Compare commits

..

5 Commits

Author SHA1 Message Date
8f93210970 fix: update Dockerfile configurations for WHIP and expose additional ports
Some checks are pending
tests / build (push) Waiting to run
2026-03-17 21:00:34 -07:00
c9304b7b63 feat: add RTSP relay support to WHIP server
- Introduced a new RTSP relay server that allows WHIP streams to be served over RTSP.
- Added `RTSPRelayAddr` configuration option to enable the RTSP relay.
- Implemented methods for updating codec parameters and forwarding RTP packets to RTSP clients.
- Enhanced the WHIP server to handle RTSP connections and manage multiple clients.
- Added comprehensive tests for RTSP relay functionality, including OPTIONS, DESCRIBE, SETUP, PLAY, and TEARDOWN requests.
2026-03-17 20:28:06 -07:00
603818cd44 feat: implement WHEP subscription support with SDP handling and subscriber management 2026-03-14 14:34:45 -07:00
c761b4e9b8 fix: normalize WHIP placeholder format in address resolution 2026-03-14 12:40:00 -07:00
7a8073eedd feat: add WHIP server implementation with Docker support
- Introduced a new Dockerfile for building the WHIP server with Go and FFmpeg.
- Implemented the WHIPHandler for managing WHIP publish sessions and generating URLs for clients.
- Added pionProvider for handling WebRTC sessions using the pion/webrtc library.
- Created the main WHIP server logic to handle SDP offers and manage active publishing streams.
- Implemented HTTP handlers for publishing, deleting, and retrieving stream information.
- Added support for SDP generation for FFmpeg consumption.
2026-03-14 12:26:24 -07:00
21 changed files with 3444 additions and 6 deletions

4
.gitignore vendored
View File

@ -6,11 +6,11 @@
/data/**
/test/**
.vscode
/vendor
*.ts
*.ts.tmp
*.m3u8
docker/
*.mp4
*.avi
*.flv

View File

@ -1,24 +1,36 @@
ARG CORE_IMAGE=datarhei/base:alpine-core-latest
ARG FFMPEG_IMAGE=datarhei/base:alpine-ffmpeg-latest
FROM $CORE_IMAGE as core
FROM $CORE_IMAGE AS core
FROM $FFMPEG_IMAGE
COPY --from=core /core /core
RUN ffmpeg -buildconf
RUN chmod +x /core/bin/run.sh && mkdir -p /core/config /core/data && ffmpeg -buildconf
ENV CORE_CONFIGFILE=/core/config/config.json
ENV CORE_STORAGE_DISK_DIR=/core/data
ENV CORE_DB_DIR=/core/config
# Enable most optional services by default
ENV CORE_WHIP_ENABLE=true
ENV CORE_WHIP_ADDRESS=:8555
ENV CORE_WHIP_RTSP_ADDRESS=:8554
ENV CORE_API_AUTH_ENABLE=false
ENV CORE_RTMP_ENABLE=true
ENV CORE_SRT_ENABLE=true
ENV CORE_PLAYOUT_ENABLE=true
ENV CORE_METRICS_ENABLE=true
ENV CORE_METRICS_ENABLE_PROMETHEUS=true
EXPOSE 8080/tcp
EXPOSE 8181/tcp
EXPOSE 1935/tcp
EXPOSE 1936/tcp
EXPOSE 6000/udp
EXPOSE 8555/tcp
EXPOSE 8554/tcp
VOLUME ["/core/data", "/core/config"]
ENTRYPOINT ["/core/bin/run.sh"]

51
Dockerfile.whip-test Normal file
View File

@ -0,0 +1,51 @@
ARG GOLANG_IMAGE=golang:1.22-alpine3.19
ARG FFMPEG_IMAGE=datarhei/base:alpine-ffmpeg-latest
FROM --platform=$BUILDPLATFORM $GOLANG_IMAGE AS builder
RUN apk add git make
# Cache go module downloads separately from source code.
# This layer only rebuilds when go.mod/go.sum/vendor change.
WORKDIR /dist/core
COPY go.mod go.sum ./
COPY vendor/ ./vendor/
# Now copy source and build. This layer rebuilds on any .go file change.
COPY . .
RUN make release && make import && make ffmigrate
FROM $FFMPEG_IMAGE
COPY --from=builder /dist/core/core /core/bin/core
COPY --from=builder /dist/core/import /core/bin/import
COPY --from=builder /dist/core/ffmigrate /core/bin/ffmigrate
COPY --from=builder /dist/core/mime.types /core/mime.types
COPY --from=builder /dist/core/run.sh /core/bin/run.sh
RUN chmod +x /core/bin/run.sh && mkdir -p /core/config /core/data
ENV CORE_CONFIGFILE=/core/config/config.json
ENV CORE_STORAGE_DISK_DIR=/core/data
ENV CORE_DB_DIR=/core/config
ENV CORE_WHIP_ENABLE=true
ENV CORE_WHIP_ADDRESS=:8555
ENV CORE_WHIP_RTSP_ADDRESS=:8554
ENV CORE_API_AUTH_ENABLE=false
ENV CORE_RTMP_ENABLE=true
ENV CORE_SRT_ENABLE=true
ENV CORE_PLAYOUT_ENABLE=true
ENV CORE_METRICS_ENABLE=true
ENV CORE_METRICS_ENABLE_PROMETHEUS=true
EXPOSE 8080/tcp
EXPOSE 8181/tcp
EXPOSE 1935/tcp
EXPOSE 1936/tcp
EXPOSE 6000/udp
EXPOSE 8555/tcp
EXPOSE 8554/tcp
VOLUME ["/core/data", "/core/config"]
ENTRYPOINT ["/core/bin/run.sh"]
WORKDIR /core

View File

@ -40,6 +40,7 @@ import (
"github.com/datarhei/core/v16/session"
"github.com/datarhei/core/v16/srt"
"github.com/datarhei/core/v16/update"
"github.com/datarhei/core/v16/whip"
"github.com/caddyserver/certmagic"
"go.uber.org/zap"
@ -73,6 +74,7 @@ type api struct {
s3fs map[string]fs.Filesystem
rtmpserver rtmp.Server
srtserver srt.Server
whipserver whip.Server
metrics monitor.HistoryMonitor
prom prometheus.Metrics
service service.Service
@ -98,6 +100,7 @@ type api struct {
rtmp log.Logger
rtmps log.Logger
srt log.Logger
whip log.Logger
}
}
@ -348,6 +351,11 @@ func (a *api) start() error {
return fmt.Errorf("unable to register session collector: %w", err)
}
whipCollector, err := sessions.Register("whip", config)
if err != nil {
return fmt.Errorf("unable to register session collector: %w", err)
}
if _, err := sessions.Register("http", config); err != nil {
return fmt.Errorf("unable to register session collector: %w", err)
}
@ -373,9 +381,13 @@ func (a *api) start() error {
srt.AddCompanion(ffmpeg)
srt.AddCompanion(rtmp)
whipCollector.AddCompanion(hls)
whipCollector.AddCompanion(ffmpeg)
ffmpeg.AddCompanion(hls)
ffmpeg.AddCompanion(rtmp)
ffmpeg.AddCompanion(srt)
ffmpeg.AddCompanion(whipCollector)
} else {
sessions, _ := session.New(session.Config{})
a.sessions = sessions
@ -570,6 +582,40 @@ func (a *api) start() error {
}, map[string]string{
"latency": "20000", // 20 milliseconds, FFmpeg requires microseconds
})
// WHIP: {whip} in an input address expands to the FFmpeg-readable SDP relay
// endpoint served by the local WHIP server (http://localhost:{port}/whip/{name}/sdp).
// Encoders and browsers push to the companion URL:
// http://{host}:{port}/whip/{name} (plain RTP / no ICE)
// or, with a WebRTCProvider injected, full WebRTC is negotiated.
a.replacer.RegisterTemplateFunc("whip", func(config *restreamapp.Config, section string) string {
_, whipPort, _ := gonet.SplitHostPort(cfg.WHIP.Address)
if whipPort == "" {
whipPort = "8555"
}
url := "http://localhost:" + whipPort + "/whip/{name}/sdp"
if len(cfg.WHIP.Token) != 0 {
url += "?token=" + cfg.WHIP.Token
}
return url
}, nil)
// WHIP-RTSP: {whip-rtsp} expands to the internal RTSP relay URL for a WHIP stream.
// Multiple FFmpeg egress processes can connect simultaneously (unlike the single-consumer
// plain-RTP relay used by {whip}).
// Usage in a process config input address: {whip-rtsp,name=mystream}
// The -rtsp_transport tcp option is injected automatically by restream.go.
a.replacer.RegisterTemplateFunc("whip-rtsp", func(config *restreamapp.Config, section string) string {
rtspAddr := cfg.WHIP.RTSPAddress
if rtspAddr == "" {
rtspAddr = ":8554"
}
_, rtspPort, _ := gonet.SplitHostPort(rtspAddr)
if rtspPort == "" {
rtspPort = "8554"
}
return "rtsp://127.0.0.1:" + rtspPort + "/live/{name}"
}, nil)
}
filesystems := []fs.Filesystem{
@ -924,6 +970,24 @@ func (a *api) start() error {
a.srtserver = srtserver
}
if cfg.WHIP.Enable {
a.log.logger.whip = a.log.logger.core.WithComponent("WHIP").WithField("address", cfg.WHIP.Address)
whipserver, err := whip.New(whip.Config{
Addr: cfg.WHIP.Address,
Token: cfg.WHIP.Token,
Logger: a.log.logger.whip,
Collector: a.sessions.Collector("whip"),
WebRTCProvider: whip.NewPionProvider(),
RTSPRelayAddr: cfg.WHIP.RTSPAddress,
})
if err != nil {
return fmt.Errorf("unable to create WHIP server: %w", err)
}
a.whipserver = whipserver
}
logcontext := "HTTP"
if cfg.TLS.Enable {
logcontext = "HTTPS"
@ -1014,6 +1078,7 @@ func (a *api) start() error {
},
RTMP: a.rtmpserver,
SRT: a.srtserver,
WHIP: a.whipserver,
JWT: a.httpjwt,
Config: a.config.store,
Sessions: a.sessions,
@ -1194,6 +1259,32 @@ func (a *api) start() error {
}()
}
if a.whipserver != nil {
wgStart.Add(1)
a.wgStop.Add(1)
go func() {
logger := a.log.logger.whip
defer func() {
logger.Info().Log("Server exited")
a.wgStop.Done()
}()
wgStart.Done()
logger.Info().Log("Server started")
err := a.whipserver.ListenAndServe()
if err != nil && err != whip.ErrServerClosed {
err = fmt.Errorf("WHIP server: %w", err)
} else {
err = nil
}
sendError(err)
}()
}
wgStart.Add(1)
a.wgStop.Add(1)
@ -1354,6 +1445,14 @@ func (a *api) stop() {
a.srtserver = nil
}
// Stop the WHIP server
if a.whipserver != nil {
a.log.logger.whip.Info().Log("Stopping ...")
a.whipserver.Close()
a.whipserver = nil
}
// Stop the RTMP server
if a.rtmpserver != nil {
a.log.logger.rtmp.Info().Log("Stopping ...")

View File

@ -98,6 +98,7 @@ func (d *Config) Clone() *Config {
data.Storage = d.Storage
data.RTMP = d.RTMP
data.SRT = d.SRT
data.WHIP = d.WHIP
data.FFmpeg = d.FFmpeg
data.Playout = d.Playout
data.Debug = d.Debug
@ -227,6 +228,12 @@ func (d *Config) init() {
d.vars.Register(value.NewBool(&d.SRT.Log.Enable, false), "srt.log.enable", "CORE_SRT_LOG_ENABLE", nil, "Enable SRT server logging", false, false)
d.vars.Register(value.NewStringList(&d.SRT.Log.Topics, []string{}, ","), "srt.log.topics", "CORE_SRT_LOG_TOPICS", nil, "List of topics to log", false, false)
// WHIP
d.vars.Register(value.NewBool(&d.WHIP.Enable, false), "whip.enable", "CORE_WHIP_ENABLE", nil, "Enable WHIP (WebRTC HTTP Ingestion Protocol) server", false, false)
d.vars.Register(value.NewAddress(&d.WHIP.Address, ":8555"), "whip.address", "CORE_WHIP_ADDRESS", nil, "WHIP server HTTP listen address", false, false)
d.vars.Register(value.NewString(&d.WHIP.Token, ""), "whip.token", "CORE_WHIP_TOKEN", nil, "WHIP bearer token for publishing clients", false, true)
d.vars.Register(value.NewAddress(&d.WHIP.RTSPAddress, ""), "whip.rtsp_address", "CORE_WHIP_RTSP_ADDRESS", nil, "WHIP internal RTSP relay TCP listen address (e.g. :8554); leave empty to disable", false, false)
// FFmpeg
d.vars.Register(value.NewExec(&d.FFmpeg.Binary, "ffmpeg", d.fs), "ffmpeg.binary", "CORE_FFMPEG_BINARY", nil, "Path to ffmpeg binary", true, false)
d.vars.Register(value.NewInt64(&d.FFmpeg.MaxProcesses, 0), "ffmpeg.max_processes", "CORE_FFMPEG_MAXPROCESSES", nil, "Max. allowed simultaneously running ffmpeg instances, 0 for unlimited", false, false)

View File

@ -113,6 +113,26 @@ type Data struct {
Topics []string `json:"topics"`
} `json:"log"`
} `json:"srt"`
// WHIP is the configuration for the WHIP (WebRTC HTTP Ingestion Protocol) server.
// The server accepts WebRTC/RTP streams from encoders and browsers, making them
// available to FFmpeg via an SDP relay endpoint.
//
// FFmpeg input address template: http://localhost:{Address}/whip/{name}/sdp
// (Use the {whip} replacer in process configs.)
WHIP struct {
// Enable activates the WHIP HTTP ingestion server.
Enable bool `json:"enable"`
// Address is the HTTP listen address for the WHIP endpoint, e.g. ":8555".
Address string `json:"address"`
// Token is an optional bearer token required from WHIP publishing clients.
Token string `json:"token"`
// RTSPAddress is the TCP listen address for the internal RTSP relay server,
// e.g. ":8554". When set, all WHIP streams are also accessible at
// rtsp://127.0.0.1:<port>/live/<name>, enabling multiple FFmpeg egress
// processes to consume the same source independently via {whip-rtsp}.
// Leave empty to disable.
RTSPAddress string `json:"rtsp_address"`
} `json:"whip"`
FFmpeg struct {
Binary string `json:"binary"`
MaxProcesses int64 `json:"max_processes" format:"int64"`

View File

@ -1896,6 +1896,162 @@
}
}
},
"/api/v3/whip": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "List all currently active WHIP (WebRTC HTTP Ingestion Protocol) streams.",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "List all publishing WHIP streams",
"operationId": "whip-3-list-channels",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.WHIPChannel"
}
}
}
}
}
},
"/api/v3/whip/url": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Returns the base publish URL and configuration so the UI can display it in a WHIP Server panel.",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Get WHIP server base URL",
"operationId": "whip-3-get-server-url",
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.WHIPServerInfo"
}
}
}
}
},
"/api/v3/whip/{name}/url": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Returns the URL to configure in OBS and the internal SDP relay URL for FFmpeg.",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Get WHIP publish URL for a stream key",
"operationId": "whip-3-get-url",
"parameters": [
{
"type": "string",
"description": "Stream key",
"name": "name",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.WHIPURLs"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/whip/{name}/whep": {
"post": {
"description": "Browser POSTs an SDP offer; server returns an SDP answer. The stream is relayed from the active WHIP publisher to the browser subscriber via ICE+DTLS-SRTP with zero transcoding.",
"consumes": [
"application/sdp"
],
"produces": [
"application/sdp"
],
"tags": [
"v16.?.?"
],
"summary": "Subscribe to a WHIP stream via WHEP (WebRTC HTTP Egress Protocol)",
"operationId": "whep-3-subscribe",
"parameters": [
{
"type": "string",
"description": "Stream key",
"name": "name",
"in": "path",
"required": true
},
{
"description": "SDP offer from the browser",
"name": "body",
"in": "body",
"required": true,
"schema": {
"type": "string"
}
}
],
"responses": {
"201": {
"description": "SDP answer",
"headers": {
"Location": {
"type": "string",
"description": "DELETE this URL to end the subscription"
}
},
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/widget/process/{id}": {
"get": {
"description": "Fetch minimal statistics about a process, which is not protected by any auth.",
@ -3439,6 +3595,74 @@
}
}
},
"api.WHIPChannel": {
"type": "object",
"properties": {
"name": {
"description": "Stream key used in the WHIP URL path.",
"type": "string"
},
"published_at": {
"description": "RFC 3339 timestamp when the publisher connected.",
"type": "string",
"format": "date-time"
},
"subscribers": {
"description": "Number of active WHEP subscribers watching this stream.",
"type": "integer"
}
}
},
"api.WHIPURLs": {
"type": "object",
"properties": {
"publish_url": {
"description": "URL to enter in OBS Settings Stream Server.",
"type": "string"
},
"sdp_url": {
"description": "Internal FFmpeg-readable relay SDP URL.",
"type": "string"
},
"stream_key": {
"description": "Stream key component used in both URLs.",
"type": "string"
},
"whep_url": {
"description": "WebRTC egress URL for browser subscribers (WHEP).",
"type": "string"
}
}
},
"api.WHIPServerInfo": {
"type": "object",
"properties": {
"base_publish_url": {
"description": "Base URL for OBS. Append the stream key.",
"type": "string"
},
"base_sdp_url": {
"description": "Base internal SDP relay URL.",
"type": "string"
},
"base_whep_url": {
"description": "Base WHEP URL for browser subscribers. Append stream key and /whep.",
"type": "string"
},
"example_obs_url": {
"description": "Example URL with placeholder stream key.",
"type": "string"
},
"has_token": {
"description": "Whether a bearer token is required.",
"type": "boolean"
},
"input_address_template": {
"description": "Process input address template for WHIP.",
"type": "string"
}
}
},
"api.SRTChannels": {
"type": "object",
"properties": {

View File

@ -976,6 +976,55 @@ definitions:
name:
type: string
type: object
api.WHIPChannel:
properties:
name:
description: Stream key used in the WHIP URL path.
type: string
published_at:
description: RFC 3339 timestamp when the publisher connected.
format: date-time
type: string
subscribers:
description: Number of active WHEP subscribers watching this stream.
type: integer
type: object
api.WHIPURLs:
properties:
publish_url:
description: URL to enter in OBS Settings Stream Server.
type: string
sdp_url:
description: Internal FFmpeg-readable relay SDP URL.
type: string
stream_key:
description: Stream key component used in both URLs.
type: string
whep_url:
description: WebRTC egress URL for browser subscribers (WHEP).
type: string
type: object
api.WHIPServerInfo:
properties:
base_publish_url:
description: Base URL for OBS. Append the stream key.
type: string
base_sdp_url:
description: Base internal SDP relay URL.
type: string
base_whep_url:
description: Base WHEP URL for browser subscribers. Append stream key and /whep.
type: string
example_obs_url:
description: Example URL with placeholder stream key.
type: string
has_token:
description: Whether a bearer token is required.
type: boolean
input_address_template:
description: Process input address template for WHIP.
type: string
type: object
api.SRTChannels:
properties:
connections:
@ -3186,6 +3235,111 @@ paths:
summary: List all publishing SRT treams
tags:
- v16.9.0
/api/v3/whip:
get:
description: List all currently active WHIP (WebRTC HTTP Ingestion Protocol)
streams.
operationId: whip-3-list-channels
produces:
- application/json
responses:
"200":
description: OK
schema:
items:
$ref: '#/definitions/api.WHIPChannel'
type: array
security:
- ApiKeyAuth: []
summary: List all publishing WHIP streams
tags:
- v16.?.?
/api/v3/whip/url:
get:
description: Returns the base publish URL and configuration so the UI can display
it in a WHIP Server panel.
operationId: whip-3-get-server-url
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/api.WHIPServerInfo'
security:
- ApiKeyAuth: []
summary: Get WHIP server base URL
tags:
- v16.?.?
/api/v3/whip/{name}/url:
get:
description: Returns the URL to configure in OBS and the internal SDP relay
URL for FFmpeg.
operationId: whip-3-get-url
parameters:
- description: Stream key
in: path
name: name
required: true
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/api.WHIPURLs'
"400":
description: Bad Request
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: Get WHIP publish URL for a stream key
tags:
- v16.?.?
/api/v3/whip/{name}/whep:
post:
consumes:
- application/sdp
description: Browser POSTs an SDP offer; server returns an SDP answer. The
stream is relayed from the active WHIP publisher to the browser subscriber
via ICE+DTLS-SRTP with zero transcoding.
operationId: whep-3-subscribe
parameters:
- description: Stream key
in: path
name: name
required: true
type: string
- description: SDP offer from the browser
in: body
name: body
required: true
schema:
type: string
produces:
- application/sdp
responses:
"201":
description: SDP answer
headers:
Location:
description: DELETE this URL to end the subscription
type: string
schema:
type: string
"400":
description: Bad Request
schema:
$ref: '#/definitions/api.Error'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/api.Error'
summary: Subscribe to a WHIP stream via WHEP (WebRTC HTTP Egress Protocol)
tags:
- v16.?.?
/api/v3/widget/process/{id}:
get:
description: Fetch minimal statistics about a process, which is not protected

17
go.mod
View File

@ -22,6 +22,8 @@ require (
github.com/lithammer/shortuuid/v4 v4.0.0
github.com/mattn/go-isatty v0.0.20
github.com/minio/minio-go/v7 v7.0.70
github.com/pion/interceptor v0.1.29
github.com/pion/webrtc/v3 v3.3.6
github.com/prep/average v0.0.0-20200506183628-d26c465f48c3
github.com/prometheus/client_golang v1.19.1
github.com/puzpuzpuz/xsync/v3 v3.1.0
@ -73,6 +75,20 @@ require (
github.com/miekg/dns v1.1.59 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pion/datachannel v1.5.8 // indirect
github.com/pion/dtls/v2 v2.2.12 // indirect
github.com/pion/ice/v2 v2.3.38 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns v0.0.12 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.14 // indirect
github.com/pion/rtp v1.8.7 // indirect
github.com/pion/sctp v1.8.19 // indirect
github.com/pion/sdp/v3 v3.0.9 // indirect
github.com/pion/srtp/v2 v2.0.20 // indirect
github.com/pion/stun v0.6.1 // indirect
github.com/pion/transport/v2 v2.2.10 // indirect
github.com/pion/turn/v2 v2.1.6 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
@ -88,6 +104,7 @@ require (
github.com/urfave/cli/v2 v2.27.2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/wlynxg/anet v0.0.3 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect

96
go.sum
View File

@ -77,6 +77,7 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
@ -98,8 +99,11 @@ github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02
github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo-jwt v0.0.0-20221127215225-c84d41a71003 h1:FyalHKl9hnJvhNbrABJXXjC2hG7gvIF0ioW9i0xHNQU=
@ -134,6 +138,48 @@ github.com/minio/minio-go/v7 v7.0.70 h1:1u9NtMgfK1U42kUxcsl5v0yj6TEOPR497OAQxpJn
github.com/minio/minio-go/v7 v7.0.70/go.mod h1:4yBA8v80xGA30cfM3fz0DKYMXunWl/AV/6tWEs9ryzo=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pion/datachannel v1.5.8 h1:ph1P1NsGkazkjrvyMfhRBUAWMxugJjq2HfQifaOoSNo=
github.com/pion/datachannel v1.5.8/go.mod h1:PgmdpoaNBLX9HNzNClmdki4DYW5JtI7Yibu8QzbL3tI=
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk=
github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v2 v2.3.38 h1:DEpt13igPfvkE2+1Q+6e8mP30dtWnQD3CtMIKoRDRmA=
github.com/pion/ice/v2 v2.3.38/go.mod h1:mBF7lnigdqgtB+YHkaY/Y6s6tsyRyo4u4rPGRuOjUBQ=
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.12 h1:CiMYlY+O0azojWDmxdNr7ADGrnZ+V6Ilfner+6mSVK8=
github.com/pion/mdns v0.0.12/go.mod h1:VExJjv8to/6Wqm1FXK+Ii/Z9tsVk/F5sD/N70cnYFbk=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4=
github.com/pion/rtcp v1.2.14 h1:KCkGV3vJ+4DAJmvP0vaQShsb0xkRfWkO540Gy102KyE=
github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4=
github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp v1.8.7 h1:qslKkG8qxvQ7hqaxkmL7Pl0XcUm+/Er7nMnu6Vq+ZxM=
github.com/pion/rtp v1.8.7/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/sctp v1.8.19 h1:2CYuw+SQ5vkQ9t0HdOPccsCz1GQMDuVy5PglLgKVBW8=
github.com/pion/sctp v1.8.19/go.mod h1:P6PbDVA++OJMrVNg2AL3XtYHV4uD6dvfyOovCgMs0PE=
github.com/pion/sdp/v3 v3.0.9 h1:pX++dCHoHUwq43kuwf3PyJfHlwIj4hXA7Vrifiq0IJY=
github.com/pion/sdp/v3 v3.0.9/go.mod h1:B5xmvENq5IXJimIO4zfp6LAe1fD9N+kFv+V/1lOdz8M=
github.com/pion/srtp/v2 v2.0.20 h1:HNNny4s+OUmG280ETrCdgFndp4ufx3/uy85EawYEhTk=
github.com/pion/srtp/v2 v2.0.20/go.mod h1:0KJQjA99A6/a0DOVTu1PhDSw0CXF2jTkqOoMg3ODqdA=
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8=
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
github.com/pion/transport/v2 v2.2.3/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
github.com/pion/transport/v2 v2.2.4/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
github.com/pion/transport/v2 v2.2.10 h1:ucLBLE8nuxiHfvkFKnkDQRYWYfp8ejf4YBOPfaQpw6Q=
github.com/pion/transport/v2 v2.2.10/go.mod h1:sq1kSLWs+cHW9E+2fJP95QudkzbK7wscs8yYgQToO5E=
github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0=
github.com/pion/transport/v3 v3.0.2 h1:r+40RJR25S9w3jbA6/5uEPTzcdn7ncyU44RWCbHkLg4=
github.com/pion/transport/v3 v3.0.2/go.mod h1:nIToODoOlb5If2jF9y2Igfx3PFYWfuXi37m0IlWa/D0=
github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
github.com/pion/turn/v2 v2.1.6 h1:Xr2niVsiPTB0FPtt+yAWKFUkU1eotQbGgpTIld4x1Gc=
github.com/pion/turn/v2 v2.1.6/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
github.com/pion/webrtc/v3 v3.3.6 h1:7XAh4RPtlY1Vul6/GmZrv7z+NnxKA6If0KStXBI2ZLE=
github.com/pion/webrtc/v3 v3.3.6/go.mod h1:zyN7th4mZpV27eXybfR/cnUf3J2DRy8zw/mdjD9JTNM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@ -176,6 +222,7 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.3.1-0.20190311161405-34c6fa2dc709/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
@ -199,6 +246,8 @@ github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQ
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/vektah/gqlparser/v2 v2.5.12 h1:COMhVVnql6RoaF7+aTBWiTADdpLGyZWU3K/NwW0ph98=
github.com/vektah/gqlparser/v2 v2.5.12/go.mod h1:WQQjFc+I1YIzoPvZBhUQX7waZgg3pMLi0r8KymvAE2w=
github.com/wlynxg/anet v0.0.3 h1:PvR53psxFXstc12jelG6f1Lv4MWqE0tI76/hHGjh9rg=
github.com/wlynxg/anet v0.0.3/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
@ -208,6 +257,7 @@ github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 h1:+qGGcbkzsfDQNPPe9UDgpxAWQrhbbBXOYJFQDq/dtJw=
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913/go.mod h1:4aEEwZQutDLsQv2Deui4iYQ6DWTxR14g6m8Wv88+Xqk=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/zeebo/assert v1.1.0 h1:hU1L1vLTHsnO8x8c9KAR5GmM5QscxHg5RNU5z5qbUWY=
@ -222,35 +272,81 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.21.0 h1:qc0xYgIbsSDt9EyWz05J5wfa7LOVW0YTLOXrqdLAWIw=
golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=

View File

@ -107,6 +107,7 @@ func (rscfg *SetConfig) MergeTo(cfg *config.Config) {
cfg.Storage = rscfg.Storage
cfg.RTMP = rscfg.RTMP
cfg.SRT = rscfg.SRT
cfg.WHIP = rscfg.WHIP
cfg.FFmpeg = rscfg.FFmpeg
cfg.Playout = rscfg.Playout
cfg.Debug = rscfg.Debug

176
http/handler/api/whip.go Normal file
View File

@ -0,0 +1,176 @@
package api
import (
"fmt"
"net/http"
"strings"
"time"
cfgstore "github.com/datarhei/core/v16/config/store"
"github.com/datarhei/core/v16/whip"
"github.com/labstack/echo/v4"
)
// WHIPHandler provides HTTP API access to the WHIP server channel information.
type WHIPHandler struct {
whip whip.Server
config cfgstore.Store
}
// NewWHIP returns a new WHIPHandler backed by the provided WHIP server.
func NewWHIP(whip whip.Server, config cfgstore.Store) *WHIPHandler {
return &WHIPHandler{
whip: whip,
config: config,
}
}
// WHIPChannel represents a currently active WHIP publish session.
type WHIPChannel struct {
// Name is the stream identifier used in the WHIP URL path.
Name string `json:"name" jsonschema:"minLength=1"`
// PublishedAt is the RFC 3339 timestamp when the publisher connected.
PublishedAt time.Time `json:"published_at"`
// Subscribers is the number of active WHEP subscribers watching this stream.
Subscribers int `json:"subscribers"`
}
// WHIPURLs holds the publish URL (for OBS) and the internal SDP relay URL (for FFmpeg).
type WHIPURLs struct {
// PublishURL is the URL to enter in OBS → Settings → Stream → Server.
// Format: http://<public_host>:<port>/whip/<stream_key>
PublishURL string `json:"publish_url"`
// SDPURL is the internal FFmpeg-readable relay URL.
// Format: http://localhost:<port>/whip/<stream_key>/sdp
SDPURL string `json:"sdp_url"`
// StreamKey is the key component used in both URLs.
StreamKey string `json:"stream_key"`
// WHEPURL is the WebRTC egress URL for browsers (WHEP).
// Format: http://<public_host>:<port>/whip/<stream_key>/whep
WHEPURL string `json:"whep_url,omitempty"`
}
// whipPublicBase returns "http://<host>:<port>" derived from the active config.
func (h *WHIPHandler) whipPublicBase() (string, string, error) {
if h.config == nil {
return "", "", fmt.Errorf("config not available")
}
cfg := h.config.GetActive()
addr := cfg.WHIP.Address // e.g. ":8555" or "0.0.0.0:8555"
// Extract port from address.
port := addr
if idx := strings.LastIndex(addr, ":"); idx >= 0 {
port = addr[idx+1:]
}
// Pick public host.
host := "localhost"
if len(cfg.Host.Name) > 0 && cfg.Host.Name[0] != "" {
host = cfg.Host.Name[0]
}
return fmt.Sprintf("http://%s:%s", host, port),
fmt.Sprintf("http://localhost:%s", port),
nil
}
// GetURL returns the WHIP publish URL for a given stream key.
// @Summary Get WHIP publish URL for a stream key
// @Description Returns the URL to configure in OBS and the internal SDP relay URL for FFmpeg.
// @Tags v16.?.?
// @ID whip-3-get-url
// @Produce json
// @Param name path string true "Stream key"
// @Success 200 {object} WHIPURLs
// @Security ApiKeyAuth
// @Router /api/v3/whip/{name}/url [get]
func (h *WHIPHandler) GetURL(c echo.Context) error {
name := c.Param("name")
if name == "" {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "stream key is required"})
}
publicBase, localBase, err := h.whipPublicBase()
if err != nil {
return c.JSON(http.StatusServiceUnavailable, map[string]string{"error": err.Error()})
}
return c.JSON(http.StatusOK, WHIPURLs{
PublishURL: fmt.Sprintf("%s/whip/%s", publicBase, name),
SDPURL: fmt.Sprintf("%s/whip/%s/sdp", localBase, name),
StreamKey: name,
WHEPURL: fmt.Sprintf("%s/whip/%s/whep", publicBase, name),
})
}
// GetServerURL returns the base WHIP server URL (without a stream key).
// @Summary Get WHIP server base URL
// @Description Returns the base publish URL and configuration so the UI can display it in a WHIP Server panel.
// @Tags v16.?.?
// @ID whip-3-get-server-url
// @Produce json
// @Success 200 {object} WHIPServerInfo
// @Security ApiKeyAuth
// @Router /api/v3/whip/url [get]
func (h *WHIPHandler) GetServerURL(c echo.Context) error {
publicBase, localBase, err := h.whipPublicBase()
if err != nil {
return c.JSON(http.StatusServiceUnavailable, map[string]string{"error": err.Error()})
}
var token string
if h.config != nil {
token = h.config.GetActive().WHIP.Token
}
return c.JSON(http.StatusOK, WHIPServerInfo{
BasePublishURL: fmt.Sprintf("%s/whip/", publicBase),
BaseSDPURL: fmt.Sprintf("%s/whip/", localBase),
BaseWHEPURL: fmt.Sprintf("%s/whip/", publicBase),
HasToken: token != "",
ExampleOBS: fmt.Sprintf("%s/whip/<stream-key>", publicBase),
InputAddressTemplate: "{whip:name=<stream-key>}",
})
}
// WHIPServerInfo holds the base URLs and metadata shown in the WHIP Server panel.
type WHIPServerInfo struct {
// BasePublishURL is the base URL for OBS. Append the stream key.
BasePublishURL string `json:"base_publish_url"`
// BaseSDPURL is the base internal SDP relay URL.
BaseSDPURL string `json:"base_sdp_url"`
// BaseWHEPURL is the base URL for WHEP browser subscribers. Append the stream key and /whep.
BaseWHEPURL string `json:"base_whep_url"`
// HasToken indicates whether a bearer token is required.
HasToken bool `json:"has_token"`
// ExampleOBS shows an example URL with placeholder stream key.
ExampleOBS string `json:"example_obs_url"`
// InputAddressTemplate is the process input address template for WHIP.
InputAddressTemplate string `json:"input_address_template"`
}
// ListChannels lists all currently active WHIP publishers.
// @Summary List all publishing WHIP streams
// @Description List all currently active WHIP (WebRTC HTTP Ingestion Protocol) streams.
// @Tags v16.?.?
// @ID whip-3-list-channels
// @Produce json
// @Success 200 {array} WHIPChannel
// @Security ApiKeyAuth
// @Router /api/v3/whip [get]
func (h *WHIPHandler) ListChannels(c echo.Context) error {
channels := h.whip.Channels()
list := make([]WHIPChannel, 0, len(channels.Publisher))
for name, since := range channels.Publisher {
list = append(list, WHIPChannel{
Name: name,
PublishedAt: since,
Subscribers: channels.Subscribers[name],
})
}
return c.JSON(http.StatusOK, list)
}

View File

@ -51,6 +51,7 @@ import (
"github.com/datarhei/core/v16/rtmp"
"github.com/datarhei/core/v16/session"
"github.com/datarhei/core/v16/srt"
"github.com/datarhei/core/v16/whip"
mwcache "github.com/datarhei/core/v16/http/middleware/cache"
mwcors "github.com/datarhei/core/v16/http/middleware/cors"
@ -86,6 +87,7 @@ type Config struct {
Cors CorsConfig
RTMP rtmp.Server
SRT srt.Server
WHIP whip.Server
JWT jwt.JWT
Config cfgstore.Store
Cache cache.Cacher
@ -120,6 +122,7 @@ type server struct {
playout *api.PlayoutHandler
rtmp *api.RTMPHandler
srt *api.SRTHandler
whip *api.WHIPHandler
config *api.ConfigHandler
session *api.SessionHandler
widget *api.WidgetHandler
@ -268,6 +271,13 @@ func NewServer(config Config) (Server, error) {
)
}
if config.WHIP != nil {
s.v3handler.whip = api.NewWHIP(
config.WHIP,
config.Config,
)
}
if config.Config != nil {
s.v3handler.config = api.NewConfig(
config.Config,
@ -628,6 +638,13 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.GET("/srt", s.v3handler.srt.ListChannels)
}
// v3 WHIP
if s.v3handler.whip != nil {
v3.GET("/whip", s.v3handler.whip.ListChannels)
v3.GET("/whip/url", s.v3handler.whip.GetServerURL)
v3.GET("/whip/:name/url", s.v3handler.whip.GetURL)
}
// v3 Config
if s.v3handler.config != nil {
v3.GET("/config", s.v3handler.config.Get)

View File

@ -51,7 +51,7 @@ type replacer struct {
func New() Replacer {
r := &replacer{
templates: make(map[string]template),
re: regexp.MustCompile(`{([a-z]+(?::[0-9A-Za-z]+)?)(?:\^(.))?(?:,(.*?))?}`),
re: regexp.MustCompile(`{([a-z]+(?:-[a-z]+)*(?::[0-9A-Za-z]+)?)(?:\^(.))?(?:,(.*?))?}`),
templateRe: regexp.MustCompile(`{([a-z:]+)}`),
}

View File

@ -28,6 +28,15 @@ import (
"github.com/Masterminds/semver/v3"
)
// whipPlaceholderRe matches the {whip:name=<streamKey>} format emitted by the
// Restreamer UI. The replacer engine expects {whip,name=<streamKey>} (comma
// separated params), so we normalize before expansion.
var whipPlaceholderRe = regexp.MustCompile(`\{whip:name=([^}]+)\}`)
// whipRtspPlaceholderRe matches the {whip-rtsp:name=<streamKey>} UI format and
// normalizes it to {whip-rtsp,name=<streamKey>} for the replacer engine.
var whipRtspPlaceholderRe = regexp.MustCompile(`\{whip-rtsp:name=([^}]+)\}`)
// The Restreamer interface
type Restreamer interface {
ID() string // ID of this instance
@ -1546,6 +1555,47 @@ func resolvePlaceholders(config *app.Config, r replace.Replacer) {
input.Address = r.Replace(input.Address, "fs:*", "", vars, config, "input")
input.Address = r.Replace(input.Address, "rtmp", "", vars, config, "input")
input.Address = r.Replace(input.Address, "srt", "", vars, config, "input")
// Normalize {whip:name=X} (UI format) → {whip,name=X} (replacer format)
input.Address = whipPlaceholderRe.ReplaceAllString(input.Address, "{whip,name=$1}")
input.Address = r.Replace(input.Address, "whip", "", vars, config, "input")
// Normalize {whip-rtsp:name=X} → {whip-rtsp,name=X} (replacer format)
whipRtspWasExpanded := whipRtspPlaceholderRe.MatchString(input.Address)
input.Address = whipRtspPlaceholderRe.ReplaceAllString(input.Address, "{whip-rtsp,name=$1}")
input.Address = r.Replace(input.Address, "whip-rtsp", "", vars, config, "input")
// WHIP SDP relay URLs (http://…/whip/…/sdp) require FFmpeg to open nested
// rtp:// and udp:// sub-protocols, which are blocked by default when the
// top-level protocol is HTTP. Inject -protocol_whitelist automatically so
// that both the probe and the live process can open the relay without
// requiring the user to set it manually.
if strings.Contains(input.Address, "/whip/") && strings.HasSuffix(input.Address, "/sdp") {
hasWhitelist := false
for _, opt := range input.Options {
if strings.Contains(opt, "protocol_whitelist") {
hasWhitelist = true
break
}
}
if !hasWhitelist {
input.Options = append([]string{"-protocol_whitelist", "file,crypto,http,rtp,udp,tcp"}, input.Options...)
}
}
// WHIP RTSP relay URLs need RTP/AVP/TCP (interleaved) transport.
// Inject -rtsp_transport tcp automatically so FFmpeg uses the correct mode.
if whipRtspWasExpanded && strings.HasPrefix(input.Address, "rtsp://") {
hasRtspTransport := false
for _, opt := range input.Options {
if strings.Contains(opt, "rtsp_transport") {
hasRtspTransport = true
break
}
}
if !hasRtspTransport {
input.Options = append([]string{"-rtsp_transport", "tcp"}, input.Options...)
}
}
for j, option := range input.Options {
// Replace any known placeholders

102
vendor/modules.txt vendored
View File

@ -255,6 +255,102 @@ github.com/minio/minio-go/v7/pkg/tags
# github.com/mitchellh/mapstructure v1.5.0
## explicit; go 1.14
github.com/mitchellh/mapstructure
# github.com/pion/datachannel v1.5.8
## explicit; go 1.19
github.com/pion/datachannel
# github.com/pion/dtls/v2 v2.2.12
## explicit; go 1.13
github.com/pion/dtls/v2
github.com/pion/dtls/v2/internal/ciphersuite
github.com/pion/dtls/v2/internal/ciphersuite/types
github.com/pion/dtls/v2/internal/closer
github.com/pion/dtls/v2/internal/util
github.com/pion/dtls/v2/pkg/crypto/ccm
github.com/pion/dtls/v2/pkg/crypto/ciphersuite
github.com/pion/dtls/v2/pkg/crypto/clientcertificate
github.com/pion/dtls/v2/pkg/crypto/elliptic
github.com/pion/dtls/v2/pkg/crypto/fingerprint
github.com/pion/dtls/v2/pkg/crypto/hash
github.com/pion/dtls/v2/pkg/crypto/prf
github.com/pion/dtls/v2/pkg/crypto/signature
github.com/pion/dtls/v2/pkg/crypto/signaturehash
github.com/pion/dtls/v2/pkg/protocol
github.com/pion/dtls/v2/pkg/protocol/alert
github.com/pion/dtls/v2/pkg/protocol/extension
github.com/pion/dtls/v2/pkg/protocol/handshake
github.com/pion/dtls/v2/pkg/protocol/recordlayer
# github.com/pion/ice/v2 v2.3.38
## explicit; go 1.13
github.com/pion/ice/v2
github.com/pion/ice/v2/internal/atomic
github.com/pion/ice/v2/internal/fakenet
github.com/pion/ice/v2/internal/stun
# github.com/pion/interceptor v0.1.29
## explicit; go 1.19
github.com/pion/interceptor
github.com/pion/interceptor/internal/ntp
github.com/pion/interceptor/internal/sequencenumber
github.com/pion/interceptor/pkg/nack
github.com/pion/interceptor/pkg/report
github.com/pion/interceptor/pkg/twcc
# github.com/pion/logging v0.2.2
## explicit; go 1.12
github.com/pion/logging
# github.com/pion/mdns v0.0.12
## explicit; go 1.19
github.com/pion/mdns
# github.com/pion/randutil v0.1.0
## explicit; go 1.14
github.com/pion/randutil
# github.com/pion/rtcp v1.2.14
## explicit; go 1.13
github.com/pion/rtcp
# github.com/pion/rtp v1.8.7
## explicit; go 1.19
github.com/pion/rtp
github.com/pion/rtp/codecs
github.com/pion/rtp/codecs/av1/obu
github.com/pion/rtp/codecs/vp9
# github.com/pion/sctp v1.8.19
## explicit; go 1.19
github.com/pion/sctp
# github.com/pion/sdp/v3 v3.0.9
## explicit; go 1.13
github.com/pion/sdp/v3
# github.com/pion/srtp/v2 v2.0.20
## explicit; go 1.14
github.com/pion/srtp/v2
# github.com/pion/stun v0.6.1
## explicit; go 1.12
github.com/pion/stun
github.com/pion/stun/internal/hmac
# github.com/pion/transport/v2 v2.2.10
## explicit; go 1.12
github.com/pion/transport/v2
github.com/pion/transport/v2/connctx
github.com/pion/transport/v2/deadline
github.com/pion/transport/v2/packetio
github.com/pion/transport/v2/replaydetector
github.com/pion/transport/v2/stdnet
github.com/pion/transport/v2/udp
github.com/pion/transport/v2/utils/xor
github.com/pion/transport/v2/vnet
# github.com/pion/turn/v2 v2.1.6
## explicit; go 1.13
github.com/pion/turn/v2
github.com/pion/turn/v2/internal/allocation
github.com/pion/turn/v2/internal/client
github.com/pion/turn/v2/internal/ipnet
github.com/pion/turn/v2/internal/proto
github.com/pion/turn/v2/internal/server
# github.com/pion/webrtc/v3 v3.3.6
## explicit; go 1.17
github.com/pion/webrtc/v3
github.com/pion/webrtc/v3/internal/fmtp
github.com/pion/webrtc/v3/internal/mux
github.com/pion/webrtc/v3/internal/util
github.com/pion/webrtc/v3/pkg/media
github.com/pion/webrtc/v3/pkg/rtcerr
# github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2
## explicit
github.com/pmezard/go-difflib/difflib
@ -343,6 +439,9 @@ github.com/vektah/gqlparser/v2/lexer
github.com/vektah/gqlparser/v2/parser
github.com/vektah/gqlparser/v2/validator
github.com/vektah/gqlparser/v2/validator/rules
# github.com/wlynxg/anet v0.0.3
## explicit; go 1.20
github.com/wlynxg/anet
# github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb
## explicit
github.com/xeipuuv/gojsonpointer
@ -392,6 +491,8 @@ golang.org/x/crypto/argon2
golang.org/x/crypto/blake2b
golang.org/x/crypto/cryptobyte
golang.org/x/crypto/cryptobyte/asn1
golang.org/x/crypto/curve25519
golang.org/x/crypto/curve25519/internal/field
golang.org/x/crypto/ocsp
golang.org/x/crypto/pbkdf2
golang.org/x/crypto/sha3
@ -403,6 +504,7 @@ golang.org/x/mod/semver
# golang.org/x/net v0.25.0
## explicit; go 1.18
golang.org/x/net/bpf
golang.org/x/net/dns/dnsmessage
golang.org/x/net/html
golang.org/x/net/html/atom
golang.org/x/net/http/httpguts

176
whip/pion_provider.go Normal file
View File

@ -0,0 +1,176 @@
package whip
import (
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
)
// pionProvider implements WebRTCProvider using pion/webrtc v3.
// It handles full ICE + DTLS-SRTP negotiation and forwards the decrypted
// RTP to the local UDP relay sockets that FFmpeg reads.
type pionProvider struct {
mu sync.Mutex
pcs map[string]*webrtc.PeerConnection
}
// NewPionProvider returns a WebRTCProvider backed by pion/webrtc v3.
func NewPionProvider() WebRTCProvider {
return &pionProvider{
pcs: make(map[string]*webrtc.PeerConnection),
}
}
// OpenSession implements WebRTCProvider.
func (p *pionProvider) OpenSession(offerSDP string, videoPort, audioPort int) (string, error) {
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
return "", fmt.Errorf("pion: register codecs: %w", err)
}
ir := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(m, ir); err != nil {
return "", fmt.Errorf("pion: register interceptors: %w", err)
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(ir))
pc, err := api.NewPeerConnection(webrtc.Configuration{})
if err != nil {
return "", fmt.Errorf("pion: new peer connection: %w", err)
}
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly})
if err != nil {
pc.Close()
return "", fmt.Errorf("pion: add video transceiver: %w", err)
}
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio,
webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly})
if err != nil {
pc.Close()
return "", fmt.Errorf("pion: add audio transceiver: %w", err)
}
if err := pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: offerSDP,
}); err != nil {
pc.Close()
return "", fmt.Errorf("pion: set remote description: %w", err)
}
answer, err := pc.CreateAnswer(nil)
if err != nil {
pc.Close()
return "", fmt.Errorf("pion: create answer: %w", err)
}
gatherComplete := webrtc.GatheringCompletePromise(pc)
if err := pc.SetLocalDescription(answer); err != nil {
pc.Close()
return "", fmt.Errorf("pion: set local description: %w", err)
}
<-gatherComplete
finalSDP := pc.LocalDescription().SDP
videoDst, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", videoPort))
if err != nil {
pc.Close()
return "", fmt.Errorf("pion: resolve video relay addr: %w", err)
}
audioDst, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", audioPort))
if err != nil {
pc.Close()
return "", fmt.Errorf("pion: resolve audio relay addr: %w", err)
}
videoConn, err := net.DialUDP("udp4", nil, videoDst)
if err != nil {
pc.Close()
return "", fmt.Errorf("pion: dial video relay: %w", err)
}
audioConn, err := net.DialUDP("udp4", nil, audioDst)
if err != nil {
videoConn.Close()
pc.Close()
return "", fmt.Errorf("pion: dial audio relay: %w", err)
}
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
var dst *net.UDPConn
if strings.EqualFold(track.Kind().String(), "video") {
dst = videoConn
// Request a keyframe immediately so consumers (ffprobe, FFmpeg) can
// determine the video resolution without waiting for the next IDR.
// Then send PLI every 2 s to keep keyframes flowing.
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
MediaSSRC: uint32(track.SSRC()),
}})
<-ticker.C
if pc.ConnectionState() != webrtc.PeerConnectionStateConnected {
return
}
}
}()
} else {
dst = audioConn
}
buf := make([]byte, 1500)
for {
n, _, err := track.Read(buf)
if err != nil {
return
}
dst.Write(buf[:n])
}
})
ufrag := extractICEUfrag(offerSDP)
p.mu.Lock()
p.pcs[ufrag] = pc
p.mu.Unlock()
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
if state == webrtc.PeerConnectionStateClosed ||
state == webrtc.PeerConnectionStateFailed ||
state == webrtc.PeerConnectionStateDisconnected {
videoConn.Close()
audioConn.Close()
p.mu.Lock()
delete(p.pcs, ufrag)
p.mu.Unlock()
}
})
return finalSDP, nil
}
func extractICEUfrag(sdp string) string {
for _, line := range strings.Split(sdp, "\n") {
line = strings.TrimRight(line, "\r")
if strings.HasPrefix(line, "a=ice-ufrag:") {
return strings.TrimPrefix(line, "a=ice-ufrag:")
}
}
if len(sdp) > 32 {
return sdp[:32]
}
return sdp
}

214
whip/pion_whep.go Normal file
View File

@ -0,0 +1,214 @@
package whip
import (
"fmt"
"strings"
"sync"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
)
// whepCodecParams describes the exact RTP codec the WHIP publisher (OBS) is sending.
// Using these instead of RegisterDefaultCodecs ensures the browser negotiates
// the same payload type numbers OBS uses, which is mandatory because
// TrackLocalStaticRTP does NOT rewrite the PT in packets — it only rewrites SSRC.
// Mismatch: OBS sends PT 96, browser expects PT 102 → browser silently discards
// all H264 packets → no video. Audio works by coincidence (Opus is PT 111 in both).
type whepCodecParams struct {
VideoMime string // "video/H264"
VideoClockRate uint32
VideoFmtp string // e.g. "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=640034"
VideoPT uint8 // OBS's H264 payload type, typically 96
AudioMime string // "audio/opus"
AudioClockRate uint32
AudioChannels uint16
AudioPT uint8 // OBS's Opus payload type, typically 111
}
// whepPionSession is a single WHEP subscriber backed by a pion PeerConnection.
// It receives plain RTP packets (already decrypted by the WHIP publisher side)
// via WriteVideo/WriteAudio and forwards them to the remote browser via
// ICE + DTLS-SRTP.
type whepPionSession struct {
pc *webrtc.PeerConnection
videoTrack *webrtc.TrackLocalStaticRTP
audioTrack *webrtc.TrackLocalStaticRTP
id string
done chan struct{}
once sync.Once
}
// newWHEPPionSession creates a new WHEP subscriber PeerConnection.
// offerSDP is the SDP offer from the browser.
// params must match the publisher's codec configuration exactly so that the
// browser negotiates the same payload type numbers the publisher uses.
func newWHEPPionSession(id, offerSDP string, params whepCodecParams) (*whepPionSession, string, error) {
m := &webrtc.MediaEngine{}
// Register the exact video codec OBS is sending.
// Using the publisher's PT (e.g. 96) forces the SDP answer to contain
// a=rtpmap:96 H264/90000, so the browser maps PT 96 → H264 and accepts
// the raw RTP packets that arrive unchanged from OBS.
videoFmtp := normalizeH264Fmtp(params.VideoMime, params.VideoFmtp)
if err := m.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: params.VideoMime,
ClockRate: params.VideoClockRate,
SDPFmtpLine: videoFmtp,
RTCPFeedback: []webrtc.RTCPFeedback{
{Type: "nack", Parameter: "pli"},
},
},
PayloadType: webrtc.PayloadType(params.VideoPT),
}, webrtc.RTPCodecTypeVideo); err != nil {
return nil, "", fmt.Errorf("whep: register video codec: %w", err)
}
audioFmtp := ""
if strings.Contains(strings.ToLower(params.AudioMime), "opus") {
audioFmtp = "minptime=10;useinbandfec=1"
}
if err := m.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: params.AudioMime,
ClockRate: params.AudioClockRate,
Channels: params.AudioChannels,
SDPFmtpLine: audioFmtp,
},
PayloadType: webrtc.PayloadType(params.AudioPT),
}, webrtc.RTPCodecTypeAudio); err != nil {
return nil, "", fmt.Errorf("whep: register audio codec: %w", err)
}
ir := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(m, ir); err != nil {
return nil, "", fmt.Errorf("whep: register interceptors: %w", err)
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(ir))
pc, err := api.NewPeerConnection(webrtc.Configuration{})
if err != nil {
return nil, "", fmt.Errorf("whep: new peer connection: %w", err)
}
videoTrack, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{MimeType: params.VideoMime, ClockRate: params.VideoClockRate, SDPFmtpLine: videoFmtp},
"video", "whep-video-"+id,
)
if err != nil {
pc.Close()
return nil, "", fmt.Errorf("whep: create video track: %w", err)
}
audioTrack, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{MimeType: params.AudioMime, ClockRate: params.AudioClockRate, Channels: params.AudioChannels},
"audio", "whep-audio-"+id,
)
if err != nil {
pc.Close()
return nil, "", fmt.Errorf("whep: create audio track: %w", err)
}
if _, err = pc.AddTrack(videoTrack); err != nil {
pc.Close()
return nil, "", fmt.Errorf("whep: add video track: %w", err)
}
if _, err = pc.AddTrack(audioTrack); err != nil {
pc.Close()
return nil, "", fmt.Errorf("whep: add audio track: %w", err)
}
if err = pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: offerSDP,
}); err != nil {
pc.Close()
return nil, "", fmt.Errorf("whep: set remote description: %w", err)
}
answer, err := pc.CreateAnswer(nil)
if err != nil {
pc.Close()
return nil, "", fmt.Errorf("whep: create answer: %w", err)
}
gatherComplete := webrtc.GatheringCompletePromise(pc)
if err = pc.SetLocalDescription(answer); err != nil {
pc.Close()
return nil, "", fmt.Errorf("whep: set local description: %w", err)
}
<-gatherComplete
sess := &whepPionSession{
pc: pc,
videoTrack: videoTrack,
audioTrack: audioTrack,
id: id,
done: make(chan struct{}),
}
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
if state == webrtc.PeerConnectionStateClosed ||
state == webrtc.PeerConnectionStateFailed ||
state == webrtc.PeerConnectionStateDisconnected {
sess.Close()
}
})
return sess, pc.LocalDescription().SDP, nil
}
// normalizeH264Fmtp ensures H264 fmtp always has level-asymmetry-allowed=1 and
// packetization-mode=1. level-asymmetry-allowed lets the browser decode a
// higher profile than it advertised. packetization-mode=1 is required for FU-A
// fragmented NALUs (how OBS sends large H264 frames).
func normalizeH264Fmtp(mime, fmtp string) string {
if !strings.Contains(strings.ToLower(mime), "h264") {
return fmtp
}
if fmtp == "" {
return "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f"
}
if !strings.Contains(fmtp, "packetization-mode") {
fmtp += ";packetization-mode=1"
}
if !strings.Contains(fmtp, "level-asymmetry-allowed") {
fmtp = "level-asymmetry-allowed=1;" + fmtp
}
return fmtp
}
// ID returns the unique subscriber identifier.
func (s *whepPionSession) ID() string { return s.id }
// WriteVideo forwards a decrypted RTP video packet to the subscriber via DTLS-SRTP.
func (s *whepPionSession) WriteVideo(pkt []byte) {
select {
case <-s.done:
return
default:
s.videoTrack.Write(pkt) //nolint:errcheck
}
}
// WriteAudio forwards a decrypted RTP audio packet to the subscriber via DTLS-SRTP.
func (s *whepPionSession) WriteAudio(pkt []byte) {
select {
case <-s.done:
return
default:
s.audioTrack.Write(pkt) //nolint:errcheck
}
}
// Done returns a channel that is closed when the subscriber disconnects.
func (s *whepPionSession) Done() <-chan struct{} { return s.done }
// Close tears down the subscriber PeerConnection.
func (s *whepPionSession) Close() {
s.once.Do(func() {
s.pc.Close()
close(s.done)
})
}

501
whip/rtsp_relay.go Normal file
View File

@ -0,0 +1,501 @@
// Package whip provides a minimal RFC 2326 RTSP relay server.
//
// The RTSP relay accepts TCP connections from FFmpeg (or any RTSP consumer)
// and forwards RTP packets from active WHIP publishers using RTP/AVP/TCP
// interleaved mode (RFC 2326 §10.12).
//
// Each stream is served at rtsp://<host>:<port>/live/<name>.
// Multiple consumers may connect to the same stream simultaneously.
// Only RTP/AVP/TCP transport (interleaved binary) is supported.
package whip
import (
"bufio"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/datarhei/core/v16/log"
)
// RTSPCodecParams holds codec parameters for a single WHIP stream.
// These are used to build the RTSP DESCRIBE response (SDP).
type RTSPCodecParams struct {
VideoPayloadType int
VideoCodec string // e.g. "H264"
VideoClockRate int
VideoFmtp string // e.g. "sprop-parameter-sets=...;packetization-mode=1"
AudioPayloadType int
AudioCodec string // e.g. "opus"
AudioClockRate int
AudioChannels int
}
// rtspRelay is a minimal TCP RTSP server that relays WHIP streams to multiple
// FFmpeg consumers using RTP/AVP/TCP interleaved mode (RFC 2326 §10.12).
type rtspRelay struct {
addr string
ln net.Listener
streams sync.Map // name -> *rtspStreamEntry
logger log.Logger
done chan struct{}
wg sync.WaitGroup
}
// rtspStreamEntry tracks per-stream codec params and connected consumers.
type rtspStreamEntry struct {
name string
mu sync.RWMutex
codecs RTSPCodecParams
clients map[string]*rtspRelayClient
}
// rtspRelayClient represents one connected RTSP consumer (e.g. an FFmpeg process).
type rtspRelayClient struct {
id string
conn net.Conn
videoTrack int // interleaved channel index for video RTP
audioTrack int // interleaved channel index for audio RTP
mu sync.Mutex
closed bool
}
// newRTSPRelay creates a new relay server bound to addr. Call ListenAndServe to start.
func newRTSPRelay(addr string, logger log.Logger) *rtspRelay {
if logger == nil {
logger = log.New("")
}
return &rtspRelay{
addr: addr,
logger: logger,
done: make(chan struct{}),
}
}
// ListenAndServe starts the RTSP relay and blocks until Close is called.
func (r *rtspRelay) ListenAndServe() error {
ln, err := net.Listen("tcp", r.addr)
if err != nil {
return fmt.Errorf("rtsp relay: listen %q: %w", r.addr, err)
}
r.ln = ln
r.logger.Info().WithField("addr", r.addr).Log("RTSP relay started")
for {
conn, err := ln.Accept()
if err != nil {
select {
case <-r.done:
return nil
default:
r.logger.Error().WithField("error", err).Log("RTSP relay: accept error")
continue
}
}
r.wg.Add(1)
go func() {
defer r.wg.Done()
r.handleConn(conn)
}()
}
}
// Close shuts down the relay and waits for all connections to finish.
func (r *rtspRelay) Close() {
select {
case <-r.done:
default:
close(r.done)
}
if r.ln != nil {
r.ln.Close()
}
r.wg.Wait()
}
// UpdateCodecs updates the codec parameters used in DESCRIBE responses for name.
func (r *rtspRelay) UpdateCodecs(name string, params RTSPCodecParams) {
e := r.getOrCreateStream(name)
e.mu.Lock()
e.codecs = params
e.mu.Unlock()
}
// WriteVideo forwards a video RTP packet to all consumers watching name.
func (r *rtspRelay) WriteVideo(name string, pkt []byte) {
v, ok := r.streams.Load(name)
if !ok {
return
}
e := v.(*rtspStreamEntry)
e.mu.RLock()
clients := make([]*rtspRelayClient, 0, len(e.clients))
for _, c := range e.clients {
clients = append(clients, c)
}
e.mu.RUnlock()
for _, c := range clients {
c.writeInterleaved(c.videoTrack, pkt)
}
}
// WriteAudio forwards an audio RTP packet to all consumers watching name.
func (r *rtspRelay) WriteAudio(name string, pkt []byte) {
v, ok := r.streams.Load(name)
if !ok {
return
}
e := v.(*rtspStreamEntry)
e.mu.RLock()
clients := make([]*rtspRelayClient, 0, len(e.clients))
for _, c := range e.clients {
clients = append(clients, c)
}
e.mu.RUnlock()
for _, c := range clients {
c.writeInterleaved(c.audioTrack, pkt)
}
}
// getOrCreateStream returns the named stream entry, creating it with sensible defaults if needed.
func (r *rtspRelay) getOrCreateStream(name string) *rtspStreamEntry {
e := &rtspStreamEntry{
name: name,
clients: make(map[string]*rtspRelayClient),
codecs: RTSPCodecParams{
VideoPayloadType: 96,
VideoCodec: "H264",
VideoClockRate: 90000,
AudioPayloadType: 111,
AudioCodec: "opus",
AudioClockRate: 48000,
AudioChannels: 2,
},
}
actual, _ := r.streams.LoadOrStore(name, e)
return actual.(*rtspStreamEntry)
}
// writeInterleaved sends one RFC 2326 §10.12 interleaved binary packet over the TCP connection.
// If the write times out (slow consumer), the connection is closed to avoid head-of-line blocking.
func (c *rtspRelayClient) writeInterleaved(channel int, data []byte) {
if len(data) > 65535 {
return
}
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return
}
hdr := [4]byte{'$', byte(channel), byte(len(data) >> 8), byte(len(data))}
c.conn.SetWriteDeadline(time.Now().Add(150 * time.Millisecond))
if _, err := c.conn.Write(hdr[:]); err != nil {
c.closed = true
c.conn.Close()
return
}
if _, err := c.conn.Write(data); err != nil {
c.closed = true
c.conn.Close()
return
}
c.conn.SetWriteDeadline(time.Time{})
}
// handleConn processes one RTSP TCP connection through OPTIONS → DESCRIBE → SETUP → PLAY.
func (r *rtspRelay) handleConn(conn net.Conn) {
defer conn.Close()
rdr := bufio.NewReader(conn)
sessionID := fmt.Sprintf("%016x", time.Now().UnixNano())
var entry *rtspStreamEntry
var client *rtspRelayClient
videoInterleaved := 0
audioInterleaved := 2
for {
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
method, reqURL, headers, err := rtspReadRequest(rdr)
conn.SetReadDeadline(time.Time{})
if err != nil {
return
}
cseq := headers["cseq"]
switch method {
case "OPTIONS":
rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{
"Public": "OPTIONS, DESCRIBE, SETUP, PLAY, TEARDOWN",
}, "")
case "DESCRIBE":
name := rtspExtractName(reqURL)
if name == "" {
rtspWriteResponse(conn, 404, "Not Found", cseq, nil, "")
return
}
entry = r.getOrCreateStream(name)
sdp := entry.buildSDP()
rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{
"Content-Type": "application/sdp",
"Content-Base": reqURL + "/",
"Content-Length": strconv.Itoa(len(sdp)),
}, sdp)
case "SETUP":
transport := headers["transport"]
if !strings.Contains(strings.ToLower(transport), "tcp") {
// Only TCP interleaved supported; decline UDP politely.
rtspWriteResponse(conn, 461, "Unsupported Transport", cseq, nil, "")
return
}
isAudio := strings.HasSuffix(strings.TrimRight(reqURL, "/"), "track1")
ch := rtspParseInterleaved(transport)
if isAudio {
if ch >= 0 {
audioInterleaved = ch
}
rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{
"Session": sessionID + ";timeout=60",
"Transport": fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d;mode=play", audioInterleaved, audioInterleaved+1),
}, "")
} else {
if ch >= 0 {
videoInterleaved = ch
}
rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{
"Session": sessionID + ";timeout=60",
"Transport": fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d;mode=play", videoInterleaved, videoInterleaved+1),
}, "")
}
case "PLAY":
if entry == nil {
rtspWriteResponse(conn, 455, "Method Not Valid in This State", cseq, nil, "")
return
}
client = &rtspRelayClient{
id: sessionID,
conn: conn,
videoTrack: videoInterleaved,
audioTrack: audioInterleaved,
}
entry.mu.Lock()
entry.clients[sessionID] = client
entry.mu.Unlock()
rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{
"Session": sessionID,
}, "")
r.logger.Info().
WithField("stream", entry.name).
WithField("session", sessionID).
Log("RTSP client connected")
// Stay connected: discard interleaved data, handle TEARDOWN.
for {
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
b, readErr := rdr.ReadByte()
conn.SetReadDeadline(time.Time{})
if readErr != nil {
break
}
if b == '$' {
// Discard interleaved binary data sent by the client (e.g. RTCP RR).
var meta [3]byte
if _, err := io.ReadFull(rdr, meta[:]); err != nil {
break
}
dataLen := int(meta[1])<<8 | int(meta[2])
if dataLen > 0 {
if _, err := io.CopyN(io.Discard, rdr, int64(dataLen)); err != nil {
break
}
}
continue
}
rdr.UnreadByte()
m, _, innerHdrs, err := rtspReadRequest(rdr)
if err != nil || m == "TEARDOWN" {
if m == "TEARDOWN" {
rtspWriteResponse(conn, 200, "OK", innerHdrs["cseq"], map[string]string{
"Session": sessionID,
}, "")
}
break
}
}
entry.mu.Lock()
delete(entry.clients, sessionID)
entry.mu.Unlock()
r.logger.Info().
WithField("stream", entry.name).
WithField("session", sessionID).
Log("RTSP client disconnected")
return
case "TEARDOWN":
if entry != nil {
entry.mu.Lock()
delete(entry.clients, sessionID)
entry.mu.Unlock()
}
rtspWriteResponse(conn, 200, "OK", cseq, map[string]string{
"Session": sessionID,
}, "")
return
default:
rtspWriteResponse(conn, 405, "Method Not Allowed", cseq, nil, "")
}
}
}
// buildSDP returns an SDP description for this stream suitable for RTSP DESCRIBE.
func (e *rtspStreamEntry) buildSDP() string {
e.mu.RLock()
c := e.codecs
e.mu.RUnlock()
var sb strings.Builder
sb.WriteString("v=0\r\n")
sb.WriteString("o=- 1 1 IN IP4 127.0.0.1\r\n")
sb.WriteString("s=WHIP Live Stream\r\n")
sb.WriteString("c=IN IP4 127.0.0.1\r\n")
sb.WriteString("t=0 0\r\n")
sb.WriteString(fmt.Sprintf("m=video 0 RTP/AVP %d\r\n", c.VideoPayloadType))
sb.WriteString(fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", c.VideoPayloadType, c.VideoCodec, c.VideoClockRate))
if c.VideoFmtp != "" {
sb.WriteString(fmt.Sprintf("a=fmtp:%d %s\r\n", c.VideoPayloadType, c.VideoFmtp))
} else if strings.EqualFold(c.VideoCodec, "H264") {
sb.WriteString(fmt.Sprintf("a=fmtp:%d packetization-mode=1\r\n", c.VideoPayloadType))
}
sb.WriteString("a=control:track0\r\n")
sb.WriteString(fmt.Sprintf("m=audio 0 RTP/AVP %d\r\n", c.AudioPayloadType))
if c.AudioChannels > 1 {
sb.WriteString(fmt.Sprintf("a=rtpmap:%d %s/%d/%d\r\n", c.AudioPayloadType, c.AudioCodec, c.AudioClockRate, c.AudioChannels))
} else {
sb.WriteString(fmt.Sprintf("a=rtpmap:%d %s/%d\r\n", c.AudioPayloadType, c.AudioCodec, c.AudioClockRate))
}
sb.WriteString("a=control:track1\r\n")
return sb.String()
}
// rtspReadRequest reads one RTSP request (request line + headers) from rdr.
// Returns method, URL, lowercase-keyed headers map, and any error.
func rtspReadRequest(rdr *bufio.Reader) (method, reqURL string, headers map[string]string, err error) {
headers = make(map[string]string)
// First line: "METHOD URL RTSP/1.0"
line, err := rdr.ReadString('\n')
if err != nil {
return
}
line = strings.TrimRight(line, "\r\n")
parts := strings.SplitN(line, " ", 3)
if len(parts) < 2 {
err = fmt.Errorf("invalid RTSP request line: %q", line)
return
}
method = strings.ToUpper(parts[0])
reqURL = parts[1]
// Headers until blank line.
for {
line, err = rdr.ReadString('\n')
if err != nil {
return
}
line = strings.TrimRight(line, "\r\n")
if line == "" {
break
}
idx := strings.IndexByte(line, ':')
if idx < 0 {
continue
}
key := strings.ToLower(strings.TrimSpace(line[:idx]))
val := strings.TrimSpace(line[idx+1:])
headers[key] = val
}
return
}
// rtspWriteResponse writes an RTSP/1.0 response to conn.
func rtspWriteResponse(conn net.Conn, code int, reason, cseq string, extra map[string]string, body string) {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("RTSP/1.0 %d %s\r\n", code, reason))
if cseq != "" {
sb.WriteString("CSeq: " + cseq + "\r\n")
}
sb.WriteString("Server: datarhei-core-whip/1.0\r\n")
sb.WriteString(fmt.Sprintf("Date: %s\r\n", time.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05 GMT")))
for k, v := range extra {
sb.WriteString(k + ": " + v + "\r\n")
}
sb.WriteString("\r\n")
if body != "" {
sb.WriteString(body)
}
conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
conn.Write([]byte(sb.String())) //nolint:errcheck
conn.SetWriteDeadline(time.Time{})
}
// rtspExtractName extracts the stream name from an RTSP URL like
// rtsp://127.0.0.1:8554/live/mystream or rtsp://host/live/mystream/track0.
func rtspExtractName(rtspURL string) string {
u := rtspURL
// Strip scheme + host.
if idx := strings.Index(u, "://"); idx >= 0 {
u = u[idx+3:]
}
if slash := strings.Index(u, "/"); slash >= 0 {
u = u[slash+1:]
}
// Expect "live/<name>[/trackN]"
if strings.HasPrefix(u, "live/") {
u = u[5:]
}
// Strip any trailing "/trackN" suffix.
if idx := strings.LastIndex(u, "/"); idx >= 0 {
tail := u[idx+1:]
if strings.HasPrefix(tail, "track") {
u = u[:idx]
}
}
// Strip query string.
if idx := strings.IndexByte(u, '?'); idx >= 0 {
u = u[:idx]
}
return u
}
// rtspParseInterleaved parses the "interleaved=X-Y" field from an RTSP Transport
// header and returns X (the RTP channel index), or -1 if not present.
func rtspParseInterleaved(transport string) int {
lower := strings.ToLower(transport)
idx := strings.Index(lower, "interleaved=")
if idx < 0 {
return -1
}
rest := transport[idx+12:]
parts := strings.SplitN(rest, "-", 2)
n, err := strconv.Atoi(strings.TrimSpace(parts[0]))
if err != nil {
return -1
}
return n
}

401
whip/rtsp_relay_test.go Normal file
View File

@ -0,0 +1,401 @@
package whip
import (
"bufio"
"fmt"
"net"
"strings"
"testing"
"time"
)
// dialRTSP dials the relay and returns a bufio.Reader + the raw conn.
func dialRTSP(t *testing.T, addr string) (net.Conn, *bufio.Reader) {
t.Helper()
conn, err := net.DialTimeout("tcp", addr, 3*time.Second)
if err != nil {
t.Fatalf("RTSP dial %s: %v", addr, err)
}
return conn, bufio.NewReader(conn)
}
// sendRTSP writes an RTSP request and returns the parsed response status line + headers.
func sendRTSP(t *testing.T, conn net.Conn, rdr *bufio.Reader, request string) (status string, headers map[string]string) {
t.Helper()
conn.SetWriteDeadline(time.Now().Add(3 * time.Second))
if _, err := conn.Write([]byte(request)); err != nil {
t.Fatalf("write RTSP request: %v", err)
}
conn.SetWriteDeadline(time.Time{})
headers = make(map[string]string)
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
line, err := rdr.ReadString('\n')
conn.SetReadDeadline(time.Time{})
if err != nil {
t.Fatalf("read RTSP status: %v", err)
}
status = strings.TrimRight(line, "\r\n")
for {
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
hdr, err := rdr.ReadString('\n')
conn.SetReadDeadline(time.Time{})
if err != nil {
t.Fatalf("read RTSP header: %v", err)
}
hdr = strings.TrimRight(hdr, "\r\n")
if hdr == "" {
break
}
idx := strings.IndexByte(hdr, ':')
if idx >= 0 {
k := strings.ToLower(strings.TrimSpace(hdr[:idx]))
v := strings.TrimSpace(hdr[idx+1:])
headers[k] = v
}
}
// If there's a body (Content-Length), drain it.
if cl, ok := headers["content-length"]; ok {
n := 0
fmt.Sscanf(cl, "%d", &n)
buf := make([]byte, n)
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
for read := 0; read < n; {
nn, err := rdr.Read(buf[read:])
if err != nil {
t.Fatalf("drain body: %v", err)
}
read += nn
}
conn.SetReadDeadline(time.Time{})
headers["__body__"] = string(buf)
}
return status, headers
}
// readInterleaved reads one RFC 2326 §10.12 interleaved binary frame.
// Returns channel index and payload.
func readInterleaved(t *testing.T, conn net.Conn, rdr *bufio.Reader) (channel int, payload []byte) {
t.Helper()
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
defer conn.SetReadDeadline(time.Time{})
// '$' marker
b, err := rdr.ReadByte()
if err != nil {
t.Fatalf("readInterleaved: read '$': %v", err)
}
if b != '$' {
t.Fatalf("readInterleaved: expected '$', got %02x", b)
}
ch, err := rdr.ReadByte()
if err != nil {
t.Fatalf("readInterleaved: read channel: %v", err)
}
hi, err := rdr.ReadByte()
if err != nil {
t.Fatalf("readInterleaved: read len hi: %v", err)
}
lo, err := rdr.ReadByte()
if err != nil {
t.Fatalf("readInterleaved: read len lo: %v", err)
}
length := int(hi)<<8 | int(lo)
payload = make([]byte, length)
for read := 0; read < length; {
nn, err := rdr.Read(payload[read:])
if err != nil {
t.Fatalf("readInterleaved: read payload: %v", err)
}
read += nn
}
return int(ch), payload
}
// freePort returns an unused TCP port on localhost.
func freePort(t *testing.T) string {
t.Helper()
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("freePort: %v", err)
}
addr := ln.Addr().String()
ln.Close()
return addr
}
// TestRTSPRelayOptions verifies OPTIONS returns the correct Public header.
func TestRTSPRelayOptions(t *testing.T) {
addr := freePort(t)
relay := newRTSPRelay(addr, nil)
go relay.ListenAndServe() //nolint:errcheck
defer relay.Close()
time.Sleep(30 * time.Millisecond)
conn, rdr := dialRTSP(t, addr)
defer conn.Close()
status, hdrs := sendRTSP(t, conn, rdr,
"OPTIONS rtsp://"+addr+"/live/test RTSP/1.0\r\nCSeq: 1\r\n\r\n")
if !strings.Contains(status, "200") {
t.Fatalf("OPTIONS: expected 200, got %q", status)
}
pub := hdrs["public"]
for _, m := range []string{"OPTIONS", "DESCRIBE", "SETUP", "PLAY", "TEARDOWN"} {
if !strings.Contains(pub, m) {
t.Errorf("OPTIONS Public missing %q: %q", m, pub)
}
}
}
// TestRTSPRelayDescribeDefaultCodecs verifies DESCRIBE returns valid SDP with default codecs.
func TestRTSPRelayDescribeDefaultCodecs(t *testing.T) {
addr := freePort(t)
relay := newRTSPRelay(addr, nil)
go relay.ListenAndServe() //nolint:errcheck
defer relay.Close()
time.Sleep(30 * time.Millisecond)
conn, rdr := dialRTSP(t, addr)
defer conn.Close()
sendRTSP(t, conn, rdr,
"OPTIONS rtsp://"+addr+"/live/mystream RTSP/1.0\r\nCSeq: 1\r\n\r\n")
_, hdrs := sendRTSP(t, conn, rdr,
"DESCRIBE rtsp://"+addr+"/live/mystream RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n")
if hdrs["content-type"] != "application/sdp" {
t.Fatalf("DESCRIBE: expected content-type application/sdp, got %q", hdrs["content-type"])
}
body := hdrs["__body__"]
for _, want := range []string{"H264", "opus", "a=control:track0", "a=control:track1"} {
if !strings.Contains(body, want) {
t.Errorf("DESCRIBE SDP missing %q:\n%s", want, body)
}
}
}
// TestRTSPRelayDescribeUpdatedCodecs verifies that UpdateCodecs changes the DESCRIBE SDP.
func TestRTSPRelayDescribeUpdatedCodecs(t *testing.T) {
addr := freePort(t)
relay := newRTSPRelay(addr, nil)
go relay.ListenAndServe() //nolint:errcheck
defer relay.Close()
time.Sleep(30 * time.Millisecond)
relay.UpdateCodecs("cam1", RTSPCodecParams{
VideoPayloadType: 96,
VideoCodec: "H264",
VideoClockRate: 90000,
VideoFmtp: "sprop-parameter-sets=abc;packetization-mode=1",
AudioPayloadType: 111,
AudioCodec: "opus",
AudioClockRate: 48000,
AudioChannels: 2,
})
conn, rdr := dialRTSP(t, addr)
defer conn.Close()
sendRTSP(t, conn, rdr,
"OPTIONS rtsp://"+addr+"/live/cam1 RTSP/1.0\r\nCSeq: 1\r\n\r\n")
_, hdrs := sendRTSP(t, conn, rdr,
"DESCRIBE rtsp://"+addr+"/live/cam1 RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n")
body := hdrs["__body__"]
if !strings.Contains(body, "sprop-parameter-sets=abc") {
t.Errorf("DESCRIBE SDP should contain updated fmtp:\n%s", body)
}
}
// TestRTSPRelayDataDelivery is the full E2E test:
// 1. relay starts
// 2. RTSP client connects (OPTIONS → DESCRIBE → SETUP video → SETUP audio → PLAY)
// 3. WriteVideo + WriteAudio inject fake RTP packets
// 4. client reads the interleaved frames and verifies channel/payload match
func TestRTSPRelayDataDelivery(t *testing.T) {
addr := freePort(t)
relay := newRTSPRelay(addr, nil)
go relay.ListenAndServe() //nolint:errcheck
defer relay.Close()
time.Sleep(30 * time.Millisecond)
streamName := "obs-test"
relay.UpdateCodecs(streamName, RTSPCodecParams{
VideoPayloadType: 96,
VideoCodec: "H264",
VideoClockRate: 90000,
AudioPayloadType: 111,
AudioCodec: "opus",
AudioClockRate: 48000,
AudioChannels: 2,
})
base := "rtsp://" + addr + "/live/" + streamName
conn, rdr := dialRTSP(t, addr)
defer conn.Close()
// OPTIONS
status, _ := sendRTSP(t, conn, rdr,
fmt.Sprintf("OPTIONS %s RTSP/1.0\r\nCSeq: 1\r\n\r\n", base))
if !strings.Contains(status, "200") {
t.Fatalf("OPTIONS: %s", status)
}
// DESCRIBE
status, _ = sendRTSP(t, conn, rdr,
fmt.Sprintf("DESCRIBE %s RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n", base))
if !strings.Contains(status, "200") {
t.Fatalf("DESCRIBE: %s", status)
}
// SETUP video interleaved channels 0-1
status, _ = sendRTSP(t, conn, rdr,
fmt.Sprintf("SETUP %s/track0 RTSP/1.0\r\nCSeq: 3\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1;mode=play\r\n\r\n", base))
if !strings.Contains(status, "200") {
t.Fatalf("SETUP video: %s", status)
}
// SETUP audio interleaved channels 2-3
status, _ = sendRTSP(t, conn, rdr,
fmt.Sprintf("SETUP %s/track1 RTSP/1.0\r\nCSeq: 4\r\nTransport: RTP/AVP/TCP;unicast;interleaved=2-3;mode=play\r\n\r\n", base))
if !strings.Contains(status, "200") {
t.Fatalf("SETUP audio: %s", status)
}
// PLAY
status, _ = sendRTSP(t, conn, rdr,
fmt.Sprintf("PLAY %s RTSP/1.0\r\nCSeq: 5\r\nSession: ignored\r\n\r\n", base))
if !strings.Contains(status, "200") {
t.Fatalf("PLAY: %s", status)
}
// Inject a video RTP packet (fake but valid minimal structure).
videoRTP := buildFakeRTP(96, 1000, 90000)
relay.WriteVideo(streamName, videoRTP)
ch, payload := readInterleaved(t, conn, rdr)
if ch != 0 {
t.Errorf("expected video on interleaved channel 0, got %d", ch)
}
if len(payload) != len(videoRTP) {
t.Errorf("video payload length: want %d got %d", len(videoRTP), len(payload))
}
// Inject an audio RTP packet.
audioRTP := buildFakeRTP(111, 2000, 48000)
relay.WriteAudio(streamName, audioRTP)
ch, payload = readInterleaved(t, conn, rdr)
if ch != 2 {
t.Errorf("expected audio on interleaved channel 2, got %d", ch)
}
if len(payload) != len(audioRTP) {
t.Errorf("audio payload length: want %d got %d", len(audioRTP), len(payload))
}
_ = payload
}
// TestRTSPRelayMultipleSubscribers verifies that two simultaneous clients both
// receive the same RTP packets.
func TestRTSPRelayMultipleSubscribers(t *testing.T) {
addr := freePort(t)
relay := newRTSPRelay(addr, nil)
go relay.ListenAndServe() //nolint:errcheck
defer relay.Close()
time.Sleep(30 * time.Millisecond)
streamName := "multi"
base := "rtsp://" + addr + "/live/" + streamName
connectAndPlay := func(t *testing.T) (net.Conn, *bufio.Reader) {
t.Helper()
c, r := dialRTSP(t, addr)
for _, req := range []string{
fmt.Sprintf("OPTIONS %s RTSP/1.0\r\nCSeq: 1\r\n\r\n", base),
fmt.Sprintf("DESCRIBE %s RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n", base),
fmt.Sprintf("SETUP %s/track0 RTSP/1.0\r\nCSeq: 3\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1;mode=play\r\n\r\n", base),
fmt.Sprintf("PLAY %s RTSP/1.0\r\nCSeq: 4\r\nSession: ignored\r\n\r\n", base),
} {
status, _ := sendRTSP(t, c, r, req)
if !strings.Contains(status, "200") {
t.Fatalf("setup failed: %s", status)
}
}
return c, r
}
conn1, rdr1 := connectAndPlay(t)
defer conn1.Close()
conn2, rdr2 := connectAndPlay(t)
defer conn2.Close()
pkt := buildFakeRTP(96, 5000, 90000)
relay.WriteVideo(streamName, pkt)
ch1, p1 := readInterleaved(t, conn1, rdr1)
ch2, p2 := readInterleaved(t, conn2, rdr2)
if ch1 != 0 || ch2 != 0 {
t.Errorf("expected channel 0 for both clients, got %d and %d", ch1, ch2)
}
if len(p1) != len(pkt) || len(p2) != len(pkt) {
t.Errorf("packet length mismatch: sent %d, got %d and %d", len(pkt), len(p1), len(p2))
}
}
// TestRTSPRelayTEARDOWN verifies that TEARDOWN is handled gracefully.
func TestRTSPRelayTEARDOWN(t *testing.T) {
addr := freePort(t)
relay := newRTSPRelay(addr, nil)
go relay.ListenAndServe() //nolint:errcheck
defer relay.Close()
time.Sleep(30 * time.Millisecond)
base := "rtsp://" + addr + "/live/tdtest"
conn, rdr := dialRTSP(t, addr)
defer conn.Close()
for cseq, req := range []string{
fmt.Sprintf("OPTIONS %s RTSP/1.0\r\nCSeq: 1\r\n\r\n", base),
fmt.Sprintf("DESCRIBE %s RTSP/1.0\r\nCSeq: 2\r\nAccept: application/sdp\r\n\r\n", base),
fmt.Sprintf("SETUP %s/track0 RTSP/1.0\r\nCSeq: 3\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1;mode=play\r\n\r\n", base),
fmt.Sprintf("PLAY %s RTSP/1.0\r\nCSeq: 4\r\nSession: ignored\r\n\r\n", base),
} {
status, _ := sendRTSP(t, conn, rdr, req)
if !strings.Contains(status, "200") {
t.Fatalf("step %d: %s", cseq, status)
}
}
// Send TEARDOWN in-band (as an RTSP request mixed with interleaved).
conn.SetWriteDeadline(time.Now().Add(3 * time.Second))
conn.Write([]byte(fmt.Sprintf("TEARDOWN %s RTSP/1.0\r\nCSeq: 5\r\n\r\n", base))) //nolint:errcheck
conn.SetWriteDeadline(time.Time{})
// Server should reply 200.
status, _ := sendRTSP(t, conn, rdr, "") // already written above, just drain response
_ = status // Server closes after TEARDOWN; a 200 or EOF is both acceptable
}
// buildFakeRTP constructs a minimal 12-byte RTP header + 4-byte payload.
func buildFakeRTP(pt uint8, seq uint16, ts uint32) []byte {
pkt := make([]byte, 16)
pkt[0] = 0x80 // V=2, P=0, X=0, CC=0
pkt[1] = pt & 0x7f // M=0, PT
pkt[2] = byte(seq >> 8) // Seq hi
pkt[3] = byte(seq) // Seq lo
pkt[4] = byte(ts >> 24) // TS
pkt[5] = byte(ts >> 16)
pkt[6] = byte(ts >> 8)
pkt[7] = byte(ts)
pkt[8], pkt[9], pkt[10], pkt[11] = 0, 0, 0, 1 // SSRC = 1
pkt[12] = 0xde // fake payload
pkt[13] = 0xad
pkt[14] = 0xbe
pkt[15] = 0xef
return pkt
}

1120
whip/whip.go Normal file

File diff suppressed because it is too large Load Diff