Add support for FLV over HTTP streaming

This commit is contained in:
Ingo Oppermann 2025-12-12 13:40:49 +01:00
parent 64dfd1c314
commit 3bca02f279
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
14 changed files with 469 additions and 85 deletions

View File

@ -1488,14 +1488,15 @@ func (a *api) start(ctx context.Context) error {
Cors: http.CorsConfig{
Origins: cfg.Storage.CORS.Origins,
},
RTMP: a.rtmpserver,
SRT: a.srtserver,
Config: a.config.store,
Sessions: a.sessions,
Router: router,
ReadOnly: cfg.API.ReadOnly,
Cluster: a.cluster,
IAM: a.iam,
RTMP: a.rtmpserver,
RTMPHTTPFLVMount: "",
SRT: a.srtserver,
Config: a.config.store,
Sessions: a.sessions,
Router: router,
ReadOnly: cfg.API.ReadOnly,
Cluster: a.cluster,
IAM: a.iam,
IAMSkipper: func(ip string) bool {
if !cfg.API.Auth.Enable {
return true
@ -1520,6 +1521,10 @@ func (a *api) start(ctx context.Context) error {
},
}
if cfg.RTMP.HTTPFLV.Enable {
serverConfig.RTMPHTTPFLVMount = cfg.RTMP.HTTPFLV.Mount
}
mainserverhandler, err := http.NewServer(serverConfig)
if err != nil {

View File

@ -2092,6 +2092,17 @@ const docTemplateClusterAPI = `{
"enable_tls": {
"type": "boolean"
},
"httpflv": {
"type": "object",
"properties": {
"enable": {
"type": "boolean"
},
"mount": {
"type": "string"
}
}
},
"token": {
"description": "Deprecated, use IAM",
"type": "string"

View File

@ -2085,6 +2085,17 @@
"enable_tls": {
"type": "boolean"
},
"httpflv": {
"type": "object",
"properties": {
"enable": {
"type": "boolean"
},
"mount": {
"type": "string"
}
}
},
"token": {
"description": "Deprecated, use IAM",
"type": "string"

View File

@ -546,6 +546,13 @@ definitions:
type: boolean
enable_tls:
type: boolean
httpflv:
properties:
enable:
type: boolean
mount:
type: string
type: object
token:
description: Deprecated, use IAM
type: string

View File

@ -271,6 +271,8 @@ func (d *Config) init() {
d.vars.Register(value.NewMustAddress(&d.RTMP.AddressTLS, ":1936"), "rtmp.address_tls", "CORE_RTMP_ADDRESS_TLS", nil, "RTMPS server listen address", false, false)
d.vars.Register(value.NewAbsolutePath(&d.RTMP.App, "/"), "rtmp.app", "CORE_RTMP_APP", nil, "RTMP app for publishing", false, false)
d.vars.Register(value.NewString(&d.RTMP.Token, ""), "rtmp.token", "CORE_RTMP_TOKEN", nil, "RTMP token for publishing and playing", false, true)
d.vars.Register(value.NewBool(&d.RTMP.HTTPFLV.Enable, false), "rtmp.httpflv_enable", "CORE_RTMP_HTTPFLV_ENABLE", nil, "Enable FLV over HTTP for RTMP streams", false, false)
d.vars.Register(value.NewAbsolutePath(&d.RTMP.HTTPFLV.Mount, "/rtmp"), "rtmp.httpflv_mount", "CORE_RTMP_HTTPFLV_MOUNT", nil, "Mountpoint for FLV over HTTP", false, true)
// SRT
d.vars.Register(value.NewBool(&d.SRT.Enable, false), "srt.enable", "CORE_SRT_ENABLE", nil, "Enable SRT server", false, false)
@ -449,6 +451,14 @@ func (d *Config) Validate(resetLogs bool) {
}
}
if d.RTMP.Enable {
if d.RTMP.HTTPFLV.Enable {
if d.RTMP.HTTPFLV.Mount == "/" {
d.vars.Log("error", "rtmp.httpflv.mount", "The FLV over HTTP mount point cannot be root")
}
}
}
// If CORE_MEMFS_USERNAME and CORE_MEMFS_PASSWORD are set, automatically active/deactivate Basic-Auth for memfs
if d.vars.IsMerged("storage.memory.auth.username") && d.vars.IsMerged("storage.memory.auth.password") {
d.Storage.Memory.Auth.Enable = true

View File

@ -115,6 +115,10 @@ type Data struct {
AddressTLS string `json:"address_tls"`
App string `json:"app"`
Token string `json:"token"` // Deprecated, use IAM
HTTPFLV struct {
Enable bool `json:"enable"`
Mount string `json:"mount"`
} `json:"httpflv"`
} `json:"rtmp"`
SRT struct {
Enable bool `json:"enable"`
@ -227,7 +231,6 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
data.DB = d.DB
data.Host = d.Host
data.API = d.API
data.RTMP = d.RTMP
data.SRT = d.SRT
data.Playout = d.Playout
data.Metrics = d.Metrics
@ -264,6 +267,15 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
data.Sessions.MaxBitrate = d.Sessions.MaxBitrate
data.Sessions.MaxSessions = d.Sessions.MaxSessions
data.RTMP.Enable = d.RTMP.Enable
data.RTMP.EnableTLS = d.RTMP.EnableTLS
data.RTMP.Address = d.RTMP.Address
data.RTMP.AddressTLS = d.RTMP.AddressTLS
data.RTMP.App = d.RTMP.App
data.RTMP.Token = d.RTMP.Token
data.RTMP.HTTPFLV.Enable = false
data.RTMP.HTTPFLV.Mount = "/rtmp"
data.SRT.Log.Topics = slices.Copy(d.SRT.Log.Topics)
data.Router.BlockedPrefixes = slices.Copy(d.Router.BlockedPrefixes)
@ -325,7 +337,6 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
data.DB = d.DB
data.Host = d.Host
data.API = d.API
data.RTMP = d.RTMP
data.SRT = d.SRT
data.Playout = d.Playout
data.Metrics = d.Metrics
@ -362,6 +373,13 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
data.Sessions.MaxBitrate = d.Sessions.MaxBitrate
data.Sessions.MaxSessions = d.Sessions.MaxSessions
data.RTMP.Enable = d.RTMP.Enable
data.RTMP.EnableTLS = d.RTMP.EnableTLS
data.RTMP.Address = d.RTMP.Address
data.RTMP.AddressTLS = d.RTMP.AddressTLS
data.RTMP.App = d.RTMP.App
data.RTMP.Token = d.RTMP.Token
data.SRT.Log.Topics = slices.Copy(d.SRT.Log.Topics)
data.Router.BlockedPrefixes = slices.Copy(d.Router.BlockedPrefixes)

View File

@ -5509,6 +5509,46 @@ const docTemplate = `{
}
}
}
},
"/rtmp/{path}": {
"get": {
"description": "Plays a RTMP stream over HTTP",
"produces": [
"video/x-flv",
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Plays a RTMP stream over HTTP",
"operationId": "rtmp-3-play",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "file"
}
},
"403": {
"description": "Forbidden",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
}
},
"definitions": {
@ -6572,6 +6612,17 @@ const docTemplate = `{
"enable_tls": {
"type": "boolean"
},
"httpflv": {
"type": "object",
"properties": {
"enable": {
"type": "boolean"
},
"mount": {
"type": "string"
}
}
},
"token": {
"description": "Deprecated, use IAM",
"type": "string"
@ -9321,6 +9372,17 @@ const docTemplate = `{
"enable_tls": {
"type": "boolean"
},
"httpflv": {
"type": "object",
"properties": {
"enable": {
"type": "boolean"
},
"mount": {
"type": "string"
}
}
},
"token": {
"description": "Deprecated, use IAM",
"type": "string"

View File

@ -5502,6 +5502,46 @@
}
}
}
},
"/rtmp/{path}": {
"get": {
"description": "Plays a RTMP stream over HTTP",
"produces": [
"video/x-flv",
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Plays a RTMP stream over HTTP",
"operationId": "rtmp-3-play",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "file"
}
},
"403": {
"description": "Forbidden",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
}
},
"definitions": {
@ -6565,6 +6605,17 @@
"enable_tls": {
"type": "boolean"
},
"httpflv": {
"type": "object",
"properties": {
"enable": {
"type": "boolean"
},
"mount": {
"type": "string"
}
}
},
"token": {
"description": "Deprecated, use IAM",
"type": "string"
@ -9314,6 +9365,17 @@
"enable_tls": {
"type": "boolean"
},
"httpflv": {
"type": "object",
"properties": {
"enable": {
"type": "boolean"
},
"mount": {
"type": "string"
}
}
},
"token": {
"description": "Deprecated, use IAM",
"type": "string"

View File

@ -723,6 +723,13 @@ definitions:
type: boolean
enable_tls:
type: boolean
httpflv:
properties:
enable:
type: boolean
mount:
type: string
type: object
token:
description: Deprecated, use IAM
type: string
@ -2643,6 +2650,13 @@ definitions:
type: boolean
enable_tls:
type: boolean
httpflv:
properties:
enable:
type: boolean
mount:
type: string
type: object
token:
description: Deprecated, use IAM
type: string
@ -6635,6 +6649,33 @@ paths:
schema:
type: string
summary: Retrieve profiling data from the application
/rtmp/{path}:
get:
description: Plays a RTMP stream over HTTP
operationId: rtmp-3-play
produces:
- video/x-flv
- application/json
responses:
"200":
description: OK
schema:
type: file
"403":
description: Forbidden
schema:
$ref: '#/definitions/api.Error'
"404":
description: Not Found
schema:
$ref: '#/definitions/api.Error'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/api.Error'
summary: Plays a RTMP stream over HTTP
tags:
- v16.?.?
securityDefinitions:
ApiKeyAuth:
in: header

View File

@ -1,9 +1,12 @@
package api
import (
"errors"
"net"
"net/http"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util"
"github.com/datarhei/core/v16/rtmp"
"github.com/labstack/echo/v4"
@ -43,3 +46,51 @@ func (rtmph *RTMPHandler) ListChannels(c echo.Context) error {
return c.JSON(http.StatusOK, list)
}
// Play plays a RTMP stream over HTTP
// @Summary Plays a RTMP stream over HTTP
// @Description Plays a RTMP stream over HTTP
// @Tags v16.?.?
// @ID rtmp-3-play
// @Produce video/x-flv
// @Produce json
// @Success 200 {file} byte
// @Success 403 {object} api.Error
// @Success 404 {object} api.Error
// @Success 500 {object} api.Error
// @Router /rtmp/{path} [get]
func (rtmph *RTMPHandler) Play(c echo.Context) error {
path := util.PathWildcardParam(c)
addr, err := net.ResolveIPAddr("ip", c.RealIP())
if err != nil {
return api.Err(http.StatusBadRequest, "", "%s", err.Error())
}
u := c.Request().URL
u.Path = path
r, err := rtmph.rtmp.PlayFLV(addr, u)
if err != nil {
var rtmperr rtmp.PlayError
if errors.As(err, &rtmperr) {
status := http.StatusInternalServerError
switch rtmperr.Message {
case "FORBIDDEN":
status = http.StatusForbidden
case "NOTFOUND":
status = http.StatusNotFound
}
return api.Err(status, "", "%s", err.Error())
} else {
return err
}
}
defer r.Close()
c.Response().Header().Set("Transfer-Encoding", "chunked")
c.Response().Header().Set("Access-Control-Allow-Origin", "*") // important for browser player
return c.Stream(200, "video/x-flv", r)
}

View File

@ -83,30 +83,31 @@ import (
var ListenAndServe = http.ListenAndServe
type Config struct {
Logger log.Logger
LogBuffer log.BufferWriter
LogEvents event.EventSource
Restream restream.Restreamer
Metrics monitor.HistoryReader
Prometheus prometheus.Reader
MimeTypesFile string
MimeTypes map[string]string
Filesystems []fs.FS
IPLimiter net.IPLimitValidator
Profiling bool
Cors CorsConfig
RTMP rtmp.Server
SRT srt.Server
Config cfgstore.Store
Cache cache.Cacher
Sessions session.RegistryReader
Router router.Router
ReadOnly bool
Cluster cluster.Cluster
IAM iam.IAM
IAMSkipper func(ip string) bool
Resources resources.Resources
Compress CompressConfig
Logger log.Logger
LogBuffer log.BufferWriter
LogEvents event.EventSource
Restream restream.Restreamer
Metrics monitor.HistoryReader
Prometheus prometheus.Reader
MimeTypesFile string
MimeTypes map[string]string
Filesystems []fs.FS
IPLimiter net.IPLimitValidator
Profiling bool
Cors CorsConfig
RTMP rtmp.Server
RTMPHTTPFLVMount string
SRT srt.Server
Config cfgstore.Store
Cache cache.Cacher
Sessions session.RegistryReader
Router router.Router
ReadOnly bool
Cluster cluster.Cluster
IAM iam.IAM
IAMSkipper func(ip string) bool
Resources resources.Resources
Compress CompressConfig
}
type CorsConfig struct {
@ -166,6 +167,7 @@ type server struct {
mimeTypesFile string
mimeTypes map[string]string
profiling bool
httpFLVMount string
readOnly bool
@ -319,6 +321,10 @@ func NewServer(config Config) (serverhandler.Server, error) {
s.v3handler.rtmp = api.NewRTMP(
config.RTMP,
)
if len(config.RTMPHTTPFLVMount) != 0 {
s.httpFLVMount = config.RTMPHTTPFLVMount
}
}
if config.SRT != nil {
@ -594,6 +600,11 @@ func (s *server) setRoutes() {
s.handler.profiling.Register(prof)
}
// RTMP over HTTP
if s.v3handler.rtmp != nil && len(s.httpFLVMount) != 0 {
s.router.GET(s.httpFLVMount+"/*", s.v3handler.rtmp.Play)
}
// GraphQL
graphql := api.Group("/graph")
//graphql.Use(gzipMiddleware)

View File

@ -9,7 +9,6 @@ import (
"github.com/datarhei/core/v16/session"
"github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/av/pubsub"
"github.com/datarhei/joy4/format/rtmp"
)
type client struct {
@ -134,8 +133,7 @@ func (ch *channel) Close() {
ch.queue.Close()
}
func (ch *channel) AddSubscriber(conn *rtmp.Conn, playPath, identity string) string {
addr := conn.NetConn().RemoteAddr().String()
func (ch *channel) AddSubscriber(conn connection, addr string, playPath, identity string) string {
ip, _, _ := net.SplitHostPort(addr)
client := newClient(conn, addr, ch.collector)

View File

@ -25,7 +25,7 @@ type conn struct {
// Make sure that conn implements the connection interface
var _ connection = &conn{}
func newConnectionFromDemuxer(m av.DemuxCloser) connection {
func newConnectionFromDemuxCloser(m av.DemuxCloser) connection {
c := &conn{
demuxer: m,
}
@ -33,6 +33,30 @@ func newConnectionFromDemuxer(m av.DemuxCloser) connection {
return c
}
func newConnectionFromMuxCloser(m av.MuxCloser) connection {
c := &conn{
muxer: m,
}
return c
}
func newConnectionFromMuxer(m av.Muxer) connection {
c := &conn{
muxer: &fakeMuxCloser{m},
}
return c
}
type fakeMuxCloser struct {
av.Muxer
}
func (f *fakeMuxCloser) Close() error {
return nil
}
func (c *conn) TxBytes() uint64 {
return c.txbytes
}

View File

@ -3,8 +3,11 @@ package rtmp
import (
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"net/url"
"path/filepath"
"strings"
"sync"
@ -22,6 +25,7 @@ import (
"github.com/datarhei/joy4/av/avutil"
"github.com/datarhei/joy4/av/pktque"
"github.com/datarhei/joy4/format"
"github.com/datarhei/joy4/format/flv"
"github.com/datarhei/joy4/format/rtmp"
)
@ -81,6 +85,8 @@ type Server interface {
// Channels return a list of currently publishing streams
Channels() []string
PlayFLV(remote net.Addr, u *url.URL) (io.ReadCloser, error)
event.MediaSource
}
@ -212,20 +218,27 @@ func (s *server) log(who, handler, action, resource, message string, client net.
}).Log(message)
}
// handlePlay is called when a RTMP client wants to play a stream
func (s *server) handlePlay(conn *rtmp.Conn) {
defer conn.Close()
type PlayError struct {
Message string
Identity string
Playpath string
Details string
Err error
}
remote := conn.NetConn().RemoteAddr()
playpath, token, isStreamkey := rtmpurl.GetToken(conn.URL)
func (e PlayError) Error() string {
return fmt.Sprintf("%s %s %s %s", e.Identity, e.Message, e.Playpath, e.Details)
}
func (s *server) play(remote net.Addr, u *url.URL) (*channel, string, string, error) {
playpath, token, isStreamkey := rtmpurl.GetToken(u)
playpath, _ = rtmpurl.RemovePathPrefix(playpath, s.app)
identity, err := s.findIdentityFromStreamKey(token)
if err != nil {
s.logger.Debug().WithError(err).Log("invalid streamkey")
s.log("", "PLAY", "FORBIDDEN", playpath, "invalid streamkey ("+token+")", remote)
return
return nil, "", playpath, &PlayError{"FORBIDDEN", "", playpath, "invalid streamkey (" + token + ")", err}
}
if identity == "$anon" && isStreamkey {
@ -237,8 +250,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
resource := playpath
if !s.iam.Enforce(identity, domain, "rtmp", resource, "PLAY") {
s.log(identity, "PLAY", "FORBIDDEN", playpath, "access denied", remote)
return
return nil, identity, playpath, &PlayError{"FORBIDDEN", "", playpath, "access denies", nil}
}
// Look for the stream
@ -250,8 +262,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
// Check in the cluster for that stream
url, err := s.proxy.MediaGetURL("rtmp", playpath)
if err != nil {
s.log(identity, "PLAY", "NOTFOUND", playpath, "", remote)
return
return nil, identity, playpath, &PlayError{"NOTFOUND", identity, playpath, "", err}
}
url = url.JoinPath(token)
@ -260,11 +271,10 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
src, err := avutil.Open(peerurl)
if err != nil {
s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed")
s.log(identity, "PLAY", "NOTFOUND", playpath, "", remote)
return
return nil, identity, playpath, &PlayError{"NOTFOUND", identity, playpath, "", err}
}
c := newConnectionFromDemuxer(src)
c := newConnectionFromDemuxCloser(src)
wg := sync.WaitGroup{}
wg.Add(1)
@ -303,41 +313,56 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
ticker.Stop()
}
if ch != nil {
// Send the metadata to the client
conn.WriteHeader(ch.streams)
s.log(identity, "PLAY", "START", playpath, "", remote)
// Get a cursor and apply filters
cursor := ch.queue.Oldest()
filters := pktque.Filters{}
if ch.hasVideo {
// The first packet has to be a key frame
filters = append(filters, &pktque.WaitKeyFrame{})
}
// Adjust the timestamp such that the stream starts from 0
filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: false})
demuxer := &pktque.FilterDemuxer{
Filter: filters,
Demuxer: cursor,
}
id := ch.AddSubscriber(conn, playpath, identity)
// Transfer the data, blocks until done
avutil.CopyFile(conn, demuxer)
ch.RemoveSubscriber(id)
s.log(identity, "PLAY", "STOP", playpath, "", remote)
} else {
if ch == nil {
s.log(identity, "PLAY", "NOTFOUND", playpath, "", remote)
}
return ch, identity, playpath, nil
}
// handlePlay is called when a RTMP client wants to play a stream
func (s *server) handlePlay(conn *rtmp.Conn) {
defer conn.Close()
remote := conn.NetConn().RemoteAddr()
ch, identity, playpath, err := s.play(remote, conn.URL)
if err != nil {
var rtmperr PlayError
if errors.As(err, &rtmperr) {
s.log(rtmperr.Identity, "PLAY", rtmperr.Message, rtmperr.Playpath, rtmperr.Details, remote)
}
return
}
s.log(identity, "PLAY", "START", playpath, "", remote)
// Get a cursor and apply filters
cursor := ch.queue.Oldest()
filters := pktque.Filters{}
if ch.hasVideo {
// The first packet has to be a key frame
filters = append(filters, &pktque.WaitKeyFrame{})
}
// Adjust the timestamp such that the stream starts from 0
filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: false})
demuxer := &pktque.FilterDemuxer{
Filter: filters,
Demuxer: cursor,
}
id := ch.AddSubscriber(conn, remote.String(), playpath, identity)
// Transfer the data, blocks until done
avutil.CopyFile(conn, demuxer)
ch.RemoveSubscriber(id)
s.log(identity, "PLAY", "STOP", playpath, "", remote)
}
// handlePublish is called when a RTMP client wants to publish a stream
@ -529,3 +554,51 @@ func (s *server) Events() (<-chan event.Event, event.CancelFunc, error) {
func (s *server) MediaList() []string {
return s.Channels()
}
func (s *server) PlayFLV(remote net.Addr, u *url.URL) (io.ReadCloser, error) {
ch, identity, playpath, err := s.play(remote, u)
if err != nil {
return nil, err
}
s.log(identity, "FLVPLAY", "START", playpath, "", remote)
// Get a cursor and apply filters
cursor := ch.queue.Oldest()
filters := pktque.Filters{}
if ch.hasVideo {
// The first packet has to be a key frame
filters = append(filters, &pktque.WaitKeyFrame{})
}
// Adjust the timestamp such that the stream starts from 0
filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: false})
demuxer := &pktque.FilterDemuxer{
Filter: filters,
Demuxer: cursor,
}
r, w := io.Pipe()
muxer := flv.NewMuxer(w)
conn := newConnectionFromMuxer(muxer)
id := ch.AddSubscriber(conn, remote.String(), playpath, identity)
go func() {
defer w.Close()
// Transfer the data, blocks until done
avutil.CopyFile(muxer, demuxer)
ch.RemoveSubscriber(id)
s.log(identity, "FLVPLAY", "STOP", playpath, "", remote)
}()
return r, err
}