feat: implement WHEP subscription support with SDP handling and subscriber management

This commit is contained in:
Cesar Mendivil 2026-03-14 14:34:45 -07:00
parent c761b4e9b8
commit 603818cd44
5 changed files with 490 additions and 10 deletions

View File

@ -1992,6 +1992,66 @@
}
}
},
"/api/v3/whip/{name}/whep": {
"post": {
"description": "Browser POSTs an SDP offer; server returns an SDP answer. The stream is relayed from the active WHIP publisher to the browser subscriber via ICE+DTLS-SRTP with zero transcoding.",
"consumes": [
"application/sdp"
],
"produces": [
"application/sdp"
],
"tags": [
"v16.?.?"
],
"summary": "Subscribe to a WHIP stream via WHEP (WebRTC HTTP Egress Protocol)",
"operationId": "whep-3-subscribe",
"parameters": [
{
"type": "string",
"description": "Stream key",
"name": "name",
"in": "path",
"required": true
},
{
"description": "SDP offer from the browser",
"name": "body",
"in": "body",
"required": true,
"schema": {
"type": "string"
}
}
],
"responses": {
"201": {
"description": "SDP answer",
"headers": {
"Location": {
"type": "string",
"description": "DELETE this URL to end the subscription"
}
},
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/widget/process/{id}": {
"get": {
"description": "Fetch minimal statistics about a process, which is not protected by any auth.",
@ -3546,6 +3606,10 @@
"description": "RFC 3339 timestamp when the publisher connected.",
"type": "string",
"format": "date-time"
},
"subscribers": {
"description": "Number of active WHEP subscribers watching this stream.",
"type": "integer"
}
}
},
@ -3563,6 +3627,10 @@
"stream_key": {
"description": "Stream key component used in both URLs.",
"type": "string"
},
"whep_url": {
"description": "WebRTC egress URL for browser subscribers (WHEP).",
"type": "string"
}
}
},
@ -3577,6 +3645,10 @@
"description": "Base internal SDP relay URL.",
"type": "string"
},
"base_whep_url": {
"description": "Base WHEP URL for browser subscribers. Append stream key and /whep.",
"type": "string"
},
"example_obs_url": {
"description": "Example URL with placeholder stream key.",
"type": "string"

View File

@ -985,6 +985,9 @@ definitions:
description: RFC 3339 timestamp when the publisher connected.
format: date-time
type: string
subscribers:
description: Number of active WHEP subscribers watching this stream.
type: integer
type: object
api.WHIPURLs:
properties:
@ -997,6 +1000,9 @@ definitions:
stream_key:
description: Stream key component used in both URLs.
type: string
whep_url:
description: WebRTC egress URL for browser subscribers (WHEP).
type: string
type: object
api.WHIPServerInfo:
properties:
@ -1006,6 +1012,9 @@ definitions:
base_sdp_url:
description: Base internal SDP relay URL.
type: string
base_whep_url:
description: Base WHEP URL for browser subscribers. Append stream key and /whep.
type: string
example_obs_url:
description: Example URL with placeholder stream key.
type: string
@ -3289,6 +3298,48 @@ paths:
summary: Get WHIP publish URL for a stream key
tags:
- v16.?.?
/api/v3/whip/{name}/whep:
post:
consumes:
- application/sdp
description: Browser POSTs an SDP offer; server returns an SDP answer. The
stream is relayed from the active WHIP publisher to the browser subscriber
via ICE+DTLS-SRTP with zero transcoding.
operationId: whep-3-subscribe
parameters:
- description: Stream key
in: path
name: name
required: true
type: string
- description: SDP offer from the browser
in: body
name: body
required: true
schema:
type: string
produces:
- application/sdp
responses:
"201":
description: SDP answer
headers:
Location:
description: DELETE this URL to end the subscription
type: string
schema:
type: string
"400":
description: Bad Request
schema:
$ref: '#/definitions/api.Error'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/api.Error'
summary: Subscribe to a WHIP stream via WHEP (WebRTC HTTP Egress Protocol)
tags:
- v16.?.?
/api/v3/widget/process/{id}:
get:
description: Fetch minimal statistics about a process, which is not protected

View File

@ -32,6 +32,8 @@ type WHIPChannel struct {
Name string `json:"name" jsonschema:"minLength=1"`
// PublishedAt is the RFC 3339 timestamp when the publisher connected.
PublishedAt time.Time `json:"published_at"`
// Subscribers is the number of active WHEP subscribers watching this stream.
Subscribers int `json:"subscribers"`
}
// WHIPURLs holds the publish URL (for OBS) and the internal SDP relay URL (for FFmpeg).
@ -44,6 +46,9 @@ type WHIPURLs struct {
SDPURL string `json:"sdp_url"`
// StreamKey is the key component used in both URLs.
StreamKey string `json:"stream_key"`
// WHEPURL is the WebRTC egress URL for browsers (WHEP).
// Format: http://<public_host>:<port>/whip/<stream_key>/whep
WHEPURL string `json:"whep_url,omitempty"`
}
// whipPublicBase returns "http://<host>:<port>" derived from the active config.
@ -96,6 +101,7 @@ func (h *WHIPHandler) GetURL(c echo.Context) error {
PublishURL: fmt.Sprintf("%s/whip/%s", publicBase, name),
SDPURL: fmt.Sprintf("%s/whip/%s/sdp", localBase, name),
StreamKey: name,
WHEPURL: fmt.Sprintf("%s/whip/%s/whep", publicBase, name),
})
}
@ -120,10 +126,11 @@ func (h *WHIPHandler) GetServerURL(c echo.Context) error {
}
return c.JSON(http.StatusOK, WHIPServerInfo{
BasePublishURL: fmt.Sprintf("%s/whip/", publicBase),
BaseSDPURL: fmt.Sprintf("%s/whip/", localBase),
HasToken: token != "",
ExampleOBS: fmt.Sprintf("%s/whip/<stream-key>", publicBase),
BasePublishURL: fmt.Sprintf("%s/whip/", publicBase),
BaseSDPURL: fmt.Sprintf("%s/whip/", localBase),
BaseWHEPURL: fmt.Sprintf("%s/whip/", publicBase),
HasToken: token != "",
ExampleOBS: fmt.Sprintf("%s/whip/<stream-key>", publicBase),
InputAddressTemplate: "{whip:name=<stream-key>}",
})
}
@ -134,6 +141,8 @@ type WHIPServerInfo struct {
BasePublishURL string `json:"base_publish_url"`
// BaseSDPURL is the base internal SDP relay URL.
BaseSDPURL string `json:"base_sdp_url"`
// BaseWHEPURL is the base URL for WHEP browser subscribers. Append the stream key and /whep.
BaseWHEPURL string `json:"base_whep_url"`
// HasToken indicates whether a bearer token is required.
HasToken bool `json:"has_token"`
// ExampleOBS shows an example URL with placeholder stream key.
@ -159,6 +168,7 @@ func (h *WHIPHandler) ListChannels(c echo.Context) error {
list = append(list, WHIPChannel{
Name: name,
PublishedAt: since,
Subscribers: channels.Subscribers[name],
})
}

144
whip/pion_whep.go Normal file
View File

@ -0,0 +1,144 @@
package whip
import (
"fmt"
"sync"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
)
// whepPionSession is a single WHEP subscriber backed by a pion PeerConnection.
// It receives plain RTP packets (already decrypted by the WHIP publisher side)
// via WriteVideo/WriteAudio and forwards them to the remote browser via
// ICE + DTLS-SRTP.
type whepPionSession struct {
pc *webrtc.PeerConnection
videoTrack *webrtc.TrackLocalStaticRTP
audioTrack *webrtc.TrackLocalStaticRTP
id string
done chan struct{}
once sync.Once
}
// newWHEPPionSession creates a new WHEP subscriber PeerConnection.
// offerSDP is the SDP offer from the browser.
// videoMime: e.g. "video/H264"; audioMime: e.g. "audio/opus".
// Returns the session and the SDP answer to send back to the browser.
func newWHEPPionSession(id, offerSDP, videoMime, audioMime string) (*whepPionSession, string, error) {
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
return nil, "", fmt.Errorf("whep: register codecs: %w", err)
}
ir := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(m, ir); err != nil {
return nil, "", fmt.Errorf("whep: register interceptors: %w", err)
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(ir))
pc, err := api.NewPeerConnection(webrtc.Configuration{})
if err != nil {
return nil, "", fmt.Errorf("whep: new peer connection: %w", err)
}
videoTrack, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{MimeType: videoMime},
"video", "whep-video-"+id,
)
if err != nil {
pc.Close()
return nil, "", fmt.Errorf("whep: create video track: %w", err)
}
audioTrack, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{MimeType: audioMime},
"audio", "whep-audio-"+id,
)
if err != nil {
pc.Close()
return nil, "", fmt.Errorf("whep: create audio track: %w", err)
}
if _, err = pc.AddTrack(videoTrack); err != nil {
pc.Close()
return nil, "", fmt.Errorf("whep: add video track: %w", err)
}
if _, err = pc.AddTrack(audioTrack); err != nil {
pc.Close()
return nil, "", fmt.Errorf("whep: add audio track: %w", err)
}
if err = pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: offerSDP,
}); err != nil {
pc.Close()
return nil, "", fmt.Errorf("whep: set remote description: %w", err)
}
answer, err := pc.CreateAnswer(nil)
if err != nil {
pc.Close()
return nil, "", fmt.Errorf("whep: create answer: %w", err)
}
gatherComplete := webrtc.GatheringCompletePromise(pc)
if err = pc.SetLocalDescription(answer); err != nil {
pc.Close()
return nil, "", fmt.Errorf("whep: set local description: %w", err)
}
<-gatherComplete
sess := &whepPionSession{
pc: pc,
videoTrack: videoTrack,
audioTrack: audioTrack,
id: id,
done: make(chan struct{}),
}
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
if state == webrtc.PeerConnectionStateClosed ||
state == webrtc.PeerConnectionStateFailed ||
state == webrtc.PeerConnectionStateDisconnected {
sess.Close()
}
})
return sess, pc.LocalDescription().SDP, nil
}
// ID returns the unique subscriber identifier.
func (s *whepPionSession) ID() string { return s.id }
// WriteVideo forwards a decrypted RTP video packet to the subscriber via DTLS-SRTP.
func (s *whepPionSession) WriteVideo(pkt []byte) {
select {
case <-s.done:
return
default:
s.videoTrack.Write(pkt) //nolint:errcheck
}
}
// WriteAudio forwards a decrypted RTP audio packet to the subscriber via DTLS-SRTP.
func (s *whepPionSession) WriteAudio(pkt []byte) {
select {
case <-s.done:
return
default:
s.audioTrack.Write(pkt) //nolint:errcheck
}
}
// Done returns a channel that is closed when the subscriber disconnects.
func (s *whepPionSession) Done() <-chan struct{} { return s.done }
// Close tears down the subscriber PeerConnection.
func (s *whepPionSession) Close() {
s.once.Do(func() {
s.pc.Close()
close(s.done)
})
}

View File

@ -60,6 +60,17 @@ type WebRTCProvider interface {
OpenSession(offerSDP string, videoPort, audioPort int) (answerSDP string, err error)
}
// whepSubscriber is implemented by each active WHEP subscriber session.
// WriteVideo/WriteAudio receive plain RTP packets from the publisher relay and
// forward them to the remote browser via a separate DTLS-SRTP connection.
type whepSubscriber interface {
ID() string
WriteVideo([]byte)
WriteAudio([]byte)
Done() <-chan struct{}
Close()
}
// Config is the configuration for the WHIP server.
type Config struct {
// Addr is the HTTP listen address for the WHIP endpoint, e.g. ":8555".
@ -97,6 +108,8 @@ type Server interface {
type Channels struct {
// Publisher maps stream name to its publish start time.
Publisher map[string]time.Time
// Subscribers maps stream name to the number of active WHEP subscribers.
Subscribers map[string]int
}
// channel holds the relay sockets and state for a single WHIP stream.
@ -136,6 +149,10 @@ type channel struct {
rxCancel context.CancelFunc
lock sync.RWMutex
// WHEP fan-out: active browser subscribers receiving this stream via WebRTC.
whepSubs map[string]whepSubscriber
whepLock sync.RWMutex
}
func newChannel(name string, collector session.Collector) (*channel, error) {
@ -165,6 +182,7 @@ func newChannel(name string, collector session.Collector) (*channel, error) {
return nil, fmt.Errorf("whip: allocate audio relay port: %w", err)
}
ch.audioRelayPort = ap
ch.whepSubs = make(map[string]whepSubscriber)
return ch, nil
}
@ -286,9 +304,9 @@ func (ch *channel) startRx(serverIP string, provider WebRTCProvider, offerSDP st
answerSDP = ch.buildPlainRTPAnswer(serverIP, videoRxPort, audioRxPort)
}
// Start relay goroutines (send decrypted RTP to the loopback ports where FFmpeg listens).
go ch.relayUDP(ctx, ch.videoRx, ch.videoRelayPort, ch.snoopH264NALs)
go ch.relayUDP(ctx, ch.audioRx, ch.audioRelayPort, nil)
// Start relay goroutines: forward RTP to FFmpeg relay port and fan out to WHEP subscribers.
go ch.relayUDP(ctx, ch.videoRx, ch.videoRelayPort, ch.videoOnPacket)
go ch.relayUDP(ctx, ch.audioRx, ch.audioRelayPort, ch.audioOnPacket)
// Track session bandwidth ingress.
if ch.collector.IsCollectableIP(serverIP) {
@ -325,6 +343,63 @@ func (ch *channel) stopRx() {
// close releases all resources held by the channel.
func (ch *channel) close() {
ch.stopRx()
// Close all active WHEP subscribers.
ch.whepLock.Lock()
for id, sub := range ch.whepSubs {
sub.Close()
delete(ch.whepSubs, id)
}
ch.whepLock.Unlock()
}
// addWHEPSub registers a WHEP subscriber for this channel.
func (ch *channel) addWHEPSub(sub whepSubscriber) {
ch.whepLock.Lock()
ch.whepSubs[sub.ID()] = sub
ch.whepLock.Unlock()
}
// removeWHEPSub removes a WHEP subscriber by ID.
func (ch *channel) removeWHEPSub(id string) {
ch.whepLock.Lock()
delete(ch.whepSubs, id)
ch.whepLock.Unlock()
}
// whepSubCount returns the number of active WHEP subscribers.
func (ch *channel) whepSubCount() int {
ch.whepLock.RLock()
defer ch.whepLock.RUnlock()
return len(ch.whepSubs)
}
// videoOnPacket is called for each received video RTP packet.
// It runs SPS/PPS snoop and fans out to all WHEP subscribers.
func (ch *channel) videoOnPacket(pkt []byte) {
ch.snoopH264NALs(pkt)
ch.whepLock.RLock()
subs := make([]whepSubscriber, 0, len(ch.whepSubs))
for _, s := range ch.whepSubs {
subs = append(subs, s)
}
ch.whepLock.RUnlock()
for _, s := range subs {
s.WriteVideo(pkt)
}
}
// audioOnPacket is called for each received audio RTP packet.
// It fans out to all WHEP subscribers.
func (ch *channel) audioOnPacket(pkt []byte) {
ch.whepLock.RLock()
subs := make([]whepSubscriber, 0, len(ch.whepSubs))
for _, s := range ch.whepSubs {
subs = append(subs, s)
}
ch.whepLock.RUnlock()
for _, s := range subs {
s.WriteAudio(pkt)
}
}
// isPublishing returns true when a WHIP client is currently sending to the channel.
@ -640,16 +715,22 @@ func (s *server) Close() {
s.logger.Info().Log("Server closed")
}
// Channels returns the current active publishers.
// Channels returns the current active publishers and subscriber counts.
func (s *server) Channels() Channels {
s.lock.RLock()
defer s.lock.RUnlock()
c := Channels{Publisher: make(map[string]time.Time)}
c := Channels{
Publisher: make(map[string]time.Time),
Subscribers: make(map[string]int),
}
for name, ch := range s.channels {
if ch.isPublishing() {
c.Publisher[name] = ch.publishedSince()
}
if count := ch.whepSubCount(); count > 0 {
c.Subscribers[name] = count
}
}
return c
}
@ -671,6 +752,49 @@ func (s *server) dispatch(w http.ResponseWriter, r *http.Request) {
return
}
// OPTIONS: CORS preflight for browser WHEP clients.
if r.Method == http.MethodOptions {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
w.Header().Set("Access-Control-Allow-Methods", "POST, DELETE, OPTIONS")
w.Header().Set("Access-Control-Expose-Headers", "Location")
w.Header().Set("Access-Control-Max-Age", "86400")
w.WriteHeader(http.StatusNoContent)
return
}
// POST /whip/{name}/whep → WHEP subscribe (RFC draft-ietf-wish-whep).
if strings.HasSuffix(path, "/whep") && r.Method == http.MethodPost {
name := strings.TrimSuffix(path, "/whep")
if !isValidStreamName(name) {
http.Error(w, "invalid stream name", http.StatusBadRequest)
return
}
if s.token != "" && !s.checkToken(r) {
w.Header().Set("WWW-Authenticate", `Bearer realm="WHEP"`)
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
s.handleWHEP(w, r, name)
return
}
// DELETE /whip/{name}/whep/{subid} → WHEP unsubscribe.
if strings.Contains(path, "/whep/") && r.Method == http.MethodDelete {
if s.token != "" && !s.checkToken(r) {
w.Header().Set("WWW-Authenticate", `Bearer realm="WHEP"`)
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
parts := strings.SplitN(path, "/whep/", 2)
if len(parts) == 2 && isValidStreamName(parts[0]) {
s.handleWHEPDelete(w, r, parts[0], parts[1])
return
}
http.Error(w, "invalid path", http.StatusBadRequest)
return
}
// All other methods require token validation.
if s.token != "" && !s.checkToken(r) {
w.Header().Set("WWW-Authenticate", `Bearer realm="WHIP"`)
@ -799,7 +923,7 @@ func (s *server) getOrCreateChannel(name string) *channel {
if err != nil {
s.logger.Error().WithField("stream", name).WithField("error", err).Log("Failed to create channel")
// Return a dummy channel rather than nil to avoid panics; it won't relay.
ch = &channel{name: name}
ch = &channel{name: name, whepSubs: make(map[string]whepSubscriber)}
}
s.channels[name] = ch
}
@ -829,6 +953,85 @@ func (s *server) serverIPFor(clientIP string) string {
return "0.0.0.0"
}
// handleWHEP handles a WHEP subscription request (RFC draft-ietf-wish-whep).
// The browser POSTs an SDP offer; the server creates a pion PeerConnection,
// adds sendonly tracks fed from the publisher relay, and returns the SDP answer.
func (s *server) handleWHEP(w http.ResponseWriter, r *http.Request, name string) {
ct := r.Header.Get("Content-Type")
if !strings.Contains(ct, "application/sdp") {
http.Error(w, "Content-Type must be application/sdp", http.StatusUnsupportedMediaType)
return
}
body, err := io.ReadAll(io.LimitReader(r.Body, 16*1024))
if err != nil {
http.Error(w, "failed to read SDP offer", http.StatusBadRequest)
return
}
ch := s.getOrCreateChannel(name)
ch.lock.RLock()
videoCodec := ch.videoCodec // e.g. "H264"
audioCodec := ch.audioCodec // e.g. "opus"
ch.lock.RUnlock()
subID := fmt.Sprintf("%x", time.Now().UnixNano())
videoMime := "video/" + videoCodec
audioMime := "audio/" + audioCodec
sess, answerSDP, err := newWHEPPionSession(subID, string(body), videoMime, audioMime)
if err != nil {
s.logger.Error().WithField("stream", name).WithField("error", err).Log("Failed to create WHEP session")
http.Error(w, "failed to create WHEP session", http.StatusInternalServerError)
return
}
ch.addWHEPSub(sess)
clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
s.logger.Info().WithField("stream", name).WithField("subscriber", subID).WithField("client", clientIP).Log("WHEP subscriber connected")
// Clean up automatically when the pion session closes.
go func() {
<-sess.Done()
ch.removeWHEPSub(subID)
s.logger.Info().WithField("stream", name).WithField("subscriber", subID).Log("WHEP subscriber disconnected")
}()
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Expose-Headers", "Location")
w.Header().Set("Content-Type", "application/sdp")
w.Header().Set("Location", "/whip/"+name+"/whep/"+subID)
w.WriteHeader(http.StatusCreated)
io.WriteString(w, answerSDP)
}
// handleWHEPDelete ends a specific WHEP subscription.
func (s *server) handleWHEPDelete(w http.ResponseWriter, r *http.Request, name, subID string) {
s.lock.RLock()
ch, ok := s.channels[name]
s.lock.RUnlock()
if !ok {
http.Error(w, "stream not found", http.StatusNotFound)
return
}
ch.whepLock.Lock()
sub, ok := ch.whepSubs[subID]
if !ok {
ch.whepLock.Unlock()
http.Error(w, "subscriber not found", http.StatusNotFound)
return
}
delete(ch.whepSubs, subID)
ch.whepLock.Unlock()
sub.Close()
w.WriteHeader(http.StatusOK)
}
// isValidStreamName checks that a stream name contains only safe characters.
func isValidStreamName(name string) bool {
if name == "" {