From c9304b7b63bb9e54cc29ccd77e3955f30e74dc26 Mon Sep 17 00:00:00 2001 From: Cesar Mendivil Date: Tue, 17 Mar 2026 20:28:06 -0700 Subject: [PATCH] feat: add RTSP relay support to WHIP server - Introduced a new RTSP relay server that allows WHIP streams to be served over RTSP. - Added `RTSPRelayAddr` configuration option to enable the RTSP relay. - Implemented methods for updating codec parameters and forwarding RTP packets to RTSP clients. - Enhanced the WHIP server to handle RTSP connections and manage multiple clients. - Added comprehensive tests for RTSP relay functionality, including OPTIONS, DESCRIBE, SETUP, PLAY, and TEARDOWN requests. --- .gitignore | 2 +- Dockerfile.bundle | 16 +- app/api/api.go | 18 ++ config/config.go | 1 + config/data.go | 6 + restream/replace/replace.go | 2 +- restream/restream.go | 24 ++ whip/pion_whep.go | 84 +++++- whip/rtsp_relay.go | 501 ++++++++++++++++++++++++++++++++++++ whip/rtsp_relay_test.go | 401 +++++++++++++++++++++++++++++ whip/whip.go | 83 +++++- 11 files changed, 1118 insertions(+), 20 deletions(-) create mode 100644 whip/rtsp_relay.go create mode 100644 whip/rtsp_relay_test.go diff --git a/.gitignore b/.gitignore index ca91bca4..f54ca393 100644 --- a/.gitignore +++ b/.gitignore @@ -10,7 +10,7 @@ *.ts *.ts.tmp *.m3u8 - +docker/ *.mp4 *.avi *.flv diff --git a/Dockerfile.bundle b/Dockerfile.bundle index 52af052c..ab55992e 100644 --- a/Dockerfile.bundle +++ b/Dockerfile.bundle @@ -1,5 +1,4 @@ ARG CORE_IMAGE=datarhei/base:alpine-core-latest - ARG FFMPEG_IMAGE=datarhei/base:alpine-ffmpeg-latest FROM $CORE_IMAGE as core @@ -8,17 +7,30 @@ FROM $FFMPEG_IMAGE COPY --from=core /core /core -RUN ffmpeg -buildconf +RUN chmod +x /core/bin/run.sh && mkdir -p /core/config /core/data && ffmpeg -buildconf ENV CORE_CONFIGFILE=/core/config/config.json ENV CORE_STORAGE_DISK_DIR=/core/data ENV CORE_DB_DIR=/core/config +# Enable most optional services by default +ENV CORE_WHIP_ENABLE=true +ENV CORE_WHIP_ADDRESS=:8555 +ENV CORE_WHIP_RTSP_ADDRESS=:8554 +ENV CORE_API_AUTH_ENABLE=false +ENV CORE_RTMP_ENABLE=true +ENV CORE_SRT_ENABLE=true +ENV CORE_PLAYOUT_ENABLE=true +ENV CORE_METRICS_ENABLE=true +ENV CORE_METRICS_ENABLE_PROMETHEUS=true + EXPOSE 8080/tcp EXPOSE 8181/tcp EXPOSE 1935/tcp EXPOSE 1936/tcp EXPOSE 6000/udp +EXPOSE 8555/tcp +EXPOSE 8554/tcp VOLUME ["/core/data", "/core/config"] ENTRYPOINT ["/core/bin/run.sh"] diff --git a/app/api/api.go b/app/api/api.go index 244c6728..4653d7ef 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -599,6 +599,23 @@ func (a *api) start() error { } return url }, nil) + + // WHIP-RTSP: {whip-rtsp} expands to the internal RTSP relay URL for a WHIP stream. + // Multiple FFmpeg egress processes can connect simultaneously (unlike the single-consumer + // plain-RTP relay used by {whip}). + // Usage in a process config input address: {whip-rtsp,name=mystream} + // The -rtsp_transport tcp option is injected automatically by restream.go. + a.replacer.RegisterTemplateFunc("whip-rtsp", func(config *restreamapp.Config, section string) string { + rtspAddr := cfg.WHIP.RTSPAddress + if rtspAddr == "" { + rtspAddr = ":8554" + } + _, rtspPort, _ := gonet.SplitHostPort(rtspAddr) + if rtspPort == "" { + rtspPort = "8554" + } + return "rtsp://127.0.0.1:" + rtspPort + "/live/{name}" + }, nil) } filesystems := []fs.Filesystem{ @@ -962,6 +979,7 @@ func (a *api) start() error { Logger: a.log.logger.whip, Collector: a.sessions.Collector("whip"), WebRTCProvider: whip.NewPionProvider(), + RTSPRelayAddr: cfg.WHIP.RTSPAddress, }) if err != nil { return fmt.Errorf("unable to create WHIP server: %w", err) diff --git a/config/config.go b/config/config.go index 7068ec98..74e6ff90 100644 --- a/config/config.go +++ b/config/config.go @@ -232,6 +232,7 @@ func (d *Config) init() { d.vars.Register(value.NewBool(&d.WHIP.Enable, false), "whip.enable", "CORE_WHIP_ENABLE", nil, "Enable WHIP (WebRTC HTTP Ingestion Protocol) server", false, false) d.vars.Register(value.NewAddress(&d.WHIP.Address, ":8555"), "whip.address", "CORE_WHIP_ADDRESS", nil, "WHIP server HTTP listen address", false, false) d.vars.Register(value.NewString(&d.WHIP.Token, ""), "whip.token", "CORE_WHIP_TOKEN", nil, "WHIP bearer token for publishing clients", false, true) + d.vars.Register(value.NewAddress(&d.WHIP.RTSPAddress, ""), "whip.rtsp_address", "CORE_WHIP_RTSP_ADDRESS", nil, "WHIP internal RTSP relay TCP listen address (e.g. :8554); leave empty to disable", false, false) // FFmpeg d.vars.Register(value.NewExec(&d.FFmpeg.Binary, "ffmpeg", d.fs), "ffmpeg.binary", "CORE_FFMPEG_BINARY", nil, "Path to ffmpeg binary", true, false) diff --git a/config/data.go b/config/data.go index ce094828..dd9bdc2f 100644 --- a/config/data.go +++ b/config/data.go @@ -126,6 +126,12 @@ type Data struct { Address string `json:"address"` // Token is an optional bearer token required from WHIP publishing clients. Token string `json:"token"` + // RTSPAddress is the TCP listen address for the internal RTSP relay server, + // e.g. ":8554". When set, all WHIP streams are also accessible at + // rtsp://127.0.0.1:/live/, enabling multiple FFmpeg egress + // processes to consume the same source independently via {whip-rtsp}. + // Leave empty to disable. + RTSPAddress string `json:"rtsp_address"` } `json:"whip"` FFmpeg struct { Binary string `json:"binary"` diff --git a/restream/replace/replace.go b/restream/replace/replace.go index e57cb855..2ca3b689 100644 --- a/restream/replace/replace.go +++ b/restream/replace/replace.go @@ -51,7 +51,7 @@ type replacer struct { func New() Replacer { r := &replacer{ templates: make(map[string]template), - re: regexp.MustCompile(`{([a-z]+(?::[0-9A-Za-z]+)?)(?:\^(.))?(?:,(.*?))?}`), + re: regexp.MustCompile(`{([a-z]+(?:-[a-z]+)*(?::[0-9A-Za-z]+)?)(?:\^(.))?(?:,(.*?))?}`), templateRe: regexp.MustCompile(`{([a-z:]+)}`), } diff --git a/restream/restream.go b/restream/restream.go index 40ef4406..951ce203 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -33,6 +33,10 @@ import ( // separated params), so we normalize before expansion. var whipPlaceholderRe = regexp.MustCompile(`\{whip:name=([^}]+)\}`) +// whipRtspPlaceholderRe matches the {whip-rtsp:name=} UI format and +// normalizes it to {whip-rtsp,name=} for the replacer engine. +var whipRtspPlaceholderRe = regexp.MustCompile(`\{whip-rtsp:name=([^}]+)\}`) + // The Restreamer interface type Restreamer interface { ID() string // ID of this instance @@ -1555,6 +1559,11 @@ func resolvePlaceholders(config *app.Config, r replace.Replacer) { input.Address = whipPlaceholderRe.ReplaceAllString(input.Address, "{whip,name=$1}") input.Address = r.Replace(input.Address, "whip", "", vars, config, "input") + // Normalize {whip-rtsp:name=X} → {whip-rtsp,name=X} (replacer format) + whipRtspWasExpanded := whipRtspPlaceholderRe.MatchString(input.Address) + input.Address = whipRtspPlaceholderRe.ReplaceAllString(input.Address, "{whip-rtsp,name=$1}") + input.Address = r.Replace(input.Address, "whip-rtsp", "", vars, config, "input") + // WHIP SDP relay URLs (http://…/whip/…/sdp) require FFmpeg to open nested // rtp:// and udp:// sub-protocols, which are blocked by default when the // top-level protocol is HTTP. Inject -protocol_whitelist automatically so @@ -1573,6 +1582,21 @@ func resolvePlaceholders(config *app.Config, r replace.Replacer) { } } + // WHIP RTSP relay URLs need RTP/AVP/TCP (interleaved) transport. + // Inject -rtsp_transport tcp automatically so FFmpeg uses the correct mode. + if whipRtspWasExpanded && strings.HasPrefix(input.Address, "rtsp://") { + hasRtspTransport := false + for _, opt := range input.Options { + if strings.Contains(opt, "rtsp_transport") { + hasRtspTransport = true + break + } + } + if !hasRtspTransport { + input.Options = append([]string{"-rtsp_transport", "tcp"}, input.Options...) + } + } + for j, option := range input.Options { // Replace any known placeholders option = r.Replace(option, "inputid", input.ID, nil, nil, "input") diff --git a/whip/pion_whep.go b/whip/pion_whep.go index b5efd32b..794a64e9 100644 --- a/whip/pion_whep.go +++ b/whip/pion_whep.go @@ -2,12 +2,30 @@ package whip import ( "fmt" + "strings" "sync" "github.com/pion/interceptor" "github.com/pion/webrtc/v3" ) +// whepCodecParams describes the exact RTP codec the WHIP publisher (OBS) is sending. +// Using these instead of RegisterDefaultCodecs ensures the browser negotiates +// the same payload type numbers OBS uses, which is mandatory because +// TrackLocalStaticRTP does NOT rewrite the PT in packets — it only rewrites SSRC. +// Mismatch: OBS sends PT 96, browser expects PT 102 → browser silently discards +// all H264 packets → no video. Audio works by coincidence (Opus is PT 111 in both). +type whepCodecParams struct { + VideoMime string // "video/H264" + VideoClockRate uint32 + VideoFmtp string // e.g. "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=640034" + VideoPT uint8 // OBS's H264 payload type, typically 96 + AudioMime string // "audio/opus" + AudioClockRate uint32 + AudioChannels uint16 + AudioPT uint8 // OBS's Opus payload type, typically 111 +} + // whepPionSession is a single WHEP subscriber backed by a pion PeerConnection. // It receives plain RTP packets (already decrypted by the WHIP publisher side) // via WriteVideo/WriteAudio and forwards them to the remote browser via @@ -23,12 +41,44 @@ type whepPionSession struct { // newWHEPPionSession creates a new WHEP subscriber PeerConnection. // offerSDP is the SDP offer from the browser. -// videoMime: e.g. "video/H264"; audioMime: e.g. "audio/opus". -// Returns the session and the SDP answer to send back to the browser. -func newWHEPPionSession(id, offerSDP, videoMime, audioMime string) (*whepPionSession, string, error) { +// params must match the publisher's codec configuration exactly so that the +// browser negotiates the same payload type numbers the publisher uses. +func newWHEPPionSession(id, offerSDP string, params whepCodecParams) (*whepPionSession, string, error) { m := &webrtc.MediaEngine{} - if err := m.RegisterDefaultCodecs(); err != nil { - return nil, "", fmt.Errorf("whep: register codecs: %w", err) + + // Register the exact video codec OBS is sending. + // Using the publisher's PT (e.g. 96) forces the SDP answer to contain + // a=rtpmap:96 H264/90000, so the browser maps PT 96 → H264 and accepts + // the raw RTP packets that arrive unchanged from OBS. + videoFmtp := normalizeH264Fmtp(params.VideoMime, params.VideoFmtp) + if err := m.RegisterCodec(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: params.VideoMime, + ClockRate: params.VideoClockRate, + SDPFmtpLine: videoFmtp, + RTCPFeedback: []webrtc.RTCPFeedback{ + {Type: "nack", Parameter: "pli"}, + }, + }, + PayloadType: webrtc.PayloadType(params.VideoPT), + }, webrtc.RTPCodecTypeVideo); err != nil { + return nil, "", fmt.Errorf("whep: register video codec: %w", err) + } + + audioFmtp := "" + if strings.Contains(strings.ToLower(params.AudioMime), "opus") { + audioFmtp = "minptime=10;useinbandfec=1" + } + if err := m.RegisterCodec(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: params.AudioMime, + ClockRate: params.AudioClockRate, + Channels: params.AudioChannels, + SDPFmtpLine: audioFmtp, + }, + PayloadType: webrtc.PayloadType(params.AudioPT), + }, webrtc.RTPCodecTypeAudio); err != nil { + return nil, "", fmt.Errorf("whep: register audio codec: %w", err) } ir := &interceptor.Registry{} @@ -43,7 +93,7 @@ func newWHEPPionSession(id, offerSDP, videoMime, audioMime string) (*whepPionSes } videoTrack, err := webrtc.NewTrackLocalStaticRTP( - webrtc.RTPCodecCapability{MimeType: videoMime}, + webrtc.RTPCodecCapability{MimeType: params.VideoMime, ClockRate: params.VideoClockRate, SDPFmtpLine: videoFmtp}, "video", "whep-video-"+id, ) if err != nil { @@ -52,7 +102,7 @@ func newWHEPPionSession(id, offerSDP, videoMime, audioMime string) (*whepPionSes } audioTrack, err := webrtc.NewTrackLocalStaticRTP( - webrtc.RTPCodecCapability{MimeType: audioMime}, + webrtc.RTPCodecCapability{MimeType: params.AudioMime, ClockRate: params.AudioClockRate, Channels: params.AudioChannels}, "audio", "whep-audio-"+id, ) if err != nil { @@ -109,6 +159,26 @@ func newWHEPPionSession(id, offerSDP, videoMime, audioMime string) (*whepPionSes return sess, pc.LocalDescription().SDP, nil } +// normalizeH264Fmtp ensures H264 fmtp always has level-asymmetry-allowed=1 and +// packetization-mode=1. level-asymmetry-allowed lets the browser decode a +// higher profile than it advertised. packetization-mode=1 is required for FU-A +// fragmented NALUs (how OBS sends large H264 frames). +func normalizeH264Fmtp(mime, fmtp string) string { + if !strings.Contains(strings.ToLower(mime), "h264") { + return fmtp + } + if fmtp == "" { + return "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f" + } + if !strings.Contains(fmtp, "packetization-mode") { + fmtp += ";packetization-mode=1" + } + if !strings.Contains(fmtp, "level-asymmetry-allowed") { + fmtp = "level-asymmetry-allowed=1;" + fmtp + } + return fmtp +} + // ID returns the unique subscriber identifier. func (s *whepPionSession) ID() string { return s.id } diff --git a/whip/rtsp_relay.go b/whip/rtsp_relay.go new file mode 100644 index 00000000..f95f180d --- /dev/null +++ b/whip/rtsp_relay.go @@ -0,0 +1,501 @@ +// Package whip provides a minimal RFC 2326 RTSP relay server. +// +// The RTSP relay accepts TCP connections from FFmpeg (or any RTSP consumer) +// and forwards RTP packets from active WHIP publishers using RTP/AVP/TCP +// interleaved mode (RFC 2326 §10.12). +// +// Each stream is served at rtsp://:/live/. +// Multiple consumers may connect to the same stream simultaneously. +// Only RTP/AVP/TCP transport (interleaved binary) is supported. +package whip + +import ( + "bufio" + "fmt" + "io" + "net" + "strconv" + "strings" + "sync" + "time" + + "github.com/datarhei/core/v16/log" +) + +// RTSPCodecParams holds codec parameters for a single WHIP stream. +// These are used to build the RTSP DESCRIBE response (SDP). +type RTSPCodecParams struct { + VideoPayloadType int + VideoCodec string // e.g. "H264" + VideoClockRate int + VideoFmtp string // e.g. "sprop-parameter-sets=...;packetization-mode=1" + AudioPayloadType int + AudioCodec string // e.g. "opus" + AudioClockRate int + AudioChannels int +} + +// rtspRelay is a minimal TCP RTSP server that relays WHIP streams to multiple +// FFmpeg consumers using RTP/AVP/TCP interleaved mode (RFC 2326 §10.12). +type rtspRelay struct { + addr string + ln net.Listener + streams sync.Map // name -> *rtspStreamEntry + logger log.Logger + done chan struct{} + wg sync.WaitGroup +} + +// rtspStreamEntry tracks per-stream codec params and connected consumers. +type rtspStreamEntry struct { + name string + mu sync.RWMutex + codecs RTSPCodecParams + clients map[string]*rtspRelayClient +} + +// rtspRelayClient represents one connected RTSP consumer (e.g. an FFmpeg process). +type rtspRelayClient struct { + id string + conn net.Conn + videoTrack int // interleaved channel index for video RTP + audioTrack int // interleaved channel index for audio RTP + mu sync.Mutex + closed bool +} + +// newRTSPRelay creates a new relay server bound to addr. Call ListenAndServe to start. +func newRTSPRelay(addr string, logger log.Logger) *rtspRelay { + if logger == nil { + logger = log.New("") + } + return &rtspRelay{ + addr: addr, + logger: logger, + done: make(chan struct{}), + } +} + +// ListenAndServe starts the RTSP relay and blocks until Close is called. +func (r *rtspRelay) ListenAndServe() error { + ln, err := net.Listen("tcp", r.addr) + if err != nil { + return fmt.Errorf("rtsp relay: listen %q: %w", r.addr, err) + } + r.ln = ln + r.logger.Info().WithField("addr", r.addr).Log("RTSP relay started") + + for { + conn, err := ln.Accept() + if err != nil { + select { + case <-r.done: + return nil + default: + r.logger.Error().WithField("error", err).Log("RTSP relay: accept error") + continue + } + } + r.wg.Add(1) + go func() { + defer r.wg.Done() + r.handleConn(conn) + }() + } +} + +// Close shuts down the relay and waits for all connections to finish. +func (r *rtspRelay) Close() { + select { + case <-r.done: + default: + close(r.done) + } + if r.ln != nil { + r.ln.Close() + } + r.wg.Wait() +} + +// UpdateCodecs updates the codec parameters used in DESCRIBE responses for name. +func (r *rtspRelay) UpdateCodecs(name string, params RTSPCodecParams) { + e := r.getOrCreateStream(name) + e.mu.Lock() + e.codecs = params + e.mu.Unlock() +} + +// WriteVideo forwards a video RTP packet to all consumers watching name. +func (r *rtspRelay) WriteVideo(name string, pkt []byte) { + v, ok := r.streams.Load(name) + if !ok { + return + } + e := v.(*rtspStreamEntry) + e.mu.RLock() + clients := make([]*rtspRelayClient, 0, len(e.clients)) + for _, c := range e.clients { + clients = append(clients, c) + } + e.mu.RUnlock() + for _, c := range clients { + c.writeInterleaved(c.videoTrack, pkt) + } +} + +// WriteAudio forwards an audio RTP packet to all consumers watching name. +func (r *rtspRelay) WriteAudio(name string, pkt []byte) { + v, ok := r.streams.Load(name) + if !ok { + return + } + e := v.(*rtspStreamEntry) + e.mu.RLock() + clients := make([]*rtspRelayClient, 0, len(e.clients)) + for _, c := range e.clients { + clients = append(clients, c) + } + e.mu.RUnlock() + for _, c := range clients { + c.writeInterleaved(c.audioTrack, pkt) + } +} + +// getOrCreateStream returns the named stream entry, creating it with sensible defaults if needed. +func (r *rtspRelay) getOrCreateStream(name string) *rtspStreamEntry { + e := &rtspStreamEntry{ + name: name, + clients: make(map[string]*rtspRelayClient), + codecs: RTSPCodecParams{ + VideoPayloadType: 96, + VideoCodec: "H264", + VideoClockRate: 90000, + AudioPayloadType: 111, + AudioCodec: "opus", + AudioClockRate: 48000, + AudioChannels: 2, + }, + } + actual, _ := r.streams.LoadOrStore(name, e) + return actual.(*rtspStreamEntry) +} + +// writeInterleaved sends one RFC 2326 §10.12 interleaved binary packet over the TCP connection. +// If the write times out (slow consumer), the connection is closed to avoid head-of-line blocking. +func (c *rtspRelayClient) writeInterleaved(channel int, data []byte) { + if len(data) > 65535 { + return + } + c.mu.Lock() + defer c.mu.Unlock() + if c.closed { + return + } + hdr := [4]byte{'$', byte(channel), byte(len(data) >> 8), byte(len(data))} + c.conn.SetWriteDeadline(time.Now().Add(150 * time.Millisecond)) + if _, err := c.conn.Write(hdr[:]); err != nil { + c.closed = true + c.conn.Close() + return + } + if _, err := c.conn.Write(data); err != nil { + c.closed = true + c.conn.Close() + return + } + c.conn.SetWriteDeadline(time.Time{}) +} + +// handleConn processes one RTSP TCP connection through OPTIONS → DESCRIBE → SETUP → PLAY. +func (r *rtspRelay) handleConn(conn net.Conn) { + defer conn.Close() + + rdr := bufio.NewReader(conn) + sessionID := fmt.Sprintf("%016x", time.Now().UnixNano()) + + var entry *rtspStreamEntry + var client *rtspRelayClient + videoInterleaved := 0 + audioInterleaved := 2 + + for { + conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + method, reqURL, headers, err := rtspReadRequest(rdr) + conn.SetReadDeadline(time.Time{}) + if err != nil { + return + } + + cseq := headers["cseq"] + + switch method { + case "OPTIONS": + rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{ + "Public": "OPTIONS, DESCRIBE, SETUP, PLAY, TEARDOWN", + }, "") + + case "DESCRIBE": + name := rtspExtractName(reqURL) + if name == "" { + rtspWriteResponse(conn, 404, "Not Found", cseq, nil, "") + return + } + entry = r.getOrCreateStream(name) + sdp := entry.buildSDP() + rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{ + "Content-Type": "application/sdp", + "Content-Base": reqURL + "/", + "Content-Length": strconv.Itoa(len(sdp)), + }, sdp) + + case "SETUP": + transport := headers["transport"] + if !strings.Contains(strings.ToLower(transport), "tcp") { + // Only TCP interleaved supported; decline UDP politely. + rtspWriteResponse(conn, 461, "Unsupported Transport", cseq, nil, "") + return + } + isAudio := strings.HasSuffix(strings.TrimRight(reqURL, "/"), "track1") + ch := rtspParseInterleaved(transport) + if isAudio { + if ch >= 0 { + audioInterleaved = ch + } + rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{ + "Session": sessionID + ";timeout=60", + "Transport": fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d;mode=play", audioInterleaved, audioInterleaved+1), + }, "") + } else { + if ch >= 0 { + videoInterleaved = ch + } + rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{ + "Session": sessionID + ";timeout=60", + "Transport": fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d;mode=play", videoInterleaved, videoInterleaved+1), + }, "") + } + + case "PLAY": + if entry == nil { + rtspWriteResponse(conn, 455, "Method Not Valid in This State", cseq, nil, "") + return + } + client = &rtspRelayClient{ + id: sessionID, + conn: conn, + videoTrack: videoInterleaved, + audioTrack: audioInterleaved, + } + entry.mu.Lock() + entry.clients[sessionID] = client + entry.mu.Unlock() + + rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{ + "Session": sessionID, + }, "") + + r.logger.Info(). + WithField("stream", entry.name). + WithField("session", sessionID). + Log("RTSP client connected") + + // Stay connected: discard interleaved data, handle TEARDOWN. + for { + conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + b, readErr := rdr.ReadByte() + conn.SetReadDeadline(time.Time{}) + if readErr != nil { + break + } + if b == '$' { + // Discard interleaved binary data sent by the client (e.g. RTCP RR). + var meta [3]byte + if _, err := io.ReadFull(rdr, meta[:]); err != nil { + break + } + dataLen := int(meta[1])<<8 | int(meta[2]) + if dataLen > 0 { + if _, err := io.CopyN(io.Discard, rdr, int64(dataLen)); err != nil { + break + } + } + continue + } + rdr.UnreadByte() + m, _, innerHdrs, err := rtspReadRequest(rdr) + if err != nil || m == "TEARDOWN" { + if m == "TEARDOWN" { + rtspWriteResponse(conn, 200, "OK", innerHdrs["cseq"], map[string]string{ + "Session": sessionID, + }, "") + } + break + } + } + + entry.mu.Lock() + delete(entry.clients, sessionID) + entry.mu.Unlock() + + r.logger.Info(). + WithField("stream", entry.name). + WithField("session", sessionID). + Log("RTSP client disconnected") + return + + case "TEARDOWN": + if entry != nil { + entry.mu.Lock() + delete(entry.clients, sessionID) + entry.mu.Unlock() + } + rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{ + "Session": sessionID, + }, "") + return + + default: + rtspWriteResponse(conn, 405, "Method Not Allowed", cseq, nil, "") + } + } +} + +// buildSDP returns an SDP description for this stream suitable for RTSP DESCRIBE. +func (e *rtspStreamEntry) buildSDP() string { + e.mu.RLock() + c := e.codecs + e.mu.RUnlock() + + var sb strings.Builder + sb.WriteString("v=0\r\n") + sb.WriteString("o=- 1 1 IN IP4 127.0.0.1\r\n") + sb.WriteString("s=WHIP Live Stream\r\n") + sb.WriteString("c=IN IP4 127.0.0.1\r\n") + sb.WriteString("t=0 0\r\n") + + sb.WriteString(fmt.Sprintf("m=video 0 RTP/AVP %d\r\n", c.VideoPayloadType)) + sb.WriteString(fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", c.VideoPayloadType, c.VideoCodec, c.VideoClockRate)) + if c.VideoFmtp != "" { + sb.WriteString(fmt.Sprintf("a=fmtp:%d %s\r\n", c.VideoPayloadType, c.VideoFmtp)) + } else if strings.EqualFold(c.VideoCodec, "H264") { + sb.WriteString(fmt.Sprintf("a=fmtp:%d packetization-mode=1\r\n", c.VideoPayloadType)) + } + sb.WriteString("a=control:track0\r\n") + + sb.WriteString(fmt.Sprintf("m=audio 0 RTP/AVP %d\r\n", c.AudioPayloadType)) + if c.AudioChannels > 1 { + sb.WriteString(fmt.Sprintf("a=rtpmap:%d %s/%d/%d\r\n", c.AudioPayloadType, c.AudioCodec, c.AudioClockRate, c.AudioChannels)) + } else { + sb.WriteString(fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", c.AudioPayloadType, c.AudioCodec, c.AudioClockRate)) + } + sb.WriteString("a=control:track1\r\n") + + return sb.String() +} + +// rtspReadRequest reads one RTSP request (request line + headers) from rdr. +// Returns method, URL, lowercase-keyed headers map, and any error. +func rtspReadRequest(rdr *bufio.Reader) (method, reqURL string, headers map[string]string, err error) { + headers = make(map[string]string) + + // First line: "METHOD URL RTSP/1.0" + line, err := rdr.ReadString('\n') + if err != nil { + return + } + line = strings.TrimRight(line, "\r\n") + parts := strings.SplitN(line, " ", 3) + if len(parts) < 2 { + err = fmt.Errorf("invalid RTSP request line: %q", line) + return + } + method = strings.ToUpper(parts[0]) + reqURL = parts[1] + + // Headers until blank line. + for { + line, err = rdr.ReadString('\n') + if err != nil { + return + } + line = strings.TrimRight(line, "\r\n") + if line == "" { + break + } + idx := strings.IndexByte(line, ':') + if idx < 0 { + continue + } + key := strings.ToLower(strings.TrimSpace(line[:idx])) + val := strings.TrimSpace(line[idx+1:]) + headers[key] = val + } + return +} + +// rtspWriteResponse writes an RTSP/1.0 response to conn. +func rtspWriteResponse(conn net.Conn, code int, reason, cseq string, extra map[string]string, body string) { + var sb strings.Builder + sb.WriteString(fmt.Sprintf("RTSP/1.0 %d %s\r\n", code, reason)) + if cseq != "" { + sb.WriteString("CSeq: " + cseq + "\r\n") + } + sb.WriteString("Server: datarhei-core-whip/1.0\r\n") + sb.WriteString(fmt.Sprintf("Date: %s\r\n", time.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05 GMT"))) + for k, v := range extra { + sb.WriteString(k + ": " + v + "\r\n") + } + sb.WriteString("\r\n") + if body != "" { + sb.WriteString(body) + } + conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + conn.Write([]byte(sb.String())) //nolint:errcheck + conn.SetWriteDeadline(time.Time{}) +} + +// rtspExtractName extracts the stream name from an RTSP URL like +// rtsp://127.0.0.1:8554/live/mystream or rtsp://host/live/mystream/track0. +func rtspExtractName(rtspURL string) string { + u := rtspURL + // Strip scheme + host. + if idx := strings.Index(u, "://"); idx >= 0 { + u = u[idx+3:] + } + if slash := strings.Index(u, "/"); slash >= 0 { + u = u[slash+1:] + } + // Expect "live/[/trackN]" + if strings.HasPrefix(u, "live/") { + u = u[5:] + } + // Strip any trailing "/trackN" suffix. + if idx := strings.LastIndex(u, "/"); idx >= 0 { + tail := u[idx+1:] + if strings.HasPrefix(tail, "track") { + u = u[:idx] + } + } + // Strip query string. + if idx := strings.IndexByte(u, '?'); idx >= 0 { + u = u[:idx] + } + return u +} + +// rtspParseInterleaved parses the "interleaved=X-Y" field from an RTSP Transport +// header and returns X (the RTP channel index), or -1 if not present. +func rtspParseInterleaved(transport string) int { + lower := strings.ToLower(transport) + idx := strings.Index(lower, "interleaved=") + if idx < 0 { + return -1 + } + rest := transport[idx+12:] + parts := strings.SplitN(rest, "-", 2) + n, err := strconv.Atoi(strings.TrimSpace(parts[0])) + if err != nil { + return -1 + } + return n +} diff --git a/whip/rtsp_relay_test.go b/whip/rtsp_relay_test.go new file mode 100644 index 00000000..7f5ed3f2 --- /dev/null +++ b/whip/rtsp_relay_test.go @@ -0,0 +1,401 @@ +package whip + +import ( + "bufio" + "fmt" + "net" + "strings" + "testing" + "time" +) + +// dialRTSP dials the relay and returns a bufio.Reader + the raw conn. +func dialRTSP(t *testing.T, addr string) (net.Conn, *bufio.Reader) { + t.Helper() + conn, err := net.DialTimeout("tcp", addr, 3*time.Second) + if err != nil { + t.Fatalf("RTSP dial %s: %v", addr, err) + } + return conn, bufio.NewReader(conn) +} + +// sendRTSP writes an RTSP request and returns the parsed response status line + headers. +func sendRTSP(t *testing.T, conn net.Conn, rdr *bufio.Reader, request string) (status string, headers map[string]string) { + t.Helper() + conn.SetWriteDeadline(time.Now().Add(3 * time.Second)) + if _, err := conn.Write([]byte(request)); err != nil { + t.Fatalf("write RTSP request: %v", err) + } + conn.SetWriteDeadline(time.Time{}) + + headers = make(map[string]string) + conn.SetReadDeadline(time.Now().Add(3 * time.Second)) + line, err := rdr.ReadString('\n') + conn.SetReadDeadline(time.Time{}) + if err != nil { + t.Fatalf("read RTSP status: %v", err) + } + status = strings.TrimRight(line, "\r\n") + for { + conn.SetReadDeadline(time.Now().Add(3 * time.Second)) + hdr, err := rdr.ReadString('\n') + conn.SetReadDeadline(time.Time{}) + if err != nil { + t.Fatalf("read RTSP header: %v", err) + } + hdr = strings.TrimRight(hdr, "\r\n") + if hdr == "" { + break + } + idx := strings.IndexByte(hdr, ':') + if idx >= 0 { + k := strings.ToLower(strings.TrimSpace(hdr[:idx])) + v := strings.TrimSpace(hdr[idx+1:]) + headers[k] = v + } + } + // If there's a body (Content-Length), drain it. + if cl, ok := headers["content-length"]; ok { + n := 0 + fmt.Sscanf(cl, "%d", &n) + buf := make([]byte, n) + conn.SetReadDeadline(time.Now().Add(3 * time.Second)) + for read := 0; read < n; { + nn, err := rdr.Read(buf[read:]) + if err != nil { + t.Fatalf("drain body: %v", err) + } + read += nn + } + conn.SetReadDeadline(time.Time{}) + headers["__body__"] = string(buf) + } + return status, headers +} + +// readInterleaved reads one RFC 2326 §10.12 interleaved binary frame. +// Returns channel index and payload. +func readInterleaved(t *testing.T, conn net.Conn, rdr *bufio.Reader) (channel int, payload []byte) { + t.Helper() + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + defer conn.SetReadDeadline(time.Time{}) + + // '$' marker + b, err := rdr.ReadByte() + if err != nil { + t.Fatalf("readInterleaved: read '$': %v", err) + } + if b != '$' { + t.Fatalf("readInterleaved: expected '$', got %02x", b) + } + ch, err := rdr.ReadByte() + if err != nil { + t.Fatalf("readInterleaved: read channel: %v", err) + } + hi, err := rdr.ReadByte() + if err != nil { + t.Fatalf("readInterleaved: read len hi: %v", err) + } + lo, err := rdr.ReadByte() + if err != nil { + t.Fatalf("readInterleaved: read len lo: %v", err) + } + length := int(hi)<<8 | int(lo) + payload = make([]byte, length) + for read := 0; read < length; { + nn, err := rdr.Read(payload[read:]) + if err != nil { + t.Fatalf("readInterleaved: read payload: %v", err) + } + read += nn + } + return int(ch), payload +} + +// freePort returns an unused TCP port on localhost. +func freePort(t *testing.T) string { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("freePort: %v", err) + } + addr := ln.Addr().String() + ln.Close() + return addr +} + +// TestRTSPRelayOptions verifies OPTIONS returns the correct Public header. +func TestRTSPRelayOptions(t *testing.T) { + addr := freePort(t) + relay := newRTSPRelay(addr, nil) + go relay.ListenAndServe() //nolint:errcheck + defer relay.Close() + time.Sleep(30 * time.Millisecond) + + conn, rdr := dialRTSP(t, addr) + defer conn.Close() + + status, hdrs := sendRTSP(t, conn, rdr, + "OPTIONS rtsp://"+addr+"/live/test RTSP/1.0\r\nCSeq: 1\r\n\r\n") + + if !strings.Contains(status, "200") { + t.Fatalf("OPTIONS: expected 200, got %q", status) + } + pub := hdrs["public"] + for _, m := range []string{"OPTIONS", "DESCRIBE", "SETUP", "PLAY", "TEARDOWN"} { + if !strings.Contains(pub, m) { + t.Errorf("OPTIONS Public missing %q: %q", m, pub) + } + } +} + +// TestRTSPRelayDescribeDefaultCodecs verifies DESCRIBE returns valid SDP with default codecs. +func TestRTSPRelayDescribeDefaultCodecs(t *testing.T) { + addr := freePort(t) + relay := newRTSPRelay(addr, nil) + go relay.ListenAndServe() //nolint:errcheck + defer relay.Close() + time.Sleep(30 * time.Millisecond) + + conn, rdr := dialRTSP(t, addr) + defer conn.Close() + + sendRTSP(t, conn, rdr, + "OPTIONS rtsp://"+addr+"/live/mystream RTSP/1.0\r\nCSeq: 1\r\n\r\n") + + _, hdrs := sendRTSP(t, conn, rdr, + "DESCRIBE rtsp://"+addr+"/live/mystream RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n") + + if hdrs["content-type"] != "application/sdp" { + t.Fatalf("DESCRIBE: expected content-type application/sdp, got %q", hdrs["content-type"]) + } + body := hdrs["__body__"] + for _, want := range []string{"H264", "opus", "a=control:track0", "a=control:track1"} { + if !strings.Contains(body, want) { + t.Errorf("DESCRIBE SDP missing %q:\n%s", want, body) + } + } +} + +// TestRTSPRelayDescribeUpdatedCodecs verifies that UpdateCodecs changes the DESCRIBE SDP. +func TestRTSPRelayDescribeUpdatedCodecs(t *testing.T) { + addr := freePort(t) + relay := newRTSPRelay(addr, nil) + go relay.ListenAndServe() //nolint:errcheck + defer relay.Close() + time.Sleep(30 * time.Millisecond) + + relay.UpdateCodecs("cam1", RTSPCodecParams{ + VideoPayloadType: 96, + VideoCodec: "H264", + VideoClockRate: 90000, + VideoFmtp: "sprop-parameter-sets=abc;packetization-mode=1", + AudioPayloadType: 111, + AudioCodec: "opus", + AudioClockRate: 48000, + AudioChannels: 2, + }) + + conn, rdr := dialRTSP(t, addr) + defer conn.Close() + + sendRTSP(t, conn, rdr, + "OPTIONS rtsp://"+addr+"/live/cam1 RTSP/1.0\r\nCSeq: 1\r\n\r\n") + _, hdrs := sendRTSP(t, conn, rdr, + "DESCRIBE rtsp://"+addr+"/live/cam1 RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n") + + body := hdrs["__body__"] + if !strings.Contains(body, "sprop-parameter-sets=abc") { + t.Errorf("DESCRIBE SDP should contain updated fmtp:\n%s", body) + } +} + +// TestRTSPRelayDataDelivery is the full E2E test: +// 1. relay starts +// 2. RTSP client connects (OPTIONS → DESCRIBE → SETUP video → SETUP audio → PLAY) +// 3. WriteVideo + WriteAudio inject fake RTP packets +// 4. client reads the interleaved frames and verifies channel/payload match +func TestRTSPRelayDataDelivery(t *testing.T) { + addr := freePort(t) + relay := newRTSPRelay(addr, nil) + go relay.ListenAndServe() //nolint:errcheck + defer relay.Close() + time.Sleep(30 * time.Millisecond) + + streamName := "obs-test" + relay.UpdateCodecs(streamName, RTSPCodecParams{ + VideoPayloadType: 96, + VideoCodec: "H264", + VideoClockRate: 90000, + AudioPayloadType: 111, + AudioCodec: "opus", + AudioClockRate: 48000, + AudioChannels: 2, + }) + + base := "rtsp://" + addr + "/live/" + streamName + + conn, rdr := dialRTSP(t, addr) + defer conn.Close() + + // OPTIONS + status, _ := sendRTSP(t, conn, rdr, + fmt.Sprintf("OPTIONS %s RTSP/1.0\r\nCSeq: 1\r\n\r\n", base)) + if !strings.Contains(status, "200") { + t.Fatalf("OPTIONS: %s", status) + } + + // DESCRIBE + status, _ = sendRTSP(t, conn, rdr, + fmt.Sprintf("DESCRIBE %s RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n", base)) + if !strings.Contains(status, "200") { + t.Fatalf("DESCRIBE: %s", status) + } + + // SETUP video – interleaved channels 0-1 + status, _ = sendRTSP(t, conn, rdr, + fmt.Sprintf("SETUP %s/track0 RTSP/1.0\r\nCSeq: 3\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1;mode=play\r\n\r\n", base)) + if !strings.Contains(status, "200") { + t.Fatalf("SETUP video: %s", status) + } + + // SETUP audio – interleaved channels 2-3 + status, _ = sendRTSP(t, conn, rdr, + fmt.Sprintf("SETUP %s/track1 RTSP/1.0\r\nCSeq: 4\r\nTransport: RTP/AVP/TCP;unicast;interleaved=2-3;mode=play\r\n\r\n", base)) + if !strings.Contains(status, "200") { + t.Fatalf("SETUP audio: %s", status) + } + + // PLAY + status, _ = sendRTSP(t, conn, rdr, + fmt.Sprintf("PLAY %s RTSP/1.0\r\nCSeq: 5\r\nSession: ignored\r\n\r\n", base)) + if !strings.Contains(status, "200") { + t.Fatalf("PLAY: %s", status) + } + + // Inject a video RTP packet (fake but valid minimal structure). + videoRTP := buildFakeRTP(96, 1000, 90000) + relay.WriteVideo(streamName, videoRTP) + + ch, payload := readInterleaved(t, conn, rdr) + if ch != 0 { + t.Errorf("expected video on interleaved channel 0, got %d", ch) + } + if len(payload) != len(videoRTP) { + t.Errorf("video payload length: want %d got %d", len(videoRTP), len(payload)) + } + + // Inject an audio RTP packet. + audioRTP := buildFakeRTP(111, 2000, 48000) + relay.WriteAudio(streamName, audioRTP) + + ch, payload = readInterleaved(t, conn, rdr) + if ch != 2 { + t.Errorf("expected audio on interleaved channel 2, got %d", ch) + } + if len(payload) != len(audioRTP) { + t.Errorf("audio payload length: want %d got %d", len(audioRTP), len(payload)) + } + _ = payload +} + +// TestRTSPRelayMultipleSubscribers verifies that two simultaneous clients both +// receive the same RTP packets. +func TestRTSPRelayMultipleSubscribers(t *testing.T) { + addr := freePort(t) + relay := newRTSPRelay(addr, nil) + go relay.ListenAndServe() //nolint:errcheck + defer relay.Close() + time.Sleep(30 * time.Millisecond) + + streamName := "multi" + base := "rtsp://" + addr + "/live/" + streamName + + connectAndPlay := func(t *testing.T) (net.Conn, *bufio.Reader) { + t.Helper() + c, r := dialRTSP(t, addr) + for _, req := range []string{ + fmt.Sprintf("OPTIONS %s RTSP/1.0\r\nCSeq: 1\r\n\r\n", base), + fmt.Sprintf("DESCRIBE %s RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n", base), + fmt.Sprintf("SETUP %s/track0 RTSP/1.0\r\nCSeq: 3\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1;mode=play\r\n\r\n", base), + fmt.Sprintf("PLAY %s RTSP/1.0\r\nCSeq: 4\r\nSession: ignored\r\n\r\n", base), + } { + status, _ := sendRTSP(t, c, r, req) + if !strings.Contains(status, "200") { + t.Fatalf("setup failed: %s", status) + } + } + return c, r + } + + conn1, rdr1 := connectAndPlay(t) + defer conn1.Close() + conn2, rdr2 := connectAndPlay(t) + defer conn2.Close() + + pkt := buildFakeRTP(96, 5000, 90000) + relay.WriteVideo(streamName, pkt) + + ch1, p1 := readInterleaved(t, conn1, rdr1) + ch2, p2 := readInterleaved(t, conn2, rdr2) + + if ch1 != 0 || ch2 != 0 { + t.Errorf("expected channel 0 for both clients, got %d and %d", ch1, ch2) + } + if len(p1) != len(pkt) || len(p2) != len(pkt) { + t.Errorf("packet length mismatch: sent %d, got %d and %d", len(pkt), len(p1), len(p2)) + } +} + +// TestRTSPRelayTEARDOWN verifies that TEARDOWN is handled gracefully. +func TestRTSPRelayTEARDOWN(t *testing.T) { + addr := freePort(t) + relay := newRTSPRelay(addr, nil) + go relay.ListenAndServe() //nolint:errcheck + defer relay.Close() + time.Sleep(30 * time.Millisecond) + + base := "rtsp://" + addr + "/live/tdtest" + conn, rdr := dialRTSP(t, addr) + defer conn.Close() + + for cseq, req := range []string{ + fmt.Sprintf("OPTIONS %s RTSP/1.0\r\nCSeq: 1\r\n\r\n", base), + fmt.Sprintf("DESCRIBE %s RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n", base), + fmt.Sprintf("SETUP %s/track0 RTSP/1.0\r\nCSeq: 3\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1;mode=play\r\n\r\n", base), + fmt.Sprintf("PLAY %s RTSP/1.0\r\nCSeq: 4\r\nSession: ignored\r\n\r\n", base), + } { + status, _ := sendRTSP(t, conn, rdr, req) + if !strings.Contains(status, "200") { + t.Fatalf("step %d: %s", cseq, status) + } + } + + // Send TEARDOWN in-band (as an RTSP request mixed with interleaved). + conn.SetWriteDeadline(time.Now().Add(3 * time.Second)) + conn.Write([]byte(fmt.Sprintf("TEARDOWN %s RTSP/1.0\r\nCSeq: 5\r\n\r\n", base))) //nolint:errcheck + conn.SetWriteDeadline(time.Time{}) + + // Server should reply 200. + status, _ := sendRTSP(t, conn, rdr, "") // already written above, just drain response + _ = status // Server closes after TEARDOWN; a 200 or EOF is both acceptable +} + +// buildFakeRTP constructs a minimal 12-byte RTP header + 4-byte payload. +func buildFakeRTP(pt uint8, seq uint16, ts uint32) []byte { + pkt := make([]byte, 16) + pkt[0] = 0x80 // V=2, P=0, X=0, CC=0 + pkt[1] = pt & 0x7f // M=0, PT + pkt[2] = byte(seq >> 8) // Seq hi + pkt[3] = byte(seq) // Seq lo + pkt[4] = byte(ts >> 24) // TS + pkt[5] = byte(ts >> 16) + pkt[6] = byte(ts >> 8) + pkt[7] = byte(ts) + pkt[8], pkt[9], pkt[10], pkt[11] = 0, 0, 0, 1 // SSRC = 1 + pkt[12] = 0xde // fake payload + pkt[13] = 0xad + pkt[14] = 0xbe + pkt[15] = 0xef + return pkt +} diff --git a/whip/whip.go b/whip/whip.go index b4d8516d..17c9a472 100644 --- a/whip/whip.go +++ b/whip/whip.go @@ -90,6 +90,13 @@ type Config struct { // DTLS-SRTP for full browser/OBS WebRTC support. When nil, the server falls // back to plain UDP RTP (no ICE, no encryption). WebRTCProvider WebRTCProvider + + // RTSPRelayAddr is the TCP listen address for the internal RTSP relay server, + // e.g. ":8554". When set, all WHIP streams are also served over RTSP at + // rtsp://127.0.0.1:/live/, allowing multiple FFmpeg egress + // processes to consume the same source independently. + // Leave empty to disable the RTSP relay. + RTSPRelayAddr string } // Server defines the WHIP server interface. @@ -116,6 +123,10 @@ type Channels struct { type channel struct { name string + // rtspRelay is the optional internal RTSP relay server. When non-nil, all + // received RTP packets are also forwarded to RTSP consumers. + rtspRelay *rtspRelay + // videoRelayPort / audioRelayPort: loopback UDP ports that FFmpeg reads from. // Ports are pre-allocated by briefly binding to :0, then released so that // FFmpeg (or any RTP consumer) can bind to them. @@ -268,6 +279,10 @@ func (ch *channel) startRx(serverIP string, provider WebRTCProvider, offerSDP st // Parse codec parameters from the SDP offer (best-effort). ch.updateCodecsFromSDP(offerSDP) + // Sync initial codec info to the RTSP relay so DESCRIBE works before SPS/PPS snoop. + if ch.rtspRelay != nil { + ch.rtspRelay.UpdateCodecs(ch.name, ch.currentCodecParams()) + } // Allocate rx sockets for the WHIP client to send RTP to. ch.videoRx, err = net.ListenUDP("udp4", &net.UDPAddr{IP: net.ParseIP(serverIP), Port: 0}) @@ -374,7 +389,7 @@ func (ch *channel) whepSubCount() int { } // videoOnPacket is called for each received video RTP packet. -// It runs SPS/PPS snoop and fans out to all WHEP subscribers. +// It runs SPS/PPS snoop, fans out to WHEP subscribers, and forwards to the RTSP relay. func (ch *channel) videoOnPacket(pkt []byte) { ch.snoopH264NALs(pkt) ch.whepLock.RLock() @@ -386,10 +401,13 @@ func (ch *channel) videoOnPacket(pkt []byte) { for _, s := range subs { s.WriteVideo(pkt) } + if ch.rtspRelay != nil { + ch.rtspRelay.WriteVideo(ch.name, pkt) + } } // audioOnPacket is called for each received audio RTP packet. -// It fans out to all WHEP subscribers. +// It fans out to WHEP subscribers and forwards to the RTSP relay. func (ch *channel) audioOnPacket(pkt []byte) { ch.whepLock.RLock() subs := make([]whepSubscriber, 0, len(ch.whepSubs)) @@ -400,6 +418,9 @@ func (ch *channel) audioOnPacket(pkt []byte) { for _, s := range subs { s.WriteAudio(pkt) } + if ch.rtspRelay != nil { + ch.rtspRelay.WriteAudio(ch.name, pkt) + } } // isPublishing returns true when a WHIP client is currently sending to the channel. @@ -494,6 +515,11 @@ func (ch *channel) snoopH264NALs(pkt []byte) { base64.StdEncoding.EncodeToString(ch.videoPPS) + ";packetization-mode=1" ch.spropped = true + // Sync updated fmtp (with sprop-parameter-sets) to the RTSP relay + // so that DESCRIBE responses include the correct SPS/PPS. + if ch.rtspRelay != nil { + ch.rtspRelay.UpdateCodecs(ch.name, ch.currentCodecParams()) + } } } @@ -645,6 +671,7 @@ type server struct { logger log.Logger collector session.Collector provider WebRTCProvider + rtspRelay *rtspRelay httpServer *http.Server @@ -667,6 +694,10 @@ func New(config Config) (Server, error) { channels: make(map[string]*channel), } + if config.RTSPRelayAddr != "" { + s.rtspRelay = newRTSPRelay(config.RTSPRelayAddr, config.Logger) + } + if s.logger == nil { s.logger = log.New("") } @@ -688,8 +719,15 @@ func New(config Config) (Server, error) { return s, nil } -// ListenAndServe starts the WHIP HTTP server. +// ListenAndServe starts the WHIP HTTP server and optionally the RTSP relay. func (s *server) ListenAndServe() error { + if s.rtspRelay != nil { + go func() { + if err := s.rtspRelay.ListenAndServe(); err != nil { + s.logger.Error().WithField("error", err).Log("RTSP relay error") + } + }() + } s.logger.Info().Log("Server started") err := s.httpServer.ListenAndServe() if err == http.ErrServerClosed { @@ -698,12 +736,16 @@ func (s *server) ListenAndServe() error { return err } -// Close shuts down the WHIP server. +// Close shuts down the WHIP server and the RTSP relay. func (s *server) Close() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() s.httpServer.Shutdown(ctx) + if s.rtspRelay != nil { + s.rtspRelay.Close() + } + s.lock.Lock() defer s.lock.Unlock() @@ -925,11 +967,28 @@ func (s *server) getOrCreateChannel(name string) *channel { // Return a dummy channel rather than nil to avoid panics; it won't relay. ch = &channel{name: name, whepSubs: make(map[string]whepSubscriber)} } + // Attach the RTSP relay so the channel can forward packets to RTSP consumers. + ch.rtspRelay = s.rtspRelay s.channels[name] = ch } return ch } +// currentCodecParams returns the codec parameters for this channel as an RTSPCodecParams. +// Caller must hold ch.lock (at least read). +func (ch *channel) currentCodecParams() RTSPCodecParams { + return RTSPCodecParams{ + VideoPayloadType: ch.videoPayloadType, + VideoCodec: ch.videoCodec, + VideoClockRate: ch.videoClockRate, + VideoFmtp: ch.videoFmtp, + AudioPayloadType: ch.audioPayloadType, + AudioCodec: ch.audioCodec, + AudioClockRate: ch.audioClockRate, + AudioChannels: ch.audioChannels, + } +} + // checkToken validates the bearer token from the Authorization header or ?token= query param. func (s *server) checkToken(r *http.Request) bool { if t := r.URL.Query().Get("token"); t != "" { @@ -972,15 +1031,21 @@ func (s *server) handleWHEP(w http.ResponseWriter, r *http.Request, name string) ch := s.getOrCreateChannel(name) ch.lock.RLock() - videoCodec := ch.videoCodec // e.g. "H264" - audioCodec := ch.audioCodec // e.g. "opus" + params := whepCodecParams{ + VideoMime: "video/" + ch.videoCodec, + VideoClockRate: uint32(ch.videoClockRate), + VideoFmtp: ch.videoFmtp, + VideoPT: uint8(ch.videoPayloadType), + AudioMime: "audio/" + ch.audioCodec, + AudioClockRate: uint32(ch.audioClockRate), + AudioChannels: uint16(ch.audioChannels), + AudioPT: uint8(ch.audioPayloadType), + } ch.lock.RUnlock() subID := fmt.Sprintf("%x", time.Now().UnixNano()) - videoMime := "video/" + videoCodec - audioMime := "audio/" + audioCodec - sess, answerSDP, err := newWHEPPionSession(subID, string(body), videoMime, audioMime) + sess, answerSDP, err := newWHEPPionSession(subID, string(body), params) if err != nil { s.logger.Error().WithField("stream", name).WithField("error", err).Log("Failed to create WHEP session") http.Error(w, "failed to create WHEP session", http.StatusInternalServerError)