core/whip/whip.go
Cesar Mendivil c9304b7b63 feat: add RTSP relay support to WHIP server
- Introduced a new RTSP relay server that allows WHIP streams to be served over RTSP.
- Added `RTSPRelayAddr` configuration option to enable the RTSP relay.
- Implemented methods for updating codec parameters and forwarding RTP packets to RTSP clients.
- Enhanced the WHIP server to handle RTSP connections and manage multiple clients.
- Added comprehensive tests for RTSP relay functionality, including OPTIONS, DESCRIBE, SETUP, PLAY, and TEARDOWN requests.
2026-03-17 20:28:06 -07:00

1121 lines
33 KiB
Go

// Package whip provides a WHIP (WebRTC HTTP Ingestion Protocol, RFC 9328) server.
//
// The server listens on an HTTP port for WHIP publishing requests. Clients
// (OBS Studio 30+, GStreamer, FFmpeg, browsers) POST an SDP offer to
// /whip/{streamID}. The server allocates UDP receive sockets, responds with
// an SDP answer containing the server's RTP receive ports, and forwards the
// received RTP packets to internal relay sockets that FFmpeg reads via a
// dynamically-generated SDP served at /whip/{streamID}/sdp.
//
// Usage in a Restreamer process config (input address template):
//
// http://localhost:8555/whip/{name}/sdp
//
// This uses the {whip} template, which expands to the above URL pattern.
//
// # WebRTC / ICE / DTLS-SRTP
//
// This implementation uses plain UDP RTP without ICE negotiation or DTLS-SRTP
// encryption. It is compatible with encoders that support plain-RTP mode
// (e.g. FFmpeg with -f whip pointed at this server, GStreamer without DTLS).
//
// For full browser and OBS WHIP support (which requires ICE + DTLS-SRTP),
// inject a WebRTCProvider implementation (e.g. via github.com/pion/webrtc/v3)
// through Config.WebRTCProvider. When set, the provider handles ICE negotiation
// and SRTP decryption and delivers decrypted RTP to the server's relay layer.
package whip
import (
"context"
"encoding/base64"
"fmt"
"io"
"net"
"net/http"
"strings"
"sync"
"time"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/session"
)
// ErrServerClosed is returned by ListenAndServe when the server is closed via Close().
var ErrServerClosed = http.ErrServerClosed
// WebRTCProvider is an optional interface for full WebRTC (ICE + DTLS-SRTP) support.
// When provided, the server delegates SDP negotiation and media reception to the
// provider rather than using plain UDP RTP.
//
// To add browser/OBS WHIP support, implement this interface using pion/webrtc/v3:
//
// type pionProvider struct { ... }
// func (p *pionProvider) OpenSession(offerSDP string, videoPort, audioPort int) (answerSDP string, err error) { ... }
type WebRTCProvider interface {
// OpenSession performs ICE + DTLS-SRTP negotiation.
// offerSDP is the SDP offer from the client.
// videoPort and audioPort are the local UDP ports where the provider should
// deliver decrypted RTP after the handshake.
// Returns the SDP answer to send back to the client, or an error.
OpenSession(offerSDP string, videoPort, audioPort int) (answerSDP string, err error)
}
// whepSubscriber is implemented by each active WHEP subscriber session.
// WriteVideo/WriteAudio receive plain RTP packets from the publisher relay and
// forward them to the remote browser via a separate DTLS-SRTP connection.
type whepSubscriber interface {
ID() string
WriteVideo([]byte)
WriteAudio([]byte)
Done() <-chan struct{}
Close()
}
// Config is the configuration for the WHIP server.
type Config struct {
// Addr is the HTTP listen address for the WHIP endpoint, e.g. ":8555".
Addr string
// Token is an optional bearer token required from WHIP clients.
// If empty, no authentication is required.
Token string
// Logger is optional.
Logger log.Logger
// Collector is optional. Used for session statistics.
Collector session.Collector
// WebRTCProvider is optional. When provided, it handles ICE negotiation and
// DTLS-SRTP for full browser/OBS WebRTC support. When nil, the server falls
// back to plain UDP RTP (no ICE, no encryption).
WebRTCProvider WebRTCProvider
// RTSPRelayAddr is the TCP listen address for the internal RTSP relay server,
// e.g. ":8554". When set, all WHIP streams are also served over RTSP at
// rtsp://127.0.0.1:<port>/live/<name>, allowing multiple FFmpeg egress
// processes to consume the same source independently.
// Leave empty to disable the RTSP relay.
RTSPRelayAddr string
}
// Server defines the WHIP server interface.
type Server interface {
// ListenAndServe starts the WHIP HTTP server. Blocks until Close() is called.
ListenAndServe() error
// Close shuts down the server and releases all resources.
Close()
// Channels returns the currently active (publishing) WHIP streams.
Channels() Channels
}
// Channels describes the active WHIP streams.
type Channels struct {
// Publisher maps stream name to its publish start time.
Publisher map[string]time.Time
// Subscribers maps stream name to the number of active WHEP subscribers.
Subscribers map[string]int
}
// channel holds the relay sockets and state for a single WHIP stream.
type channel struct {
name string
// rtspRelay is the optional internal RTSP relay server. When non-nil, all
// received RTP packets are also forwarded to RTSP consumers.
rtspRelay *rtspRelay
// videoRelayPort / audioRelayPort: loopback UDP ports that FFmpeg reads from.
// Ports are pre-allocated by briefly binding to :0, then released so that
// FFmpeg (or any RTP consumer) can bind to them.
videoRelayPort int
audioRelayPort int
// videoRx / audioRx: UDP sockets that receive RTP from the WHIP client.
// These are present only while a client is actively publishing.
videoRx *net.UDPConn
audioRx *net.UDPConn
// codec parameters negotiated from the SDP offer.
videoPayloadType int
videoCodec string
videoClockRate int
videoFmtp string // extra fmtp params from offer (e.g. sprop-parameter-sets)
// SPS/PPS captured from in-band RTP stream (populated after first keyframe).
videoSPS []byte
videoPPS []byte
spropped bool // true once sprop-parameter-sets has been built from in-band NALs
audioPayloadType int
audioCodec string
audioClockRate int
audioChannels int
publishedAt time.Time
collector session.Collector
rxCancel context.CancelFunc
lock sync.RWMutex
// WHEP fan-out: active browser subscribers receiving this stream via WebRTC.
whepSubs map[string]whepSubscriber
whepLock sync.RWMutex
}
func newChannel(name string, collector session.Collector) (*channel, error) {
ch := &channel{
name: name,
collector: collector,
// Defaults matching the most common WebRTC codecs.
videoPayloadType: 96,
videoCodec: "H264",
videoClockRate: 90000,
audioPayloadType: 111,
audioCodec: "opus",
audioClockRate: 48000,
audioChannels: 2,
}
// Pre-allocate relay port numbers. We bind briefly to get an ephemeral port
// assigned by the OS, then immediately close so FFmpeg can bind to that port.
vp, err := allocateRelayPort()
if err != nil {
return nil, fmt.Errorf("whip: allocate video relay port: %w", err)
}
ch.videoRelayPort = vp
ap, err := allocateRelayPort()
if err != nil {
return nil, fmt.Errorf("whip: allocate audio relay port: %w", err)
}
ch.audioRelayPort = ap
ch.whepSubs = make(map[string]whepSubscriber)
return ch, nil
}
// allocateRelayPort binds a UDP socket on a random loopback port, records the
// port number, then immediately closes the socket so that an external process
// (FFmpeg) can bind to the same port shortly afterwards.
func allocateRelayPort() (int, error) {
conn, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0})
if err != nil {
return 0, err
}
port := conn.LocalAddr().(*net.UDPAddr).Port
conn.Close()
return port, nil
}
// relayVideoPort returns the relay port for the video RTP stream (FFmpeg reads this).
func (ch *channel) relayVideoPort() int {
return ch.videoRelayPort
}
// relayAudioPort returns the relay port for the audio RTP stream (FFmpeg reads this).
func (ch *channel) relayAudioPort() int {
return ch.audioRelayPort
}
// ffmpegSDP returns the SDP describing the relay streams for FFmpeg to consume.
func (ch *channel) ffmpegSDP() string {
ch.lock.RLock()
vpt := ch.videoPayloadType
vpc := ch.videoCodec
vclk := ch.videoClockRate
vfmtp := ch.videoFmtp
apt := ch.audioPayloadType
apc := ch.audioCodec
aclk := ch.audioClockRate
ach := ch.audioChannels
ch.lock.RUnlock()
videoPort := ch.relayVideoPort()
audioPort := ch.relayAudioPort()
sdp := "v=0\r\n"
sdp += "o=- 0 0 IN IP4 127.0.0.1\r\n"
sdp += "s=WHIP Stream\r\n"
sdp += "c=IN IP4 127.0.0.1\r\n"
sdp += "t=0 0\r\n"
sdp += fmt.Sprintf("m=video %d RTP/AVP %d\r\n", videoPort, vpt)
sdp += fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", vpt, vpc, vclk)
if vfmtp != "" {
sdp += fmt.Sprintf("a=fmtp:%d %s\r\n", vpt, vfmtp)
} else if vpc == "H264" {
sdp += fmt.Sprintf("a=fmtp:%d packetization-mode=1\r\n", vpt)
}
sdp += fmt.Sprintf("m=audio %d RTP/AVP %d\r\n", audioPort, apt)
if ach > 1 {
sdp += fmt.Sprintf("a=rtpmap:%d %s/%d/%d\r\n", apt, apc, aclk, ach)
} else {
sdp += fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", apt, apc, aclk)
}
return sdp
}
// startRx allocates receive sockets, starts the relay goroutines, and begins
// forwarding incoming RTP packets from the WHIP client to the relay sockets.
// It returns the SDP answer to send to the WHIP client.
func (ch *channel) startRx(serverIP string, provider WebRTCProvider, offerSDP string) (answerSDP string, videoRxPort, audioRxPort int, err error) {
ch.lock.Lock()
defer ch.lock.Unlock()
// Stop any previous rx session.
if ch.rxCancel != nil {
ch.rxCancel()
}
if ch.videoRx != nil {
ch.videoRx.Close()
}
if ch.audioRx != nil {
ch.audioRx.Close()
}
// Parse codec parameters from the SDP offer (best-effort).
ch.updateCodecsFromSDP(offerSDP)
// Sync initial codec info to the RTSP relay so DESCRIBE works before SPS/PPS snoop.
if ch.rtspRelay != nil {
ch.rtspRelay.UpdateCodecs(ch.name, ch.currentCodecParams())
}
// Allocate rx sockets for the WHIP client to send RTP to.
ch.videoRx, err = net.ListenUDP("udp4", &net.UDPAddr{IP: net.ParseIP(serverIP), Port: 0})
if err != nil {
return "", 0, 0, fmt.Errorf("whip: allocate video rx socket: %w", err)
}
ch.audioRx, err = net.ListenUDP("udp4", &net.UDPAddr{IP: net.ParseIP(serverIP), Port: 0})
if err != nil {
ch.videoRx.Close()
ch.videoRx = nil
return "", 0, 0, fmt.Errorf("whip: allocate audio rx socket: %w", err)
}
videoRxPort = ch.videoRx.LocalAddr().(*net.UDPAddr).Port
audioRxPort = ch.audioRx.LocalAddr().(*net.UDPAddr).Port
ctx, cancel := context.WithCancel(context.Background())
ch.rxCancel = cancel
ch.publishedAt = time.Now()
// Build the SDP answer for the WHIP client.
if provider != nil {
answerSDP, err = provider.OpenSession(offerSDP, videoRxPort, audioRxPort)
if err != nil {
cancel()
ch.videoRx.Close()
ch.audioRx.Close()
ch.videoRx = nil
ch.audioRx = nil
return "", 0, 0, fmt.Errorf("whip: WebRTC provider: %w", err)
}
} else {
answerSDP = ch.buildPlainRTPAnswer(serverIP, videoRxPort, audioRxPort)
}
// Start relay goroutines: forward RTP to FFmpeg relay port and fan out to WHEP subscribers.
go ch.relayUDP(ctx, ch.videoRx, ch.videoRelayPort, ch.videoOnPacket)
go ch.relayUDP(ctx, ch.audioRx, ch.audioRelayPort, ch.audioOnPacket)
// Track session bandwidth ingress.
if ch.collector.IsCollectableIP(serverIP) {
ch.collector.RegisterAndActivate(ch.name, ch.name, "publish:"+ch.name, serverIP)
}
return answerSDP, videoRxPort, audioRxPort, nil
}
// stopRx tears down the receive side of the channel.
func (ch *channel) stopRx() {
ch.lock.Lock()
defer ch.lock.Unlock()
if ch.rxCancel != nil {
ch.rxCancel()
ch.rxCancel = nil
}
if ch.videoRx != nil {
ch.videoRx.Close()
ch.videoRx = nil
}
if ch.audioRx != nil {
ch.audioRx.Close()
ch.audioRx = nil
}
ch.publishedAt = time.Time{}
// Reset SPS/PPS snoop so next publisher gets fresh capture.
ch.videoSPS = nil
ch.videoPPS = nil
ch.spropped = false
}
// close releases all resources held by the channel.
func (ch *channel) close() {
ch.stopRx()
// Close all active WHEP subscribers.
ch.whepLock.Lock()
for id, sub := range ch.whepSubs {
sub.Close()
delete(ch.whepSubs, id)
}
ch.whepLock.Unlock()
}
// addWHEPSub registers a WHEP subscriber for this channel.
func (ch *channel) addWHEPSub(sub whepSubscriber) {
ch.whepLock.Lock()
ch.whepSubs[sub.ID()] = sub
ch.whepLock.Unlock()
}
// removeWHEPSub removes a WHEP subscriber by ID.
func (ch *channel) removeWHEPSub(id string) {
ch.whepLock.Lock()
delete(ch.whepSubs, id)
ch.whepLock.Unlock()
}
// whepSubCount returns the number of active WHEP subscribers.
func (ch *channel) whepSubCount() int {
ch.whepLock.RLock()
defer ch.whepLock.RUnlock()
return len(ch.whepSubs)
}
// videoOnPacket is called for each received video RTP packet.
// It runs SPS/PPS snoop, fans out to WHEP subscribers, and forwards to the RTSP relay.
func (ch *channel) videoOnPacket(pkt []byte) {
ch.snoopH264NALs(pkt)
ch.whepLock.RLock()
subs := make([]whepSubscriber, 0, len(ch.whepSubs))
for _, s := range ch.whepSubs {
subs = append(subs, s)
}
ch.whepLock.RUnlock()
for _, s := range subs {
s.WriteVideo(pkt)
}
if ch.rtspRelay != nil {
ch.rtspRelay.WriteVideo(ch.name, pkt)
}
}
// audioOnPacket is called for each received audio RTP packet.
// It fans out to WHEP subscribers and forwards to the RTSP relay.
func (ch *channel) audioOnPacket(pkt []byte) {
ch.whepLock.RLock()
subs := make([]whepSubscriber, 0, len(ch.whepSubs))
for _, s := range ch.whepSubs {
subs = append(subs, s)
}
ch.whepLock.RUnlock()
for _, s := range subs {
s.WriteAudio(pkt)
}
if ch.rtspRelay != nil {
ch.rtspRelay.WriteAudio(ch.name, pkt)
}
}
// isPublishing returns true when a WHIP client is currently sending to the channel.
func (ch *channel) isPublishing() bool {
ch.lock.RLock()
defer ch.lock.RUnlock()
return !ch.publishedAt.IsZero()
}
// publishedSince returns when the current publisher started.
func (ch *channel) publishedSince() time.Time {
ch.lock.RLock()
defer ch.lock.RUnlock()
return ch.publishedAt
}
// relayUDP reads RTP packets from rx and forwards them to relayPort on loopback.
// FFmpeg (or any RTP consumer) is expected to be listening on relayPort.
// onPacket, if non-nil, is called with a copy of each RTP packet before forwarding.
func (ch *channel) relayUDP(ctx context.Context, rx *net.UDPConn, relayPort int, onPacket func([]byte)) {
buf := make([]byte, 1500)
// Send to loopback so FFmpeg (listening on relayPort) receives.
dst := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: relayPort}
sendConn, err := net.DialUDP("udp4", nil, dst)
if err != nil {
return
}
defer sendConn.Close()
for {
select {
case <-ctx.Done():
return
default:
}
rx.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
n, _, err := rx.ReadFromUDP(buf)
if err != nil {
if isNetTimeout(err) {
continue
}
return
}
sendConn.Write(buf[:n])
if onPacket != nil {
pkt := make([]byte, n)
copy(pkt, buf[:n])
go onPacket(pkt)
}
}
}
// snoopH264NALs inspects an RTP packet for H.264 SPS (type 7) and PPS (type 8)
// NAL units. Once both are found, it updates ch.videoFmtp with sprop-parameter-sets
// so that FFmpeg can determine the video resolution from the relay SDP.
func (ch *channel) snoopH264NALs(pkt []byte) {
ch.lock.RLock()
done := ch.spropped
ch.lock.RUnlock()
if done {
return
}
nals := parseH264NALsFromRTP(pkt)
if len(nals) == 0 {
return
}
ch.lock.Lock()
defer ch.lock.Unlock()
if ch.spropped {
return
}
for _, nal := range nals {
if len(nal) == 0 {
continue
}
nalType := nal[0] & 0x1f
if nalType == 7 && ch.videoSPS == nil {
ch.videoSPS = nal
} else if nalType == 8 && ch.videoPPS == nil {
ch.videoPPS = nal
}
}
if ch.videoSPS != nil && ch.videoPPS != nil {
ch.videoFmtp = "sprop-parameter-sets=" +
base64.StdEncoding.EncodeToString(ch.videoSPS) + "," +
base64.StdEncoding.EncodeToString(ch.videoPPS) +
";packetization-mode=1"
ch.spropped = true
// Sync updated fmtp (with sprop-parameter-sets) to the RTSP relay
// so that DESCRIBE responses include the correct SPS/PPS.
if ch.rtspRelay != nil {
ch.rtspRelay.UpdateCodecs(ch.name, ch.currentCodecParams())
}
}
}
// parseH264NALsFromRTP returns the H.264 NAL units contained in a raw RTP packet.
// It handles Single NAL unit packets and STAP-A aggregation packets.
// FU-A fragmented packets are skipped (SPS/PPS are never fragmented in practice).
func parseH264NALsFromRTP(rtpPkt []byte) [][]byte {
if len(rtpPkt) < 13 {
return nil
}
// Skip fixed 12-byte RTP header + optional CSRC list.
cc := int(rtpPkt[0] & 0x0f)
offset := 12 + cc*4
// Skip RTP extension header if present.
if rtpPkt[0]&0x10 != 0 {
if offset+4 > len(rtpPkt) {
return nil
}
extLen := int(rtpPkt[offset+2])<<8 | int(rtpPkt[offset+3])
offset += 4 + extLen*4
}
if offset >= len(rtpPkt) {
return nil
}
payload := rtpPkt[offset:]
nalType := payload[0] & 0x1f
switch {
case nalType >= 1 && nalType <= 23: // Single NAL unit packet
return [][]byte{payload}
case nalType == 24: // STAP-A
var nals [][]byte
pos := 1
for pos+2 <= len(payload) {
size := int(payload[pos])<<8 | int(payload[pos+1])
pos += 2
if size == 0 || pos+size > len(payload) {
break
}
nals = append(nals, payload[pos:pos+size])
pos += size
}
return nals
}
// FU-A and other types: ignore
return nil
}
// buildPlainRTPAnswer constructs a minimal SDP answer for plain (non-DTLS) RTP.
func (ch *channel) buildPlainRTPAnswer(serverIP string, videoPort, audioPort int) string {
sdp := "v=0\r\n"
sdp += "o=- 0 0 IN IP4 " + serverIP + "\r\n"
sdp += "s=WHIP Answer\r\n"
sdp += "c=IN IP4 " + serverIP + "\r\n"
sdp += "t=0 0\r\n"
sdp += fmt.Sprintf("m=video %d RTP/AVP %d\r\n", videoPort, ch.videoPayloadType)
sdp += fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", ch.videoPayloadType, ch.videoCodec, ch.videoClockRate)
sdp += "a=recvonly\r\n"
sdp += fmt.Sprintf("m=audio %d RTP/AVP %d\r\n", audioPort, ch.audioPayloadType)
if ch.audioChannels > 1 {
sdp += fmt.Sprintf("a=rtpmap:%d %s/%d/%d\r\n", ch.audioPayloadType, ch.audioCodec, ch.audioClockRate, ch.audioChannels)
} else {
sdp += fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", ch.audioPayloadType, ch.audioCodec, ch.audioClockRate)
}
sdp += "a=recvonly\r\n"
return sdp
}
// updateCodecsFromSDP performs a best-effort parse of the SDP offer to extract
// the first video and audio payload types and codec names.
func (ch *channel) updateCodecsFromSDP(sdp string) {
type mediaState struct {
inVideo bool
inAudio bool
}
var ms mediaState
for _, line := range strings.Split(sdp, "\n") {
line = strings.TrimRight(line, "\r")
if strings.HasPrefix(line, "m=video") {
ms.inVideo = true
ms.inAudio = false
// Extract payload type from "m=video 9 ... 96 97 ..."
parts := strings.Fields(line)
if len(parts) >= 4 {
var pt int
if _, err := fmt.Sscanf(parts[3], "%d", &pt); err == nil {
ch.videoPayloadType = pt
}
}
} else if strings.HasPrefix(line, "m=audio") {
ms.inAudio = true
ms.inVideo = false
parts := strings.Fields(line)
if len(parts) >= 4 {
var pt int
if _, err := fmt.Sscanf(parts[3], "%d", &pt); err == nil {
ch.audioPayloadType = pt
}
}
} else if strings.HasPrefix(line, "a=fmtp:") {
// Format: a=fmtp:<pt> <params>
var pt int
if _, err := fmt.Sscanf(line, "a=fmtp:%d ", &pt); err == nil {
params := strings.SplitN(line, " ", 2)
if len(params) == 2 && ms.inVideo && pt == ch.videoPayloadType {
ch.videoFmtp = params[1]
}
}
} else if strings.HasPrefix(line, "a=rtpmap:") {
// Format: a=rtpmap:<pt> <codec>/<clockrate>[/<channels>]
var pt int
var rest string
if _, err := fmt.Sscanf(line, "a=rtpmap:%d %s", &pt, &rest); err != nil {
continue
}
parts := strings.Split(rest, "/")
codec := parts[0]
var clk int
if len(parts) >= 2 {
fmt.Sscanf(parts[1], "%d", &clk)
}
var channels int = 1
if len(parts) >= 3 {
fmt.Sscanf(parts[2], "%d", &channels)
}
if ms.inVideo && pt == ch.videoPayloadType {
ch.videoCodec = codec
if clk > 0 {
ch.videoClockRate = clk
}
} else if ms.inAudio && pt == ch.audioPayloadType {
ch.audioCodec = codec
if clk > 0 {
ch.audioClockRate = clk
}
ch.audioChannels = channels
}
}
}
}
// --- Server implementation ---
type server struct {
addr string
token string
logger log.Logger
collector session.Collector
provider WebRTCProvider
rtspRelay *rtspRelay
httpServer *http.Server
channels map[string]*channel
lock sync.RWMutex
}
// New creates a new WHIP server from the given config.
func New(config Config) (Server, error) {
if config.Addr == "" {
return nil, fmt.Errorf("whip: listen address is required")
}
s := &server{
addr: config.Addr,
token: config.Token,
logger: config.Logger,
collector: config.Collector,
provider: config.WebRTCProvider,
channels: make(map[string]*channel),
}
if config.RTSPRelayAddr != "" {
s.rtspRelay = newRTSPRelay(config.RTSPRelayAddr, config.Logger)
}
if s.logger == nil {
s.logger = log.New("")
}
if s.collector == nil {
s.collector = session.NewNullCollector()
}
mux := http.NewServeMux()
mux.HandleFunc("/whip/", s.dispatch)
s.httpServer = &http.Server{
Addr: s.addr,
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
}
return s, nil
}
// ListenAndServe starts the WHIP HTTP server and optionally the RTSP relay.
func (s *server) ListenAndServe() error {
if s.rtspRelay != nil {
go func() {
if err := s.rtspRelay.ListenAndServe(); err != nil {
s.logger.Error().WithField("error", err).Log("RTSP relay error")
}
}()
}
s.logger.Info().Log("Server started")
err := s.httpServer.ListenAndServe()
if err == http.ErrServerClosed {
return ErrServerClosed
}
return err
}
// Close shuts down the WHIP server and the RTSP relay.
func (s *server) Close() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
s.httpServer.Shutdown(ctx)
if s.rtspRelay != nil {
s.rtspRelay.Close()
}
s.lock.Lock()
defer s.lock.Unlock()
for _, ch := range s.channels {
ch.close()
}
s.channels = make(map[string]*channel)
s.logger.Info().Log("Server closed")
}
// Channels returns the current active publishers and subscriber counts.
func (s *server) Channels() Channels {
s.lock.RLock()
defer s.lock.RUnlock()
c := Channels{
Publisher: make(map[string]time.Time),
Subscribers: make(map[string]int),
}
for name, ch := range s.channels {
if ch.isPublishing() {
c.Publisher[name] = ch.publishedSince()
}
if count := ch.whepSubCount(); count > 0 {
c.Subscribers[name] = count
}
}
return c
}
// dispatch routes incoming HTTP requests to the appropriate handler.
func (s *server) dispatch(w http.ResponseWriter, r *http.Request) {
// Trim "/whip/" prefix.
path := strings.TrimPrefix(r.URL.Path, "/whip/")
path = strings.TrimSuffix(path, "/")
// GET /whip/{name}/sdp → SDP for FFmpeg to consume the relay.
if strings.HasSuffix(path, "/sdp") && r.Method == http.MethodGet {
name := strings.TrimSuffix(path, "/sdp")
if !isValidStreamName(name) {
http.Error(w, "invalid stream name", http.StatusBadRequest)
return
}
s.handleSDPRead(w, r, name)
return
}
// OPTIONS: CORS preflight for browser WHEP clients.
if r.Method == http.MethodOptions {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
w.Header().Set("Access-Control-Allow-Methods", "POST, DELETE, OPTIONS")
w.Header().Set("Access-Control-Expose-Headers", "Location")
w.Header().Set("Access-Control-Max-Age", "86400")
w.WriteHeader(http.StatusNoContent)
return
}
// POST /whip/{name}/whep → WHEP subscribe (RFC draft-ietf-wish-whep).
if strings.HasSuffix(path, "/whep") && r.Method == http.MethodPost {
name := strings.TrimSuffix(path, "/whep")
if !isValidStreamName(name) {
http.Error(w, "invalid stream name", http.StatusBadRequest)
return
}
if s.token != "" && !s.checkToken(r) {
w.Header().Set("WWW-Authenticate", `Bearer realm="WHEP"`)
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
s.handleWHEP(w, r, name)
return
}
// DELETE /whip/{name}/whep/{subid} → WHEP unsubscribe.
if strings.Contains(path, "/whep/") && r.Method == http.MethodDelete {
if s.token != "" && !s.checkToken(r) {
w.Header().Set("WWW-Authenticate", `Bearer realm="WHEP"`)
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
parts := strings.SplitN(path, "/whep/", 2)
if len(parts) == 2 && isValidStreamName(parts[0]) {
s.handleWHEPDelete(w, r, parts[0], parts[1])
return
}
http.Error(w, "invalid path", http.StatusBadRequest)
return
}
// All other methods require token validation.
if s.token != "" && !s.checkToken(r) {
w.Header().Set("WWW-Authenticate", `Bearer realm="WHIP"`)
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
name := path
if !isValidStreamName(name) {
http.Error(w, "invalid stream name", http.StatusBadRequest)
return
}
switch r.Method {
case http.MethodPost:
s.handlePublish(w, r, name)
case http.MethodDelete:
s.handleDelete(w, r, name)
case http.MethodPatch:
// Trickle ICE (RFC 9328 §4.3) — respond 405 if not supported.
if s.provider == nil {
http.Error(w, "trickle ICE not supported in plain-RTP mode", http.StatusMethodNotAllowed)
return
}
http.Error(w, "trickle ICE not implemented", http.StatusNotImplemented)
default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
}
// handleSDPRead serves the SDP file that FFmpeg uses to read the WHIP relay stream.
// The relay sockets (and thus their ports) are pre-allocated here if the channel
// does not yet exist, so that FFmpeg can bind to them before a WHIP client arrives.
// If a publisher is active, this handler waits up to 5 seconds for in-band SPS/PPS
// to be captured so that FFmpeg can determine the video resolution immediately.
func (s *server) handleSDPRead(w http.ResponseWriter, r *http.Request, name string) {
ch := s.getOrCreateChannel(name)
// If a publisher is active, wait for SPS/PPS to be snooped from the RTP stream.
if ch.isPublishing() {
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
ch.lock.RLock()
done := ch.spropped
ch.lock.RUnlock()
if done {
break
}
time.Sleep(50 * time.Millisecond)
}
}
sdp := ch.ffmpegSDP()
w.Header().Set("Content-Type", "application/sdp")
w.Header().Set("Cache-Control", "no-cache")
w.WriteHeader(http.StatusOK)
io.WriteString(w, sdp)
}
// handlePublish handles the WHIP POST request (SDP offer/answer exchange).
func (s *server) handlePublish(w http.ResponseWriter, r *http.Request, name string) {
ct := r.Header.Get("Content-Type")
if !strings.Contains(ct, "application/sdp") {
http.Error(w, "Content-Type must be application/sdp", http.StatusUnsupportedMediaType)
return
}
body, err := io.ReadAll(io.LimitReader(r.Body, 16*1024))
if err != nil {
http.Error(w, "failed to read SDP offer", http.StatusBadRequest)
return
}
offerSDP := string(body)
// Identify the client IP so we can send RTP back on the same interface.
clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
serverIP := s.serverIPFor(clientIP)
ch := s.getOrCreateChannel(name)
answerSDP, _, _, err := ch.startRx(serverIP, s.provider, offerSDP)
if err != nil {
s.logger.Error().WithField("stream", name).WithField("error", err).Log("Failed to start WHIP session")
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
s.logger.Info().WithField("stream", name).WithField("client", clientIP).Log("WHIP publisher connected")
// Respond per RFC 9328: 201 Created with Location header and SDP answer.
location := "/whip/" + name
w.Header().Set("Content-Type", "application/sdp")
w.Header().Set("Location", location)
w.WriteHeader(http.StatusCreated)
io.WriteString(w, answerSDP)
}
// handleDelete ends a WHIP session (RFC 9328 §4.2).
func (s *server) handleDelete(w http.ResponseWriter, r *http.Request, name string) {
s.lock.RLock()
ch, ok := s.channels[name]
s.lock.RUnlock()
if !ok || !ch.isPublishing() {
http.Error(w, "stream not found", http.StatusNotFound)
return
}
ch.stopRx()
clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
s.logger.Info().WithField("stream", name).WithField("client", clientIP).Log("WHIP publisher disconnected")
w.WriteHeader(http.StatusOK)
}
// getOrCreateChannel returns the channel for the given name, creating it if needed.
func (s *server) getOrCreateChannel(name string) *channel {
s.lock.Lock()
defer s.lock.Unlock()
ch, ok := s.channels[name]
if !ok {
var err error
ch, err = newChannel(name, s.collector)
if err != nil {
s.logger.Error().WithField("stream", name).WithField("error", err).Log("Failed to create channel")
// Return a dummy channel rather than nil to avoid panics; it won't relay.
ch = &channel{name: name, whepSubs: make(map[string]whepSubscriber)}
}
// Attach the RTSP relay so the channel can forward packets to RTSP consumers.
ch.rtspRelay = s.rtspRelay
s.channels[name] = ch
}
return ch
}
// currentCodecParams returns the codec parameters for this channel as an RTSPCodecParams.
// Caller must hold ch.lock (at least read).
func (ch *channel) currentCodecParams() RTSPCodecParams {
return RTSPCodecParams{
VideoPayloadType: ch.videoPayloadType,
VideoCodec: ch.videoCodec,
VideoClockRate: ch.videoClockRate,
VideoFmtp: ch.videoFmtp,
AudioPayloadType: ch.audioPayloadType,
AudioCodec: ch.audioCodec,
AudioClockRate: ch.audioClockRate,
AudioChannels: ch.audioChannels,
}
}
// checkToken validates the bearer token from the Authorization header or ?token= query param.
func (s *server) checkToken(r *http.Request) bool {
if t := r.URL.Query().Get("token"); t != "" {
return t == s.token
}
auth := r.Header.Get("Authorization")
if strings.HasPrefix(auth, "Bearer ") {
return strings.TrimPrefix(auth, "Bearer ") == s.token
}
return false
}
// serverIPFor returns the server-side IP to use in SDP based on the client IP.
// For loopback clients, returns 127.0.0.1; for external clients returns 0.0.0.0
// (the OS will choose an appropriate local address when binding).
func (s *server) serverIPFor(clientIP string) string {
ip := net.ParseIP(clientIP)
if ip == nil || ip.IsLoopback() {
return "127.0.0.1"
}
return "0.0.0.0"
}
// handleWHEP handles a WHEP subscription request (RFC draft-ietf-wish-whep).
// The browser POSTs an SDP offer; the server creates a pion PeerConnection,
// adds sendonly tracks fed from the publisher relay, and returns the SDP answer.
func (s *server) handleWHEP(w http.ResponseWriter, r *http.Request, name string) {
ct := r.Header.Get("Content-Type")
if !strings.Contains(ct, "application/sdp") {
http.Error(w, "Content-Type must be application/sdp", http.StatusUnsupportedMediaType)
return
}
body, err := io.ReadAll(io.LimitReader(r.Body, 16*1024))
if err != nil {
http.Error(w, "failed to read SDP offer", http.StatusBadRequest)
return
}
ch := s.getOrCreateChannel(name)
ch.lock.RLock()
params := whepCodecParams{
VideoMime: "video/" + ch.videoCodec,
VideoClockRate: uint32(ch.videoClockRate),
VideoFmtp: ch.videoFmtp,
VideoPT: uint8(ch.videoPayloadType),
AudioMime: "audio/" + ch.audioCodec,
AudioClockRate: uint32(ch.audioClockRate),
AudioChannels: uint16(ch.audioChannels),
AudioPT: uint8(ch.audioPayloadType),
}
ch.lock.RUnlock()
subID := fmt.Sprintf("%x", time.Now().UnixNano())
sess, answerSDP, err := newWHEPPionSession(subID, string(body), params)
if err != nil {
s.logger.Error().WithField("stream", name).WithField("error", err).Log("Failed to create WHEP session")
http.Error(w, "failed to create WHEP session", http.StatusInternalServerError)
return
}
ch.addWHEPSub(sess)
clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
s.logger.Info().WithField("stream", name).WithField("subscriber", subID).WithField("client", clientIP).Log("WHEP subscriber connected")
// Clean up automatically when the pion session closes.
go func() {
<-sess.Done()
ch.removeWHEPSub(subID)
s.logger.Info().WithField("stream", name).WithField("subscriber", subID).Log("WHEP subscriber disconnected")
}()
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Expose-Headers", "Location")
w.Header().Set("Content-Type", "application/sdp")
w.Header().Set("Location", "/whip/"+name+"/whep/"+subID)
w.WriteHeader(http.StatusCreated)
io.WriteString(w, answerSDP)
}
// handleWHEPDelete ends a specific WHEP subscription.
func (s *server) handleWHEPDelete(w http.ResponseWriter, r *http.Request, name, subID string) {
s.lock.RLock()
ch, ok := s.channels[name]
s.lock.RUnlock()
if !ok {
http.Error(w, "stream not found", http.StatusNotFound)
return
}
ch.whepLock.Lock()
sub, ok := ch.whepSubs[subID]
if !ok {
ch.whepLock.Unlock()
http.Error(w, "subscriber not found", http.StatusNotFound)
return
}
delete(ch.whepSubs, subID)
ch.whepLock.Unlock()
sub.Close()
w.WriteHeader(http.StatusOK)
}
// isValidStreamName checks that a stream name contains only safe characters.
func isValidStreamName(name string) bool {
if name == "" {
return false
}
for _, c := range name {
if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
(c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.') {
return false
}
}
return true
}
// isNetTimeout returns true for Go net timeout errors.
func isNetTimeout(err error) bool {
if ne, ok := err.(net.Error); ok {
return ne.Timeout()
}
return false
}