// Package whip provides a WHIP (WebRTC HTTP Ingestion Protocol, RFC 9328) server. // // The server listens on an HTTP port for WHIP publishing requests. Clients // (OBS Studio 30+, GStreamer, FFmpeg, browsers) POST an SDP offer to // /whip/{streamID}. The server allocates UDP receive sockets, responds with // an SDP answer containing the server's RTP receive ports, and forwards the // received RTP packets to internal relay sockets that FFmpeg reads via a // dynamically-generated SDP served at /whip/{streamID}/sdp. // // Usage in a Restreamer process config (input address template): // // http://localhost:8555/whip/{name}/sdp // // This uses the {whip} template, which expands to the above URL pattern. // // # WebRTC / ICE / DTLS-SRTP // // This implementation uses plain UDP RTP without ICE negotiation or DTLS-SRTP // encryption. It is compatible with encoders that support plain-RTP mode // (e.g. FFmpeg with -f whip pointed at this server, GStreamer without DTLS). // // For full browser and OBS WHIP support (which requires ICE + DTLS-SRTP), // inject a WebRTCProvider implementation (e.g. via github.com/pion/webrtc/v3) // through Config.WebRTCProvider. When set, the provider handles ICE negotiation // and SRTP decryption and delivers decrypted RTP to the server's relay layer. package whip import ( "context" "encoding/base64" "fmt" "io" "net" "net/http" "strings" "sync" "time" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/session" ) // ErrServerClosed is returned by ListenAndServe when the server is closed via Close(). var ErrServerClosed = http.ErrServerClosed // WebRTCProvider is an optional interface for full WebRTC (ICE + DTLS-SRTP) support. // When provided, the server delegates SDP negotiation and media reception to the // provider rather than using plain UDP RTP. // // To add browser/OBS WHIP support, implement this interface using pion/webrtc/v3: // // type pionProvider struct { ... } // func (p *pionProvider) OpenSession(offerSDP string, videoPort, audioPort int) (answerSDP string, err error) { ... } type WebRTCProvider interface { // OpenSession performs ICE + DTLS-SRTP negotiation. // offerSDP is the SDP offer from the client. // videoPort and audioPort are the local UDP ports where the provider should // deliver decrypted RTP after the handshake. // Returns the SDP answer to send back to the client, or an error. OpenSession(offerSDP string, videoPort, audioPort int) (answerSDP string, err error) } // whepSubscriber is implemented by each active WHEP subscriber session. // WriteVideo/WriteAudio receive plain RTP packets from the publisher relay and // forward them to the remote browser via a separate DTLS-SRTP connection. type whepSubscriber interface { ID() string WriteVideo([]byte) WriteAudio([]byte) Done() <-chan struct{} Close() } // Config is the configuration for the WHIP server. type Config struct { // Addr is the HTTP listen address for the WHIP endpoint, e.g. ":8555". Addr string // Token is an optional bearer token required from WHIP clients. // If empty, no authentication is required. Token string // Logger is optional. Logger log.Logger // Collector is optional. Used for session statistics. Collector session.Collector // WebRTCProvider is optional. When provided, it handles ICE negotiation and // DTLS-SRTP for full browser/OBS WebRTC support. When nil, the server falls // back to plain UDP RTP (no ICE, no encryption). WebRTCProvider WebRTCProvider // RTSPRelayAddr is the TCP listen address for the internal RTSP relay server, // e.g. ":8554". When set, all WHIP streams are also served over RTSP at // rtsp://127.0.0.1:/live/, allowing multiple FFmpeg egress // processes to consume the same source independently. // Leave empty to disable the RTSP relay. RTSPRelayAddr string } // Server defines the WHIP server interface. type Server interface { // ListenAndServe starts the WHIP HTTP server. Blocks until Close() is called. ListenAndServe() error // Close shuts down the server and releases all resources. Close() // Channels returns the currently active (publishing) WHIP streams. Channels() Channels } // Channels describes the active WHIP streams. type Channels struct { // Publisher maps stream name to its publish start time. Publisher map[string]time.Time // Subscribers maps stream name to the number of active WHEP subscribers. Subscribers map[string]int } // channel holds the relay sockets and state for a single WHIP stream. type channel struct { name string // rtspRelay is the optional internal RTSP relay server. When non-nil, all // received RTP packets are also forwarded to RTSP consumers. rtspRelay *rtspRelay // videoRelayPort / audioRelayPort: loopback UDP ports that FFmpeg reads from. // Ports are pre-allocated by briefly binding to :0, then released so that // FFmpeg (or any RTP consumer) can bind to them. videoRelayPort int audioRelayPort int // videoRx / audioRx: UDP sockets that receive RTP from the WHIP client. // These are present only while a client is actively publishing. videoRx *net.UDPConn audioRx *net.UDPConn // codec parameters negotiated from the SDP offer. videoPayloadType int videoCodec string videoClockRate int videoFmtp string // extra fmtp params from offer (e.g. sprop-parameter-sets) // SPS/PPS captured from in-band RTP stream (populated after first keyframe). videoSPS []byte videoPPS []byte spropped bool // true once sprop-parameter-sets has been built from in-band NALs audioPayloadType int audioCodec string audioClockRate int audioChannels int publishedAt time.Time collector session.Collector rxCancel context.CancelFunc lock sync.RWMutex // WHEP fan-out: active browser subscribers receiving this stream via WebRTC. whepSubs map[string]whepSubscriber whepLock sync.RWMutex } func newChannel(name string, collector session.Collector) (*channel, error) { ch := &channel{ name: name, collector: collector, // Defaults matching the most common WebRTC codecs. videoPayloadType: 96, videoCodec: "H264", videoClockRate: 90000, audioPayloadType: 111, audioCodec: "opus", audioClockRate: 48000, audioChannels: 2, } // Pre-allocate relay port numbers. We bind briefly to get an ephemeral port // assigned by the OS, then immediately close so FFmpeg can bind to that port. vp, err := allocateRelayPort() if err != nil { return nil, fmt.Errorf("whip: allocate video relay port: %w", err) } ch.videoRelayPort = vp ap, err := allocateRelayPort() if err != nil { return nil, fmt.Errorf("whip: allocate audio relay port: %w", err) } ch.audioRelayPort = ap ch.whepSubs = make(map[string]whepSubscriber) return ch, nil } // allocateRelayPort binds a UDP socket on a random loopback port, records the // port number, then immediately closes the socket so that an external process // (FFmpeg) can bind to the same port shortly afterwards. func allocateRelayPort() (int, error) { conn, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}) if err != nil { return 0, err } port := conn.LocalAddr().(*net.UDPAddr).Port conn.Close() return port, nil } // relayVideoPort returns the relay port for the video RTP stream (FFmpeg reads this). func (ch *channel) relayVideoPort() int { return ch.videoRelayPort } // relayAudioPort returns the relay port for the audio RTP stream (FFmpeg reads this). func (ch *channel) relayAudioPort() int { return ch.audioRelayPort } // ffmpegSDP returns the SDP describing the relay streams for FFmpeg to consume. func (ch *channel) ffmpegSDP() string { ch.lock.RLock() vpt := ch.videoPayloadType vpc := ch.videoCodec vclk := ch.videoClockRate vfmtp := ch.videoFmtp apt := ch.audioPayloadType apc := ch.audioCodec aclk := ch.audioClockRate ach := ch.audioChannels ch.lock.RUnlock() videoPort := ch.relayVideoPort() audioPort := ch.relayAudioPort() sdp := "v=0\r\n" sdp += "o=- 0 0 IN IP4 127.0.0.1\r\n" sdp += "s=WHIP Stream\r\n" sdp += "c=IN IP4 127.0.0.1\r\n" sdp += "t=0 0\r\n" sdp += fmt.Sprintf("m=video %d RTP/AVP %d\r\n", videoPort, vpt) sdp += fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", vpt, vpc, vclk) if vfmtp != "" { sdp += fmt.Sprintf("a=fmtp:%d %s\r\n", vpt, vfmtp) } else if vpc == "H264" { sdp += fmt.Sprintf("a=fmtp:%d packetization-mode=1\r\n", vpt) } sdp += fmt.Sprintf("m=audio %d RTP/AVP %d\r\n", audioPort, apt) if ach > 1 { sdp += fmt.Sprintf("a=rtpmap:%d %s/%d/%d\r\n", apt, apc, aclk, ach) } else { sdp += fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", apt, apc, aclk) } return sdp } // startRx allocates receive sockets, starts the relay goroutines, and begins // forwarding incoming RTP packets from the WHIP client to the relay sockets. // It returns the SDP answer to send to the WHIP client. func (ch *channel) startRx(serverIP string, provider WebRTCProvider, offerSDP string) (answerSDP string, videoRxPort, audioRxPort int, err error) { ch.lock.Lock() defer ch.lock.Unlock() // Stop any previous rx session. if ch.rxCancel != nil { ch.rxCancel() } if ch.videoRx != nil { ch.videoRx.Close() } if ch.audioRx != nil { ch.audioRx.Close() } // Parse codec parameters from the SDP offer (best-effort). ch.updateCodecsFromSDP(offerSDP) // Sync initial codec info to the RTSP relay so DESCRIBE works before SPS/PPS snoop. if ch.rtspRelay != nil { ch.rtspRelay.UpdateCodecs(ch.name, ch.currentCodecParams()) } // Allocate rx sockets for the WHIP client to send RTP to. ch.videoRx, err = net.ListenUDP("udp4", &net.UDPAddr{IP: net.ParseIP(serverIP), Port: 0}) if err != nil { return "", 0, 0, fmt.Errorf("whip: allocate video rx socket: %w", err) } ch.audioRx, err = net.ListenUDP("udp4", &net.UDPAddr{IP: net.ParseIP(serverIP), Port: 0}) if err != nil { ch.videoRx.Close() ch.videoRx = nil return "", 0, 0, fmt.Errorf("whip: allocate audio rx socket: %w", err) } videoRxPort = ch.videoRx.LocalAddr().(*net.UDPAddr).Port audioRxPort = ch.audioRx.LocalAddr().(*net.UDPAddr).Port ctx, cancel := context.WithCancel(context.Background()) ch.rxCancel = cancel ch.publishedAt = time.Now() // Build the SDP answer for the WHIP client. if provider != nil { answerSDP, err = provider.OpenSession(offerSDP, videoRxPort, audioRxPort) if err != nil { cancel() ch.videoRx.Close() ch.audioRx.Close() ch.videoRx = nil ch.audioRx = nil return "", 0, 0, fmt.Errorf("whip: WebRTC provider: %w", err) } } else { answerSDP = ch.buildPlainRTPAnswer(serverIP, videoRxPort, audioRxPort) } // Start relay goroutines: forward RTP to FFmpeg relay port and fan out to WHEP subscribers. go ch.relayUDP(ctx, ch.videoRx, ch.videoRelayPort, ch.videoOnPacket) go ch.relayUDP(ctx, ch.audioRx, ch.audioRelayPort, ch.audioOnPacket) // Track session bandwidth ingress. if ch.collector.IsCollectableIP(serverIP) { ch.collector.RegisterAndActivate(ch.name, ch.name, "publish:"+ch.name, serverIP) } return answerSDP, videoRxPort, audioRxPort, nil } // stopRx tears down the receive side of the channel. func (ch *channel) stopRx() { ch.lock.Lock() defer ch.lock.Unlock() if ch.rxCancel != nil { ch.rxCancel() ch.rxCancel = nil } if ch.videoRx != nil { ch.videoRx.Close() ch.videoRx = nil } if ch.audioRx != nil { ch.audioRx.Close() ch.audioRx = nil } ch.publishedAt = time.Time{} // Reset SPS/PPS snoop so next publisher gets fresh capture. ch.videoSPS = nil ch.videoPPS = nil ch.spropped = false } // close releases all resources held by the channel. func (ch *channel) close() { ch.stopRx() // Close all active WHEP subscribers. ch.whepLock.Lock() for id, sub := range ch.whepSubs { sub.Close() delete(ch.whepSubs, id) } ch.whepLock.Unlock() } // addWHEPSub registers a WHEP subscriber for this channel. func (ch *channel) addWHEPSub(sub whepSubscriber) { ch.whepLock.Lock() ch.whepSubs[sub.ID()] = sub ch.whepLock.Unlock() } // removeWHEPSub removes a WHEP subscriber by ID. func (ch *channel) removeWHEPSub(id string) { ch.whepLock.Lock() delete(ch.whepSubs, id) ch.whepLock.Unlock() } // whepSubCount returns the number of active WHEP subscribers. func (ch *channel) whepSubCount() int { ch.whepLock.RLock() defer ch.whepLock.RUnlock() return len(ch.whepSubs) } // videoOnPacket is called for each received video RTP packet. // It runs SPS/PPS snoop, fans out to WHEP subscribers, and forwards to the RTSP relay. func (ch *channel) videoOnPacket(pkt []byte) { ch.snoopH264NALs(pkt) ch.whepLock.RLock() subs := make([]whepSubscriber, 0, len(ch.whepSubs)) for _, s := range ch.whepSubs { subs = append(subs, s) } ch.whepLock.RUnlock() for _, s := range subs { s.WriteVideo(pkt) } if ch.rtspRelay != nil { ch.rtspRelay.WriteVideo(ch.name, pkt) } } // audioOnPacket is called for each received audio RTP packet. // It fans out to WHEP subscribers and forwards to the RTSP relay. func (ch *channel) audioOnPacket(pkt []byte) { ch.whepLock.RLock() subs := make([]whepSubscriber, 0, len(ch.whepSubs)) for _, s := range ch.whepSubs { subs = append(subs, s) } ch.whepLock.RUnlock() for _, s := range subs { s.WriteAudio(pkt) } if ch.rtspRelay != nil { ch.rtspRelay.WriteAudio(ch.name, pkt) } } // isPublishing returns true when a WHIP client is currently sending to the channel. func (ch *channel) isPublishing() bool { ch.lock.RLock() defer ch.lock.RUnlock() return !ch.publishedAt.IsZero() } // publishedSince returns when the current publisher started. func (ch *channel) publishedSince() time.Time { ch.lock.RLock() defer ch.lock.RUnlock() return ch.publishedAt } // relayUDP reads RTP packets from rx and forwards them to relayPort on loopback. // FFmpeg (or any RTP consumer) is expected to be listening on relayPort. // onPacket, if non-nil, is called with a copy of each RTP packet before forwarding. func (ch *channel) relayUDP(ctx context.Context, rx *net.UDPConn, relayPort int, onPacket func([]byte)) { buf := make([]byte, 1500) // Send to loopback so FFmpeg (listening on relayPort) receives. dst := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: relayPort} sendConn, err := net.DialUDP("udp4", nil, dst) if err != nil { return } defer sendConn.Close() for { select { case <-ctx.Done(): return default: } rx.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) n, _, err := rx.ReadFromUDP(buf) if err != nil { if isNetTimeout(err) { continue } return } sendConn.Write(buf[:n]) if onPacket != nil { pkt := make([]byte, n) copy(pkt, buf[:n]) go onPacket(pkt) } } } // snoopH264NALs inspects an RTP packet for H.264 SPS (type 7) and PPS (type 8) // NAL units. Once both are found, it updates ch.videoFmtp with sprop-parameter-sets // so that FFmpeg can determine the video resolution from the relay SDP. func (ch *channel) snoopH264NALs(pkt []byte) { ch.lock.RLock() done := ch.spropped ch.lock.RUnlock() if done { return } nals := parseH264NALsFromRTP(pkt) if len(nals) == 0 { return } ch.lock.Lock() defer ch.lock.Unlock() if ch.spropped { return } for _, nal := range nals { if len(nal) == 0 { continue } nalType := nal[0] & 0x1f if nalType == 7 && ch.videoSPS == nil { ch.videoSPS = nal } else if nalType == 8 && ch.videoPPS == nil { ch.videoPPS = nal } } if ch.videoSPS != nil && ch.videoPPS != nil { ch.videoFmtp = "sprop-parameter-sets=" + base64.StdEncoding.EncodeToString(ch.videoSPS) + "," + base64.StdEncoding.EncodeToString(ch.videoPPS) + ";packetization-mode=1" ch.spropped = true // Sync updated fmtp (with sprop-parameter-sets) to the RTSP relay // so that DESCRIBE responses include the correct SPS/PPS. if ch.rtspRelay != nil { ch.rtspRelay.UpdateCodecs(ch.name, ch.currentCodecParams()) } } } // parseH264NALsFromRTP returns the H.264 NAL units contained in a raw RTP packet. // It handles Single NAL unit packets and STAP-A aggregation packets. // FU-A fragmented packets are skipped (SPS/PPS are never fragmented in practice). func parseH264NALsFromRTP(rtpPkt []byte) [][]byte { if len(rtpPkt) < 13 { return nil } // Skip fixed 12-byte RTP header + optional CSRC list. cc := int(rtpPkt[0] & 0x0f) offset := 12 + cc*4 // Skip RTP extension header if present. if rtpPkt[0]&0x10 != 0 { if offset+4 > len(rtpPkt) { return nil } extLen := int(rtpPkt[offset+2])<<8 | int(rtpPkt[offset+3]) offset += 4 + extLen*4 } if offset >= len(rtpPkt) { return nil } payload := rtpPkt[offset:] nalType := payload[0] & 0x1f switch { case nalType >= 1 && nalType <= 23: // Single NAL unit packet return [][]byte{payload} case nalType == 24: // STAP-A var nals [][]byte pos := 1 for pos+2 <= len(payload) { size := int(payload[pos])<<8 | int(payload[pos+1]) pos += 2 if size == 0 || pos+size > len(payload) { break } nals = append(nals, payload[pos:pos+size]) pos += size } return nals } // FU-A and other types: ignore return nil } // buildPlainRTPAnswer constructs a minimal SDP answer for plain (non-DTLS) RTP. func (ch *channel) buildPlainRTPAnswer(serverIP string, videoPort, audioPort int) string { sdp := "v=0\r\n" sdp += "o=- 0 0 IN IP4 " + serverIP + "\r\n" sdp += "s=WHIP Answer\r\n" sdp += "c=IN IP4 " + serverIP + "\r\n" sdp += "t=0 0\r\n" sdp += fmt.Sprintf("m=video %d RTP/AVP %d\r\n", videoPort, ch.videoPayloadType) sdp += fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", ch.videoPayloadType, ch.videoCodec, ch.videoClockRate) sdp += "a=recvonly\r\n" sdp += fmt.Sprintf("m=audio %d RTP/AVP %d\r\n", audioPort, ch.audioPayloadType) if ch.audioChannels > 1 { sdp += fmt.Sprintf("a=rtpmap:%d %s/%d/%d\r\n", ch.audioPayloadType, ch.audioCodec, ch.audioClockRate, ch.audioChannels) } else { sdp += fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", ch.audioPayloadType, ch.audioCodec, ch.audioClockRate) } sdp += "a=recvonly\r\n" return sdp } // updateCodecsFromSDP performs a best-effort parse of the SDP offer to extract // the first video and audio payload types and codec names. func (ch *channel) updateCodecsFromSDP(sdp string) { type mediaState struct { inVideo bool inAudio bool } var ms mediaState for _, line := range strings.Split(sdp, "\n") { line = strings.TrimRight(line, "\r") if strings.HasPrefix(line, "m=video") { ms.inVideo = true ms.inAudio = false // Extract payload type from "m=video 9 ... 96 97 ..." parts := strings.Fields(line) if len(parts) >= 4 { var pt int if _, err := fmt.Sscanf(parts[3], "%d", &pt); err == nil { ch.videoPayloadType = pt } } } else if strings.HasPrefix(line, "m=audio") { ms.inAudio = true ms.inVideo = false parts := strings.Fields(line) if len(parts) >= 4 { var pt int if _, err := fmt.Sscanf(parts[3], "%d", &pt); err == nil { ch.audioPayloadType = pt } } } else if strings.HasPrefix(line, "a=fmtp:") { // Format: a=fmtp: var pt int if _, err := fmt.Sscanf(line, "a=fmtp:%d ", &pt); err == nil { params := strings.SplitN(line, " ", 2) if len(params) == 2 && ms.inVideo && pt == ch.videoPayloadType { ch.videoFmtp = params[1] } } } else if strings.HasPrefix(line, "a=rtpmap:") { // Format: a=rtpmap: /[/] var pt int var rest string if _, err := fmt.Sscanf(line, "a=rtpmap:%d %s", &pt, &rest); err != nil { continue } parts := strings.Split(rest, "/") codec := parts[0] var clk int if len(parts) >= 2 { fmt.Sscanf(parts[1], "%d", &clk) } var channels int = 1 if len(parts) >= 3 { fmt.Sscanf(parts[2], "%d", &channels) } if ms.inVideo && pt == ch.videoPayloadType { ch.videoCodec = codec if clk > 0 { ch.videoClockRate = clk } } else if ms.inAudio && pt == ch.audioPayloadType { ch.audioCodec = codec if clk > 0 { ch.audioClockRate = clk } ch.audioChannels = channels } } } } // --- Server implementation --- type server struct { addr string token string logger log.Logger collector session.Collector provider WebRTCProvider rtspRelay *rtspRelay httpServer *http.Server channels map[string]*channel lock sync.RWMutex } // New creates a new WHIP server from the given config. func New(config Config) (Server, error) { if config.Addr == "" { return nil, fmt.Errorf("whip: listen address is required") } s := &server{ addr: config.Addr, token: config.Token, logger: config.Logger, collector: config.Collector, provider: config.WebRTCProvider, channels: make(map[string]*channel), } if config.RTSPRelayAddr != "" { s.rtspRelay = newRTSPRelay(config.RTSPRelayAddr, config.Logger) } if s.logger == nil { s.logger = log.New("") } if s.collector == nil { s.collector = session.NewNullCollector() } mux := http.NewServeMux() mux.HandleFunc("/whip/", s.dispatch) s.httpServer = &http.Server{ Addr: s.addr, Handler: mux, ReadHeaderTimeout: 10 * time.Second, IdleTimeout: 60 * time.Second, } return s, nil } // ListenAndServe starts the WHIP HTTP server and optionally the RTSP relay. func (s *server) ListenAndServe() error { if s.rtspRelay != nil { go func() { if err := s.rtspRelay.ListenAndServe(); err != nil { s.logger.Error().WithField("error", err).Log("RTSP relay error") } }() } s.logger.Info().Log("Server started") err := s.httpServer.ListenAndServe() if err == http.ErrServerClosed { return ErrServerClosed } return err } // Close shuts down the WHIP server and the RTSP relay. func (s *server) Close() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() s.httpServer.Shutdown(ctx) if s.rtspRelay != nil { s.rtspRelay.Close() } s.lock.Lock() defer s.lock.Unlock() for _, ch := range s.channels { ch.close() } s.channels = make(map[string]*channel) s.logger.Info().Log("Server closed") } // Channels returns the current active publishers and subscriber counts. func (s *server) Channels() Channels { s.lock.RLock() defer s.lock.RUnlock() c := Channels{ Publisher: make(map[string]time.Time), Subscribers: make(map[string]int), } for name, ch := range s.channels { if ch.isPublishing() { c.Publisher[name] = ch.publishedSince() } if count := ch.whepSubCount(); count > 0 { c.Subscribers[name] = count } } return c } // dispatch routes incoming HTTP requests to the appropriate handler. func (s *server) dispatch(w http.ResponseWriter, r *http.Request) { // Trim "/whip/" prefix. path := strings.TrimPrefix(r.URL.Path, "/whip/") path = strings.TrimSuffix(path, "/") // GET /whip/{name}/sdp → SDP for FFmpeg to consume the relay. if strings.HasSuffix(path, "/sdp") && r.Method == http.MethodGet { name := strings.TrimSuffix(path, "/sdp") if !isValidStreamName(name) { http.Error(w, "invalid stream name", http.StatusBadRequest) return } s.handleSDPRead(w, r, name) return } // OPTIONS: CORS preflight for browser WHEP clients. if r.Method == http.MethodOptions { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") w.Header().Set("Access-Control-Allow-Methods", "POST, DELETE, OPTIONS") w.Header().Set("Access-Control-Expose-Headers", "Location") w.Header().Set("Access-Control-Max-Age", "86400") w.WriteHeader(http.StatusNoContent) return } // POST /whip/{name}/whep → WHEP subscribe (RFC draft-ietf-wish-whep). if strings.HasSuffix(path, "/whep") && r.Method == http.MethodPost { name := strings.TrimSuffix(path, "/whep") if !isValidStreamName(name) { http.Error(w, "invalid stream name", http.StatusBadRequest) return } if s.token != "" && !s.checkToken(r) { w.Header().Set("WWW-Authenticate", `Bearer realm="WHEP"`) http.Error(w, "Unauthorized", http.StatusUnauthorized) return } s.handleWHEP(w, r, name) return } // DELETE /whip/{name}/whep/{subid} → WHEP unsubscribe. if strings.Contains(path, "/whep/") && r.Method == http.MethodDelete { if s.token != "" && !s.checkToken(r) { w.Header().Set("WWW-Authenticate", `Bearer realm="WHEP"`) http.Error(w, "Unauthorized", http.StatusUnauthorized) return } parts := strings.SplitN(path, "/whep/", 2) if len(parts) == 2 && isValidStreamName(parts[0]) { s.handleWHEPDelete(w, r, parts[0], parts[1]) return } http.Error(w, "invalid path", http.StatusBadRequest) return } // All other methods require token validation. if s.token != "" && !s.checkToken(r) { w.Header().Set("WWW-Authenticate", `Bearer realm="WHIP"`) http.Error(w, "Unauthorized", http.StatusUnauthorized) return } name := path if !isValidStreamName(name) { http.Error(w, "invalid stream name", http.StatusBadRequest) return } switch r.Method { case http.MethodPost: s.handlePublish(w, r, name) case http.MethodDelete: s.handleDelete(w, r, name) case http.MethodPatch: // Trickle ICE (RFC 9328 §4.3) — respond 405 if not supported. if s.provider == nil { http.Error(w, "trickle ICE not supported in plain-RTP mode", http.StatusMethodNotAllowed) return } http.Error(w, "trickle ICE not implemented", http.StatusNotImplemented) default: http.Error(w, "method not allowed", http.StatusMethodNotAllowed) } } // handleSDPRead serves the SDP file that FFmpeg uses to read the WHIP relay stream. // The relay sockets (and thus their ports) are pre-allocated here if the channel // does not yet exist, so that FFmpeg can bind to them before a WHIP client arrives. // If a publisher is active, this handler waits up to 5 seconds for in-band SPS/PPS // to be captured so that FFmpeg can determine the video resolution immediately. func (s *server) handleSDPRead(w http.ResponseWriter, r *http.Request, name string) { ch := s.getOrCreateChannel(name) // If a publisher is active, wait for SPS/PPS to be snooped from the RTP stream. if ch.isPublishing() { deadline := time.Now().Add(5 * time.Second) for time.Now().Before(deadline) { ch.lock.RLock() done := ch.spropped ch.lock.RUnlock() if done { break } time.Sleep(50 * time.Millisecond) } } sdp := ch.ffmpegSDP() w.Header().Set("Content-Type", "application/sdp") w.Header().Set("Cache-Control", "no-cache") w.WriteHeader(http.StatusOK) io.WriteString(w, sdp) } // handlePublish handles the WHIP POST request (SDP offer/answer exchange). func (s *server) handlePublish(w http.ResponseWriter, r *http.Request, name string) { ct := r.Header.Get("Content-Type") if !strings.Contains(ct, "application/sdp") { http.Error(w, "Content-Type must be application/sdp", http.StatusUnsupportedMediaType) return } body, err := io.ReadAll(io.LimitReader(r.Body, 16*1024)) if err != nil { http.Error(w, "failed to read SDP offer", http.StatusBadRequest) return } offerSDP := string(body) // Identify the client IP so we can send RTP back on the same interface. clientIP, _, _ := net.SplitHostPort(r.RemoteAddr) serverIP := s.serverIPFor(clientIP) ch := s.getOrCreateChannel(name) answerSDP, _, _, err := ch.startRx(serverIP, s.provider, offerSDP) if err != nil { s.logger.Error().WithField("stream", name).WithField("error", err).Log("Failed to start WHIP session") http.Error(w, "internal server error", http.StatusInternalServerError) return } s.logger.Info().WithField("stream", name).WithField("client", clientIP).Log("WHIP publisher connected") // Respond per RFC 9328: 201 Created with Location header and SDP answer. location := "/whip/" + name w.Header().Set("Content-Type", "application/sdp") w.Header().Set("Location", location) w.WriteHeader(http.StatusCreated) io.WriteString(w, answerSDP) } // handleDelete ends a WHIP session (RFC 9328 §4.2). func (s *server) handleDelete(w http.ResponseWriter, r *http.Request, name string) { s.lock.RLock() ch, ok := s.channels[name] s.lock.RUnlock() if !ok || !ch.isPublishing() { http.Error(w, "stream not found", http.StatusNotFound) return } ch.stopRx() clientIP, _, _ := net.SplitHostPort(r.RemoteAddr) s.logger.Info().WithField("stream", name).WithField("client", clientIP).Log("WHIP publisher disconnected") w.WriteHeader(http.StatusOK) } // getOrCreateChannel returns the channel for the given name, creating it if needed. func (s *server) getOrCreateChannel(name string) *channel { s.lock.Lock() defer s.lock.Unlock() ch, ok := s.channels[name] if !ok { var err error ch, err = newChannel(name, s.collector) if err != nil { s.logger.Error().WithField("stream", name).WithField("error", err).Log("Failed to create channel") // Return a dummy channel rather than nil to avoid panics; it won't relay. ch = &channel{name: name, whepSubs: make(map[string]whepSubscriber)} } // Attach the RTSP relay so the channel can forward packets to RTSP consumers. ch.rtspRelay = s.rtspRelay s.channels[name] = ch } return ch } // currentCodecParams returns the codec parameters for this channel as an RTSPCodecParams. // Caller must hold ch.lock (at least read). func (ch *channel) currentCodecParams() RTSPCodecParams { return RTSPCodecParams{ VideoPayloadType: ch.videoPayloadType, VideoCodec: ch.videoCodec, VideoClockRate: ch.videoClockRate, VideoFmtp: ch.videoFmtp, AudioPayloadType: ch.audioPayloadType, AudioCodec: ch.audioCodec, AudioClockRate: ch.audioClockRate, AudioChannels: ch.audioChannels, } } // checkToken validates the bearer token from the Authorization header or ?token= query param. func (s *server) checkToken(r *http.Request) bool { if t := r.URL.Query().Get("token"); t != "" { return t == s.token } auth := r.Header.Get("Authorization") if strings.HasPrefix(auth, "Bearer ") { return strings.TrimPrefix(auth, "Bearer ") == s.token } return false } // serverIPFor returns the server-side IP to use in SDP based on the client IP. // For loopback clients, returns 127.0.0.1; for external clients returns 0.0.0.0 // (the OS will choose an appropriate local address when binding). func (s *server) serverIPFor(clientIP string) string { ip := net.ParseIP(clientIP) if ip == nil || ip.IsLoopback() { return "127.0.0.1" } return "0.0.0.0" } // handleWHEP handles a WHEP subscription request (RFC draft-ietf-wish-whep). // The browser POSTs an SDP offer; the server creates a pion PeerConnection, // adds sendonly tracks fed from the publisher relay, and returns the SDP answer. func (s *server) handleWHEP(w http.ResponseWriter, r *http.Request, name string) { ct := r.Header.Get("Content-Type") if !strings.Contains(ct, "application/sdp") { http.Error(w, "Content-Type must be application/sdp", http.StatusUnsupportedMediaType) return } body, err := io.ReadAll(io.LimitReader(r.Body, 16*1024)) if err != nil { http.Error(w, "failed to read SDP offer", http.StatusBadRequest) return } ch := s.getOrCreateChannel(name) ch.lock.RLock() params := whepCodecParams{ VideoMime: "video/" + ch.videoCodec, VideoClockRate: uint32(ch.videoClockRate), VideoFmtp: ch.videoFmtp, VideoPT: uint8(ch.videoPayloadType), AudioMime: "audio/" + ch.audioCodec, AudioClockRate: uint32(ch.audioClockRate), AudioChannels: uint16(ch.audioChannels), AudioPT: uint8(ch.audioPayloadType), } ch.lock.RUnlock() subID := fmt.Sprintf("%x", time.Now().UnixNano()) sess, answerSDP, err := newWHEPPionSession(subID, string(body), params) if err != nil { s.logger.Error().WithField("stream", name).WithField("error", err).Log("Failed to create WHEP session") http.Error(w, "failed to create WHEP session", http.StatusInternalServerError) return } ch.addWHEPSub(sess) clientIP, _, _ := net.SplitHostPort(r.RemoteAddr) s.logger.Info().WithField("stream", name).WithField("subscriber", subID).WithField("client", clientIP).Log("WHEP subscriber connected") // Clean up automatically when the pion session closes. go func() { <-sess.Done() ch.removeWHEPSub(subID) s.logger.Info().WithField("stream", name).WithField("subscriber", subID).Log("WHEP subscriber disconnected") }() w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Expose-Headers", "Location") w.Header().Set("Content-Type", "application/sdp") w.Header().Set("Location", "/whip/"+name+"/whep/"+subID) w.WriteHeader(http.StatusCreated) io.WriteString(w, answerSDP) } // handleWHEPDelete ends a specific WHEP subscription. func (s *server) handleWHEPDelete(w http.ResponseWriter, r *http.Request, name, subID string) { s.lock.RLock() ch, ok := s.channels[name] s.lock.RUnlock() if !ok { http.Error(w, "stream not found", http.StatusNotFound) return } ch.whepLock.Lock() sub, ok := ch.whepSubs[subID] if !ok { ch.whepLock.Unlock() http.Error(w, "subscriber not found", http.StatusNotFound) return } delete(ch.whepSubs, subID) ch.whepLock.Unlock() sub.Close() w.WriteHeader(http.StatusOK) } // isValidStreamName checks that a stream name contains only safe characters. func isValidStreamName(name string) bool { if name == "" { return false } for _, c := range name { if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.') { return false } } return true } // isNetTimeout returns true for Go net timeout errors. func isNetTimeout(err error) bool { if ne, ok := err.(net.Error); ok { return ne.Timeout() } return false }