diff --git a/docs/swagger.json b/docs/swagger.json index 5eab3577..251902c5 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -1992,6 +1992,66 @@ } } }, + "/api/v3/whip/{name}/whep": { + "post": { + "description": "Browser POSTs an SDP offer; server returns an SDP answer. The stream is relayed from the active WHIP publisher to the browser subscriber via ICE+DTLS-SRTP with zero transcoding.", + "consumes": [ + "application/sdp" + ], + "produces": [ + "application/sdp" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Subscribe to a WHIP stream via WHEP (WebRTC HTTP Egress Protocol)", + "operationId": "whep-3-subscribe", + "parameters": [ + { + "type": "string", + "description": "Stream key", + "name": "name", + "in": "path", + "required": true + }, + { + "description": "SDP offer from the browser", + "name": "body", + "in": "body", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "201": { + "description": "SDP answer", + "headers": { + "Location": { + "type": "string", + "description": "DELETE this URL to end the subscription" + } + }, + "schema": { + "type": "string" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/widget/process/{id}": { "get": { "description": "Fetch minimal statistics about a process, which is not protected by any auth.", @@ -3546,6 +3606,10 @@ "description": "RFC 3339 timestamp when the publisher connected.", "type": "string", "format": "date-time" + }, + "subscribers": { + "description": "Number of active WHEP subscribers watching this stream.", + "type": "integer" } } }, @@ -3563,6 +3627,10 @@ "stream_key": { "description": "Stream key component used in both URLs.", "type": "string" + }, + "whep_url": { + "description": "WebRTC egress URL for browser subscribers (WHEP).", + "type": "string" } } }, @@ -3577,6 +3645,10 @@ "description": "Base internal SDP relay URL.", "type": "string" }, + "base_whep_url": { + "description": "Base WHEP URL for browser subscribers. Append stream key and /whep.", + "type": "string" + }, "example_obs_url": { "description": "Example URL with placeholder stream key.", "type": "string" diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 5119e4c5..193df1b0 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -985,6 +985,9 @@ definitions: description: RFC 3339 timestamp when the publisher connected. format: date-time type: string + subscribers: + description: Number of active WHEP subscribers watching this stream. + type: integer type: object api.WHIPURLs: properties: @@ -997,6 +1000,9 @@ definitions: stream_key: description: Stream key component used in both URLs. type: string + whep_url: + description: WebRTC egress URL for browser subscribers (WHEP). + type: string type: object api.WHIPServerInfo: properties: @@ -1006,6 +1012,9 @@ definitions: base_sdp_url: description: Base internal SDP relay URL. type: string + base_whep_url: + description: Base WHEP URL for browser subscribers. Append stream key and /whep. + type: string example_obs_url: description: Example URL with placeholder stream key. type: string @@ -3289,6 +3298,48 @@ paths: summary: Get WHIP publish URL for a stream key tags: - v16.?.? + /api/v3/whip/{name}/whep: + post: + consumes: + - application/sdp + description: Browser POSTs an SDP offer; server returns an SDP answer. The + stream is relayed from the active WHIP publisher to the browser subscriber + via ICE+DTLS-SRTP with zero transcoding. + operationId: whep-3-subscribe + parameters: + - description: Stream key + in: path + name: name + required: true + type: string + - description: SDP offer from the browser + in: body + name: body + required: true + schema: + type: string + produces: + - application/sdp + responses: + "201": + description: SDP answer + headers: + Location: + description: DELETE this URL to end the subscription + type: string + schema: + type: string + "400": + description: Bad Request + schema: + $ref: '#/definitions/api.Error' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/api.Error' + summary: Subscribe to a WHIP stream via WHEP (WebRTC HTTP Egress Protocol) + tags: + - v16.?.? /api/v3/widget/process/{id}: get: description: Fetch minimal statistics about a process, which is not protected diff --git a/http/handler/api/whip.go b/http/handler/api/whip.go index fe3fd04b..279d9210 100644 --- a/http/handler/api/whip.go +++ b/http/handler/api/whip.go @@ -32,6 +32,8 @@ type WHIPChannel struct { Name string `json:"name" jsonschema:"minLength=1"` // PublishedAt is the RFC 3339 timestamp when the publisher connected. PublishedAt time.Time `json:"published_at"` + // Subscribers is the number of active WHEP subscribers watching this stream. + Subscribers int `json:"subscribers"` } // WHIPURLs holds the publish URL (for OBS) and the internal SDP relay URL (for FFmpeg). @@ -44,6 +46,9 @@ type WHIPURLs struct { SDPURL string `json:"sdp_url"` // StreamKey is the key component used in both URLs. StreamKey string `json:"stream_key"` + // WHEPURL is the WebRTC egress URL for browsers (WHEP). + // Format: http://:/whip//whep + WHEPURL string `json:"whep_url,omitempty"` } // whipPublicBase returns "http://:" derived from the active config. @@ -96,6 +101,7 @@ func (h *WHIPHandler) GetURL(c echo.Context) error { PublishURL: fmt.Sprintf("%s/whip/%s", publicBase, name), SDPURL: fmt.Sprintf("%s/whip/%s/sdp", localBase, name), StreamKey: name, + WHEPURL: fmt.Sprintf("%s/whip/%s/whep", publicBase, name), }) } @@ -120,10 +126,11 @@ func (h *WHIPHandler) GetServerURL(c echo.Context) error { } return c.JSON(http.StatusOK, WHIPServerInfo{ - BasePublishURL: fmt.Sprintf("%s/whip/", publicBase), - BaseSDPURL: fmt.Sprintf("%s/whip/", localBase), - HasToken: token != "", - ExampleOBS: fmt.Sprintf("%s/whip/", publicBase), + BasePublishURL: fmt.Sprintf("%s/whip/", publicBase), + BaseSDPURL: fmt.Sprintf("%s/whip/", localBase), + BaseWHEPURL: fmt.Sprintf("%s/whip/", publicBase), + HasToken: token != "", + ExampleOBS: fmt.Sprintf("%s/whip/", publicBase), InputAddressTemplate: "{whip:name=}", }) } @@ -134,6 +141,8 @@ type WHIPServerInfo struct { BasePublishURL string `json:"base_publish_url"` // BaseSDPURL is the base internal SDP relay URL. BaseSDPURL string `json:"base_sdp_url"` + // BaseWHEPURL is the base URL for WHEP browser subscribers. Append the stream key and /whep. + BaseWHEPURL string `json:"base_whep_url"` // HasToken indicates whether a bearer token is required. HasToken bool `json:"has_token"` // ExampleOBS shows an example URL with placeholder stream key. @@ -159,6 +168,7 @@ func (h *WHIPHandler) ListChannels(c echo.Context) error { list = append(list, WHIPChannel{ Name: name, PublishedAt: since, + Subscribers: channels.Subscribers[name], }) } diff --git a/whip/pion_whep.go b/whip/pion_whep.go new file mode 100644 index 00000000..b5efd32b --- /dev/null +++ b/whip/pion_whep.go @@ -0,0 +1,144 @@ +package whip + +import ( + "fmt" + "sync" + + "github.com/pion/interceptor" + "github.com/pion/webrtc/v3" +) + +// whepPionSession is a single WHEP subscriber backed by a pion PeerConnection. +// It receives plain RTP packets (already decrypted by the WHIP publisher side) +// via WriteVideo/WriteAudio and forwards them to the remote browser via +// ICE + DTLS-SRTP. +type whepPionSession struct { + pc *webrtc.PeerConnection + videoTrack *webrtc.TrackLocalStaticRTP + audioTrack *webrtc.TrackLocalStaticRTP + id string + done chan struct{} + once sync.Once +} + +// newWHEPPionSession creates a new WHEP subscriber PeerConnection. +// offerSDP is the SDP offer from the browser. +// videoMime: e.g. "video/H264"; audioMime: e.g. "audio/opus". +// Returns the session and the SDP answer to send back to the browser. +func newWHEPPionSession(id, offerSDP, videoMime, audioMime string) (*whepPionSession, string, error) { + m := &webrtc.MediaEngine{} + if err := m.RegisterDefaultCodecs(); err != nil { + return nil, "", fmt.Errorf("whep: register codecs: %w", err) + } + + ir := &interceptor.Registry{} + if err := webrtc.RegisterDefaultInterceptors(m, ir); err != nil { + return nil, "", fmt.Errorf("whep: register interceptors: %w", err) + } + + api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(ir)) + pc, err := api.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + return nil, "", fmt.Errorf("whep: new peer connection: %w", err) + } + + videoTrack, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{MimeType: videoMime}, + "video", "whep-video-"+id, + ) + if err != nil { + pc.Close() + return nil, "", fmt.Errorf("whep: create video track: %w", err) + } + + audioTrack, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{MimeType: audioMime}, + "audio", "whep-audio-"+id, + ) + if err != nil { + pc.Close() + return nil, "", fmt.Errorf("whep: create audio track: %w", err) + } + + if _, err = pc.AddTrack(videoTrack); err != nil { + pc.Close() + return nil, "", fmt.Errorf("whep: add video track: %w", err) + } + if _, err = pc.AddTrack(audioTrack); err != nil { + pc.Close() + return nil, "", fmt.Errorf("whep: add audio track: %w", err) + } + + if err = pc.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeOffer, + SDP: offerSDP, + }); err != nil { + pc.Close() + return nil, "", fmt.Errorf("whep: set remote description: %w", err) + } + + answer, err := pc.CreateAnswer(nil) + if err != nil { + pc.Close() + return nil, "", fmt.Errorf("whep: create answer: %w", err) + } + + gatherComplete := webrtc.GatheringCompletePromise(pc) + if err = pc.SetLocalDescription(answer); err != nil { + pc.Close() + return nil, "", fmt.Errorf("whep: set local description: %w", err) + } + <-gatherComplete + + sess := &whepPionSession{ + pc: pc, + videoTrack: videoTrack, + audioTrack: audioTrack, + id: id, + done: make(chan struct{}), + } + + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + if state == webrtc.PeerConnectionStateClosed || + state == webrtc.PeerConnectionStateFailed || + state == webrtc.PeerConnectionStateDisconnected { + sess.Close() + } + }) + + return sess, pc.LocalDescription().SDP, nil +} + +// ID returns the unique subscriber identifier. +func (s *whepPionSession) ID() string { return s.id } + +// WriteVideo forwards a decrypted RTP video packet to the subscriber via DTLS-SRTP. +func (s *whepPionSession) WriteVideo(pkt []byte) { + select { + case <-s.done: + return + default: + s.videoTrack.Write(pkt) //nolint:errcheck + } +} + +// WriteAudio forwards a decrypted RTP audio packet to the subscriber via DTLS-SRTP. +func (s *whepPionSession) WriteAudio(pkt []byte) { + select { + case <-s.done: + return + default: + s.audioTrack.Write(pkt) //nolint:errcheck + } +} + +// Done returns a channel that is closed when the subscriber disconnects. +func (s *whepPionSession) Done() <-chan struct{} { return s.done } + +// Close tears down the subscriber PeerConnection. +func (s *whepPionSession) Close() { + s.once.Do(func() { + s.pc.Close() + close(s.done) + }) +} diff --git a/whip/whip.go b/whip/whip.go index 5c0b8ca6..b4d8516d 100644 --- a/whip/whip.go +++ b/whip/whip.go @@ -60,6 +60,17 @@ type WebRTCProvider interface { OpenSession(offerSDP string, videoPort, audioPort int) (answerSDP string, err error) } +// whepSubscriber is implemented by each active WHEP subscriber session. +// WriteVideo/WriteAudio receive plain RTP packets from the publisher relay and +// forward them to the remote browser via a separate DTLS-SRTP connection. +type whepSubscriber interface { + ID() string + WriteVideo([]byte) + WriteAudio([]byte) + Done() <-chan struct{} + Close() +} + // Config is the configuration for the WHIP server. type Config struct { // Addr is the HTTP listen address for the WHIP endpoint, e.g. ":8555". @@ -97,6 +108,8 @@ type Server interface { type Channels struct { // Publisher maps stream name to its publish start time. Publisher map[string]time.Time + // Subscribers maps stream name to the number of active WHEP subscribers. + Subscribers map[string]int } // channel holds the relay sockets and state for a single WHIP stream. @@ -136,6 +149,10 @@ type channel struct { rxCancel context.CancelFunc lock sync.RWMutex + + // WHEP fan-out: active browser subscribers receiving this stream via WebRTC. + whepSubs map[string]whepSubscriber + whepLock sync.RWMutex } func newChannel(name string, collector session.Collector) (*channel, error) { @@ -165,6 +182,7 @@ func newChannel(name string, collector session.Collector) (*channel, error) { return nil, fmt.Errorf("whip: allocate audio relay port: %w", err) } ch.audioRelayPort = ap + ch.whepSubs = make(map[string]whepSubscriber) return ch, nil } @@ -286,9 +304,9 @@ func (ch *channel) startRx(serverIP string, provider WebRTCProvider, offerSDP st answerSDP = ch.buildPlainRTPAnswer(serverIP, videoRxPort, audioRxPort) } - // Start relay goroutines (send decrypted RTP to the loopback ports where FFmpeg listens). - go ch.relayUDP(ctx, ch.videoRx, ch.videoRelayPort, ch.snoopH264NALs) - go ch.relayUDP(ctx, ch.audioRx, ch.audioRelayPort, nil) + // Start relay goroutines: forward RTP to FFmpeg relay port and fan out to WHEP subscribers. + go ch.relayUDP(ctx, ch.videoRx, ch.videoRelayPort, ch.videoOnPacket) + go ch.relayUDP(ctx, ch.audioRx, ch.audioRelayPort, ch.audioOnPacket) // Track session bandwidth ingress. if ch.collector.IsCollectableIP(serverIP) { @@ -325,6 +343,63 @@ func (ch *channel) stopRx() { // close releases all resources held by the channel. func (ch *channel) close() { ch.stopRx() + // Close all active WHEP subscribers. + ch.whepLock.Lock() + for id, sub := range ch.whepSubs { + sub.Close() + delete(ch.whepSubs, id) + } + ch.whepLock.Unlock() +} + +// addWHEPSub registers a WHEP subscriber for this channel. +func (ch *channel) addWHEPSub(sub whepSubscriber) { + ch.whepLock.Lock() + ch.whepSubs[sub.ID()] = sub + ch.whepLock.Unlock() +} + +// removeWHEPSub removes a WHEP subscriber by ID. +func (ch *channel) removeWHEPSub(id string) { + ch.whepLock.Lock() + delete(ch.whepSubs, id) + ch.whepLock.Unlock() +} + +// whepSubCount returns the number of active WHEP subscribers. +func (ch *channel) whepSubCount() int { + ch.whepLock.RLock() + defer ch.whepLock.RUnlock() + return len(ch.whepSubs) +} + +// videoOnPacket is called for each received video RTP packet. +// It runs SPS/PPS snoop and fans out to all WHEP subscribers. +func (ch *channel) videoOnPacket(pkt []byte) { + ch.snoopH264NALs(pkt) + ch.whepLock.RLock() + subs := make([]whepSubscriber, 0, len(ch.whepSubs)) + for _, s := range ch.whepSubs { + subs = append(subs, s) + } + ch.whepLock.RUnlock() + for _, s := range subs { + s.WriteVideo(pkt) + } +} + +// audioOnPacket is called for each received audio RTP packet. +// It fans out to all WHEP subscribers. +func (ch *channel) audioOnPacket(pkt []byte) { + ch.whepLock.RLock() + subs := make([]whepSubscriber, 0, len(ch.whepSubs)) + for _, s := range ch.whepSubs { + subs = append(subs, s) + } + ch.whepLock.RUnlock() + for _, s := range subs { + s.WriteAudio(pkt) + } } // isPublishing returns true when a WHIP client is currently sending to the channel. @@ -640,16 +715,22 @@ func (s *server) Close() { s.logger.Info().Log("Server closed") } -// Channels returns the current active publishers. +// Channels returns the current active publishers and subscriber counts. func (s *server) Channels() Channels { s.lock.RLock() defer s.lock.RUnlock() - c := Channels{Publisher: make(map[string]time.Time)} + c := Channels{ + Publisher: make(map[string]time.Time), + Subscribers: make(map[string]int), + } for name, ch := range s.channels { if ch.isPublishing() { c.Publisher[name] = ch.publishedSince() } + if count := ch.whepSubCount(); count > 0 { + c.Subscribers[name] = count + } } return c } @@ -671,6 +752,49 @@ func (s *server) dispatch(w http.ResponseWriter, r *http.Request) { return } + // OPTIONS: CORS preflight for browser WHEP clients. + if r.Method == http.MethodOptions { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") + w.Header().Set("Access-Control-Allow-Methods", "POST, DELETE, OPTIONS") + w.Header().Set("Access-Control-Expose-Headers", "Location") + w.Header().Set("Access-Control-Max-Age", "86400") + w.WriteHeader(http.StatusNoContent) + return + } + + // POST /whip/{name}/whep → WHEP subscribe (RFC draft-ietf-wish-whep). + if strings.HasSuffix(path, "/whep") && r.Method == http.MethodPost { + name := strings.TrimSuffix(path, "/whep") + if !isValidStreamName(name) { + http.Error(w, "invalid stream name", http.StatusBadRequest) + return + } + if s.token != "" && !s.checkToken(r) { + w.Header().Set("WWW-Authenticate", `Bearer realm="WHEP"`) + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + s.handleWHEP(w, r, name) + return + } + + // DELETE /whip/{name}/whep/{subid} → WHEP unsubscribe. + if strings.Contains(path, "/whep/") && r.Method == http.MethodDelete { + if s.token != "" && !s.checkToken(r) { + w.Header().Set("WWW-Authenticate", `Bearer realm="WHEP"`) + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + parts := strings.SplitN(path, "/whep/", 2) + if len(parts) == 2 && isValidStreamName(parts[0]) { + s.handleWHEPDelete(w, r, parts[0], parts[1]) + return + } + http.Error(w, "invalid path", http.StatusBadRequest) + return + } + // All other methods require token validation. if s.token != "" && !s.checkToken(r) { w.Header().Set("WWW-Authenticate", `Bearer realm="WHIP"`) @@ -799,7 +923,7 @@ func (s *server) getOrCreateChannel(name string) *channel { if err != nil { s.logger.Error().WithField("stream", name).WithField("error", err).Log("Failed to create channel") // Return a dummy channel rather than nil to avoid panics; it won't relay. - ch = &channel{name: name} + ch = &channel{name: name, whepSubs: make(map[string]whepSubscriber)} } s.channels[name] = ch } @@ -829,6 +953,85 @@ func (s *server) serverIPFor(clientIP string) string { return "0.0.0.0" } +// handleWHEP handles a WHEP subscription request (RFC draft-ietf-wish-whep). +// The browser POSTs an SDP offer; the server creates a pion PeerConnection, +// adds sendonly tracks fed from the publisher relay, and returns the SDP answer. +func (s *server) handleWHEP(w http.ResponseWriter, r *http.Request, name string) { + ct := r.Header.Get("Content-Type") + if !strings.Contains(ct, "application/sdp") { + http.Error(w, "Content-Type must be application/sdp", http.StatusUnsupportedMediaType) + return + } + + body, err := io.ReadAll(io.LimitReader(r.Body, 16*1024)) + if err != nil { + http.Error(w, "failed to read SDP offer", http.StatusBadRequest) + return + } + + ch := s.getOrCreateChannel(name) + + ch.lock.RLock() + videoCodec := ch.videoCodec // e.g. "H264" + audioCodec := ch.audioCodec // e.g. "opus" + ch.lock.RUnlock() + + subID := fmt.Sprintf("%x", time.Now().UnixNano()) + videoMime := "video/" + videoCodec + audioMime := "audio/" + audioCodec + + sess, answerSDP, err := newWHEPPionSession(subID, string(body), videoMime, audioMime) + if err != nil { + s.logger.Error().WithField("stream", name).WithField("error", err).Log("Failed to create WHEP session") + http.Error(w, "failed to create WHEP session", http.StatusInternalServerError) + return + } + + ch.addWHEPSub(sess) + + clientIP, _, _ := net.SplitHostPort(r.RemoteAddr) + s.logger.Info().WithField("stream", name).WithField("subscriber", subID).WithField("client", clientIP).Log("WHEP subscriber connected") + + // Clean up automatically when the pion session closes. + go func() { + <-sess.Done() + ch.removeWHEPSub(subID) + s.logger.Info().WithField("stream", name).WithField("subscriber", subID).Log("WHEP subscriber disconnected") + }() + + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Expose-Headers", "Location") + w.Header().Set("Content-Type", "application/sdp") + w.Header().Set("Location", "/whip/"+name+"/whep/"+subID) + w.WriteHeader(http.StatusCreated) + io.WriteString(w, answerSDP) +} + +// handleWHEPDelete ends a specific WHEP subscription. +func (s *server) handleWHEPDelete(w http.ResponseWriter, r *http.Request, name, subID string) { + s.lock.RLock() + ch, ok := s.channels[name] + s.lock.RUnlock() + + if !ok { + http.Error(w, "stream not found", http.StatusNotFound) + return + } + + ch.whepLock.Lock() + sub, ok := ch.whepSubs[subID] + if !ok { + ch.whepLock.Unlock() + http.Error(w, "subscriber not found", http.StatusNotFound) + return + } + delete(ch.whepSubs, subID) + ch.whepLock.Unlock() + + sub.Close() + w.WriteHeader(http.StatusOK) +} + // isValidStreamName checks that a stream name contains only safe characters. func isValidStreamName(name string) bool { if name == "" {