From 3bca02f2793f1be8ad48af1a7575b9084664328f Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Fri, 12 Dec 2025 13:40:49 +0100 Subject: [PATCH] Add support for FLV over HTTP streaming --- app/api/api.go | 21 ++-- cluster/docs/ClusterAPI_docs.go | 11 ++ cluster/docs/ClusterAPI_swagger.json | 11 ++ cluster/docs/ClusterAPI_swagger.yaml | 7 ++ config/config.go | 10 ++ config/data.go | 22 +++- docs/docs.go | 62 ++++++++++ docs/swagger.json | 62 ++++++++++ docs/swagger.yaml | 41 +++++++ http/handler/api/rtmp.go | 51 ++++++++ http/server.go | 59 ++++++---- rtmp/channel.go | 4 +- rtmp/connection.go | 26 ++++- rtmp/rtmp.go | 167 +++++++++++++++++++-------- 14 files changed, 469 insertions(+), 85 deletions(-) diff --git a/app/api/api.go b/app/api/api.go index d5f4a713..747fbf7a 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -1488,14 +1488,15 @@ func (a *api) start(ctx context.Context) error { Cors: http.CorsConfig{ Origins: cfg.Storage.CORS.Origins, }, - RTMP: a.rtmpserver, - SRT: a.srtserver, - Config: a.config.store, - Sessions: a.sessions, - Router: router, - ReadOnly: cfg.API.ReadOnly, - Cluster: a.cluster, - IAM: a.iam, + RTMP: a.rtmpserver, + RTMPHTTPFLVMount: "", + SRT: a.srtserver, + Config: a.config.store, + Sessions: a.sessions, + Router: router, + ReadOnly: cfg.API.ReadOnly, + Cluster: a.cluster, + IAM: a.iam, IAMSkipper: func(ip string) bool { if !cfg.API.Auth.Enable { return true @@ -1520,6 +1521,10 @@ func (a *api) start(ctx context.Context) error { }, } + if cfg.RTMP.HTTPFLV.Enable { + serverConfig.RTMPHTTPFLVMount = cfg.RTMP.HTTPFLV.Mount + } + mainserverhandler, err := http.NewServer(serverConfig) if err != nil { diff --git a/cluster/docs/ClusterAPI_docs.go b/cluster/docs/ClusterAPI_docs.go index b63ac085..f620aa8a 100644 --- a/cluster/docs/ClusterAPI_docs.go +++ b/cluster/docs/ClusterAPI_docs.go @@ -2092,6 +2092,17 @@ const docTemplateClusterAPI = `{ "enable_tls": { "type": "boolean" }, + "httpflv": { + "type": "object", + "properties": { + "enable": { + "type": "boolean" + }, + "mount": { + "type": "string" + } + } + }, "token": { "description": "Deprecated, use IAM", "type": "string" diff --git a/cluster/docs/ClusterAPI_swagger.json b/cluster/docs/ClusterAPI_swagger.json index 6fa6a20b..e0a1ec64 100644 --- a/cluster/docs/ClusterAPI_swagger.json +++ b/cluster/docs/ClusterAPI_swagger.json @@ -2085,6 +2085,17 @@ "enable_tls": { "type": "boolean" }, + "httpflv": { + "type": "object", + "properties": { + "enable": { + "type": "boolean" + }, + "mount": { + "type": "string" + } + } + }, "token": { "description": "Deprecated, use IAM", "type": "string" diff --git a/cluster/docs/ClusterAPI_swagger.yaml b/cluster/docs/ClusterAPI_swagger.yaml index 45e391bf..06802a6f 100644 --- a/cluster/docs/ClusterAPI_swagger.yaml +++ b/cluster/docs/ClusterAPI_swagger.yaml @@ -546,6 +546,13 @@ definitions: type: boolean enable_tls: type: boolean + httpflv: + properties: + enable: + type: boolean + mount: + type: string + type: object token: description: Deprecated, use IAM type: string diff --git a/config/config.go b/config/config.go index d4947865..98d2fe49 100644 --- a/config/config.go +++ b/config/config.go @@ -271,6 +271,8 @@ func (d *Config) init() { d.vars.Register(value.NewMustAddress(&d.RTMP.AddressTLS, ":1936"), "rtmp.address_tls", "CORE_RTMP_ADDRESS_TLS", nil, "RTMPS server listen address", false, false) d.vars.Register(value.NewAbsolutePath(&d.RTMP.App, "/"), "rtmp.app", "CORE_RTMP_APP", nil, "RTMP app for publishing", false, false) d.vars.Register(value.NewString(&d.RTMP.Token, ""), "rtmp.token", "CORE_RTMP_TOKEN", nil, "RTMP token for publishing and playing", false, true) + d.vars.Register(value.NewBool(&d.RTMP.HTTPFLV.Enable, false), "rtmp.httpflv_enable", "CORE_RTMP_HTTPFLV_ENABLE", nil, "Enable FLV over HTTP for RTMP streams", false, false) + d.vars.Register(value.NewAbsolutePath(&d.RTMP.HTTPFLV.Mount, "/rtmp"), "rtmp.httpflv_mount", "CORE_RTMP_HTTPFLV_MOUNT", nil, "Mountpoint for FLV over HTTP", false, true) // SRT d.vars.Register(value.NewBool(&d.SRT.Enable, false), "srt.enable", "CORE_SRT_ENABLE", nil, "Enable SRT server", false, false) @@ -449,6 +451,14 @@ func (d *Config) Validate(resetLogs bool) { } } + if d.RTMP.Enable { + if d.RTMP.HTTPFLV.Enable { + if d.RTMP.HTTPFLV.Mount == "/" { + d.vars.Log("error", "rtmp.httpflv.mount", "The FLV over HTTP mount point cannot be root") + } + } + } + // If CORE_MEMFS_USERNAME and CORE_MEMFS_PASSWORD are set, automatically active/deactivate Basic-Auth for memfs if d.vars.IsMerged("storage.memory.auth.username") && d.vars.IsMerged("storage.memory.auth.password") { d.Storage.Memory.Auth.Enable = true diff --git a/config/data.go b/config/data.go index 39d0231b..af68a071 100644 --- a/config/data.go +++ b/config/data.go @@ -115,6 +115,10 @@ type Data struct { AddressTLS string `json:"address_tls"` App string `json:"app"` Token string `json:"token"` // Deprecated, use IAM + HTTPFLV struct { + Enable bool `json:"enable"` + Mount string `json:"mount"` + } `json:"httpflv"` } `json:"rtmp"` SRT struct { Enable bool `json:"enable"` @@ -227,7 +231,6 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) { data.DB = d.DB data.Host = d.Host data.API = d.API - data.RTMP = d.RTMP data.SRT = d.SRT data.Playout = d.Playout data.Metrics = d.Metrics @@ -264,6 +267,15 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) { data.Sessions.MaxBitrate = d.Sessions.MaxBitrate data.Sessions.MaxSessions = d.Sessions.MaxSessions + data.RTMP.Enable = d.RTMP.Enable + data.RTMP.EnableTLS = d.RTMP.EnableTLS + data.RTMP.Address = d.RTMP.Address + data.RTMP.AddressTLS = d.RTMP.AddressTLS + data.RTMP.App = d.RTMP.App + data.RTMP.Token = d.RTMP.Token + data.RTMP.HTTPFLV.Enable = false + data.RTMP.HTTPFLV.Mount = "/rtmp" + data.SRT.Log.Topics = slices.Copy(d.SRT.Log.Topics) data.Router.BlockedPrefixes = slices.Copy(d.Router.BlockedPrefixes) @@ -325,7 +337,6 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) { data.DB = d.DB data.Host = d.Host data.API = d.API - data.RTMP = d.RTMP data.SRT = d.SRT data.Playout = d.Playout data.Metrics = d.Metrics @@ -362,6 +373,13 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) { data.Sessions.MaxBitrate = d.Sessions.MaxBitrate data.Sessions.MaxSessions = d.Sessions.MaxSessions + data.RTMP.Enable = d.RTMP.Enable + data.RTMP.EnableTLS = d.RTMP.EnableTLS + data.RTMP.Address = d.RTMP.Address + data.RTMP.AddressTLS = d.RTMP.AddressTLS + data.RTMP.App = d.RTMP.App + data.RTMP.Token = d.RTMP.Token + data.SRT.Log.Topics = slices.Copy(d.SRT.Log.Topics) data.Router.BlockedPrefixes = slices.Copy(d.Router.BlockedPrefixes) diff --git a/docs/docs.go b/docs/docs.go index 2ca9629f..dcf1bff7 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -5509,6 +5509,46 @@ const docTemplate = `{ } } } + }, + "/rtmp/{path}": { + "get": { + "description": "Plays a RTMP stream over HTTP", + "produces": [ + "video/x-flv", + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Plays a RTMP stream over HTTP", + "operationId": "rtmp-3-play", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "file" + } + }, + "403": { + "description": "Forbidden", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } } }, "definitions": { @@ -6572,6 +6612,17 @@ const docTemplate = `{ "enable_tls": { "type": "boolean" }, + "httpflv": { + "type": "object", + "properties": { + "enable": { + "type": "boolean" + }, + "mount": { + "type": "string" + } + } + }, "token": { "description": "Deprecated, use IAM", "type": "string" @@ -9321,6 +9372,17 @@ const docTemplate = `{ "enable_tls": { "type": "boolean" }, + "httpflv": { + "type": "object", + "properties": { + "enable": { + "type": "boolean" + }, + "mount": { + "type": "string" + } + } + }, "token": { "description": "Deprecated, use IAM", "type": "string" diff --git a/docs/swagger.json b/docs/swagger.json index bedfbeaf..712a41ae 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -5502,6 +5502,46 @@ } } } + }, + "/rtmp/{path}": { + "get": { + "description": "Plays a RTMP stream over HTTP", + "produces": [ + "video/x-flv", + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Plays a RTMP stream over HTTP", + "operationId": "rtmp-3-play", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "file" + } + }, + "403": { + "description": "Forbidden", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } } }, "definitions": { @@ -6565,6 +6605,17 @@ "enable_tls": { "type": "boolean" }, + "httpflv": { + "type": "object", + "properties": { + "enable": { + "type": "boolean" + }, + "mount": { + "type": "string" + } + } + }, "token": { "description": "Deprecated, use IAM", "type": "string" @@ -9314,6 +9365,17 @@ "enable_tls": { "type": "boolean" }, + "httpflv": { + "type": "object", + "properties": { + "enable": { + "type": "boolean" + }, + "mount": { + "type": "string" + } + } + }, "token": { "description": "Deprecated, use IAM", "type": "string" diff --git a/docs/swagger.yaml b/docs/swagger.yaml index b6562d1e..32d228e6 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -723,6 +723,13 @@ definitions: type: boolean enable_tls: type: boolean + httpflv: + properties: + enable: + type: boolean + mount: + type: string + type: object token: description: Deprecated, use IAM type: string @@ -2643,6 +2650,13 @@ definitions: type: boolean enable_tls: type: boolean + httpflv: + properties: + enable: + type: boolean + mount: + type: string + type: object token: description: Deprecated, use IAM type: string @@ -6635,6 +6649,33 @@ paths: schema: type: string summary: Retrieve profiling data from the application + /rtmp/{path}: + get: + description: Plays a RTMP stream over HTTP + operationId: rtmp-3-play + produces: + - video/x-flv + - application/json + responses: + "200": + description: OK + schema: + type: file + "403": + description: Forbidden + schema: + $ref: '#/definitions/api.Error' + "404": + description: Not Found + schema: + $ref: '#/definitions/api.Error' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/api.Error' + summary: Plays a RTMP stream over HTTP + tags: + - v16.?.? securityDefinitions: ApiKeyAuth: in: header diff --git a/http/handler/api/rtmp.go b/http/handler/api/rtmp.go index 3f2dbbb5..ccc49b8c 100644 --- a/http/handler/api/rtmp.go +++ b/http/handler/api/rtmp.go @@ -1,9 +1,12 @@ package api import ( + "errors" + "net" "net/http" "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/http/handler/util" "github.com/datarhei/core/v16/rtmp" "github.com/labstack/echo/v4" @@ -43,3 +46,51 @@ func (rtmph *RTMPHandler) ListChannels(c echo.Context) error { return c.JSON(http.StatusOK, list) } + +// Play plays a RTMP stream over HTTP +// @Summary Plays a RTMP stream over HTTP +// @Description Plays a RTMP stream over HTTP +// @Tags v16.?.? +// @ID rtmp-3-play +// @Produce video/x-flv +// @Produce json +// @Success 200 {file} byte +// @Success 403 {object} api.Error +// @Success 404 {object} api.Error +// @Success 500 {object} api.Error +// @Router /rtmp/{path} [get] +func (rtmph *RTMPHandler) Play(c echo.Context) error { + path := util.PathWildcardParam(c) + addr, err := net.ResolveIPAddr("ip", c.RealIP()) + if err != nil { + return api.Err(http.StatusBadRequest, "", "%s", err.Error()) + } + + u := c.Request().URL + u.Path = path + + r, err := rtmph.rtmp.PlayFLV(addr, u) + if err != nil { + var rtmperr rtmp.PlayError + if errors.As(err, &rtmperr) { + status := http.StatusInternalServerError + switch rtmperr.Message { + case "FORBIDDEN": + status = http.StatusForbidden + case "NOTFOUND": + status = http.StatusNotFound + } + + return api.Err(status, "", "%s", err.Error()) + } else { + return err + } + } + + defer r.Close() + + c.Response().Header().Set("Transfer-Encoding", "chunked") + c.Response().Header().Set("Access-Control-Allow-Origin", "*") // important for browser player + + return c.Stream(200, "video/x-flv", r) +} diff --git a/http/server.go b/http/server.go index 6831a8cd..2fff5d4d 100644 --- a/http/server.go +++ b/http/server.go @@ -83,30 +83,31 @@ import ( var ListenAndServe = http.ListenAndServe type Config struct { - Logger log.Logger - LogBuffer log.BufferWriter - LogEvents event.EventSource - Restream restream.Restreamer - Metrics monitor.HistoryReader - Prometheus prometheus.Reader - MimeTypesFile string - MimeTypes map[string]string - Filesystems []fs.FS - IPLimiter net.IPLimitValidator - Profiling bool - Cors CorsConfig - RTMP rtmp.Server - SRT srt.Server - Config cfgstore.Store - Cache cache.Cacher - Sessions session.RegistryReader - Router router.Router - ReadOnly bool - Cluster cluster.Cluster - IAM iam.IAM - IAMSkipper func(ip string) bool - Resources resources.Resources - Compress CompressConfig + Logger log.Logger + LogBuffer log.BufferWriter + LogEvents event.EventSource + Restream restream.Restreamer + Metrics monitor.HistoryReader + Prometheus prometheus.Reader + MimeTypesFile string + MimeTypes map[string]string + Filesystems []fs.FS + IPLimiter net.IPLimitValidator + Profiling bool + Cors CorsConfig + RTMP rtmp.Server + RTMPHTTPFLVMount string + SRT srt.Server + Config cfgstore.Store + Cache cache.Cacher + Sessions session.RegistryReader + Router router.Router + ReadOnly bool + Cluster cluster.Cluster + IAM iam.IAM + IAMSkipper func(ip string) bool + Resources resources.Resources + Compress CompressConfig } type CorsConfig struct { @@ -166,6 +167,7 @@ type server struct { mimeTypesFile string mimeTypes map[string]string profiling bool + httpFLVMount string readOnly bool @@ -319,6 +321,10 @@ func NewServer(config Config) (serverhandler.Server, error) { s.v3handler.rtmp = api.NewRTMP( config.RTMP, ) + + if len(config.RTMPHTTPFLVMount) != 0 { + s.httpFLVMount = config.RTMPHTTPFLVMount + } } if config.SRT != nil { @@ -594,6 +600,11 @@ func (s *server) setRoutes() { s.handler.profiling.Register(prof) } + // RTMP over HTTP + if s.v3handler.rtmp != nil && len(s.httpFLVMount) != 0 { + s.router.GET(s.httpFLVMount+"/*", s.v3handler.rtmp.Play) + } + // GraphQL graphql := api.Group("/graph") //graphql.Use(gzipMiddleware) diff --git a/rtmp/channel.go b/rtmp/channel.go index 7a1123b3..2ec8ce69 100644 --- a/rtmp/channel.go +++ b/rtmp/channel.go @@ -9,7 +9,6 @@ import ( "github.com/datarhei/core/v16/session" "github.com/datarhei/joy4/av" "github.com/datarhei/joy4/av/pubsub" - "github.com/datarhei/joy4/format/rtmp" ) type client struct { @@ -134,8 +133,7 @@ func (ch *channel) Close() { ch.queue.Close() } -func (ch *channel) AddSubscriber(conn *rtmp.Conn, playPath, identity string) string { - addr := conn.NetConn().RemoteAddr().String() +func (ch *channel) AddSubscriber(conn connection, addr string, playPath, identity string) string { ip, _, _ := net.SplitHostPort(addr) client := newClient(conn, addr, ch.collector) diff --git a/rtmp/connection.go b/rtmp/connection.go index 43cee496..a785bc56 100644 --- a/rtmp/connection.go +++ b/rtmp/connection.go @@ -25,7 +25,7 @@ type conn struct { // Make sure that conn implements the connection interface var _ connection = &conn{} -func newConnectionFromDemuxer(m av.DemuxCloser) connection { +func newConnectionFromDemuxCloser(m av.DemuxCloser) connection { c := &conn{ demuxer: m, } @@ -33,6 +33,30 @@ func newConnectionFromDemuxer(m av.DemuxCloser) connection { return c } +func newConnectionFromMuxCloser(m av.MuxCloser) connection { + c := &conn{ + muxer: m, + } + + return c +} + +func newConnectionFromMuxer(m av.Muxer) connection { + c := &conn{ + muxer: &fakeMuxCloser{m}, + } + + return c +} + +type fakeMuxCloser struct { + av.Muxer +} + +func (f *fakeMuxCloser) Close() error { + return nil +} + func (c *conn) TxBytes() uint64 { return c.txbytes } diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 1df7476f..e86de53b 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -3,8 +3,11 @@ package rtmp import ( "crypto/tls" + "errors" "fmt" + "io" "net" + "net/url" "path/filepath" "strings" "sync" @@ -22,6 +25,7 @@ import ( "github.com/datarhei/joy4/av/avutil" "github.com/datarhei/joy4/av/pktque" "github.com/datarhei/joy4/format" + "github.com/datarhei/joy4/format/flv" "github.com/datarhei/joy4/format/rtmp" ) @@ -81,6 +85,8 @@ type Server interface { // Channels return a list of currently publishing streams Channels() []string + PlayFLV(remote net.Addr, u *url.URL) (io.ReadCloser, error) + event.MediaSource } @@ -212,20 +218,27 @@ func (s *server) log(who, handler, action, resource, message string, client net. }).Log(message) } -// handlePlay is called when a RTMP client wants to play a stream -func (s *server) handlePlay(conn *rtmp.Conn) { - defer conn.Close() +type PlayError struct { + Message string + Identity string + Playpath string + Details string + Err error +} - remote := conn.NetConn().RemoteAddr() - playpath, token, isStreamkey := rtmpurl.GetToken(conn.URL) +func (e PlayError) Error() string { + return fmt.Sprintf("%s %s %s %s", e.Identity, e.Message, e.Playpath, e.Details) +} + +func (s *server) play(remote net.Addr, u *url.URL) (*channel, string, string, error) { + playpath, token, isStreamkey := rtmpurl.GetToken(u) playpath, _ = rtmpurl.RemovePathPrefix(playpath, s.app) identity, err := s.findIdentityFromStreamKey(token) if err != nil { s.logger.Debug().WithError(err).Log("invalid streamkey") - s.log("", "PLAY", "FORBIDDEN", playpath, "invalid streamkey ("+token+")", remote) - return + return nil, "", playpath, &PlayError{"FORBIDDEN", "", playpath, "invalid streamkey (" + token + ")", err} } if identity == "$anon" && isStreamkey { @@ -237,8 +250,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) { resource := playpath if !s.iam.Enforce(identity, domain, "rtmp", resource, "PLAY") { - s.log(identity, "PLAY", "FORBIDDEN", playpath, "access denied", remote) - return + return nil, identity, playpath, &PlayError{"FORBIDDEN", "", playpath, "access denies", nil} } // Look for the stream @@ -250,8 +262,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) { // Check in the cluster for that stream url, err := s.proxy.MediaGetURL("rtmp", playpath) if err != nil { - s.log(identity, "PLAY", "NOTFOUND", playpath, "", remote) - return + return nil, identity, playpath, &PlayError{"NOTFOUND", identity, playpath, "", err} } url = url.JoinPath(token) @@ -260,11 +271,10 @@ func (s *server) handlePlay(conn *rtmp.Conn) { src, err := avutil.Open(peerurl) if err != nil { s.logger.Error().WithField("address", url).WithError(err).Log("Proxying address failed") - s.log(identity, "PLAY", "NOTFOUND", playpath, "", remote) - return + return nil, identity, playpath, &PlayError{"NOTFOUND", identity, playpath, "", err} } - c := newConnectionFromDemuxer(src) + c := newConnectionFromDemuxCloser(src) wg := sync.WaitGroup{} wg.Add(1) @@ -303,41 +313,56 @@ func (s *server) handlePlay(conn *rtmp.Conn) { ticker.Stop() } - if ch != nil { - // Send the metadata to the client - conn.WriteHeader(ch.streams) - - s.log(identity, "PLAY", "START", playpath, "", remote) - - // Get a cursor and apply filters - cursor := ch.queue.Oldest() - - filters := pktque.Filters{} - - if ch.hasVideo { - // The first packet has to be a key frame - filters = append(filters, &pktque.WaitKeyFrame{}) - } - - // Adjust the timestamp such that the stream starts from 0 - filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: false}) - - demuxer := &pktque.FilterDemuxer{ - Filter: filters, - Demuxer: cursor, - } - - id := ch.AddSubscriber(conn, playpath, identity) - - // Transfer the data, blocks until done - avutil.CopyFile(conn, demuxer) - - ch.RemoveSubscriber(id) - - s.log(identity, "PLAY", "STOP", playpath, "", remote) - } else { + if ch == nil { s.log(identity, "PLAY", "NOTFOUND", playpath, "", remote) } + + return ch, identity, playpath, nil +} + +// handlePlay is called when a RTMP client wants to play a stream +func (s *server) handlePlay(conn *rtmp.Conn) { + defer conn.Close() + + remote := conn.NetConn().RemoteAddr() + + ch, identity, playpath, err := s.play(remote, conn.URL) + if err != nil { + var rtmperr PlayError + if errors.As(err, &rtmperr) { + s.log(rtmperr.Identity, "PLAY", rtmperr.Message, rtmperr.Playpath, rtmperr.Details, remote) + } + return + } + + s.log(identity, "PLAY", "START", playpath, "", remote) + + // Get a cursor and apply filters + cursor := ch.queue.Oldest() + + filters := pktque.Filters{} + + if ch.hasVideo { + // The first packet has to be a key frame + filters = append(filters, &pktque.WaitKeyFrame{}) + } + + // Adjust the timestamp such that the stream starts from 0 + filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: false}) + + demuxer := &pktque.FilterDemuxer{ + Filter: filters, + Demuxer: cursor, + } + + id := ch.AddSubscriber(conn, remote.String(), playpath, identity) + + // Transfer the data, blocks until done + avutil.CopyFile(conn, demuxer) + + ch.RemoveSubscriber(id) + + s.log(identity, "PLAY", "STOP", playpath, "", remote) } // handlePublish is called when a RTMP client wants to publish a stream @@ -529,3 +554,51 @@ func (s *server) Events() (<-chan event.Event, event.CancelFunc, error) { func (s *server) MediaList() []string { return s.Channels() } + +func (s *server) PlayFLV(remote net.Addr, u *url.URL) (io.ReadCloser, error) { + ch, identity, playpath, err := s.play(remote, u) + if err != nil { + return nil, err + } + + s.log(identity, "FLVPLAY", "START", playpath, "", remote) + + // Get a cursor and apply filters + cursor := ch.queue.Oldest() + + filters := pktque.Filters{} + + if ch.hasVideo { + // The first packet has to be a key frame + filters = append(filters, &pktque.WaitKeyFrame{}) + } + + // Adjust the timestamp such that the stream starts from 0 + filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: false}) + + demuxer := &pktque.FilterDemuxer{ + Filter: filters, + Demuxer: cursor, + } + + r, w := io.Pipe() + + muxer := flv.NewMuxer(w) + + conn := newConnectionFromMuxer(muxer) + + id := ch.AddSubscriber(conn, remote.String(), playpath, identity) + + go func() { + defer w.Close() + + // Transfer the data, blocks until done + avutil.CopyFile(muxer, demuxer) + + ch.RemoveSubscriber(id) + + s.log(identity, "FLVPLAY", "STOP", playpath, "", remote) + }() + + return r, err +}