feat: add RTSP relay support to WHIP server

- Introduced a new RTSP relay server that allows WHIP streams to be served over RTSP.
- Added `RTSPRelayAddr` configuration option to enable the RTSP relay.
- Implemented methods for updating codec parameters and forwarding RTP packets to RTSP clients.
- Enhanced the WHIP server to handle RTSP connections and manage multiple clients.
- Added comprehensive tests for RTSP relay functionality, including OPTIONS, DESCRIBE, SETUP, PLAY, and TEARDOWN requests.
This commit is contained in:
Cesar Mendivil 2026-03-17 20:28:06 -07:00
parent 603818cd44
commit c9304b7b63
11 changed files with 1118 additions and 20 deletions

2
.gitignore vendored
View File

@ -10,7 +10,7 @@
*.ts
*.ts.tmp
*.m3u8
docker/
*.mp4
*.avi
*.flv

View File

@ -1,5 +1,4 @@
ARG CORE_IMAGE=datarhei/base:alpine-core-latest
ARG FFMPEG_IMAGE=datarhei/base:alpine-ffmpeg-latest
FROM $CORE_IMAGE as core
@ -8,17 +7,30 @@ FROM $FFMPEG_IMAGE
COPY --from=core /core /core
RUN ffmpeg -buildconf
RUN chmod +x /core/bin/run.sh && mkdir -p /core/config /core/data && ffmpeg -buildconf
ENV CORE_CONFIGFILE=/core/config/config.json
ENV CORE_STORAGE_DISK_DIR=/core/data
ENV CORE_DB_DIR=/core/config
# Enable most optional services by default
ENV CORE_WHIP_ENABLE=true
ENV CORE_WHIP_ADDRESS=:8555
ENV CORE_WHIP_RTSP_ADDRESS=:8554
ENV CORE_API_AUTH_ENABLE=false
ENV CORE_RTMP_ENABLE=true
ENV CORE_SRT_ENABLE=true
ENV CORE_PLAYOUT_ENABLE=true
ENV CORE_METRICS_ENABLE=true
ENV CORE_METRICS_ENABLE_PROMETHEUS=true
EXPOSE 8080/tcp
EXPOSE 8181/tcp
EXPOSE 1935/tcp
EXPOSE 1936/tcp
EXPOSE 6000/udp
EXPOSE 8555/tcp
EXPOSE 8554/tcp
VOLUME ["/core/data", "/core/config"]
ENTRYPOINT ["/core/bin/run.sh"]

View File

@ -599,6 +599,23 @@ func (a *api) start() error {
}
return url
}, nil)
// WHIP-RTSP: {whip-rtsp} expands to the internal RTSP relay URL for a WHIP stream.
// Multiple FFmpeg egress processes can connect simultaneously (unlike the single-consumer
// plain-RTP relay used by {whip}).
// Usage in a process config input address: {whip-rtsp,name=mystream}
// The -rtsp_transport tcp option is injected automatically by restream.go.
a.replacer.RegisterTemplateFunc("whip-rtsp", func(config *restreamapp.Config, section string) string {
rtspAddr := cfg.WHIP.RTSPAddress
if rtspAddr == "" {
rtspAddr = ":8554"
}
_, rtspPort, _ := gonet.SplitHostPort(rtspAddr)
if rtspPort == "" {
rtspPort = "8554"
}
return "rtsp://127.0.0.1:" + rtspPort + "/live/{name}"
}, nil)
}
filesystems := []fs.Filesystem{
@ -962,6 +979,7 @@ func (a *api) start() error {
Logger: a.log.logger.whip,
Collector: a.sessions.Collector("whip"),
WebRTCProvider: whip.NewPionProvider(),
RTSPRelayAddr: cfg.WHIP.RTSPAddress,
})
if err != nil {
return fmt.Errorf("unable to create WHIP server: %w", err)

View File

@ -232,6 +232,7 @@ func (d *Config) init() {
d.vars.Register(value.NewBool(&d.WHIP.Enable, false), "whip.enable", "CORE_WHIP_ENABLE", nil, "Enable WHIP (WebRTC HTTP Ingestion Protocol) server", false, false)
d.vars.Register(value.NewAddress(&d.WHIP.Address, ":8555"), "whip.address", "CORE_WHIP_ADDRESS", nil, "WHIP server HTTP listen address", false, false)
d.vars.Register(value.NewString(&d.WHIP.Token, ""), "whip.token", "CORE_WHIP_TOKEN", nil, "WHIP bearer token for publishing clients", false, true)
d.vars.Register(value.NewAddress(&d.WHIP.RTSPAddress, ""), "whip.rtsp_address", "CORE_WHIP_RTSP_ADDRESS", nil, "WHIP internal RTSP relay TCP listen address (e.g. :8554); leave empty to disable", false, false)
// FFmpeg
d.vars.Register(value.NewExec(&d.FFmpeg.Binary, "ffmpeg", d.fs), "ffmpeg.binary", "CORE_FFMPEG_BINARY", nil, "Path to ffmpeg binary", true, false)

View File

@ -126,6 +126,12 @@ type Data struct {
Address string `json:"address"`
// Token is an optional bearer token required from WHIP publishing clients.
Token string `json:"token"`
// RTSPAddress is the TCP listen address for the internal RTSP relay server,
// e.g. ":8554". When set, all WHIP streams are also accessible at
// rtsp://127.0.0.1:<port>/live/<name>, enabling multiple FFmpeg egress
// processes to consume the same source independently via {whip-rtsp}.
// Leave empty to disable.
RTSPAddress string `json:"rtsp_address"`
} `json:"whip"`
FFmpeg struct {
Binary string `json:"binary"`

View File

@ -51,7 +51,7 @@ type replacer struct {
func New() Replacer {
r := &replacer{
templates: make(map[string]template),
re: regexp.MustCompile(`{([a-z]+(?::[0-9A-Za-z]+)?)(?:\^(.))?(?:,(.*?))?}`),
re: regexp.MustCompile(`{([a-z]+(?:-[a-z]+)*(?::[0-9A-Za-z]+)?)(?:\^(.))?(?:,(.*?))?}`),
templateRe: regexp.MustCompile(`{([a-z:]+)}`),
}

View File

@ -33,6 +33,10 @@ import (
// separated params), so we normalize before expansion.
var whipPlaceholderRe = regexp.MustCompile(`\{whip:name=([^}]+)\}`)
// whipRtspPlaceholderRe matches the {whip-rtsp:name=<streamKey>} UI format and
// normalizes it to {whip-rtsp,name=<streamKey>} for the replacer engine.
var whipRtspPlaceholderRe = regexp.MustCompile(`\{whip-rtsp:name=([^}]+)\}`)
// The Restreamer interface
type Restreamer interface {
ID() string // ID of this instance
@ -1555,6 +1559,11 @@ func resolvePlaceholders(config *app.Config, r replace.Replacer) {
input.Address = whipPlaceholderRe.ReplaceAllString(input.Address, "{whip,name=$1}")
input.Address = r.Replace(input.Address, "whip", "", vars, config, "input")
// Normalize {whip-rtsp:name=X} → {whip-rtsp,name=X} (replacer format)
whipRtspWasExpanded := whipRtspPlaceholderRe.MatchString(input.Address)
input.Address = whipRtspPlaceholderRe.ReplaceAllString(input.Address, "{whip-rtsp,name=$1}")
input.Address = r.Replace(input.Address, "whip-rtsp", "", vars, config, "input")
// WHIP SDP relay URLs (http://…/whip/…/sdp) require FFmpeg to open nested
// rtp:// and udp:// sub-protocols, which are blocked by default when the
// top-level protocol is HTTP. Inject -protocol_whitelist automatically so
@ -1573,6 +1582,21 @@ func resolvePlaceholders(config *app.Config, r replace.Replacer) {
}
}
// WHIP RTSP relay URLs need RTP/AVP/TCP (interleaved) transport.
// Inject -rtsp_transport tcp automatically so FFmpeg uses the correct mode.
if whipRtspWasExpanded && strings.HasPrefix(input.Address, "rtsp://") {
hasRtspTransport := false
for _, opt := range input.Options {
if strings.Contains(opt, "rtsp_transport") {
hasRtspTransport = true
break
}
}
if !hasRtspTransport {
input.Options = append([]string{"-rtsp_transport", "tcp"}, input.Options...)
}
}
for j, option := range input.Options {
// Replace any known placeholders
option = r.Replace(option, "inputid", input.ID, nil, nil, "input")

View File

@ -2,12 +2,30 @@ package whip
import (
"fmt"
"strings"
"sync"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
)
// whepCodecParams describes the exact RTP codec the WHIP publisher (OBS) is sending.
// Using these instead of RegisterDefaultCodecs ensures the browser negotiates
// the same payload type numbers OBS uses, which is mandatory because
// TrackLocalStaticRTP does NOT rewrite the PT in packets — it only rewrites SSRC.
// Mismatch: OBS sends PT 96, browser expects PT 102 → browser silently discards
// all H264 packets → no video. Audio works by coincidence (Opus is PT 111 in both).
type whepCodecParams struct {
VideoMime string // "video/H264"
VideoClockRate uint32
VideoFmtp string // e.g. "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=640034"
VideoPT uint8 // OBS's H264 payload type, typically 96
AudioMime string // "audio/opus"
AudioClockRate uint32
AudioChannels uint16
AudioPT uint8 // OBS's Opus payload type, typically 111
}
// whepPionSession is a single WHEP subscriber backed by a pion PeerConnection.
// It receives plain RTP packets (already decrypted by the WHIP publisher side)
// via WriteVideo/WriteAudio and forwards them to the remote browser via
@ -23,12 +41,44 @@ type whepPionSession struct {
// newWHEPPionSession creates a new WHEP subscriber PeerConnection.
// offerSDP is the SDP offer from the browser.
// videoMime: e.g. "video/H264"; audioMime: e.g. "audio/opus".
// Returns the session and the SDP answer to send back to the browser.
func newWHEPPionSession(id, offerSDP, videoMime, audioMime string) (*whepPionSession, string, error) {
// params must match the publisher's codec configuration exactly so that the
// browser negotiates the same payload type numbers the publisher uses.
func newWHEPPionSession(id, offerSDP string, params whepCodecParams) (*whepPionSession, string, error) {
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
return nil, "", fmt.Errorf("whep: register codecs: %w", err)
// Register the exact video codec OBS is sending.
// Using the publisher's PT (e.g. 96) forces the SDP answer to contain
// a=rtpmap:96 H264/90000, so the browser maps PT 96 → H264 and accepts
// the raw RTP packets that arrive unchanged from OBS.
videoFmtp := normalizeH264Fmtp(params.VideoMime, params.VideoFmtp)
if err := m.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: params.VideoMime,
ClockRate: params.VideoClockRate,
SDPFmtpLine: videoFmtp,
RTCPFeedback: []webrtc.RTCPFeedback{
{Type: "nack", Parameter: "pli"},
},
},
PayloadType: webrtc.PayloadType(params.VideoPT),
}, webrtc.RTPCodecTypeVideo); err != nil {
return nil, "", fmt.Errorf("whep: register video codec: %w", err)
}
audioFmtp := ""
if strings.Contains(strings.ToLower(params.AudioMime), "opus") {
audioFmtp = "minptime=10;useinbandfec=1"
}
if err := m.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: params.AudioMime,
ClockRate: params.AudioClockRate,
Channels: params.AudioChannels,
SDPFmtpLine: audioFmtp,
},
PayloadType: webrtc.PayloadType(params.AudioPT),
}, webrtc.RTPCodecTypeAudio); err != nil {
return nil, "", fmt.Errorf("whep: register audio codec: %w", err)
}
ir := &interceptor.Registry{}
@ -43,7 +93,7 @@ func newWHEPPionSession(id, offerSDP, videoMime, audioMime string) (*whepPionSes
}
videoTrack, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{MimeType: videoMime},
webrtc.RTPCodecCapability{MimeType: params.VideoMime, ClockRate: params.VideoClockRate, SDPFmtpLine: videoFmtp},
"video", "whep-video-"+id,
)
if err != nil {
@ -52,7 +102,7 @@ func newWHEPPionSession(id, offerSDP, videoMime, audioMime string) (*whepPionSes
}
audioTrack, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{MimeType: audioMime},
webrtc.RTPCodecCapability{MimeType: params.AudioMime, ClockRate: params.AudioClockRate, Channels: params.AudioChannels},
"audio", "whep-audio-"+id,
)
if err != nil {
@ -109,6 +159,26 @@ func newWHEPPionSession(id, offerSDP, videoMime, audioMime string) (*whepPionSes
return sess, pc.LocalDescription().SDP, nil
}
// normalizeH264Fmtp ensures H264 fmtp always has level-asymmetry-allowed=1 and
// packetization-mode=1. level-asymmetry-allowed lets the browser decode a
// higher profile than it advertised. packetization-mode=1 is required for FU-A
// fragmented NALUs (how OBS sends large H264 frames).
func normalizeH264Fmtp(mime, fmtp string) string {
if !strings.Contains(strings.ToLower(mime), "h264") {
return fmtp
}
if fmtp == "" {
return "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f"
}
if !strings.Contains(fmtp, "packetization-mode") {
fmtp += ";packetization-mode=1"
}
if !strings.Contains(fmtp, "level-asymmetry-allowed") {
fmtp = "level-asymmetry-allowed=1;" + fmtp
}
return fmtp
}
// ID returns the unique subscriber identifier.
func (s *whepPionSession) ID() string { return s.id }

501
whip/rtsp_relay.go Normal file
View File

@ -0,0 +1,501 @@
// Package whip provides a minimal RFC 2326 RTSP relay server.
//
// The RTSP relay accepts TCP connections from FFmpeg (or any RTSP consumer)
// and forwards RTP packets from active WHIP publishers using RTP/AVP/TCP
// interleaved mode (RFC 2326 §10.12).
//
// Each stream is served at rtsp://<host>:<port>/live/<name>.
// Multiple consumers may connect to the same stream simultaneously.
// Only RTP/AVP/TCP transport (interleaved binary) is supported.
package whip
import (
"bufio"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/datarhei/core/v16/log"
)
// RTSPCodecParams holds codec parameters for a single WHIP stream.
// These are used to build the RTSP DESCRIBE response (SDP).
type RTSPCodecParams struct {
VideoPayloadType int
VideoCodec string // e.g. "H264"
VideoClockRate int
VideoFmtp string // e.g. "sprop-parameter-sets=...;packetization-mode=1"
AudioPayloadType int
AudioCodec string // e.g. "opus"
AudioClockRate int
AudioChannels int
}
// rtspRelay is a minimal TCP RTSP server that relays WHIP streams to multiple
// FFmpeg consumers using RTP/AVP/TCP interleaved mode (RFC 2326 §10.12).
type rtspRelay struct {
addr string
ln net.Listener
streams sync.Map // name -> *rtspStreamEntry
logger log.Logger
done chan struct{}
wg sync.WaitGroup
}
// rtspStreamEntry tracks per-stream codec params and connected consumers.
type rtspStreamEntry struct {
name string
mu sync.RWMutex
codecs RTSPCodecParams
clients map[string]*rtspRelayClient
}
// rtspRelayClient represents one connected RTSP consumer (e.g. an FFmpeg process).
type rtspRelayClient struct {
id string
conn net.Conn
videoTrack int // interleaved channel index for video RTP
audioTrack int // interleaved channel index for audio RTP
mu sync.Mutex
closed bool
}
// newRTSPRelay creates a new relay server bound to addr. Call ListenAndServe to start.
func newRTSPRelay(addr string, logger log.Logger) *rtspRelay {
if logger == nil {
logger = log.New("")
}
return &rtspRelay{
addr: addr,
logger: logger,
done: make(chan struct{}),
}
}
// ListenAndServe starts the RTSP relay and blocks until Close is called.
func (r *rtspRelay) ListenAndServe() error {
ln, err := net.Listen("tcp", r.addr)
if err != nil {
return fmt.Errorf("rtsp relay: listen %q: %w", r.addr, err)
}
r.ln = ln
r.logger.Info().WithField("addr", r.addr).Log("RTSP relay started")
for {
conn, err := ln.Accept()
if err != nil {
select {
case <-r.done:
return nil
default:
r.logger.Error().WithField("error", err).Log("RTSP relay: accept error")
continue
}
}
r.wg.Add(1)
go func() {
defer r.wg.Done()
r.handleConn(conn)
}()
}
}
// Close shuts down the relay and waits for all connections to finish.
func (r *rtspRelay) Close() {
select {
case <-r.done:
default:
close(r.done)
}
if r.ln != nil {
r.ln.Close()
}
r.wg.Wait()
}
// UpdateCodecs updates the codec parameters used in DESCRIBE responses for name.
func (r *rtspRelay) UpdateCodecs(name string, params RTSPCodecParams) {
e := r.getOrCreateStream(name)
e.mu.Lock()
e.codecs = params
e.mu.Unlock()
}
// WriteVideo forwards a video RTP packet to all consumers watching name.
func (r *rtspRelay) WriteVideo(name string, pkt []byte) {
v, ok := r.streams.Load(name)
if !ok {
return
}
e := v.(*rtspStreamEntry)
e.mu.RLock()
clients := make([]*rtspRelayClient, 0, len(e.clients))
for _, c := range e.clients {
clients = append(clients, c)
}
e.mu.RUnlock()
for _, c := range clients {
c.writeInterleaved(c.videoTrack, pkt)
}
}
// WriteAudio forwards an audio RTP packet to all consumers watching name.
func (r *rtspRelay) WriteAudio(name string, pkt []byte) {
v, ok := r.streams.Load(name)
if !ok {
return
}
e := v.(*rtspStreamEntry)
e.mu.RLock()
clients := make([]*rtspRelayClient, 0, len(e.clients))
for _, c := range e.clients {
clients = append(clients, c)
}
e.mu.RUnlock()
for _, c := range clients {
c.writeInterleaved(c.audioTrack, pkt)
}
}
// getOrCreateStream returns the named stream entry, creating it with sensible defaults if needed.
func (r *rtspRelay) getOrCreateStream(name string) *rtspStreamEntry {
e := &rtspStreamEntry{
name: name,
clients: make(map[string]*rtspRelayClient),
codecs: RTSPCodecParams{
VideoPayloadType: 96,
VideoCodec: "H264",
VideoClockRate: 90000,
AudioPayloadType: 111,
AudioCodec: "opus",
AudioClockRate: 48000,
AudioChannels: 2,
},
}
actual, _ := r.streams.LoadOrStore(name, e)
return actual.(*rtspStreamEntry)
}
// writeInterleaved sends one RFC 2326 §10.12 interleaved binary packet over the TCP connection.
// If the write times out (slow consumer), the connection is closed to avoid head-of-line blocking.
func (c *rtspRelayClient) writeInterleaved(channel int, data []byte) {
if len(data) > 65535 {
return
}
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return
}
hdr := [4]byte{'$', byte(channel), byte(len(data) >> 8), byte(len(data))}
c.conn.SetWriteDeadline(time.Now().Add(150 * time.Millisecond))
if _, err := c.conn.Write(hdr[:]); err != nil {
c.closed = true
c.conn.Close()
return
}
if _, err := c.conn.Write(data); err != nil {
c.closed = true
c.conn.Close()
return
}
c.conn.SetWriteDeadline(time.Time{})
}
// handleConn processes one RTSP TCP connection through OPTIONS → DESCRIBE → SETUP → PLAY.
func (r *rtspRelay) handleConn(conn net.Conn) {
defer conn.Close()
rdr := bufio.NewReader(conn)
sessionID := fmt.Sprintf("%016x", time.Now().UnixNano())
var entry *rtspStreamEntry
var client *rtspRelayClient
videoInterleaved := 0
audioInterleaved := 2
for {
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
method, reqURL, headers, err := rtspReadRequest(rdr)
conn.SetReadDeadline(time.Time{})
if err != nil {
return
}
cseq := headers["cseq"]
switch method {
case "OPTIONS":
rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{
"Public": "OPTIONS, DESCRIBE, SETUP, PLAY, TEARDOWN",
}, "")
case "DESCRIBE":
name := rtspExtractName(reqURL)
if name == "" {
rtspWriteResponse(conn, 404, "Not Found", cseq, nil, "")
return
}
entry = r.getOrCreateStream(name)
sdp := entry.buildSDP()
rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{
"Content-Type": "application/sdp",
"Content-Base": reqURL + "/",
"Content-Length": strconv.Itoa(len(sdp)),
}, sdp)
case "SETUP":
transport := headers["transport"]
if !strings.Contains(strings.ToLower(transport), "tcp") {
// Only TCP interleaved supported; decline UDP politely.
rtspWriteResponse(conn, 461, "Unsupported Transport", cseq, nil, "")
return
}
isAudio := strings.HasSuffix(strings.TrimRight(reqURL, "/"), "track1")
ch := rtspParseInterleaved(transport)
if isAudio {
if ch >= 0 {
audioInterleaved = ch
}
rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{
"Session": sessionID + ";timeout=60",
"Transport": fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d;mode=play", audioInterleaved, audioInterleaved+1),
}, "")
} else {
if ch >= 0 {
videoInterleaved = ch
}
rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{
"Session": sessionID + ";timeout=60",
"Transport": fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d;mode=play", videoInterleaved, videoInterleaved+1),
}, "")
}
case "PLAY":
if entry == nil {
rtspWriteResponse(conn, 455, "Method Not Valid in This State", cseq, nil, "")
return
}
client = &rtspRelayClient{
id: sessionID,
conn: conn,
videoTrack: videoInterleaved,
audioTrack: audioInterleaved,
}
entry.mu.Lock()
entry.clients[sessionID] = client
entry.mu.Unlock()
rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{
"Session": sessionID,
}, "")
r.logger.Info().
WithField("stream", entry.name).
WithField("session", sessionID).
Log("RTSP client connected")
// Stay connected: discard interleaved data, handle TEARDOWN.
for {
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
b, readErr := rdr.ReadByte()
conn.SetReadDeadline(time.Time{})
if readErr != nil {
break
}
if b == '$' {
// Discard interleaved binary data sent by the client (e.g. RTCP RR).
var meta [3]byte
if _, err := io.ReadFull(rdr, meta[:]); err != nil {
break
}
dataLen := int(meta[1])<<8 | int(meta[2])
if dataLen > 0 {
if _, err := io.CopyN(io.Discard, rdr, int64(dataLen)); err != nil {
break
}
}
continue
}
rdr.UnreadByte()
m, _, innerHdrs, err := rtspReadRequest(rdr)
if err != nil || m == "TEARDOWN" {
if m == "TEARDOWN" {
rtspWriteResponse(conn, 200, "OK", innerHdrs["cseq"], map[string]string{
"Session": sessionID,
}, "")
}
break
}
}
entry.mu.Lock()
delete(entry.clients, sessionID)
entry.mu.Unlock()
r.logger.Info().
WithField("stream", entry.name).
WithField("session", sessionID).
Log("RTSP client disconnected")
return
case "TEARDOWN":
if entry != nil {
entry.mu.Lock()
delete(entry.clients, sessionID)
entry.mu.Unlock()
}
rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{
"Session": sessionID,
}, "")
return
default:
rtspWriteResponse(conn, 405, "Method Not Allowed", cseq, nil, "")
}
}
}
// buildSDP returns an SDP description for this stream suitable for RTSP DESCRIBE.
func (e *rtspStreamEntry) buildSDP() string {
e.mu.RLock()
c := e.codecs
e.mu.RUnlock()
var sb strings.Builder
sb.WriteString("v=0\r\n")
sb.WriteString("o=- 1 1 IN IP4 127.0.0.1\r\n")
sb.WriteString("s=WHIP Live Stream\r\n")
sb.WriteString("c=IN IP4 127.0.0.1\r\n")
sb.WriteString("t=0 0\r\n")
sb.WriteString(fmt.Sprintf("m=video 0 RTP/AVP %d\r\n", c.VideoPayloadType))
sb.WriteString(fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", c.VideoPayloadType, c.VideoCodec, c.VideoClockRate))
if c.VideoFmtp != "" {
sb.WriteString(fmt.Sprintf("a=fmtp:%d %s\r\n", c.VideoPayloadType, c.VideoFmtp))
} else if strings.EqualFold(c.VideoCodec, "H264") {
sb.WriteString(fmt.Sprintf("a=fmtp:%d packetization-mode=1\r\n", c.VideoPayloadType))
}
sb.WriteString("a=control:track0\r\n")
sb.WriteString(fmt.Sprintf("m=audio 0 RTP/AVP %d\r\n", c.AudioPayloadType))
if c.AudioChannels > 1 {
sb.WriteString(fmt.Sprintf("a=rtpmap:%d %s/%d/%d\r\n", c.AudioPayloadType, c.AudioCodec, c.AudioClockRate, c.AudioChannels))
} else {
sb.WriteString(fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", c.AudioPayloadType, c.AudioCodec, c.AudioClockRate))
}
sb.WriteString("a=control:track1\r\n")
return sb.String()
}
// rtspReadRequest reads one RTSP request (request line + headers) from rdr.
// Returns method, URL, lowercase-keyed headers map, and any error.
func rtspReadRequest(rdr *bufio.Reader) (method, reqURL string, headers map[string]string, err error) {
headers = make(map[string]string)
// First line: "METHOD URL RTSP/1.0"
line, err := rdr.ReadString('\n')
if err != nil {
return
}
line = strings.TrimRight(line, "\r\n")
parts := strings.SplitN(line, " ", 3)
if len(parts) < 2 {
err = fmt.Errorf("invalid RTSP request line: %q", line)
return
}
method = strings.ToUpper(parts[0])
reqURL = parts[1]
// Headers until blank line.
for {
line, err = rdr.ReadString('\n')
if err != nil {
return
}
line = strings.TrimRight(line, "\r\n")
if line == "" {
break
}
idx := strings.IndexByte(line, ':')
if idx < 0 {
continue
}
key := strings.ToLower(strings.TrimSpace(line[:idx]))
val := strings.TrimSpace(line[idx+1:])
headers[key] = val
}
return
}
// rtspWriteResponse writes an RTSP/1.0 response to conn.
func rtspWriteResponse(conn net.Conn, code int, reason, cseq string, extra map[string]string, body string) {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("RTSP/1.0 %d %s\r\n", code, reason))
if cseq != "" {
sb.WriteString("CSeq: " + cseq + "\r\n")
}
sb.WriteString("Server: datarhei-core-whip/1.0\r\n")
sb.WriteString(fmt.Sprintf("Date: %s\r\n", time.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05 GMT")))
for k, v := range extra {
sb.WriteString(k + ": " + v + "\r\n")
}
sb.WriteString("\r\n")
if body != "" {
sb.WriteString(body)
}
conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
conn.Write([]byte(sb.String())) //nolint:errcheck
conn.SetWriteDeadline(time.Time{})
}
// rtspExtractName extracts the stream name from an RTSP URL like
// rtsp://127.0.0.1:8554/live/mystream or rtsp://host/live/mystream/track0.
func rtspExtractName(rtspURL string) string {
u := rtspURL
// Strip scheme + host.
if idx := strings.Index(u, "://"); idx >= 0 {
u = u[idx+3:]
}
if slash := strings.Index(u, "/"); slash >= 0 {
u = u[slash+1:]
}
// Expect "live/<name>[/trackN]"
if strings.HasPrefix(u, "live/") {
u = u[5:]
}
// Strip any trailing "/trackN" suffix.
if idx := strings.LastIndex(u, "/"); idx >= 0 {
tail := u[idx+1:]
if strings.HasPrefix(tail, "track") {
u = u[:idx]
}
}
// Strip query string.
if idx := strings.IndexByte(u, '?'); idx >= 0 {
u = u[:idx]
}
return u
}
// rtspParseInterleaved parses the "interleaved=X-Y" field from an RTSP Transport
// header and returns X (the RTP channel index), or -1 if not present.
func rtspParseInterleaved(transport string) int {
lower := strings.ToLower(transport)
idx := strings.Index(lower, "interleaved=")
if idx < 0 {
return -1
}
rest := transport[idx+12:]
parts := strings.SplitN(rest, "-", 2)
n, err := strconv.Atoi(strings.TrimSpace(parts[0]))
if err != nil {
return -1
}
return n
}

401
whip/rtsp_relay_test.go Normal file
View File

@ -0,0 +1,401 @@
package whip
import (
"bufio"
"fmt"
"net"
"strings"
"testing"
"time"
)
// dialRTSP dials the relay and returns a bufio.Reader + the raw conn.
func dialRTSP(t *testing.T, addr string) (net.Conn, *bufio.Reader) {
t.Helper()
conn, err := net.DialTimeout("tcp", addr, 3*time.Second)
if err != nil {
t.Fatalf("RTSP dial %s: %v", addr, err)
}
return conn, bufio.NewReader(conn)
}
// sendRTSP writes an RTSP request and returns the parsed response status line + headers.
func sendRTSP(t *testing.T, conn net.Conn, rdr *bufio.Reader, request string) (status string, headers map[string]string) {
t.Helper()
conn.SetWriteDeadline(time.Now().Add(3 * time.Second))
if _, err := conn.Write([]byte(request)); err != nil {
t.Fatalf("write RTSP request: %v", err)
}
conn.SetWriteDeadline(time.Time{})
headers = make(map[string]string)
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
line, err := rdr.ReadString('\n')
conn.SetReadDeadline(time.Time{})
if err != nil {
t.Fatalf("read RTSP status: %v", err)
}
status = strings.TrimRight(line, "\r\n")
for {
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
hdr, err := rdr.ReadString('\n')
conn.SetReadDeadline(time.Time{})
if err != nil {
t.Fatalf("read RTSP header: %v", err)
}
hdr = strings.TrimRight(hdr, "\r\n")
if hdr == "" {
break
}
idx := strings.IndexByte(hdr, ':')
if idx >= 0 {
k := strings.ToLower(strings.TrimSpace(hdr[:idx]))
v := strings.TrimSpace(hdr[idx+1:])
headers[k] = v
}
}
// If there's a body (Content-Length), drain it.
if cl, ok := headers["content-length"]; ok {
n := 0
fmt.Sscanf(cl, "%d", &n)
buf := make([]byte, n)
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
for read := 0; read < n; {
nn, err := rdr.Read(buf[read:])
if err != nil {
t.Fatalf("drain body: %v", err)
}
read += nn
}
conn.SetReadDeadline(time.Time{})
headers["__body__"] = string(buf)
}
return status, headers
}
// readInterleaved reads one RFC 2326 §10.12 interleaved binary frame.
// Returns channel index and payload.
func readInterleaved(t *testing.T, conn net.Conn, rdr *bufio.Reader) (channel int, payload []byte) {
t.Helper()
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
defer conn.SetReadDeadline(time.Time{})
// '$' marker
b, err := rdr.ReadByte()
if err != nil {
t.Fatalf("readInterleaved: read '$': %v", err)
}
if b != '$' {
t.Fatalf("readInterleaved: expected '$', got %02x", b)
}
ch, err := rdr.ReadByte()
if err != nil {
t.Fatalf("readInterleaved: read channel: %v", err)
}
hi, err := rdr.ReadByte()
if err != nil {
t.Fatalf("readInterleaved: read len hi: %v", err)
}
lo, err := rdr.ReadByte()
if err != nil {
t.Fatalf("readInterleaved: read len lo: %v", err)
}
length := int(hi)<<8 | int(lo)
payload = make([]byte, length)
for read := 0; read < length; {
nn, err := rdr.Read(payload[read:])
if err != nil {
t.Fatalf("readInterleaved: read payload: %v", err)
}
read += nn
}
return int(ch), payload
}
// freePort returns an unused TCP port on localhost.
func freePort(t *testing.T) string {
t.Helper()
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("freePort: %v", err)
}
addr := ln.Addr().String()
ln.Close()
return addr
}
// TestRTSPRelayOptions verifies OPTIONS returns the correct Public header.
func TestRTSPRelayOptions(t *testing.T) {
addr := freePort(t)
relay := newRTSPRelay(addr, nil)
go relay.ListenAndServe() //nolint:errcheck
defer relay.Close()
time.Sleep(30 * time.Millisecond)
conn, rdr := dialRTSP(t, addr)
defer conn.Close()
status, hdrs := sendRTSP(t, conn, rdr,
"OPTIONS rtsp://"+addr+"/live/test RTSP/1.0\r\nCSeq: 1\r\n\r\n")
if !strings.Contains(status, "200") {
t.Fatalf("OPTIONS: expected 200, got %q", status)
}
pub := hdrs["public"]
for _, m := range []string{"OPTIONS", "DESCRIBE", "SETUP", "PLAY", "TEARDOWN"} {
if !strings.Contains(pub, m) {
t.Errorf("OPTIONS Public missing %q: %q", m, pub)
}
}
}
// TestRTSPRelayDescribeDefaultCodecs verifies DESCRIBE returns valid SDP with default codecs.
func TestRTSPRelayDescribeDefaultCodecs(t *testing.T) {
addr := freePort(t)
relay := newRTSPRelay(addr, nil)
go relay.ListenAndServe() //nolint:errcheck
defer relay.Close()
time.Sleep(30 * time.Millisecond)
conn, rdr := dialRTSP(t, addr)
defer conn.Close()
sendRTSP(t, conn, rdr,
"OPTIONS rtsp://"+addr+"/live/mystream RTSP/1.0\r\nCSeq: 1\r\n\r\n")
_, hdrs := sendRTSP(t, conn, rdr,
"DESCRIBE rtsp://"+addr+"/live/mystream RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n")
if hdrs["content-type"] != "application/sdp" {
t.Fatalf("DESCRIBE: expected content-type application/sdp, got %q", hdrs["content-type"])
}
body := hdrs["__body__"]
for _, want := range []string{"H264", "opus", "a=control:track0", "a=control:track1"} {
if !strings.Contains(body, want) {
t.Errorf("DESCRIBE SDP missing %q:\n%s", want, body)
}
}
}
// TestRTSPRelayDescribeUpdatedCodecs verifies that UpdateCodecs changes the DESCRIBE SDP.
func TestRTSPRelayDescribeUpdatedCodecs(t *testing.T) {
addr := freePort(t)
relay := newRTSPRelay(addr, nil)
go relay.ListenAndServe() //nolint:errcheck
defer relay.Close()
time.Sleep(30 * time.Millisecond)
relay.UpdateCodecs("cam1", RTSPCodecParams{
VideoPayloadType: 96,
VideoCodec: "H264",
VideoClockRate: 90000,
VideoFmtp: "sprop-parameter-sets=abc;packetization-mode=1",
AudioPayloadType: 111,
AudioCodec: "opus",
AudioClockRate: 48000,
AudioChannels: 2,
})
conn, rdr := dialRTSP(t, addr)
defer conn.Close()
sendRTSP(t, conn, rdr,
"OPTIONS rtsp://"+addr+"/live/cam1 RTSP/1.0\r\nCSeq: 1\r\n\r\n")
_, hdrs := sendRTSP(t, conn, rdr,
"DESCRIBE rtsp://"+addr+"/live/cam1 RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n")
body := hdrs["__body__"]
if !strings.Contains(body, "sprop-parameter-sets=abc") {
t.Errorf("DESCRIBE SDP should contain updated fmtp:\n%s", body)
}
}
// TestRTSPRelayDataDelivery is the full E2E test:
// 1. relay starts
// 2. RTSP client connects (OPTIONS → DESCRIBE → SETUP video → SETUP audio → PLAY)
// 3. WriteVideo + WriteAudio inject fake RTP packets
// 4. client reads the interleaved frames and verifies channel/payload match
func TestRTSPRelayDataDelivery(t *testing.T) {
addr := freePort(t)
relay := newRTSPRelay(addr, nil)
go relay.ListenAndServe() //nolint:errcheck
defer relay.Close()
time.Sleep(30 * time.Millisecond)
streamName := "obs-test"
relay.UpdateCodecs(streamName, RTSPCodecParams{
VideoPayloadType: 96,
VideoCodec: "H264",
VideoClockRate: 90000,
AudioPayloadType: 111,
AudioCodec: "opus",
AudioClockRate: 48000,
AudioChannels: 2,
})
base := "rtsp://" + addr + "/live/" + streamName
conn, rdr := dialRTSP(t, addr)
defer conn.Close()
// OPTIONS
status, _ := sendRTSP(t, conn, rdr,
fmt.Sprintf("OPTIONS %s RTSP/1.0\r\nCSeq: 1\r\n\r\n", base))
if !strings.Contains(status, "200") {
t.Fatalf("OPTIONS: %s", status)
}
// DESCRIBE
status, _ = sendRTSP(t, conn, rdr,
fmt.Sprintf("DESCRIBE %s RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n", base))
if !strings.Contains(status, "200") {
t.Fatalf("DESCRIBE: %s", status)
}
// SETUP video interleaved channels 0-1
status, _ = sendRTSP(t, conn, rdr,
fmt.Sprintf("SETUP %s/track0 RTSP/1.0\r\nCSeq: 3\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1;mode=play\r\n\r\n", base))
if !strings.Contains(status, "200") {
t.Fatalf("SETUP video: %s", status)
}
// SETUP audio interleaved channels 2-3
status, _ = sendRTSP(t, conn, rdr,
fmt.Sprintf("SETUP %s/track1 RTSP/1.0\r\nCSeq: 4\r\nTransport: RTP/AVP/TCP;unicast;interleaved=2-3;mode=play\r\n\r\n", base))
if !strings.Contains(status, "200") {
t.Fatalf("SETUP audio: %s", status)
}
// PLAY
status, _ = sendRTSP(t, conn, rdr,
fmt.Sprintf("PLAY %s RTSP/1.0\r\nCSeq: 5\r\nSession: ignored\r\n\r\n", base))
if !strings.Contains(status, "200") {
t.Fatalf("PLAY: %s", status)
}
// Inject a video RTP packet (fake but valid minimal structure).
videoRTP := buildFakeRTP(96, 1000, 90000)
relay.WriteVideo(streamName, videoRTP)
ch, payload := readInterleaved(t, conn, rdr)
if ch != 0 {
t.Errorf("expected video on interleaved channel 0, got %d", ch)
}
if len(payload) != len(videoRTP) {
t.Errorf("video payload length: want %d got %d", len(videoRTP), len(payload))
}
// Inject an audio RTP packet.
audioRTP := buildFakeRTP(111, 2000, 48000)
relay.WriteAudio(streamName, audioRTP)
ch, payload = readInterleaved(t, conn, rdr)
if ch != 2 {
t.Errorf("expected audio on interleaved channel 2, got %d", ch)
}
if len(payload) != len(audioRTP) {
t.Errorf("audio payload length: want %d got %d", len(audioRTP), len(payload))
}
_ = payload
}
// TestRTSPRelayMultipleSubscribers verifies that two simultaneous clients both
// receive the same RTP packets.
func TestRTSPRelayMultipleSubscribers(t *testing.T) {
addr := freePort(t)
relay := newRTSPRelay(addr, nil)
go relay.ListenAndServe() //nolint:errcheck
defer relay.Close()
time.Sleep(30 * time.Millisecond)
streamName := "multi"
base := "rtsp://" + addr + "/live/" + streamName
connectAndPlay := func(t *testing.T) (net.Conn, *bufio.Reader) {
t.Helper()
c, r := dialRTSP(t, addr)
for _, req := range []string{
fmt.Sprintf("OPTIONS %s RTSP/1.0\r\nCSeq: 1\r\n\r\n", base),
fmt.Sprintf("DESCRIBE %s RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n", base),
fmt.Sprintf("SETUP %s/track0 RTSP/1.0\r\nCSeq: 3\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1;mode=play\r\n\r\n", base),
fmt.Sprintf("PLAY %s RTSP/1.0\r\nCSeq: 4\r\nSession: ignored\r\n\r\n", base),
} {
status, _ := sendRTSP(t, c, r, req)
if !strings.Contains(status, "200") {
t.Fatalf("setup failed: %s", status)
}
}
return c, r
}
conn1, rdr1 := connectAndPlay(t)
defer conn1.Close()
conn2, rdr2 := connectAndPlay(t)
defer conn2.Close()
pkt := buildFakeRTP(96, 5000, 90000)
relay.WriteVideo(streamName, pkt)
ch1, p1 := readInterleaved(t, conn1, rdr1)
ch2, p2 := readInterleaved(t, conn2, rdr2)
if ch1 != 0 || ch2 != 0 {
t.Errorf("expected channel 0 for both clients, got %d and %d", ch1, ch2)
}
if len(p1) != len(pkt) || len(p2) != len(pkt) {
t.Errorf("packet length mismatch: sent %d, got %d and %d", len(pkt), len(p1), len(p2))
}
}
// TestRTSPRelayTEARDOWN verifies that TEARDOWN is handled gracefully.
func TestRTSPRelayTEARDOWN(t *testing.T) {
addr := freePort(t)
relay := newRTSPRelay(addr, nil)
go relay.ListenAndServe() //nolint:errcheck
defer relay.Close()
time.Sleep(30 * time.Millisecond)
base := "rtsp://" + addr + "/live/tdtest"
conn, rdr := dialRTSP(t, addr)
defer conn.Close()
for cseq, req := range []string{
fmt.Sprintf("OPTIONS %s RTSP/1.0\r\nCSeq: 1\r\n\r\n", base),
fmt.Sprintf("DESCRIBE %s RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n", base),
fmt.Sprintf("SETUP %s/track0 RTSP/1.0\r\nCSeq: 3\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1;mode=play\r\n\r\n", base),
fmt.Sprintf("PLAY %s RTSP/1.0\r\nCSeq: 4\r\nSession: ignored\r\n\r\n", base),
} {
status, _ := sendRTSP(t, conn, rdr, req)
if !strings.Contains(status, "200") {
t.Fatalf("step %d: %s", cseq, status)
}
}
// Send TEARDOWN in-band (as an RTSP request mixed with interleaved).
conn.SetWriteDeadline(time.Now().Add(3 * time.Second))
conn.Write([]byte(fmt.Sprintf("TEARDOWN %s RTSP/1.0\r\nCSeq: 5\r\n\r\n", base))) //nolint:errcheck
conn.SetWriteDeadline(time.Time{})
// Server should reply 200.
status, _ := sendRTSP(t, conn, rdr, "") // already written above, just drain response
_ = status // Server closes after TEARDOWN; a 200 or EOF is both acceptable
}
// buildFakeRTP constructs a minimal 12-byte RTP header + 4-byte payload.
func buildFakeRTP(pt uint8, seq uint16, ts uint32) []byte {
pkt := make([]byte, 16)
pkt[0] = 0x80 // V=2, P=0, X=0, CC=0
pkt[1] = pt & 0x7f // M=0, PT
pkt[2] = byte(seq >> 8) // Seq hi
pkt[3] = byte(seq) // Seq lo
pkt[4] = byte(ts >> 24) // TS
pkt[5] = byte(ts >> 16)
pkt[6] = byte(ts >> 8)
pkt[7] = byte(ts)
pkt[8], pkt[9], pkt[10], pkt[11] = 0, 0, 0, 1 // SSRC = 1
pkt[12] = 0xde // fake payload
pkt[13] = 0xad
pkt[14] = 0xbe
pkt[15] = 0xef
return pkt
}

View File

@ -90,6 +90,13 @@ type Config struct {
// DTLS-SRTP for full browser/OBS WebRTC support. When nil, the server falls
// back to plain UDP RTP (no ICE, no encryption).
WebRTCProvider WebRTCProvider
// RTSPRelayAddr is the TCP listen address for the internal RTSP relay server,
// e.g. ":8554". When set, all WHIP streams are also served over RTSP at
// rtsp://127.0.0.1:<port>/live/<name>, allowing multiple FFmpeg egress
// processes to consume the same source independently.
// Leave empty to disable the RTSP relay.
RTSPRelayAddr string
}
// Server defines the WHIP server interface.
@ -116,6 +123,10 @@ type Channels struct {
type channel struct {
name string
// rtspRelay is the optional internal RTSP relay server. When non-nil, all
// received RTP packets are also forwarded to RTSP consumers.
rtspRelay *rtspRelay
// videoRelayPort / audioRelayPort: loopback UDP ports that FFmpeg reads from.
// Ports are pre-allocated by briefly binding to :0, then released so that
// FFmpeg (or any RTP consumer) can bind to them.
@ -268,6 +279,10 @@ func (ch *channel) startRx(serverIP string, provider WebRTCProvider, offerSDP st
// Parse codec parameters from the SDP offer (best-effort).
ch.updateCodecsFromSDP(offerSDP)
// Sync initial codec info to the RTSP relay so DESCRIBE works before SPS/PPS snoop.
if ch.rtspRelay != nil {
ch.rtspRelay.UpdateCodecs(ch.name, ch.currentCodecParams())
}
// Allocate rx sockets for the WHIP client to send RTP to.
ch.videoRx, err = net.ListenUDP("udp4", &net.UDPAddr{IP: net.ParseIP(serverIP), Port: 0})
@ -374,7 +389,7 @@ func (ch *channel) whepSubCount() int {
}
// videoOnPacket is called for each received video RTP packet.
// It runs SPS/PPS snoop and fans out to all WHEP subscribers.
// It runs SPS/PPS snoop, fans out to WHEP subscribers, and forwards to the RTSP relay.
func (ch *channel) videoOnPacket(pkt []byte) {
ch.snoopH264NALs(pkt)
ch.whepLock.RLock()
@ -386,10 +401,13 @@ func (ch *channel) videoOnPacket(pkt []byte) {
for _, s := range subs {
s.WriteVideo(pkt)
}
if ch.rtspRelay != nil {
ch.rtspRelay.WriteVideo(ch.name, pkt)
}
}
// audioOnPacket is called for each received audio RTP packet.
// It fans out to all WHEP subscribers.
// It fans out to WHEP subscribers and forwards to the RTSP relay.
func (ch *channel) audioOnPacket(pkt []byte) {
ch.whepLock.RLock()
subs := make([]whepSubscriber, 0, len(ch.whepSubs))
@ -400,6 +418,9 @@ func (ch *channel) audioOnPacket(pkt []byte) {
for _, s := range subs {
s.WriteAudio(pkt)
}
if ch.rtspRelay != nil {
ch.rtspRelay.WriteAudio(ch.name, pkt)
}
}
// isPublishing returns true when a WHIP client is currently sending to the channel.
@ -494,6 +515,11 @@ func (ch *channel) snoopH264NALs(pkt []byte) {
base64.StdEncoding.EncodeToString(ch.videoPPS) +
";packetization-mode=1"
ch.spropped = true
// Sync updated fmtp (with sprop-parameter-sets) to the RTSP relay
// so that DESCRIBE responses include the correct SPS/PPS.
if ch.rtspRelay != nil {
ch.rtspRelay.UpdateCodecs(ch.name, ch.currentCodecParams())
}
}
}
@ -645,6 +671,7 @@ type server struct {
logger log.Logger
collector session.Collector
provider WebRTCProvider
rtspRelay *rtspRelay
httpServer *http.Server
@ -667,6 +694,10 @@ func New(config Config) (Server, error) {
channels: make(map[string]*channel),
}
if config.RTSPRelayAddr != "" {
s.rtspRelay = newRTSPRelay(config.RTSPRelayAddr, config.Logger)
}
if s.logger == nil {
s.logger = log.New("")
}
@ -688,8 +719,15 @@ func New(config Config) (Server, error) {
return s, nil
}
// ListenAndServe starts the WHIP HTTP server.
// ListenAndServe starts the WHIP HTTP server and optionally the RTSP relay.
func (s *server) ListenAndServe() error {
if s.rtspRelay != nil {
go func() {
if err := s.rtspRelay.ListenAndServe(); err != nil {
s.logger.Error().WithField("error", err).Log("RTSP relay error")
}
}()
}
s.logger.Info().Log("Server started")
err := s.httpServer.ListenAndServe()
if err == http.ErrServerClosed {
@ -698,12 +736,16 @@ func (s *server) ListenAndServe() error {
return err
}
// Close shuts down the WHIP server.
// Close shuts down the WHIP server and the RTSP relay.
func (s *server) Close() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
s.httpServer.Shutdown(ctx)
if s.rtspRelay != nil {
s.rtspRelay.Close()
}
s.lock.Lock()
defer s.lock.Unlock()
@ -925,11 +967,28 @@ func (s *server) getOrCreateChannel(name string) *channel {
// Return a dummy channel rather than nil to avoid panics; it won't relay.
ch = &channel{name: name, whepSubs: make(map[string]whepSubscriber)}
}
// Attach the RTSP relay so the channel can forward packets to RTSP consumers.
ch.rtspRelay = s.rtspRelay
s.channels[name] = ch
}
return ch
}
// currentCodecParams returns the codec parameters for this channel as an RTSPCodecParams.
// Caller must hold ch.lock (at least read).
func (ch *channel) currentCodecParams() RTSPCodecParams {
return RTSPCodecParams{
VideoPayloadType: ch.videoPayloadType,
VideoCodec: ch.videoCodec,
VideoClockRate: ch.videoClockRate,
VideoFmtp: ch.videoFmtp,
AudioPayloadType: ch.audioPayloadType,
AudioCodec: ch.audioCodec,
AudioClockRate: ch.audioClockRate,
AudioChannels: ch.audioChannels,
}
}
// checkToken validates the bearer token from the Authorization header or ?token= query param.
func (s *server) checkToken(r *http.Request) bool {
if t := r.URL.Query().Get("token"); t != "" {
@ -972,15 +1031,21 @@ func (s *server) handleWHEP(w http.ResponseWriter, r *http.Request, name string)
ch := s.getOrCreateChannel(name)
ch.lock.RLock()
videoCodec := ch.videoCodec // e.g. "H264"
audioCodec := ch.audioCodec // e.g. "opus"
params := whepCodecParams{
VideoMime: "video/" + ch.videoCodec,
VideoClockRate: uint32(ch.videoClockRate),
VideoFmtp: ch.videoFmtp,
VideoPT: uint8(ch.videoPayloadType),
AudioMime: "audio/" + ch.audioCodec,
AudioClockRate: uint32(ch.audioClockRate),
AudioChannels: uint16(ch.audioChannels),
AudioPT: uint8(ch.audioPayloadType),
}
ch.lock.RUnlock()
subID := fmt.Sprintf("%x", time.Now().UnixNano())
videoMime := "video/" + videoCodec
audioMime := "audio/" + audioCodec
sess, answerSDP, err := newWHEPPionSession(subID, string(body), videoMime, audioMime)
sess, answerSDP, err := newWHEPPionSession(subID, string(body), params)
if err != nil {
s.logger.Error().WithField("stream", name).WithField("error", err).Log("Failed to create WHEP session")
http.Error(w, "failed to create WHEP session", http.StatusInternalServerError)