Integrate media events into cluster

This commit is contained in:
Ingo Oppermann 2025-10-06 17:36:23 +02:00
parent 86437171f3
commit e077cd48a6
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
27 changed files with 746 additions and 473 deletions

View File

@ -19,11 +19,68 @@ import (
"github.com/datarhei/core/v16/restream/app"
)
type Media struct {
available bool // Whether filesystem events are available
media map[string]int64 // List of files and timestamp of when they have been last seen
lock sync.RWMutex // Lock for the map
}
func (m *Media) update(name string, timestamp int64) {
m.lock.Lock()
defer m.lock.Unlock()
m.media[name] = timestamp
}
func (m *Media) remove(name string) {
m.lock.Lock()
defer m.lock.Unlock()
delete(m.media, name)
}
func (m *Media) set(name []string, timestamp int64) {
media := map[string]int64{}
for _, n := range name {
media[n] = timestamp
}
m.lock.Lock()
defer m.lock.Unlock()
m.media = media
}
func (m *Media) get(name string) (int64, bool) {
m.lock.RLock()
defer m.lock.RUnlock()
ts, ok := m.media[name]
return ts, ok
}
func (m *Media) list() []string {
m.lock.RLock()
defer m.lock.RUnlock()
names := make([]string, 0, len(m.media))
for name := range m.media {
names = append(names, name)
}
return names
}
type Core struct {
id string
client client.RestClient
clientErr error
client client.RestClient
clientErr error
clientCtx context.Context
clientCancel context.CancelFunc
lock sync.RWMutex
@ -39,6 +96,9 @@ type Core struct {
hasSRT bool
srtAddress *url.URL
media map[string]*Media // map[storage]map[path]lastchange
mediaLock sync.RWMutex
logger log.Logger
}
@ -48,6 +108,7 @@ func NewCore(id string, logger log.Logger) *Core {
core := &Core{
id: id,
logger: logger,
media: map[string]*Media{},
}
if core.logger == nil {
@ -68,18 +129,18 @@ func (n *Core) SetEssentials(address string, config *config.Config) {
if n.address != address {
n.address = address
n.client = nil // force reconnet
n.disconnect() // force reconnet
}
if config != nil {
if n.config == nil {
n.config = config
n.client = nil // force reconnect
n.disconnect() // force reconnect
}
if n.config != nil && n.config.UpdatedAt != config.UpdatedAt {
n.config = config
n.client = nil // force reconnect
n.disconnect() // force reconnect
}
}
}
@ -112,12 +173,23 @@ func (n *Core) Stop() {
n.cancel()
n.cancel = nil
n.disconnect()
}
func (n *Core) Reconnect() {
n.lock.Lock()
defer n.lock.Unlock()
n.disconnect()
}
func (n *Core) disconnect() {
if n.clientCancel != nil {
n.clientCancel()
n.clientCancel = nil
}
n.client = nil
}
@ -240,11 +312,71 @@ func (n *Core) connect() error {
n.srtAddress = srtAddress
n.client = client
ctx, cancel := context.WithCancel(context.Background())
n.clientCtx = ctx
n.clientCancel = cancel
go n.mediaEvents(ctx, "mem")
go n.mediaEvents(ctx, "disk")
go n.mediaEvents(ctx, "rtmp")
go n.mediaEvents(ctx, "srt")
n.lock.Unlock()
return nil
}
func (n *Core) mediaEvents(ctx context.Context, storage string) {
m := &Media{}
for {
ch, err := n.client.MediaEvents(ctx, storage, "/**")
if err != nil {
m.available = false
m.media = nil
n.mediaLock.Lock()
n.media[storage] = m
n.mediaLock.Unlock()
n.logger.Error().WithField("storage", storage).WithError(err).Log("Failed to connect to event source")
return
}
n.logger.Info().WithField("storage", storage).Log("Connected to event source")
m.available = true
m.media = map[string]int64{}
n.mediaLock.Lock()
n.media[storage] = m
n.mediaLock.Unlock()
innerloop:
for {
select {
case <-ctx.Done():
return
case e, ok := <-ch:
if !ok {
break innerloop
}
switch e.Action {
case "update", "create":
m.update(e.Name, e.Timestamp)
case "remove":
m.remove(e.Name)
case "list":
m.set(e.Names, e.Timestamp)
}
}
}
n.logger.Info().WithField("storage", storage).Log("Reconnecting to event source")
time.Sleep(5 * time.Second)
}
}
type CoreAbout struct {
ID string
Name string
@ -538,135 +670,29 @@ func (n *Core) MediaList() NodeFiles {
LastUpdate: time.Now(),
}
errorsChan := make(chan error, 8)
filesChan := make(chan string, 1024)
errorList := []error{}
prefixes := []string{}
wgList := sync.WaitGroup{}
wgList.Add(1)
go func() {
defer wgList.Done()
for file := range filesChan {
files.Files = append(files.Files, file)
}
for err := range errorsChan {
errorList = append(errorList, err)
}
}()
wg := sync.WaitGroup{}
wg.Add(2)
go func(f chan<- string, e chan<- error) {
defer wg.Done()
n.lock.RLock()
client := n.client
n.lock.RUnlock()
if client == nil {
e <- ErrNoPeer
return
}
files, err := client.FilesystemList("mem", "/*", "name", "asc")
if err != nil {
e <- err
return
}
for _, file := range files {
f <- "mem:" + file.Name
}
}(filesChan, errorsChan)
go func(f chan<- string, e chan<- error) {
defer wg.Done()
n.lock.RLock()
client := n.client
n.lock.RUnlock()
if client == nil {
e <- ErrNoPeer
return
}
files, err := client.FilesystemList("disk", "/*", "name", "asc")
if err != nil {
e <- err
return
}
for _, file := range files {
f <- "disk:" + file.Name
}
}(filesChan, errorsChan)
if n.hasRTMP {
wg.Add(1)
go func(f chan<- string, e chan<- error) {
defer wg.Done()
n.lock.RLock()
client := n.client
n.lock.RUnlock()
if client == nil {
e <- ErrNoPeer
return
}
files, err := client.RTMPChannels()
if err != nil {
e <- err
return
}
for _, file := range files {
f <- "rtmp:" + file.Name
}
}(filesChan, errorsChan)
n.mediaLock.RLock()
for prefix := range n.media {
prefixes = append(prefixes, prefix)
}
n.mediaLock.RUnlock()
if n.hasSRT {
wg.Add(1)
for _, prefix := range prefixes {
n.mediaLock.RLock()
m, ok := n.media[prefix]
n.mediaLock.RUnlock()
go func(f chan<- string, e chan<- error) {
defer wg.Done()
if !ok {
continue
}
n.lock.RLock()
client := n.client
n.lock.RUnlock()
if client == nil {
e <- ErrNoPeer
return
}
files, err := client.SRTChannels()
if err != nil {
e <- err
return
}
for _, file := range files {
f <- "srt:" + file.Name
}
}(filesChan, errorsChan)
list := m.list()
for _, name := range list {
files.Files = append(files.Files, prefix+":"+name)
}
}
wg.Wait()
close(filesChan)
close(errorsChan)
wgList.Wait()
return files
}
@ -702,18 +728,19 @@ func cloneURL(src *url.URL) *url.URL {
func (n *Core) MediaGetURL(prefix, path string) (*url.URL, error) {
var u *url.URL
if prefix == "mem" {
switch prefix {
case "mem":
u = cloneURL(n.httpAddress)
u = u.JoinPath("memfs", path)
} else if prefix == "disk" {
case "disk":
u = cloneURL(n.httpAddress)
u = u.JoinPath(path)
} else if prefix == "rtmp" {
case "rtmp":
u = cloneURL(n.rtmpAddress)
u = u.JoinPath(path)
} else if prefix == "srt" {
case "srt":
u = cloneURL(n.srtAddress)
} else {
default:
return nil, fmt.Errorf("unknown prefix")
}
@ -721,6 +748,19 @@ func (n *Core) MediaGetURL(prefix, path string) (*url.URL, error) {
}
func (n *Core) MediaGetInfo(prefix, path string) (int64, time.Time, error) {
n.mediaLock.RLock()
m, ok := n.media[prefix]
n.mediaLock.RUnlock()
if ok && m.available {
lastmod, ok := m.get(path)
if !ok {
return 0, time.Time{}, fmt.Errorf("media not found")
}
return 0, time.UnixMilli(lastmod), nil
}
if prefix == "disk" || prefix == "mem" {
return n.FilesystemGetFileInfo(prefix, path)
}
@ -873,7 +913,7 @@ func (n *Core) ClusterProcessList() ([]Process, error) {
return processes, nil
}
func (n *Core) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) {
func (n *Core) Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) {
n.lock.RLock()
client := n.client
n.lock.RUnlock()

View File

@ -655,12 +655,12 @@ func (p *Manager) ProcessValidateConfig(nodeid string, config *app.Config) error
return node.Core().ProcessValidateConfig(config)
}
func (p *Manager) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) {
eventChan := make(chan api.Event, 128)
func (p *Manager) Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) {
eventChan := make(chan api.LogEvent, 128)
p.lock.RLock()
for _, n := range p.nodes {
go func(node *Node, e chan<- api.Event) {
go func(node *Node, e chan<- api.LogEvent) {
eventChan, err := node.Core().Events(ctx, filters)
if err != nil {
return

View File

@ -4986,7 +4986,8 @@ const docTemplate = `{
],
"description": "List all currently publishing RTMP streams.",
"produces": [
"application/json"
"application/json",
"application/x-json-stream"
],
"tags": [
"v16.7.2"
@ -5206,7 +5207,8 @@ const docTemplate = `{
],
"description": "List all currently publishing SRT streams. This endpoint is EXPERIMENTAL and may change in future.",
"produces": [
"application/json"
"application/json",
"application/x-json-stream"
],
"tags": [
"v16.9.0"

View File

@ -4979,7 +4979,8 @@
],
"description": "List all currently publishing RTMP streams.",
"produces": [
"application/json"
"application/json",
"application/x-json-stream"
],
"tags": [
"v16.7.2"
@ -5199,7 +5200,8 @@
],
"description": "List all currently publishing SRT streams. This endpoint is EXPERIMENTAL and may change in future.",
"produces": [
"application/json"
"application/json",
"application/x-json-stream"
],
"tags": [
"v16.9.0"

View File

@ -6177,6 +6177,7 @@ paths:
operationId: rtmp-3-list-channels
produces:
- application/json
- application/x-json-stream
responses:
"200":
description: OK
@ -6317,6 +6318,7 @@ paths:
operationId: srt-3-list-channels
produces:
- application/json
- application/x-json-stream
responses:
"200":
description: OK

124
event/event.go Normal file
View File

@ -0,0 +1,124 @@
package event
import (
"context"
"fmt"
"sync"
"github.com/lithammer/shortuuid/v4"
)
type Event interface {
Clone() Event
}
type CancelFunc func()
type EventSource interface {
Events() (<-chan Event, CancelFunc, error)
}
type PubSub struct {
publisher chan Event
publisherClosed bool
publisherLock sync.Mutex
ctx context.Context
cancel context.CancelFunc
subscriber map[string]chan Event
subscriberLock sync.Mutex
}
func NewPubSub() *PubSub {
w := &PubSub{
publisher: make(chan Event, 1024),
publisherClosed: false,
subscriber: make(map[string]chan Event),
}
w.ctx, w.cancel = context.WithCancel(context.Background())
go w.broadcast()
return w
}
func (w *PubSub) Publish(e Event) error {
event := e.Clone()
w.publisherLock.Lock()
defer w.publisherLock.Unlock()
if w.publisherClosed {
return fmt.Errorf("writer is closed")
}
select {
case w.publisher <- event:
default:
return fmt.Errorf("publisher queue full")
}
return nil
}
func (w *PubSub) Close() {
w.cancel()
w.publisherLock.Lock()
close(w.publisher)
w.publisherClosed = true
w.publisherLock.Unlock()
w.subscriberLock.Lock()
for _, c := range w.subscriber {
close(c)
}
w.subscriber = make(map[string]chan Event)
w.subscriberLock.Unlock()
}
func (w *PubSub) Subscribe() (<-chan Event, CancelFunc) {
l := make(chan Event, 1024)
var id string = ""
w.subscriberLock.Lock()
for {
id = shortuuid.New()
if _, ok := w.subscriber[id]; !ok {
w.subscriber[id] = l
break
}
}
w.subscriberLock.Unlock()
unsubscribe := func() {
w.subscriberLock.Lock()
delete(w.subscriber, id)
w.subscriberLock.Unlock()
}
return l, unsubscribe
}
func (w *PubSub) broadcast() {
for {
select {
case <-w.ctx.Done():
return
case e := <-w.publisher:
w.subscriberLock.Lock()
for _, c := range w.subscriber {
pp := e.Clone()
select {
case c <- pp:
default:
}
}
w.subscriberLock.Unlock()
}
}
}

33
event/media.go Normal file
View File

@ -0,0 +1,33 @@
package event
import (
"time"
)
type MediaSource interface {
EventSource
MediaList() []string
}
type MediaEvent struct {
Action string
Name string
Timestamp time.Time
}
func NewMediaEvent(action string, name string) *MediaEvent {
return &MediaEvent{
Action: action,
Name: name,
Timestamp: time.Now(),
}
}
func (e *MediaEvent) Clone() Event {
return &MediaEvent{
Action: e.Action,
Name: e.Name,
Timestamp: e.Timestamp,
}
}

View File

@ -9,7 +9,7 @@ import (
"github.com/datarhei/core/v16/log"
)
type Event struct {
type LogEvent struct {
Timestamp int64 `json:"ts" format:"int64"`
Level int `json:"level"`
Component string `json:"event"`
@ -20,7 +20,7 @@ type Event struct {
Data map[string]string `json:"data"`
}
func (e *Event) Unmarshal(le *log.Event) {
func (e *LogEvent) Unmarshal(le *log.Event) {
e.Timestamp = le.Time.Unix()
e.Level = int(le.Level)
e.Component = strings.ToLower(le.Component)
@ -53,7 +53,7 @@ func (e *Event) Unmarshal(le *log.Event) {
}
}
func (e *Event) Filter(ef *EventFilter) bool {
func (e *LogEvent) Filter(ef *LogEventFilter) bool {
if ef.reMessage != nil {
if !ef.reMessage.MatchString(e.Message) {
return false
@ -93,7 +93,7 @@ func (e *Event) Filter(ef *EventFilter) bool {
return true
}
type EventFilter struct {
type LogEventFilter struct {
Component string `json:"event"`
Message string `json:"message"`
Level string `json:"level"`
@ -109,10 +109,10 @@ type EventFilter struct {
}
type EventFilters struct {
Filters []EventFilter `json:"filters"`
Filters []LogEventFilter `json:"filters"`
}
func (ef *EventFilter) Compile() error {
func (ef *LogEventFilter) Compile() error {
if len(ef.Message) != 0 {
r, err := regexp.Compile("(?i)" + ef.Message)
if err != nil {
@ -162,3 +162,10 @@ func (ef *EventFilter) Compile() error {
return nil
}
type MediaEvent struct {
Action string `json:"action"`
Name string `json:"name,omitempty"`
Names []string `json:"names,omitempty"`
Timestamp int64 `json:"ts"`
}

View File

@ -7,7 +7,7 @@ import (
)
func TestEventFilter(t *testing.T) {
event := Event{
event := LogEvent{
Timestamp: 1234,
Level: 3,
Component: "foobar",
@ -17,7 +17,7 @@ func TestEventFilter(t *testing.T) {
},
}
filter := EventFilter{
filter := LogEventFilter{
Component: "foobar",
Level: "info",
Message: "none",
@ -29,7 +29,7 @@ func TestEventFilter(t *testing.T) {
res := event.Filter(&filter)
require.True(t, res)
filter = EventFilter{
filter = LogEventFilter{
Component: "foobar",
Level: "warn",
Message: "none",
@ -41,7 +41,7 @@ func TestEventFilter(t *testing.T) {
res = event.Filter(&filter)
require.False(t, res)
filter = EventFilter{
filter = LogEventFilter{
Component: "foobar",
Level: "info",
Message: "done",
@ -53,7 +53,7 @@ func TestEventFilter(t *testing.T) {
res = event.Filter(&filter)
require.False(t, res)
foobarfilter := EventFilter{
foobarfilter := LogEventFilter{
Component: "foobar",
Data: map[string]string{
"foo": "^b.*$",
@ -66,7 +66,7 @@ func TestEventFilter(t *testing.T) {
res = event.Filter(&foobarfilter)
require.True(t, res)
foobazfilter := EventFilter{
foobazfilter := LogEventFilter{
Component: "foobaz",
Data: map[string]string{
"foo": "baz",
@ -81,7 +81,7 @@ func TestEventFilter(t *testing.T) {
}
func TestEventFilterDataKey(t *testing.T) {
event := Event{
event := LogEvent{
Timestamp: 1234,
Level: 3,
Component: "foobar",
@ -91,7 +91,7 @@ func TestEventFilterDataKey(t *testing.T) {
},
}
filter := EventFilter{
filter := LogEventFilter{
Component: "foobar",
Level: "info",
Message: "none",
@ -103,7 +103,7 @@ func TestEventFilterDataKey(t *testing.T) {
res := event.Filter(&filter)
require.True(t, res)
filter = EventFilter{
filter = LogEventFilter{
Component: "foobar",
Level: "info",
Message: "none",
@ -118,7 +118,7 @@ func TestEventFilterDataKey(t *testing.T) {
res = event.Filter(&filter)
require.False(t, res)
filter = EventFilter{
filter = LogEventFilter{
Component: "foobar",
Level: "info",
Message: "none",
@ -135,7 +135,7 @@ func TestEventFilterDataKey(t *testing.T) {
}
func BenchmarkEventFilters(b *testing.B) {
event := Event{
event := LogEvent{
Timestamp: 1234,
Level: 3,
Component: "foobar",
@ -145,7 +145,7 @@ func BenchmarkEventFilters(b *testing.B) {
},
}
levelfilter := EventFilter{
levelfilter := LogEventFilter{
Component: "foobar",
Level: "info",
Data: map[string]string{

View File

@ -22,10 +22,3 @@ type FilesystemOperation struct {
Target string `json:"target"`
RateLimit uint64 `json:"bandwidth_limit_kbit"` // kbit/s
}
type FilesystemEvent struct {
Action string `json:"action"`
Name string `json:"name,omitempty"`
Names []string `json:"names,omitempty"`
Timestamp int64 `json:"ts"`
}

View File

@ -1,4 +1,4 @@
package api
// LogEvent represents a log event from the app
type LogEvent map[string]interface{}
// LogEntries represents a log event from the app
type LogEntries map[string]interface{}

View File

@ -57,7 +57,8 @@ type RestClient interface {
FilesystemDeleteFile(storage, path string) error // DELETE /v3/fs/{storage}/{path}
FilesystemAddFile(storage, path string, data io.Reader) error // PUT /v3/fs/{storage}/{path}
Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) // POST /v3/events
Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) // POST /v3/events
MediaEvents(ctx context.Context, storage, pattern string) (<-chan api.MediaEvent, error) // GET /v3/fs/{storage}, GET /v3/(rtmp|srt)
ProcessList(opts ProcessListOptions) ([]api.Process, error) // GET /v3/process
ProcessAdd(p *app.Config, metadata map[string]any) error // POST /v3/process

View File

@ -4,13 +4,14 @@ import (
"context"
"io"
"net/http"
"net/url"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/mem"
)
func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) {
func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) {
buf := mem.Get()
defer mem.Put(buf)
@ -25,16 +26,16 @@ func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-ch
return nil, err
}
channel := make(chan api.Event, 128)
channel := make(chan api.LogEvent, 128)
go func(stream io.ReadCloser, ch chan<- api.Event) {
go func(stream io.ReadCloser, ch chan<- api.LogEvent) {
defer stream.Close()
defer close(channel)
decoder := json.NewDecoder(stream)
for decoder.More() {
var event api.Event
var event api.LogEvent
if err := decoder.Decode(&event); err == io.EOF {
return
} else if err != nil {
@ -57,3 +58,49 @@ func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-ch
return channel, nil
}
func (r *restclient) MediaEvents(ctx context.Context, storage, pattern string) (<-chan api.MediaEvent, error) {
header := make(http.Header)
header.Set("Accept", "application/x-json-stream")
header.Set("Connection", "close")
query := &url.Values{}
query.Set("glob", pattern)
stream, err := r.stream(ctx, "POST", "/v3/events/media/"+url.PathEscape(storage), query, header, "", nil)
if err != nil {
return nil, err
}
channel := make(chan api.MediaEvent, 128)
go func(stream io.ReadCloser, ch chan<- api.MediaEvent) {
defer stream.Close()
defer close(channel)
decoder := json.NewDecoder(stream)
for decoder.More() {
var event api.MediaEvent
if err := decoder.Decode(&event); err == io.EOF {
return
} else if err != nil {
event.Action = "error"
event.Name = err.Error()
}
// Don't emit keepalives
if event.Action == "keepalive" {
continue
}
ch <- event
if event.Action == "" || event.Action == "error" {
return
}
}
}(stream, channel)
return channel, nil
}

View File

@ -32,7 +32,7 @@ func (h *ClusterHandler) Events(c echo.Context) error {
return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
}
filter := map[string]*api.EventFilter{}
filter := map[string]*api.LogEventFilter{}
for _, f := range filters.Filters {
f := f
@ -77,7 +77,7 @@ func (h *ClusterHandler) Events(c echo.Context) error {
done := make(chan error, 1)
filterEvent := func(event *api.Event) bool {
filterEvent := func(event *api.LogEvent) bool {
if len(filter) == 0 {
return true
}

View File

@ -3,31 +3,49 @@ package api
import (
"net/http"
"strings"
"sync"
"time"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/event"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/slices"
"github.com/labstack/echo/v4"
)
// The EventsHandler type provides handler functions for retrieving event.
type EventsHandler struct {
events log.ChannelWriter
logs log.ChannelWriter
media map[string]event.MediaSource
lock sync.Mutex
}
// NewEvents returns a new EventsHandler type
func NewEvents(events log.ChannelWriter) *EventsHandler {
func NewEvents(logs log.ChannelWriter) *EventsHandler {
return &EventsHandler{
events: events,
logs: logs,
media: map[string]event.MediaSource{},
}
}
// Events returns a stream of event
// @Summary Stream of events
// @Description Stream of event of whats happening in the core
func (h *EventsHandler) AddMediaSource(name string, source event.MediaSource) {
if source == nil {
return
}
h.lock.Lock()
defer h.lock.Unlock()
h.media[name] = source
}
// LogEvents returns a stream of event
// @Summary Stream of log events
// @Description Stream of log event of whats happening in the core
// @ID events
// @Tags v16.?.?
// @Accept json
@ -37,14 +55,14 @@ func NewEvents(events log.ChannelWriter) *EventsHandler {
// @Success 200 {object} api.Event
// @Security ApiKeyAuth
// @Router /api/v3/events [post]
func (h *EventsHandler) Events(c echo.Context) error {
func (h *EventsHandler) LogEvents(c echo.Context) error {
filters := api.EventFilters{}
if err := util.ShouldBindJSON(c, &filters); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
}
filter := map[string]*api.EventFilter{}
filter := map[string]*api.LogEventFilter{}
for _, f := range filters.Filters {
f := f
@ -76,7 +94,7 @@ func (h *EventsHandler) Events(c echo.Context) error {
res.Header().Set(echo.HeaderConnection, "close")
res.WriteHeader(http.StatusOK)
evts, cancel := h.events.Subscribe()
evts, cancel := h.logs.Subscribe()
defer cancel()
enc := json.NewEncoder(res)
@ -84,7 +102,7 @@ func (h *EventsHandler) Events(c echo.Context) error {
done := make(chan error, 1)
filterEvent := func(event *api.Event) bool {
filterEvent := func(event *api.LogEvent) bool {
if len(filter) == 0 {
return true
}
@ -97,7 +115,7 @@ func (h *EventsHandler) Events(c echo.Context) error {
return event.Filter(f)
}
event := api.Event{}
event := api.LogEvent{}
if contentType == "text/event-stream" {
res.Write([]byte(":keepalive\n\n"))
@ -155,3 +173,130 @@ func (h *EventsHandler) Events(c echo.Context) error {
}
}
}
// LogEvents returns a stream of media event
// @Summary Stream of media events
// @Description Stream of media event of whats happening in the core
// @ID events
// @Tags v16.?.?
// @Accept json
// @Param glob query string false "glob pattern for media names"
// @Produce json-stream
// @Success 200 {object} api.Event
// @Security ApiKeyAuth
// @Router /api/v3/events/media/{type} [post]
func (h *EventsHandler) MediaEvents(c echo.Context) error {
pattern := util.DefaultQuery(c, "glob", "")
var compiledPattern glob.Glob = nil
if len(pattern) != 0 {
var err error
compiledPattern, err = glob.Compile(pattern, '/')
if err != nil {
return api.Err(http.StatusBadRequest, "", "invalid pattern: %w", err)
}
}
mediaType := util.PathParam(c, "type")
keepaliveTicker := time.NewTicker(5 * time.Second)
defer keepaliveTicker.Stop()
listTicker := time.NewTicker(30 * time.Second)
defer listTicker.Stop()
req := c.Request()
reqctx := req.Context()
contentType := "application/x-json-stream"
h.lock.Lock()
mediaSource, ok := h.media[mediaType]
h.lock.Unlock()
if !ok {
return api.Err(http.StatusNotFound, "", "media source not found")
}
evts, cancel, err := mediaSource.Events()
if err != nil {
return api.Err(http.StatusNotImplemented, "", "events are not implemented for this server")
}
defer cancel()
res := c.Response()
res.Header().Set(echo.HeaderContentType, contentType+"; charset=UTF-8")
res.Header().Set(echo.HeaderCacheControl, "no-store")
res.Header().Set(echo.HeaderConnection, "close")
res.WriteHeader(http.StatusOK)
enc := json.NewEncoder(res)
enc.SetIndent("", "")
done := make(chan error, 1)
createList := func() api.MediaEvent {
list := mediaSource.MediaList()
if compiledPattern != nil {
names := []string{}
for _, l := range list {
if !compiledPattern.Match(l) {
continue
}
names = append(names, l)
}
list = names
}
event := api.MediaEvent{
Action: "list",
Names: slices.Copy(list),
Timestamp: time.Now().UnixMilli(),
}
return event
}
if err := enc.Encode(createList()); err != nil {
done <- err
}
res.Flush()
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
case <-keepaliveTicker.C:
res.Write([]byte("{\"action\":\"keepalive\"}\n"))
res.Flush()
case <-listTicker.C:
if err := enc.Encode(createList()); err != nil {
done <- err
}
res.Flush()
case evt := <-evts:
e := evt.(*event.MediaEvent)
if compiledPattern != nil {
if !compiledPattern.Match(e.Name) {
continue
}
}
if err := enc.Encode(api.MediaEvent{
Action: e.Action,
Name: e.Name,
Timestamp: e.Timestamp.UnixMilli(),
}); err != nil {
done <- err
}
res.Flush()
}
}
}

View File

@ -35,7 +35,7 @@ func NewLog(buffer log.BufferWriter) *LogHandler {
// @ID log-3
// @Param format query string false "Format of the list of log events (*console, raw)"
// @Produce json
// @Success 200 {array} api.LogEvent "application log"
// @Success 200 {array} api.LogEntries "application log"
// @Success 200 {array} string "application log"
// @Security ApiKeyAuth
// @Router /api/v3/log [get]

View File

@ -24,5 +24,5 @@ func TestLog(t *testing.T) {
response := mock.Request(t, http.StatusOK, router, "GET", "/", nil)
mock.Validate(t, []api.LogEvent{}, response.Data)
mock.Validate(t, []api.LogEntries{}, response.Data)
}

View File

@ -1,7 +1,6 @@
package handler
import (
"encoding/json"
"errors"
"fmt"
"io"
@ -258,11 +257,6 @@ func (h *FSHandler) DeleteFiles(c echo.Context) error {
}
func (h *FSHandler) ListFiles(c echo.Context) error {
accept := c.Request().Header.Get(echo.HeaderAccept)
if strings.Contains(accept, "application/x-json-stream") || strings.Contains(accept, "text/event-stream") {
return h.ListFilesEvent(c)
}
pattern := util.DefaultQuery(c, "glob", "")
sizeMin := util.DefaultQuery(c, "size_min", "0")
sizeMax := util.DefaultQuery(c, "size_max", "0")
@ -353,115 +347,6 @@ func (h *FSHandler) ListFiles(c echo.Context) error {
return c.JSON(http.StatusOK, fileinfos)
}
func (h *FSHandler) ListFilesEvent(c echo.Context) error {
pattern := util.DefaultQuery(c, "glob", "")
path := "/"
if len(pattern) != 0 {
prefix := glob.Prefix(pattern)
index := strings.LastIndex(prefix, "/")
path = prefix[:index+1]
}
var compiledPattern glob.Glob = nil
if len(pattern) != 0 {
var err error
compiledPattern, err = glob.Compile(pattern, '/')
if err != nil {
return api.Err(http.StatusBadRequest, "", "invalid pattern: %w", err)
}
}
options := fs.ListOptions{
Pattern: pattern,
}
keepaliveTicker := time.NewTicker(5 * time.Second)
defer keepaliveTicker.Stop()
listTicker := time.NewTicker(30 * time.Second)
defer listTicker.Stop()
req := c.Request()
reqctx := req.Context()
contentType := "text/event-stream"
accept := req.Header.Get(echo.HeaderAccept)
if strings.Contains(accept, "application/x-json-stream") {
contentType = "application/x-json-stream"
}
evts, cancel, err := h.FS.Filesystem.Events()
if err != nil {
return api.Err(http.StatusNotImplemented, "", "events are not implemented for this filesystem")
}
defer cancel()
res := c.Response()
res.Header().Set(echo.HeaderContentType, contentType+"; charset=UTF-8")
res.Header().Set(echo.HeaderCacheControl, "no-store")
res.Header().Set(echo.HeaderConnection, "close")
res.WriteHeader(http.StatusOK)
enc := json.NewEncoder(res)
enc.SetIndent("", "")
done := make(chan error, 1)
createList := func() api.FilesystemEvent {
files := h.FS.Filesystem.List(path, options)
event := api.FilesystemEvent{
Action: "list",
Names: make([]string, 0, len(files)),
Timestamp: time.Now().UnixMilli(),
}
for _, file := range files {
event.Names = append(event.Names, file.Name())
}
return event
}
if err := enc.Encode(createList()); err != nil {
done <- err
}
res.Flush()
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
case <-keepaliveTicker.C:
res.Write([]byte("{\"action\": \"keepalive\"}\n"))
res.Flush()
case <-listTicker.C:
if err := enc.Encode(createList()); err != nil {
done <- err
}
res.Flush()
case e := <-evts:
if compiledPattern != nil {
if !compiledPattern.Match(e.Name) {
continue
}
}
if err := enc.Encode(api.FilesystemEvent{
Action: e.Action,
Name: e.Name,
Timestamp: e.Timestamp.UnixMilli(),
}); err != nil {
done <- err
}
res.Flush()
}
}
}
// From: github.com/golang/go/net/http/fs.go@7dc9fcb
// errNoOverlap is returned by serveContent's parseRange if first-byte-pos of

View File

@ -283,6 +283,12 @@ func NewServer(config Config) (serverhandler.Server, error) {
config.LogEvents,
)
for name, fs := range s.filesystems {
s.v3handler.events.AddMediaSource(name, fs.Filesystem)
}
s.v3handler.events.AddMediaSource("srt", config.SRT)
s.v3handler.events.AddMediaSource("rtmp", config.RTMP)
if config.Restream != nil {
s.v3handler.process = api.NewProcess(
config.Restream,
@ -807,6 +813,8 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
// v3 Events
if s.v3handler.events != nil {
v3.POST("/events", s.v3handler.events.Events)
v3.POST("/events", s.v3handler.events.LogEvents)
v3.POST("/events/log", s.v3handler.events.LogEvents)
v3.POST("/events/media/:type", s.v3handler.events.MediaEvents)
}
}

View File

@ -11,6 +11,7 @@ import (
"sync"
"time"
"github.com/datarhei/core/v16/event"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/log"
)
@ -140,7 +141,7 @@ type diskFilesystem struct {
// Logger from the config
logger log.Logger
events *EventWriter
events *event.PubSub
}
// NewDiskFilesystem returns a new filesystem that is backed by the disk filesystem.
@ -174,7 +175,7 @@ func NewDiskFilesystem(config DiskConfig) (Filesystem, error) {
fs.logger = log.New("")
}
fs.events = NewEventWriter()
fs.events = event.NewPubSub()
return fs, nil
}
@ -218,7 +219,7 @@ func NewRootedDiskFilesystem(config RootedDiskConfig) (Filesystem, error) {
fs.logger = log.New("")
}
fs.events = NewEventWriter()
fs.events = event.NewPubSub()
return fs, nil
}
@ -369,9 +370,9 @@ func (fs *diskFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int
fs.lastSizeCheck = time.Time{}
if replace {
fs.events.Publish(NewEvent("update", path))
fs.events.Publish(event.NewMediaEvent("update", path))
} else {
fs.events.Publish(NewEvent("create", path))
fs.events.Publish(event.NewMediaEvent("create", path))
}
return size, !replace, nil
@ -437,7 +438,7 @@ func (fs *diskFilesystem) AppendFileReader(path string, r io.Reader, sizeHint in
fs.lastSizeCheck = time.Time{}
fs.events.Publish(NewEvent("update", path))
fs.events.Publish(event.NewMediaEvent("update", path))
return size, nil
}
@ -461,8 +462,8 @@ func (fs *diskFilesystem) rename(src, dst string) error {
// First try to rename the file
if err := os.Rename(src, dst); err == nil {
fs.events.Publish(NewEvent("remove", src))
fs.events.Publish(NewEvent("create", dst))
fs.events.Publish(event.NewMediaEvent("create", dst))
fs.events.Publish(event.NewMediaEvent("remove", src))
return nil
}
@ -472,14 +473,14 @@ func (fs *diskFilesystem) rename(src, dst string) error {
return fmt.Errorf("failed to copy files: %w", err)
}
fs.events.Publish(NewEvent("create", dst))
fs.events.Publish(event.NewMediaEvent("create", dst))
if err := os.Remove(src); err != nil {
os.Remove(dst)
return fmt.Errorf("failed to remove source file: %w", err)
}
fs.events.Publish(NewEvent("remove", src))
fs.events.Publish(event.NewMediaEvent("remove", src))
return nil
}
@ -520,7 +521,7 @@ func (fs *diskFilesystem) copy(src, dst string) error {
fs.lastSizeCheck = time.Time{}
fs.events.Publish(NewEvent("create", dst))
fs.events.Publish(event.NewMediaEvent("create", dst))
return nil
}
@ -574,7 +575,7 @@ func (fs *diskFilesystem) Remove(path string) int64 {
fs.lastSizeCheck = time.Time{}
fs.events.Publish(NewEvent("remove", path))
fs.events.Publish(event.NewMediaEvent("remove", path))
return size
}
@ -642,7 +643,7 @@ func (fs *diskFilesystem) RemoveList(path string, options ListOptions) ([]string
if err := os.Remove(path); err == nil {
files = append(files, name)
size += info.Size()
fs.events.Publish(NewEvent("remove", path))
fs.events.Publish(event.NewMediaEvent("remove", path))
}
})
@ -792,8 +793,19 @@ func (fs *diskFilesystem) cleanPath(path string) string {
return filepath.Join(fs.root, filepath.Clean(path))
}
func (fs *diskFilesystem) Events() (<-chan Event, EventsCancelFunc, error) {
func (fs *diskFilesystem) Events() (<-chan event.Event, event.CancelFunc, error) {
ch, cancel := fs.events.Subscribe()
return ch, cancel, nil
}
func (fs *diskFilesystem) MediaList() []string {
files := fs.List("/", ListOptions{})
list := make([]string, 0, len(files))
for _, file := range files {
list = append(list, file.Name())
}
return list
}

View File

@ -1,139 +1,14 @@
package fs
import (
"context"
"fmt"
"sync"
"time"
type Action string
"github.com/lithammer/shortuuid/v4"
const (
ActionCreate Action = "create"
ActionUpdate Action = "update"
ActionRemove Action = "remove"
ActionList Action = "list"
)
type Event struct {
Action string
Name string
Timestamp time.Time
}
func NewEvent(action, name string) Event {
return Event{
Action: action,
Name: name,
Timestamp: time.Now(),
}
}
func (e Event) clone() Event {
return Event{
Action: e.Action,
Name: e.Name,
Timestamp: e.Timestamp,
}
}
type EventsCancelFunc func()
type EventWriter struct {
publisher chan Event
publisherClosed bool
publisherLock sync.Mutex
ctx context.Context
cancel context.CancelFunc
subscriber map[string]chan Event
subscriberLock sync.Mutex
}
func NewEventWriter() *EventWriter {
w := &EventWriter{
publisher: make(chan Event, 1024),
publisherClosed: false,
subscriber: make(map[string]chan Event),
}
w.ctx, w.cancel = context.WithCancel(context.Background())
go w.broadcast()
return w
}
func (w *EventWriter) Publish(e Event) error {
event := e.clone()
w.publisherLock.Lock()
defer w.publisherLock.Unlock()
if w.publisherClosed {
return fmt.Errorf("writer is closed")
}
select {
case w.publisher <- event:
default:
return fmt.Errorf("publisher queue full")
}
return nil
}
func (w *EventWriter) Close() {
w.cancel()
w.publisherLock.Lock()
close(w.publisher)
w.publisherClosed = true
w.publisherLock.Unlock()
w.subscriberLock.Lock()
for _, c := range w.subscriber {
close(c)
}
w.subscriber = make(map[string]chan Event)
w.subscriberLock.Unlock()
}
func (w *EventWriter) Subscribe() (<-chan Event, EventsCancelFunc) {
l := make(chan Event, 1024)
var id string = ""
w.subscriberLock.Lock()
for {
id = shortuuid.New()
if _, ok := w.subscriber[id]; !ok {
w.subscriber[id] = l
break
}
}
w.subscriberLock.Unlock()
unsubscribe := func() {
w.subscriberLock.Lock()
delete(w.subscriber, id)
w.subscriberLock.Unlock()
}
return l, unsubscribe
}
func (w *EventWriter) broadcast() {
for {
select {
case <-w.ctx.Done():
return
case e := <-w.publisher:
w.subscriberLock.Lock()
for _, c := range w.subscriber {
pp := e.clone()
select {
case c <- pp:
default:
}
}
w.subscriberLock.Unlock()
}
}
func (a Action) String() string {
return string(a)
}

View File

@ -7,6 +7,8 @@ import (
"io/fs"
"os"
"time"
"github.com/datarhei/core/v16/event"
)
var ErrExist = errors.New("file or directory already exists")
@ -85,7 +87,7 @@ type ReadFilesystem interface {
// of that file is verfied. In case the file is not found, the error ErrNotExist will be returned.
LookPath(file string) (string, error)
Events() (<-chan Event, EventsCancelFunc, error)
event.MediaSource
}
type WriteFilesystem interface {

View File

@ -12,6 +12,7 @@ import (
"sync"
"time"
"github.com/datarhei/core/v16/event"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/mem"
@ -136,7 +137,7 @@ type memFilesystem struct {
storage memStorage
dirs *dirStorage
events *EventWriter
events *event.PubSub
}
type dirStorage struct {
@ -224,7 +225,7 @@ func NewMemFilesystem(config MemConfig) (Filesystem, error) {
fs.logger.Debug().Log("Created")
fs.events = NewEventWriter()
fs.events = event.NewPubSub()
return fs, nil
}
@ -466,10 +467,10 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int)
})
if replace {
fs.events.Publish(NewEvent("update", newFile.name))
fs.events.Publish(event.NewMediaEvent("update", newFile.name))
logger.Debug().Log("Replaced file")
} else {
fs.events.Publish(NewEvent("create", newFile.name))
fs.events.Publish(event.NewMediaEvent("create", newFile.name))
logger.Debug().Log("Added file")
}
@ -526,7 +527,7 @@ func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int
"size_bytes": fs.currentSize,
}).Log("Appended to file")
fs.events.Publish(NewEvent("update", file.name))
fs.events.Publish(event.NewMediaEvent("update", file.name))
return size, nil
}
@ -564,7 +565,7 @@ func (fs *memFilesystem) Purge(size int64) int64 {
"size_bytes": fs.currentSize,
}).Debug().Log("Purged file")
fs.events.Publish(NewEvent("remove", f.name))
fs.events.Publish(event.NewMediaEvent("remove", f.name))
if size <= 0 {
break
@ -607,23 +608,24 @@ func (fs *memFilesystem) Rename(src, dst string) error {
}
dstFile, replace := fs.storage.Store(dst, srcFile)
if replace {
fs.events.Publish(event.NewMediaEvent("update", dst))
} else {
fs.events.Publish(event.NewMediaEvent("create", dst))
}
fs.storage.Delete(src)
fs.events.Publish(NewEvent("remove", src))
fs.events.Publish(event.NewMediaEvent("remove", src))
fs.dirs.Remove(src)
if !replace {
fs.dirs.Add(dst)
fs.events.Publish(NewEvent("create", dst))
}
fs.sizeLock.Lock()
defer fs.sizeLock.Unlock()
if replace {
fs.events.Publish(NewEvent("update", dst))
dstFile.Close()
fs.currentSize -= dstFile.size
@ -660,14 +662,14 @@ func (fs *memFilesystem) Copy(src, dst string) error {
if !replace {
fs.dirs.Add(dst)
fs.events.Publish(NewEvent("create", dst))
fs.events.Publish(event.NewMediaEvent("create", dst))
}
fs.sizeLock.Lock()
defer fs.sizeLock.Unlock()
if replace {
fs.events.Publish(NewEvent("update", dst))
fs.events.Publish(event.NewMediaEvent("update", dst))
replacedFile.Close()
fs.currentSize -= replacedFile.size
}
@ -754,7 +756,7 @@ func (fs *memFilesystem) remove(path string) int64 {
"size_bytes": fs.currentSize,
}).Debug().Log("Removed file")
fs.events.Publish(NewEvent("remove", file.name))
fs.events.Publish(event.NewMediaEvent("remove", file.name))
return file.size
}
@ -830,7 +832,7 @@ func (fs *memFilesystem) RemoveList(path string, options ListOptions) ([]string,
file.Close()
fs.events.Publish(NewEvent("remove", file.name))
fs.events.Publish(event.NewMediaEvent("remove", file.name))
}
fs.sizeLock.Lock()
@ -948,8 +950,19 @@ func (fs *memFilesystem) cleanPath(path string) string {
return filepath.Join("/", filepath.Clean(path))
}
func (fs *memFilesystem) Events() (<-chan Event, EventsCancelFunc, error) {
func (fs *memFilesystem) Events() (<-chan event.Event, event.CancelFunc, error) {
ch, cancel := fs.events.Subscribe()
return ch, cancel, nil
}
func (fs *memFilesystem) MediaList() []string {
files := fs.List("/", ListOptions{})
list := make([]string, 0, len(files))
for _, file := range files {
list = append(list, file.Name())
}
return list
}

View File

@ -12,6 +12,7 @@ import (
"sync"
"time"
"github.com/datarhei/core/v16/event"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/mem"
"github.com/minio/minio-go/v7"
@ -686,10 +687,14 @@ func (fs *s3Filesystem) cleanPath(path string) string {
return filepath.Join("/", filepath.Clean(path))[1:]
}
func (fs *s3Filesystem) Events() (<-chan Event, EventsCancelFunc, error) {
func (fs *s3Filesystem) Events() (<-chan event.Event, event.CancelFunc, error) {
return nil, func() {}, fmt.Errorf("events are not implemented for this filesystem")
}
func (fs *s3Filesystem) MediaList() []string {
return nil
}
type s3FileInfo struct {
name string
size int64

View File

@ -356,8 +356,8 @@ func (e *Event) Error() Logger {
return clone
}
func (l *Event) Write(p []byte) (int, error) {
l.Log("%s", strings.TrimSpace(string(p)))
func (e *Event) Write(p []byte) (int, error) {
e.Log("%s", strings.TrimSpace(string(p)))
return len(p), nil
}

View File

@ -12,6 +12,7 @@ import (
"github.com/datarhei/core/v16/cluster/node"
enctoken "github.com/datarhei/core/v16/encoding/token"
"github.com/datarhei/core/v16/event"
"github.com/datarhei/core/v16/iam"
iamidentity "github.com/datarhei/core/v16/iam/identity"
"github.com/datarhei/core/v16/log"
@ -79,6 +80,8 @@ type Server interface {
// Channels return a list of currently publishing streams
Channels() []string
event.MediaSource
}
// server is an implementation of the Server interface
@ -101,6 +104,8 @@ type server struct {
proxy *node.Manager
iam iam.IAM
events *event.PubSub
}
// New creates a new RTMP server according to the given config
@ -120,6 +125,7 @@ func New(config Config) (Server, error) {
collector: config.Collector,
proxy: config.Proxy,
iam: config.IAM,
events: event.NewPubSub(),
}
if s.collector == nil {
@ -178,15 +184,18 @@ func (s *server) Close() {
}
}
// Channels returns the list of streams that are
// publishing currently
// Channels returns the list of streams that are publishing currently, excluding proxied channels
func (s *server) Channels() []string {
channels := []string{}
s.lock.RLock()
defer s.lock.RUnlock()
for key := range s.channels {
for key, channel := range s.channels {
if channel.isProxy {
continue
}
channels = append(channels, key)
}
@ -366,6 +375,15 @@ func (s *server) handlePublish(conn *rtmp.Conn) {
return
}
// Check if this stream is already published on the cluster
if s.proxy != nil {
_, err = s.proxy.MediaGetURL("rtmp", playpath)
if err == nil {
s.log(identity, "PUBLISH", "CONFLICT", playpath, "already publishing", remote)
return
}
}
err = s.publish(conn, playpath, remote, identity, false)
if err != nil {
s.logger.WithField("path", conn.URL.Path).WithError(err).Log("")
@ -419,6 +437,10 @@ func (s *server) publish(src connection, playpath string, remote net.Addr, ident
s.log(identity, "PUBLISH", "STREAM", playpath, stream.Type().String(), remote)
}
if !isProxy {
s.events.Publish(event.NewMediaEvent("create", playpath))
}
// Ingest the data, blocks until done
avutil.CopyPackets(ch.queue, src)
@ -428,6 +450,10 @@ func (s *server) publish(src connection, playpath string, remote net.Addr, ident
ch.Close()
if !isProxy {
s.events.Publish(event.NewMediaEvent("remove", playpath))
}
s.log(identity, "PUBLISH", "STOP", playpath, "", remote)
return nil
@ -493,3 +519,13 @@ func (s *server) findDomainFromPlaypath(path string) string {
return "$none"
}
func (s *server) Events() (<-chan event.Event, event.CancelFunc, error) {
ch, cancel := s.events.Subscribe()
return ch, cancel, nil
}
func (s *server) MediaList() []string {
return s.Channels()
}

View File

@ -13,6 +13,7 @@ import (
"github.com/datarhei/core/v16/cluster/node"
enctoken "github.com/datarhei/core/v16/encoding/token"
"github.com/datarhei/core/v16/event"
"github.com/datarhei/core/v16/iam"
iamidentity "github.com/datarhei/core/v16/iam/identity"
"github.com/datarhei/core/v16/log"
@ -60,6 +61,8 @@ type Server interface {
// Channels return a list of currently publishing streams
Channels() []Channel
event.MediaSource
}
// server implements the Server interface
@ -87,6 +90,8 @@ type server struct {
proxy *node.Manager
iam iam.IAM
events *event.PubSub
}
func New(config Config) (Server, error) {
@ -98,6 +103,7 @@ func New(config Config) (Server, error) {
iam: config.IAM,
logger: config.Logger,
proxy: config.Proxy,
events: event.NewPubSub(),
}
if s.collector == nil {
@ -369,6 +375,16 @@ func (s *server) publish(conn srt.Conn, isProxy bool) error {
si, _ := url.ParseStreamId(streamId)
identity, _ := s.findIdentityFromToken(si.Token)
// Check if this stream is already published on the cluster
if s.proxy != nil {
_, err := s.proxy.MediaGetURL("rtmp", si.Resource)
if err == nil {
s.log(identity, "PUBLISH", "CONFLICT", si.Resource, "already publishing", client)
conn.Close()
return fmt.Errorf("already publishing this resource")
}
}
// Look for the stream
s.lock.Lock()
ch := s.channels[si.Resource]
@ -388,6 +404,10 @@ func (s *server) publish(conn srt.Conn, isProxy bool) error {
s.log(identity, "PUBLISH", "START", si.Resource, "", client)
if !isProxy {
s.events.Publish(event.NewMediaEvent("create", si.Resource))
}
// Blocks until connection closes
err := ch.pubsub.Publish(conn)
@ -397,6 +417,10 @@ func (s *server) publish(conn srt.Conn, isProxy bool) error {
ch.Close()
if !isProxy {
s.events.Publish(event.NewMediaEvent("remove", si.Resource))
}
s.log(identity, "PUBLISH", "STOP", si.Resource, err.Error(), client)
conn.Close()
@ -561,3 +585,20 @@ func (s *server) findDomainFromPlaypath(path string) string {
return "$none"
}
func (s *server) Events() (<-chan event.Event, event.CancelFunc, error) {
ch, cancel := s.events.Subscribe()
return ch, cancel, nil
}
func (s *server) MediaList() []string {
channels := s.Channels()
list := make([]string, 0, len(channels))
for _, channel := range channels {
list = append(list, channel.Name)
}
return list
}