- Introduced a new Dockerfile for building the WHIP server with Go and FFmpeg. - Implemented the WHIPHandler for managing WHIP publish sessions and generating URLs for clients. - Added pionProvider for handling WebRTC sessions using the pion/webrtc library. - Created the main WHIP server logic to handle SDP offers and manage active publishing streams. - Implemented HTTP handlers for publishing, deleting, and retrieving stream information. - Added support for SDP generation for FFmpeg consumption.
853 lines
24 KiB
Go
853 lines
24 KiB
Go
// Package whip provides a WHIP (WebRTC HTTP Ingestion Protocol, RFC 9328) server.
|
|
//
|
|
// The server listens on an HTTP port for WHIP publishing requests. Clients
|
|
// (OBS Studio 30+, GStreamer, FFmpeg, browsers) POST an SDP offer to
|
|
// /whip/{streamID}. The server allocates UDP receive sockets, responds with
|
|
// an SDP answer containing the server's RTP receive ports, and forwards the
|
|
// received RTP packets to internal relay sockets that FFmpeg reads via a
|
|
// dynamically-generated SDP served at /whip/{streamID}/sdp.
|
|
//
|
|
// Usage in a Restreamer process config (input address template):
|
|
//
|
|
// http://localhost:8555/whip/{name}/sdp
|
|
//
|
|
// This uses the {whip} template, which expands to the above URL pattern.
|
|
//
|
|
// # WebRTC / ICE / DTLS-SRTP
|
|
//
|
|
// This implementation uses plain UDP RTP without ICE negotiation or DTLS-SRTP
|
|
// encryption. It is compatible with encoders that support plain-RTP mode
|
|
// (e.g. FFmpeg with -f whip pointed at this server, GStreamer without DTLS).
|
|
//
|
|
// For full browser and OBS WHIP support (which requires ICE + DTLS-SRTP),
|
|
// inject a WebRTCProvider implementation (e.g. via github.com/pion/webrtc/v3)
|
|
// through Config.WebRTCProvider. When set, the provider handles ICE negotiation
|
|
// and SRTP decryption and delivers decrypted RTP to the server's relay layer.
|
|
package whip
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/datarhei/core/v16/log"
|
|
"github.com/datarhei/core/v16/session"
|
|
)
|
|
|
|
// ErrServerClosed is returned by ListenAndServe when the server is closed via Close().
|
|
var ErrServerClosed = http.ErrServerClosed
|
|
|
|
// WebRTCProvider is an optional interface for full WebRTC (ICE + DTLS-SRTP) support.
|
|
// When provided, the server delegates SDP negotiation and media reception to the
|
|
// provider rather than using plain UDP RTP.
|
|
//
|
|
// To add browser/OBS WHIP support, implement this interface using pion/webrtc/v3:
|
|
//
|
|
// type pionProvider struct { ... }
|
|
// func (p *pionProvider) OpenSession(offerSDP string, videoPort, audioPort int) (answerSDP string, err error) { ... }
|
|
type WebRTCProvider interface {
|
|
// OpenSession performs ICE + DTLS-SRTP negotiation.
|
|
// offerSDP is the SDP offer from the client.
|
|
// videoPort and audioPort are the local UDP ports where the provider should
|
|
// deliver decrypted RTP after the handshake.
|
|
// Returns the SDP answer to send back to the client, or an error.
|
|
OpenSession(offerSDP string, videoPort, audioPort int) (answerSDP string, err error)
|
|
}
|
|
|
|
// Config is the configuration for the WHIP server.
|
|
type Config struct {
|
|
// Addr is the HTTP listen address for the WHIP endpoint, e.g. ":8555".
|
|
Addr string
|
|
|
|
// Token is an optional bearer token required from WHIP clients.
|
|
// If empty, no authentication is required.
|
|
Token string
|
|
|
|
// Logger is optional.
|
|
Logger log.Logger
|
|
|
|
// Collector is optional. Used for session statistics.
|
|
Collector session.Collector
|
|
|
|
// WebRTCProvider is optional. When provided, it handles ICE negotiation and
|
|
// DTLS-SRTP for full browser/OBS WebRTC support. When nil, the server falls
|
|
// back to plain UDP RTP (no ICE, no encryption).
|
|
WebRTCProvider WebRTCProvider
|
|
}
|
|
|
|
// Server defines the WHIP server interface.
|
|
type Server interface {
|
|
// ListenAndServe starts the WHIP HTTP server. Blocks until Close() is called.
|
|
ListenAndServe() error
|
|
|
|
// Close shuts down the server and releases all resources.
|
|
Close()
|
|
|
|
// Channels returns the currently active (publishing) WHIP streams.
|
|
Channels() Channels
|
|
}
|
|
|
|
// Channels describes the active WHIP streams.
|
|
type Channels struct {
|
|
// Publisher maps stream name to its publish start time.
|
|
Publisher map[string]time.Time
|
|
}
|
|
|
|
// channel holds the relay sockets and state for a single WHIP stream.
|
|
type channel struct {
|
|
name string
|
|
|
|
// videoRelayPort / audioRelayPort: loopback UDP ports that FFmpeg reads from.
|
|
// Ports are pre-allocated by briefly binding to :0, then released so that
|
|
// FFmpeg (or any RTP consumer) can bind to them.
|
|
videoRelayPort int
|
|
audioRelayPort int
|
|
|
|
// videoRx / audioRx: UDP sockets that receive RTP from the WHIP client.
|
|
// These are present only while a client is actively publishing.
|
|
videoRx *net.UDPConn
|
|
audioRx *net.UDPConn
|
|
|
|
// codec parameters negotiated from the SDP offer.
|
|
videoPayloadType int
|
|
videoCodec string
|
|
videoClockRate int
|
|
videoFmtp string // extra fmtp params from offer (e.g. sprop-parameter-sets)
|
|
|
|
// SPS/PPS captured from in-band RTP stream (populated after first keyframe).
|
|
videoSPS []byte
|
|
videoPPS []byte
|
|
spropped bool // true once sprop-parameter-sets has been built from in-band NALs
|
|
|
|
audioPayloadType int
|
|
audioCodec string
|
|
audioClockRate int
|
|
audioChannels int
|
|
|
|
publishedAt time.Time
|
|
collector session.Collector
|
|
|
|
rxCancel context.CancelFunc
|
|
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
func newChannel(name string, collector session.Collector) (*channel, error) {
|
|
ch := &channel{
|
|
name: name,
|
|
collector: collector,
|
|
// Defaults matching the most common WebRTC codecs.
|
|
videoPayloadType: 96,
|
|
videoCodec: "H264",
|
|
videoClockRate: 90000,
|
|
audioPayloadType: 111,
|
|
audioCodec: "opus",
|
|
audioClockRate: 48000,
|
|
audioChannels: 2,
|
|
}
|
|
|
|
// Pre-allocate relay port numbers. We bind briefly to get an ephemeral port
|
|
// assigned by the OS, then immediately close so FFmpeg can bind to that port.
|
|
vp, err := allocateRelayPort()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("whip: allocate video relay port: %w", err)
|
|
}
|
|
ch.videoRelayPort = vp
|
|
|
|
ap, err := allocateRelayPort()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("whip: allocate audio relay port: %w", err)
|
|
}
|
|
ch.audioRelayPort = ap
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
// allocateRelayPort binds a UDP socket on a random loopback port, records the
|
|
// port number, then immediately closes the socket so that an external process
|
|
// (FFmpeg) can bind to the same port shortly afterwards.
|
|
func allocateRelayPort() (int, error) {
|
|
conn, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
port := conn.LocalAddr().(*net.UDPAddr).Port
|
|
conn.Close()
|
|
return port, nil
|
|
}
|
|
|
|
// relayVideoPort returns the relay port for the video RTP stream (FFmpeg reads this).
|
|
func (ch *channel) relayVideoPort() int {
|
|
return ch.videoRelayPort
|
|
}
|
|
|
|
// relayAudioPort returns the relay port for the audio RTP stream (FFmpeg reads this).
|
|
func (ch *channel) relayAudioPort() int {
|
|
return ch.audioRelayPort
|
|
}
|
|
|
|
// ffmpegSDP returns the SDP describing the relay streams for FFmpeg to consume.
|
|
func (ch *channel) ffmpegSDP() string {
|
|
ch.lock.RLock()
|
|
vpt := ch.videoPayloadType
|
|
vpc := ch.videoCodec
|
|
vclk := ch.videoClockRate
|
|
vfmtp := ch.videoFmtp
|
|
apt := ch.audioPayloadType
|
|
apc := ch.audioCodec
|
|
aclk := ch.audioClockRate
|
|
ach := ch.audioChannels
|
|
ch.lock.RUnlock()
|
|
|
|
videoPort := ch.relayVideoPort()
|
|
audioPort := ch.relayAudioPort()
|
|
|
|
sdp := "v=0\r\n"
|
|
sdp += "o=- 0 0 IN IP4 127.0.0.1\r\n"
|
|
sdp += "s=WHIP Stream\r\n"
|
|
sdp += "c=IN IP4 127.0.0.1\r\n"
|
|
sdp += "t=0 0\r\n"
|
|
sdp += fmt.Sprintf("m=video %d RTP/AVP %d\r\n", videoPort, vpt)
|
|
sdp += fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", vpt, vpc, vclk)
|
|
if vfmtp != "" {
|
|
sdp += fmt.Sprintf("a=fmtp:%d %s\r\n", vpt, vfmtp)
|
|
} else if vpc == "H264" {
|
|
sdp += fmt.Sprintf("a=fmtp:%d packetization-mode=1\r\n", vpt)
|
|
}
|
|
sdp += fmt.Sprintf("m=audio %d RTP/AVP %d\r\n", audioPort, apt)
|
|
if ach > 1 {
|
|
sdp += fmt.Sprintf("a=rtpmap:%d %s/%d/%d\r\n", apt, apc, aclk, ach)
|
|
} else {
|
|
sdp += fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", apt, apc, aclk)
|
|
}
|
|
|
|
return sdp
|
|
}
|
|
|
|
// startRx allocates receive sockets, starts the relay goroutines, and begins
|
|
// forwarding incoming RTP packets from the WHIP client to the relay sockets.
|
|
// It returns the SDP answer to send to the WHIP client.
|
|
func (ch *channel) startRx(serverIP string, provider WebRTCProvider, offerSDP string) (answerSDP string, videoRxPort, audioRxPort int, err error) {
|
|
ch.lock.Lock()
|
|
defer ch.lock.Unlock()
|
|
|
|
// Stop any previous rx session.
|
|
if ch.rxCancel != nil {
|
|
ch.rxCancel()
|
|
}
|
|
if ch.videoRx != nil {
|
|
ch.videoRx.Close()
|
|
}
|
|
if ch.audioRx != nil {
|
|
ch.audioRx.Close()
|
|
}
|
|
|
|
// Parse codec parameters from the SDP offer (best-effort).
|
|
ch.updateCodecsFromSDP(offerSDP)
|
|
|
|
// Allocate rx sockets for the WHIP client to send RTP to.
|
|
ch.videoRx, err = net.ListenUDP("udp4", &net.UDPAddr{IP: net.ParseIP(serverIP), Port: 0})
|
|
if err != nil {
|
|
return "", 0, 0, fmt.Errorf("whip: allocate video rx socket: %w", err)
|
|
}
|
|
|
|
ch.audioRx, err = net.ListenUDP("udp4", &net.UDPAddr{IP: net.ParseIP(serverIP), Port: 0})
|
|
if err != nil {
|
|
ch.videoRx.Close()
|
|
ch.videoRx = nil
|
|
return "", 0, 0, fmt.Errorf("whip: allocate audio rx socket: %w", err)
|
|
}
|
|
|
|
videoRxPort = ch.videoRx.LocalAddr().(*net.UDPAddr).Port
|
|
audioRxPort = ch.audioRx.LocalAddr().(*net.UDPAddr).Port
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
ch.rxCancel = cancel
|
|
ch.publishedAt = time.Now()
|
|
|
|
// Build the SDP answer for the WHIP client.
|
|
if provider != nil {
|
|
answerSDP, err = provider.OpenSession(offerSDP, videoRxPort, audioRxPort)
|
|
if err != nil {
|
|
cancel()
|
|
ch.videoRx.Close()
|
|
ch.audioRx.Close()
|
|
ch.videoRx = nil
|
|
ch.audioRx = nil
|
|
return "", 0, 0, fmt.Errorf("whip: WebRTC provider: %w", err)
|
|
}
|
|
} else {
|
|
answerSDP = ch.buildPlainRTPAnswer(serverIP, videoRxPort, audioRxPort)
|
|
}
|
|
|
|
// Start relay goroutines (send decrypted RTP to the loopback ports where FFmpeg listens).
|
|
go ch.relayUDP(ctx, ch.videoRx, ch.videoRelayPort, ch.snoopH264NALs)
|
|
go ch.relayUDP(ctx, ch.audioRx, ch.audioRelayPort, nil)
|
|
|
|
// Track session bandwidth ingress.
|
|
if ch.collector.IsCollectableIP(serverIP) {
|
|
ch.collector.RegisterAndActivate(ch.name, ch.name, "publish:"+ch.name, serverIP)
|
|
}
|
|
|
|
return answerSDP, videoRxPort, audioRxPort, nil
|
|
}
|
|
|
|
// stopRx tears down the receive side of the channel.
|
|
func (ch *channel) stopRx() {
|
|
ch.lock.Lock()
|
|
defer ch.lock.Unlock()
|
|
|
|
if ch.rxCancel != nil {
|
|
ch.rxCancel()
|
|
ch.rxCancel = nil
|
|
}
|
|
if ch.videoRx != nil {
|
|
ch.videoRx.Close()
|
|
ch.videoRx = nil
|
|
}
|
|
if ch.audioRx != nil {
|
|
ch.audioRx.Close()
|
|
ch.audioRx = nil
|
|
}
|
|
ch.publishedAt = time.Time{}
|
|
// Reset SPS/PPS snoop so next publisher gets fresh capture.
|
|
ch.videoSPS = nil
|
|
ch.videoPPS = nil
|
|
ch.spropped = false
|
|
}
|
|
|
|
// close releases all resources held by the channel.
|
|
func (ch *channel) close() {
|
|
ch.stopRx()
|
|
}
|
|
|
|
// isPublishing returns true when a WHIP client is currently sending to the channel.
|
|
func (ch *channel) isPublishing() bool {
|
|
ch.lock.RLock()
|
|
defer ch.lock.RUnlock()
|
|
return !ch.publishedAt.IsZero()
|
|
}
|
|
|
|
// publishedSince returns when the current publisher started.
|
|
func (ch *channel) publishedSince() time.Time {
|
|
ch.lock.RLock()
|
|
defer ch.lock.RUnlock()
|
|
return ch.publishedAt
|
|
}
|
|
|
|
// relayUDP reads RTP packets from rx and forwards them to relayPort on loopback.
|
|
// FFmpeg (or any RTP consumer) is expected to be listening on relayPort.
|
|
// onPacket, if non-nil, is called with a copy of each RTP packet before forwarding.
|
|
func (ch *channel) relayUDP(ctx context.Context, rx *net.UDPConn, relayPort int, onPacket func([]byte)) {
|
|
buf := make([]byte, 1500)
|
|
|
|
// Send to loopback so FFmpeg (listening on relayPort) receives.
|
|
dst := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: relayPort}
|
|
|
|
sendConn, err := net.DialUDP("udp4", nil, dst)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer sendConn.Close()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
rx.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
|
|
n, _, err := rx.ReadFromUDP(buf)
|
|
if err != nil {
|
|
if isNetTimeout(err) {
|
|
continue
|
|
}
|
|
return
|
|
}
|
|
|
|
sendConn.Write(buf[:n])
|
|
if onPacket != nil {
|
|
pkt := make([]byte, n)
|
|
copy(pkt, buf[:n])
|
|
go onPacket(pkt)
|
|
}
|
|
}
|
|
}
|
|
|
|
// snoopH264NALs inspects an RTP packet for H.264 SPS (type 7) and PPS (type 8)
|
|
// NAL units. Once both are found, it updates ch.videoFmtp with sprop-parameter-sets
|
|
// so that FFmpeg can determine the video resolution from the relay SDP.
|
|
func (ch *channel) snoopH264NALs(pkt []byte) {
|
|
ch.lock.RLock()
|
|
done := ch.spropped
|
|
ch.lock.RUnlock()
|
|
if done {
|
|
return
|
|
}
|
|
|
|
nals := parseH264NALsFromRTP(pkt)
|
|
if len(nals) == 0 {
|
|
return
|
|
}
|
|
|
|
ch.lock.Lock()
|
|
defer ch.lock.Unlock()
|
|
if ch.spropped {
|
|
return
|
|
}
|
|
for _, nal := range nals {
|
|
if len(nal) == 0 {
|
|
continue
|
|
}
|
|
nalType := nal[0] & 0x1f
|
|
if nalType == 7 && ch.videoSPS == nil {
|
|
ch.videoSPS = nal
|
|
} else if nalType == 8 && ch.videoPPS == nil {
|
|
ch.videoPPS = nal
|
|
}
|
|
}
|
|
if ch.videoSPS != nil && ch.videoPPS != nil {
|
|
ch.videoFmtp = "sprop-parameter-sets=" +
|
|
base64.StdEncoding.EncodeToString(ch.videoSPS) + "," +
|
|
base64.StdEncoding.EncodeToString(ch.videoPPS) +
|
|
";packetization-mode=1"
|
|
ch.spropped = true
|
|
}
|
|
}
|
|
|
|
// parseH264NALsFromRTP returns the H.264 NAL units contained in a raw RTP packet.
|
|
// It handles Single NAL unit packets and STAP-A aggregation packets.
|
|
// FU-A fragmented packets are skipped (SPS/PPS are never fragmented in practice).
|
|
func parseH264NALsFromRTP(rtpPkt []byte) [][]byte {
|
|
if len(rtpPkt) < 13 {
|
|
return nil
|
|
}
|
|
// Skip fixed 12-byte RTP header + optional CSRC list.
|
|
cc := int(rtpPkt[0] & 0x0f)
|
|
offset := 12 + cc*4
|
|
// Skip RTP extension header if present.
|
|
if rtpPkt[0]&0x10 != 0 {
|
|
if offset+4 > len(rtpPkt) {
|
|
return nil
|
|
}
|
|
extLen := int(rtpPkt[offset+2])<<8 | int(rtpPkt[offset+3])
|
|
offset += 4 + extLen*4
|
|
}
|
|
if offset >= len(rtpPkt) {
|
|
return nil
|
|
}
|
|
payload := rtpPkt[offset:]
|
|
nalType := payload[0] & 0x1f
|
|
|
|
switch {
|
|
case nalType >= 1 && nalType <= 23: // Single NAL unit packet
|
|
return [][]byte{payload}
|
|
case nalType == 24: // STAP-A
|
|
var nals [][]byte
|
|
pos := 1
|
|
for pos+2 <= len(payload) {
|
|
size := int(payload[pos])<<8 | int(payload[pos+1])
|
|
pos += 2
|
|
if size == 0 || pos+size > len(payload) {
|
|
break
|
|
}
|
|
nals = append(nals, payload[pos:pos+size])
|
|
pos += size
|
|
}
|
|
return nals
|
|
}
|
|
// FU-A and other types: ignore
|
|
return nil
|
|
}
|
|
|
|
// buildPlainRTPAnswer constructs a minimal SDP answer for plain (non-DTLS) RTP.
|
|
func (ch *channel) buildPlainRTPAnswer(serverIP string, videoPort, audioPort int) string {
|
|
sdp := "v=0\r\n"
|
|
sdp += "o=- 0 0 IN IP4 " + serverIP + "\r\n"
|
|
sdp += "s=WHIP Answer\r\n"
|
|
sdp += "c=IN IP4 " + serverIP + "\r\n"
|
|
sdp += "t=0 0\r\n"
|
|
sdp += fmt.Sprintf("m=video %d RTP/AVP %d\r\n", videoPort, ch.videoPayloadType)
|
|
sdp += fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", ch.videoPayloadType, ch.videoCodec, ch.videoClockRate)
|
|
sdp += "a=recvonly\r\n"
|
|
sdp += fmt.Sprintf("m=audio %d RTP/AVP %d\r\n", audioPort, ch.audioPayloadType)
|
|
if ch.audioChannels > 1 {
|
|
sdp += fmt.Sprintf("a=rtpmap:%d %s/%d/%d\r\n", ch.audioPayloadType, ch.audioCodec, ch.audioClockRate, ch.audioChannels)
|
|
} else {
|
|
sdp += fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", ch.audioPayloadType, ch.audioCodec, ch.audioClockRate)
|
|
}
|
|
sdp += "a=recvonly\r\n"
|
|
return sdp
|
|
}
|
|
|
|
// updateCodecsFromSDP performs a best-effort parse of the SDP offer to extract
|
|
// the first video and audio payload types and codec names.
|
|
func (ch *channel) updateCodecsFromSDP(sdp string) {
|
|
type mediaState struct {
|
|
inVideo bool
|
|
inAudio bool
|
|
}
|
|
var ms mediaState
|
|
|
|
for _, line := range strings.Split(sdp, "\n") {
|
|
line = strings.TrimRight(line, "\r")
|
|
if strings.HasPrefix(line, "m=video") {
|
|
ms.inVideo = true
|
|
ms.inAudio = false
|
|
// Extract payload type from "m=video 9 ... 96 97 ..."
|
|
parts := strings.Fields(line)
|
|
if len(parts) >= 4 {
|
|
var pt int
|
|
if _, err := fmt.Sscanf(parts[3], "%d", &pt); err == nil {
|
|
ch.videoPayloadType = pt
|
|
}
|
|
}
|
|
} else if strings.HasPrefix(line, "m=audio") {
|
|
ms.inAudio = true
|
|
ms.inVideo = false
|
|
parts := strings.Fields(line)
|
|
if len(parts) >= 4 {
|
|
var pt int
|
|
if _, err := fmt.Sscanf(parts[3], "%d", &pt); err == nil {
|
|
ch.audioPayloadType = pt
|
|
}
|
|
}
|
|
} else if strings.HasPrefix(line, "a=fmtp:") {
|
|
// Format: a=fmtp:<pt> <params>
|
|
var pt int
|
|
if _, err := fmt.Sscanf(line, "a=fmtp:%d ", &pt); err == nil {
|
|
params := strings.SplitN(line, " ", 2)
|
|
if len(params) == 2 && ms.inVideo && pt == ch.videoPayloadType {
|
|
ch.videoFmtp = params[1]
|
|
}
|
|
}
|
|
} else if strings.HasPrefix(line, "a=rtpmap:") {
|
|
// Format: a=rtpmap:<pt> <codec>/<clockrate>[/<channels>]
|
|
var pt int
|
|
var rest string
|
|
if _, err := fmt.Sscanf(line, "a=rtpmap:%d %s", &pt, &rest); err != nil {
|
|
continue
|
|
}
|
|
parts := strings.Split(rest, "/")
|
|
codec := parts[0]
|
|
var clk int
|
|
if len(parts) >= 2 {
|
|
fmt.Sscanf(parts[1], "%d", &clk)
|
|
}
|
|
var channels int = 1
|
|
if len(parts) >= 3 {
|
|
fmt.Sscanf(parts[2], "%d", &channels)
|
|
}
|
|
|
|
if ms.inVideo && pt == ch.videoPayloadType {
|
|
ch.videoCodec = codec
|
|
if clk > 0 {
|
|
ch.videoClockRate = clk
|
|
}
|
|
} else if ms.inAudio && pt == ch.audioPayloadType {
|
|
ch.audioCodec = codec
|
|
if clk > 0 {
|
|
ch.audioClockRate = clk
|
|
}
|
|
ch.audioChannels = channels
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// --- Server implementation ---
|
|
|
|
type server struct {
|
|
addr string
|
|
token string
|
|
logger log.Logger
|
|
collector session.Collector
|
|
provider WebRTCProvider
|
|
|
|
httpServer *http.Server
|
|
|
|
channels map[string]*channel
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
// New creates a new WHIP server from the given config.
|
|
func New(config Config) (Server, error) {
|
|
if config.Addr == "" {
|
|
return nil, fmt.Errorf("whip: listen address is required")
|
|
}
|
|
|
|
s := &server{
|
|
addr: config.Addr,
|
|
token: config.Token,
|
|
logger: config.Logger,
|
|
collector: config.Collector,
|
|
provider: config.WebRTCProvider,
|
|
channels: make(map[string]*channel),
|
|
}
|
|
|
|
if s.logger == nil {
|
|
s.logger = log.New("")
|
|
}
|
|
|
|
if s.collector == nil {
|
|
s.collector = session.NewNullCollector()
|
|
}
|
|
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/whip/", s.dispatch)
|
|
|
|
s.httpServer = &http.Server{
|
|
Addr: s.addr,
|
|
Handler: mux,
|
|
ReadHeaderTimeout: 10 * time.Second,
|
|
IdleTimeout: 60 * time.Second,
|
|
}
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// ListenAndServe starts the WHIP HTTP server.
|
|
func (s *server) ListenAndServe() error {
|
|
s.logger.Info().Log("Server started")
|
|
err := s.httpServer.ListenAndServe()
|
|
if err == http.ErrServerClosed {
|
|
return ErrServerClosed
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Close shuts down the WHIP server.
|
|
func (s *server) Close() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
s.httpServer.Shutdown(ctx)
|
|
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
for _, ch := range s.channels {
|
|
ch.close()
|
|
}
|
|
s.channels = make(map[string]*channel)
|
|
|
|
s.logger.Info().Log("Server closed")
|
|
}
|
|
|
|
// Channels returns the current active publishers.
|
|
func (s *server) Channels() Channels {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
|
|
c := Channels{Publisher: make(map[string]time.Time)}
|
|
for name, ch := range s.channels {
|
|
if ch.isPublishing() {
|
|
c.Publisher[name] = ch.publishedSince()
|
|
}
|
|
}
|
|
return c
|
|
}
|
|
|
|
// dispatch routes incoming HTTP requests to the appropriate handler.
|
|
func (s *server) dispatch(w http.ResponseWriter, r *http.Request) {
|
|
// Trim "/whip/" prefix.
|
|
path := strings.TrimPrefix(r.URL.Path, "/whip/")
|
|
path = strings.TrimSuffix(path, "/")
|
|
|
|
// GET /whip/{name}/sdp → SDP for FFmpeg to consume the relay.
|
|
if strings.HasSuffix(path, "/sdp") && r.Method == http.MethodGet {
|
|
name := strings.TrimSuffix(path, "/sdp")
|
|
if !isValidStreamName(name) {
|
|
http.Error(w, "invalid stream name", http.StatusBadRequest)
|
|
return
|
|
}
|
|
s.handleSDPRead(w, r, name)
|
|
return
|
|
}
|
|
|
|
// All other methods require token validation.
|
|
if s.token != "" && !s.checkToken(r) {
|
|
w.Header().Set("WWW-Authenticate", `Bearer realm="WHIP"`)
|
|
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
name := path
|
|
if !isValidStreamName(name) {
|
|
http.Error(w, "invalid stream name", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
switch r.Method {
|
|
case http.MethodPost:
|
|
s.handlePublish(w, r, name)
|
|
case http.MethodDelete:
|
|
s.handleDelete(w, r, name)
|
|
case http.MethodPatch:
|
|
// Trickle ICE (RFC 9328 §4.3) — respond 405 if not supported.
|
|
if s.provider == nil {
|
|
http.Error(w, "trickle ICE not supported in plain-RTP mode", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
http.Error(w, "trickle ICE not implemented", http.StatusNotImplemented)
|
|
default:
|
|
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
|
}
|
|
}
|
|
|
|
// handleSDPRead serves the SDP file that FFmpeg uses to read the WHIP relay stream.
|
|
// The relay sockets (and thus their ports) are pre-allocated here if the channel
|
|
// does not yet exist, so that FFmpeg can bind to them before a WHIP client arrives.
|
|
// If a publisher is active, this handler waits up to 5 seconds for in-band SPS/PPS
|
|
// to be captured so that FFmpeg can determine the video resolution immediately.
|
|
func (s *server) handleSDPRead(w http.ResponseWriter, r *http.Request, name string) {
|
|
ch := s.getOrCreateChannel(name)
|
|
|
|
// If a publisher is active, wait for SPS/PPS to be snooped from the RTP stream.
|
|
if ch.isPublishing() {
|
|
deadline := time.Now().Add(5 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
ch.lock.RLock()
|
|
done := ch.spropped
|
|
ch.lock.RUnlock()
|
|
if done {
|
|
break
|
|
}
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
sdp := ch.ffmpegSDP()
|
|
w.Header().Set("Content-Type", "application/sdp")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.WriteHeader(http.StatusOK)
|
|
io.WriteString(w, sdp)
|
|
}
|
|
|
|
// handlePublish handles the WHIP POST request (SDP offer/answer exchange).
|
|
func (s *server) handlePublish(w http.ResponseWriter, r *http.Request, name string) {
|
|
ct := r.Header.Get("Content-Type")
|
|
if !strings.Contains(ct, "application/sdp") {
|
|
http.Error(w, "Content-Type must be application/sdp", http.StatusUnsupportedMediaType)
|
|
return
|
|
}
|
|
|
|
body, err := io.ReadAll(io.LimitReader(r.Body, 16*1024))
|
|
if err != nil {
|
|
http.Error(w, "failed to read SDP offer", http.StatusBadRequest)
|
|
return
|
|
}
|
|
offerSDP := string(body)
|
|
|
|
// Identify the client IP so we can send RTP back on the same interface.
|
|
clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
|
|
serverIP := s.serverIPFor(clientIP)
|
|
|
|
ch := s.getOrCreateChannel(name)
|
|
|
|
answerSDP, _, _, err := ch.startRx(serverIP, s.provider, offerSDP)
|
|
if err != nil {
|
|
s.logger.Error().WithField("stream", name).WithField("error", err).Log("Failed to start WHIP session")
|
|
http.Error(w, "internal server error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
s.logger.Info().WithField("stream", name).WithField("client", clientIP).Log("WHIP publisher connected")
|
|
|
|
// Respond per RFC 9328: 201 Created with Location header and SDP answer.
|
|
location := "/whip/" + name
|
|
w.Header().Set("Content-Type", "application/sdp")
|
|
w.Header().Set("Location", location)
|
|
w.WriteHeader(http.StatusCreated)
|
|
io.WriteString(w, answerSDP)
|
|
}
|
|
|
|
// handleDelete ends a WHIP session (RFC 9328 §4.2).
|
|
func (s *server) handleDelete(w http.ResponseWriter, r *http.Request, name string) {
|
|
s.lock.RLock()
|
|
ch, ok := s.channels[name]
|
|
s.lock.RUnlock()
|
|
|
|
if !ok || !ch.isPublishing() {
|
|
http.Error(w, "stream not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
ch.stopRx()
|
|
|
|
clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
|
|
s.logger.Info().WithField("stream", name).WithField("client", clientIP).Log("WHIP publisher disconnected")
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
}
|
|
|
|
// getOrCreateChannel returns the channel for the given name, creating it if needed.
|
|
func (s *server) getOrCreateChannel(name string) *channel {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
ch, ok := s.channels[name]
|
|
if !ok {
|
|
var err error
|
|
ch, err = newChannel(name, s.collector)
|
|
if err != nil {
|
|
s.logger.Error().WithField("stream", name).WithField("error", err).Log("Failed to create channel")
|
|
// Return a dummy channel rather than nil to avoid panics; it won't relay.
|
|
ch = &channel{name: name}
|
|
}
|
|
s.channels[name] = ch
|
|
}
|
|
return ch
|
|
}
|
|
|
|
// checkToken validates the bearer token from the Authorization header or ?token= query param.
|
|
func (s *server) checkToken(r *http.Request) bool {
|
|
if t := r.URL.Query().Get("token"); t != "" {
|
|
return t == s.token
|
|
}
|
|
auth := r.Header.Get("Authorization")
|
|
if strings.HasPrefix(auth, "Bearer ") {
|
|
return strings.TrimPrefix(auth, "Bearer ") == s.token
|
|
}
|
|
return false
|
|
}
|
|
|
|
// serverIPFor returns the server-side IP to use in SDP based on the client IP.
|
|
// For loopback clients, returns 127.0.0.1; for external clients returns 0.0.0.0
|
|
// (the OS will choose an appropriate local address when binding).
|
|
func (s *server) serverIPFor(clientIP string) string {
|
|
ip := net.ParseIP(clientIP)
|
|
if ip == nil || ip.IsLoopback() {
|
|
return "127.0.0.1"
|
|
}
|
|
return "0.0.0.0"
|
|
}
|
|
|
|
// isValidStreamName checks that a stream name contains only safe characters.
|
|
func isValidStreamName(name string) bool {
|
|
if name == "" {
|
|
return false
|
|
}
|
|
for _, c := range name {
|
|
if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
|
|
(c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.') {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// isNetTimeout returns true for Go net timeout errors.
|
|
func isNetTimeout(err error) bool {
|
|
if ne, ok := err.(net.Error); ok {
|
|
return ne.Timeout()
|
|
}
|
|
return false
|
|
}
|