diff --git a/cluster/node/core.go b/cluster/node/core.go index c10ea752..600a9b1e 100644 --- a/cluster/node/core.go +++ b/cluster/node/core.go @@ -19,11 +19,68 @@ import ( "github.com/datarhei/core/v16/restream/app" ) +type Media struct { + available bool // Whether filesystem events are available + media map[string]int64 // List of files and timestamp of when they have been last seen + lock sync.RWMutex // Lock for the map +} + +func (m *Media) update(name string, timestamp int64) { + m.lock.Lock() + defer m.lock.Unlock() + + m.media[name] = timestamp +} + +func (m *Media) remove(name string) { + m.lock.Lock() + defer m.lock.Unlock() + + delete(m.media, name) +} + +func (m *Media) set(name []string, timestamp int64) { + media := map[string]int64{} + + for _, n := range name { + media[n] = timestamp + } + + m.lock.Lock() + defer m.lock.Unlock() + + m.media = media +} + +func (m *Media) get(name string) (int64, bool) { + m.lock.RLock() + defer m.lock.RUnlock() + + ts, ok := m.media[name] + + return ts, ok +} + +func (m *Media) list() []string { + m.lock.RLock() + defer m.lock.RUnlock() + + names := make([]string, 0, len(m.media)) + + for name := range m.media { + names = append(names, name) + } + + return names +} + type Core struct { id string - client client.RestClient - clientErr error + client client.RestClient + clientErr error + clientCtx context.Context + clientCancel context.CancelFunc lock sync.RWMutex @@ -39,6 +96,9 @@ type Core struct { hasSRT bool srtAddress *url.URL + media map[string]*Media // map[storage]map[path]lastchange + mediaLock sync.RWMutex + logger log.Logger } @@ -48,6 +108,7 @@ func NewCore(id string, logger log.Logger) *Core { core := &Core{ id: id, logger: logger, + media: map[string]*Media{}, } if core.logger == nil { @@ -68,18 +129,18 @@ func (n *Core) SetEssentials(address string, config *config.Config) { if n.address != address { n.address = address - n.client = nil // force reconnet + n.disconnect() // force reconnet } if config != nil { if n.config == nil { n.config = config - n.client = nil // force reconnect + n.disconnect() // force reconnect } if n.config != nil && n.config.UpdatedAt != config.UpdatedAt { n.config = config - n.client = nil // force reconnect + n.disconnect() // force reconnect } } } @@ -112,12 +173,23 @@ func (n *Core) Stop() { n.cancel() n.cancel = nil + + n.disconnect() } func (n *Core) Reconnect() { n.lock.Lock() defer n.lock.Unlock() + n.disconnect() +} + +func (n *Core) disconnect() { + if n.clientCancel != nil { + n.clientCancel() + n.clientCancel = nil + } + n.client = nil } @@ -240,11 +312,71 @@ func (n *Core) connect() error { n.srtAddress = srtAddress n.client = client + ctx, cancel := context.WithCancel(context.Background()) + n.clientCtx = ctx + n.clientCancel = cancel + + go n.mediaEvents(ctx, "mem") + go n.mediaEvents(ctx, "disk") + go n.mediaEvents(ctx, "rtmp") + go n.mediaEvents(ctx, "srt") + n.lock.Unlock() return nil } +func (n *Core) mediaEvents(ctx context.Context, storage string) { + m := &Media{} + + for { + ch, err := n.client.MediaEvents(ctx, storage, "/**") + if err != nil { + m.available = false + m.media = nil + + n.mediaLock.Lock() + n.media[storage] = m + n.mediaLock.Unlock() + + n.logger.Error().WithField("storage", storage).WithError(err).Log("Failed to connect to event source") + + return + } + + n.logger.Info().WithField("storage", storage).Log("Connected to event source") + + m.available = true + m.media = map[string]int64{} + n.mediaLock.Lock() + n.media[storage] = m + n.mediaLock.Unlock() + + innerloop: + for { + select { + case <-ctx.Done(): + return + case e, ok := <-ch: + if !ok { + break innerloop + } + switch e.Action { + case "update", "create": + m.update(e.Name, e.Timestamp) + case "remove": + m.remove(e.Name) + case "list": + m.set(e.Names, e.Timestamp) + } + } + } + + n.logger.Info().WithField("storage", storage).Log("Reconnecting to event source") + time.Sleep(5 * time.Second) + } +} + type CoreAbout struct { ID string Name string @@ -538,135 +670,29 @@ func (n *Core) MediaList() NodeFiles { LastUpdate: time.Now(), } - errorsChan := make(chan error, 8) - filesChan := make(chan string, 1024) - errorList := []error{} + prefixes := []string{} - wgList := sync.WaitGroup{} - wgList.Add(1) - - go func() { - defer wgList.Done() - - for file := range filesChan { - files.Files = append(files.Files, file) - } - - for err := range errorsChan { - errorList = append(errorList, err) - } - }() - - wg := sync.WaitGroup{} - wg.Add(2) - - go func(f chan<- string, e chan<- error) { - defer wg.Done() - - n.lock.RLock() - client := n.client - n.lock.RUnlock() - - if client == nil { - e <- ErrNoPeer - return - } - - files, err := client.FilesystemList("mem", "/*", "name", "asc") - if err != nil { - e <- err - return - } - - for _, file := range files { - f <- "mem:" + file.Name - } - }(filesChan, errorsChan) - - go func(f chan<- string, e chan<- error) { - defer wg.Done() - - n.lock.RLock() - client := n.client - n.lock.RUnlock() - - if client == nil { - e <- ErrNoPeer - return - } - - files, err := client.FilesystemList("disk", "/*", "name", "asc") - if err != nil { - e <- err - return - } - - for _, file := range files { - f <- "disk:" + file.Name - } - }(filesChan, errorsChan) - - if n.hasRTMP { - wg.Add(1) - - go func(f chan<- string, e chan<- error) { - defer wg.Done() - - n.lock.RLock() - client := n.client - n.lock.RUnlock() - - if client == nil { - e <- ErrNoPeer - return - } - - files, err := client.RTMPChannels() - if err != nil { - e <- err - return - } - - for _, file := range files { - f <- "rtmp:" + file.Name - } - }(filesChan, errorsChan) + n.mediaLock.RLock() + for prefix := range n.media { + prefixes = append(prefixes, prefix) } + n.mediaLock.RUnlock() - if n.hasSRT { - wg.Add(1) + for _, prefix := range prefixes { + n.mediaLock.RLock() + m, ok := n.media[prefix] + n.mediaLock.RUnlock() - go func(f chan<- string, e chan<- error) { - defer wg.Done() + if !ok { + continue + } - n.lock.RLock() - client := n.client - n.lock.RUnlock() - - if client == nil { - e <- ErrNoPeer - return - } - - files, err := client.SRTChannels() - if err != nil { - e <- err - return - } - - for _, file := range files { - f <- "srt:" + file.Name - } - }(filesChan, errorsChan) + list := m.list() + for _, name := range list { + files.Files = append(files.Files, prefix+":"+name) + } } - wg.Wait() - - close(filesChan) - close(errorsChan) - - wgList.Wait() - return files } @@ -702,18 +728,19 @@ func cloneURL(src *url.URL) *url.URL { func (n *Core) MediaGetURL(prefix, path string) (*url.URL, error) { var u *url.URL - if prefix == "mem" { + switch prefix { + case "mem": u = cloneURL(n.httpAddress) u = u.JoinPath("memfs", path) - } else if prefix == "disk" { + case "disk": u = cloneURL(n.httpAddress) u = u.JoinPath(path) - } else if prefix == "rtmp" { + case "rtmp": u = cloneURL(n.rtmpAddress) u = u.JoinPath(path) - } else if prefix == "srt" { + case "srt": u = cloneURL(n.srtAddress) - } else { + default: return nil, fmt.Errorf("unknown prefix") } @@ -721,6 +748,19 @@ func (n *Core) MediaGetURL(prefix, path string) (*url.URL, error) { } func (n *Core) MediaGetInfo(prefix, path string) (int64, time.Time, error) { + n.mediaLock.RLock() + m, ok := n.media[prefix] + n.mediaLock.RUnlock() + + if ok && m.available { + lastmod, ok := m.get(path) + if !ok { + return 0, time.Time{}, fmt.Errorf("media not found") + } + + return 0, time.UnixMilli(lastmod), nil + } + if prefix == "disk" || prefix == "mem" { return n.FilesystemGetFileInfo(prefix, path) } @@ -873,7 +913,7 @@ func (n *Core) ClusterProcessList() ([]Process, error) { return processes, nil } -func (n *Core) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) { +func (n *Core) Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) { n.lock.RLock() client := n.client n.lock.RUnlock() diff --git a/cluster/node/manager.go b/cluster/node/manager.go index 926d480c..4d8bb1cf 100644 --- a/cluster/node/manager.go +++ b/cluster/node/manager.go @@ -655,12 +655,12 @@ func (p *Manager) ProcessValidateConfig(nodeid string, config *app.Config) error return node.Core().ProcessValidateConfig(config) } -func (p *Manager) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) { - eventChan := make(chan api.Event, 128) +func (p *Manager) Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) { + eventChan := make(chan api.LogEvent, 128) p.lock.RLock() for _, n := range p.nodes { - go func(node *Node, e chan<- api.Event) { + go func(node *Node, e chan<- api.LogEvent) { eventChan, err := node.Core().Events(ctx, filters) if err != nil { return diff --git a/docs/docs.go b/docs/docs.go index d0027206..f14469e8 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -4986,7 +4986,8 @@ const docTemplate = `{ ], "description": "List all currently publishing RTMP streams.", "produces": [ - "application/json" + "application/json", + "application/x-json-stream" ], "tags": [ "v16.7.2" @@ -5206,7 +5207,8 @@ const docTemplate = `{ ], "description": "List all currently publishing SRT streams. This endpoint is EXPERIMENTAL and may change in future.", "produces": [ - "application/json" + "application/json", + "application/x-json-stream" ], "tags": [ "v16.9.0" diff --git a/docs/swagger.json b/docs/swagger.json index 229dc1b5..dd940a30 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -4979,7 +4979,8 @@ ], "description": "List all currently publishing RTMP streams.", "produces": [ - "application/json" + "application/json", + "application/x-json-stream" ], "tags": [ "v16.7.2" @@ -5199,7 +5200,8 @@ ], "description": "List all currently publishing SRT streams. This endpoint is EXPERIMENTAL and may change in future.", "produces": [ - "application/json" + "application/json", + "application/x-json-stream" ], "tags": [ "v16.9.0" diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 7e1ae986..aec0ad13 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -6177,6 +6177,7 @@ paths: operationId: rtmp-3-list-channels produces: - application/json + - application/x-json-stream responses: "200": description: OK @@ -6317,6 +6318,7 @@ paths: operationId: srt-3-list-channels produces: - application/json + - application/x-json-stream responses: "200": description: OK diff --git a/event/event.go b/event/event.go new file mode 100644 index 00000000..c331e683 --- /dev/null +++ b/event/event.go @@ -0,0 +1,124 @@ +package event + +import ( + "context" + "fmt" + "sync" + + "github.com/lithammer/shortuuid/v4" +) + +type Event interface { + Clone() Event +} + +type CancelFunc func() + +type EventSource interface { + Events() (<-chan Event, CancelFunc, error) +} + +type PubSub struct { + publisher chan Event + publisherClosed bool + publisherLock sync.Mutex + + ctx context.Context + cancel context.CancelFunc + + subscriber map[string]chan Event + subscriberLock sync.Mutex +} + +func NewPubSub() *PubSub { + w := &PubSub{ + publisher: make(chan Event, 1024), + publisherClosed: false, + subscriber: make(map[string]chan Event), + } + + w.ctx, w.cancel = context.WithCancel(context.Background()) + + go w.broadcast() + + return w +} + +func (w *PubSub) Publish(e Event) error { + event := e.Clone() + + w.publisherLock.Lock() + defer w.publisherLock.Unlock() + + if w.publisherClosed { + return fmt.Errorf("writer is closed") + } + + select { + case w.publisher <- event: + default: + return fmt.Errorf("publisher queue full") + } + + return nil +} + +func (w *PubSub) Close() { + w.cancel() + + w.publisherLock.Lock() + close(w.publisher) + w.publisherClosed = true + w.publisherLock.Unlock() + + w.subscriberLock.Lock() + for _, c := range w.subscriber { + close(c) + } + w.subscriber = make(map[string]chan Event) + w.subscriberLock.Unlock() +} + +func (w *PubSub) Subscribe() (<-chan Event, CancelFunc) { + l := make(chan Event, 1024) + + var id string = "" + + w.subscriberLock.Lock() + for { + id = shortuuid.New() + if _, ok := w.subscriber[id]; !ok { + w.subscriber[id] = l + break + } + } + w.subscriberLock.Unlock() + + unsubscribe := func() { + w.subscriberLock.Lock() + delete(w.subscriber, id) + w.subscriberLock.Unlock() + } + + return l, unsubscribe +} + +func (w *PubSub) broadcast() { + for { + select { + case <-w.ctx.Done(): + return + case e := <-w.publisher: + w.subscriberLock.Lock() + for _, c := range w.subscriber { + pp := e.Clone() + + select { + case c <- pp: + default: + } + } + w.subscriberLock.Unlock() + } + } +} diff --git a/event/media.go b/event/media.go new file mode 100644 index 00000000..a11e60df --- /dev/null +++ b/event/media.go @@ -0,0 +1,33 @@ +package event + +import ( + "time" +) + +type MediaSource interface { + EventSource + + MediaList() []string +} + +type MediaEvent struct { + Action string + Name string + Timestamp time.Time +} + +func NewMediaEvent(action string, name string) *MediaEvent { + return &MediaEvent{ + Action: action, + Name: name, + Timestamp: time.Now(), + } +} + +func (e *MediaEvent) Clone() Event { + return &MediaEvent{ + Action: e.Action, + Name: e.Name, + Timestamp: e.Timestamp, + } +} diff --git a/http/api/event.go b/http/api/event.go index 0685cb82..49b2f494 100644 --- a/http/api/event.go +++ b/http/api/event.go @@ -9,7 +9,7 @@ import ( "github.com/datarhei/core/v16/log" ) -type Event struct { +type LogEvent struct { Timestamp int64 `json:"ts" format:"int64"` Level int `json:"level"` Component string `json:"event"` @@ -20,7 +20,7 @@ type Event struct { Data map[string]string `json:"data"` } -func (e *Event) Unmarshal(le *log.Event) { +func (e *LogEvent) Unmarshal(le *log.Event) { e.Timestamp = le.Time.Unix() e.Level = int(le.Level) e.Component = strings.ToLower(le.Component) @@ -53,7 +53,7 @@ func (e *Event) Unmarshal(le *log.Event) { } } -func (e *Event) Filter(ef *EventFilter) bool { +func (e *LogEvent) Filter(ef *LogEventFilter) bool { if ef.reMessage != nil { if !ef.reMessage.MatchString(e.Message) { return false @@ -93,7 +93,7 @@ func (e *Event) Filter(ef *EventFilter) bool { return true } -type EventFilter struct { +type LogEventFilter struct { Component string `json:"event"` Message string `json:"message"` Level string `json:"level"` @@ -109,10 +109,10 @@ type EventFilter struct { } type EventFilters struct { - Filters []EventFilter `json:"filters"` + Filters []LogEventFilter `json:"filters"` } -func (ef *EventFilter) Compile() error { +func (ef *LogEventFilter) Compile() error { if len(ef.Message) != 0 { r, err := regexp.Compile("(?i)" + ef.Message) if err != nil { @@ -162,3 +162,10 @@ func (ef *EventFilter) Compile() error { return nil } + +type MediaEvent struct { + Action string `json:"action"` + Name string `json:"name,omitempty"` + Names []string `json:"names,omitempty"` + Timestamp int64 `json:"ts"` +} diff --git a/http/api/event_test.go b/http/api/event_test.go index 985967a2..e169d184 100644 --- a/http/api/event_test.go +++ b/http/api/event_test.go @@ -7,7 +7,7 @@ import ( ) func TestEventFilter(t *testing.T) { - event := Event{ + event := LogEvent{ Timestamp: 1234, Level: 3, Component: "foobar", @@ -17,7 +17,7 @@ func TestEventFilter(t *testing.T) { }, } - filter := EventFilter{ + filter := LogEventFilter{ Component: "foobar", Level: "info", Message: "none", @@ -29,7 +29,7 @@ func TestEventFilter(t *testing.T) { res := event.Filter(&filter) require.True(t, res) - filter = EventFilter{ + filter = LogEventFilter{ Component: "foobar", Level: "warn", Message: "none", @@ -41,7 +41,7 @@ func TestEventFilter(t *testing.T) { res = event.Filter(&filter) require.False(t, res) - filter = EventFilter{ + filter = LogEventFilter{ Component: "foobar", Level: "info", Message: "done", @@ -53,7 +53,7 @@ func TestEventFilter(t *testing.T) { res = event.Filter(&filter) require.False(t, res) - foobarfilter := EventFilter{ + foobarfilter := LogEventFilter{ Component: "foobar", Data: map[string]string{ "foo": "^b.*$", @@ -66,7 +66,7 @@ func TestEventFilter(t *testing.T) { res = event.Filter(&foobarfilter) require.True(t, res) - foobazfilter := EventFilter{ + foobazfilter := LogEventFilter{ Component: "foobaz", Data: map[string]string{ "foo": "baz", @@ -81,7 +81,7 @@ func TestEventFilter(t *testing.T) { } func TestEventFilterDataKey(t *testing.T) { - event := Event{ + event := LogEvent{ Timestamp: 1234, Level: 3, Component: "foobar", @@ -91,7 +91,7 @@ func TestEventFilterDataKey(t *testing.T) { }, } - filter := EventFilter{ + filter := LogEventFilter{ Component: "foobar", Level: "info", Message: "none", @@ -103,7 +103,7 @@ func TestEventFilterDataKey(t *testing.T) { res := event.Filter(&filter) require.True(t, res) - filter = EventFilter{ + filter = LogEventFilter{ Component: "foobar", Level: "info", Message: "none", @@ -118,7 +118,7 @@ func TestEventFilterDataKey(t *testing.T) { res = event.Filter(&filter) require.False(t, res) - filter = EventFilter{ + filter = LogEventFilter{ Component: "foobar", Level: "info", Message: "none", @@ -135,7 +135,7 @@ func TestEventFilterDataKey(t *testing.T) { } func BenchmarkEventFilters(b *testing.B) { - event := Event{ + event := LogEvent{ Timestamp: 1234, Level: 3, Component: "foobar", @@ -145,7 +145,7 @@ func BenchmarkEventFilters(b *testing.B) { }, } - levelfilter := EventFilter{ + levelfilter := LogEventFilter{ Component: "foobar", Level: "info", Data: map[string]string{ diff --git a/http/api/filesystems.go b/http/api/filesystems.go index 1b470b84..da2395d4 100644 --- a/http/api/filesystems.go +++ b/http/api/filesystems.go @@ -22,10 +22,3 @@ type FilesystemOperation struct { Target string `json:"target"` RateLimit uint64 `json:"bandwidth_limit_kbit"` // kbit/s } - -type FilesystemEvent struct { - Action string `json:"action"` - Name string `json:"name,omitempty"` - Names []string `json:"names,omitempty"` - Timestamp int64 `json:"ts"` -} diff --git a/http/api/log.go b/http/api/log.go index 137f2abc..cf4773ac 100644 --- a/http/api/log.go +++ b/http/api/log.go @@ -1,4 +1,4 @@ package api -// LogEvent represents a log event from the app -type LogEvent map[string]interface{} +// LogEntries represents a log event from the app +type LogEntries map[string]interface{} diff --git a/http/client/client.go b/http/client/client.go index 64d990c7..b77b46a9 100644 --- a/http/client/client.go +++ b/http/client/client.go @@ -57,7 +57,8 @@ type RestClient interface { FilesystemDeleteFile(storage, path string) error // DELETE /v3/fs/{storage}/{path} FilesystemAddFile(storage, path string, data io.Reader) error // PUT /v3/fs/{storage}/{path} - Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) // POST /v3/events + Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) // POST /v3/events + MediaEvents(ctx context.Context, storage, pattern string) (<-chan api.MediaEvent, error) // GET /v3/fs/{storage}, GET /v3/(rtmp|srt) ProcessList(opts ProcessListOptions) ([]api.Process, error) // GET /v3/process ProcessAdd(p *app.Config, metadata map[string]any) error // POST /v3/process diff --git a/http/client/events.go b/http/client/events.go index 97909edb..9ac5ef82 100644 --- a/http/client/events.go +++ b/http/client/events.go @@ -4,13 +4,14 @@ import ( "context" "io" "net/http" + "net/url" "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/mem" ) -func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) { +func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) { buf := mem.Get() defer mem.Put(buf) @@ -25,16 +26,16 @@ func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-ch return nil, err } - channel := make(chan api.Event, 128) + channel := make(chan api.LogEvent, 128) - go func(stream io.ReadCloser, ch chan<- api.Event) { + go func(stream io.ReadCloser, ch chan<- api.LogEvent) { defer stream.Close() defer close(channel) decoder := json.NewDecoder(stream) for decoder.More() { - var event api.Event + var event api.LogEvent if err := decoder.Decode(&event); err == io.EOF { return } else if err != nil { @@ -57,3 +58,49 @@ func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-ch return channel, nil } + +func (r *restclient) MediaEvents(ctx context.Context, storage, pattern string) (<-chan api.MediaEvent, error) { + header := make(http.Header) + header.Set("Accept", "application/x-json-stream") + header.Set("Connection", "close") + + query := &url.Values{} + query.Set("glob", pattern) + + stream, err := r.stream(ctx, "POST", "/v3/events/media/"+url.PathEscape(storage), query, header, "", nil) + if err != nil { + return nil, err + } + + channel := make(chan api.MediaEvent, 128) + + go func(stream io.ReadCloser, ch chan<- api.MediaEvent) { + defer stream.Close() + defer close(channel) + + decoder := json.NewDecoder(stream) + + for decoder.More() { + var event api.MediaEvent + if err := decoder.Decode(&event); err == io.EOF { + return + } else if err != nil { + event.Action = "error" + event.Name = err.Error() + } + + // Don't emit keepalives + if event.Action == "keepalive" { + continue + } + + ch <- event + + if event.Action == "" || event.Action == "error" { + return + } + } + }(stream, channel) + + return channel, nil +} diff --git a/http/handler/api/cluster_events.go b/http/handler/api/cluster_events.go index 7072a44c..ae34899c 100644 --- a/http/handler/api/cluster_events.go +++ b/http/handler/api/cluster_events.go @@ -32,7 +32,7 @@ func (h *ClusterHandler) Events(c echo.Context) error { return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) } - filter := map[string]*api.EventFilter{} + filter := map[string]*api.LogEventFilter{} for _, f := range filters.Filters { f := f @@ -77,7 +77,7 @@ func (h *ClusterHandler) Events(c echo.Context) error { done := make(chan error, 1) - filterEvent := func(event *api.Event) bool { + filterEvent := func(event *api.LogEvent) bool { if len(filter) == 0 { return true } diff --git a/http/handler/api/events.go b/http/handler/api/events.go index bec1b2e2..f0036d34 100644 --- a/http/handler/api/events.go +++ b/http/handler/api/events.go @@ -3,31 +3,49 @@ package api import ( "net/http" "strings" + "sync" "time" "github.com/datarhei/core/v16/encoding/json" + "github.com/datarhei/core/v16/event" + "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/handler/util" "github.com/datarhei/core/v16/log" + "github.com/datarhei/core/v16/slices" "github.com/labstack/echo/v4" ) // The EventsHandler type provides handler functions for retrieving event. type EventsHandler struct { - events log.ChannelWriter + logs log.ChannelWriter + media map[string]event.MediaSource + lock sync.Mutex } // NewEvents returns a new EventsHandler type -func NewEvents(events log.ChannelWriter) *EventsHandler { +func NewEvents(logs log.ChannelWriter) *EventsHandler { return &EventsHandler{ - events: events, + logs: logs, + media: map[string]event.MediaSource{}, } } -// Events returns a stream of event -// @Summary Stream of events -// @Description Stream of event of whats happening in the core +func (h *EventsHandler) AddMediaSource(name string, source event.MediaSource) { + if source == nil { + return + } + + h.lock.Lock() + defer h.lock.Unlock() + + h.media[name] = source +} + +// LogEvents returns a stream of event +// @Summary Stream of log events +// @Description Stream of log event of whats happening in the core // @ID events // @Tags v16.?.? // @Accept json @@ -37,14 +55,14 @@ func NewEvents(events log.ChannelWriter) *EventsHandler { // @Success 200 {object} api.Event // @Security ApiKeyAuth // @Router /api/v3/events [post] -func (h *EventsHandler) Events(c echo.Context) error { +func (h *EventsHandler) LogEvents(c echo.Context) error { filters := api.EventFilters{} if err := util.ShouldBindJSON(c, &filters); err != nil { return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) } - filter := map[string]*api.EventFilter{} + filter := map[string]*api.LogEventFilter{} for _, f := range filters.Filters { f := f @@ -76,7 +94,7 @@ func (h *EventsHandler) Events(c echo.Context) error { res.Header().Set(echo.HeaderConnection, "close") res.WriteHeader(http.StatusOK) - evts, cancel := h.events.Subscribe() + evts, cancel := h.logs.Subscribe() defer cancel() enc := json.NewEncoder(res) @@ -84,7 +102,7 @@ func (h *EventsHandler) Events(c echo.Context) error { done := make(chan error, 1) - filterEvent := func(event *api.Event) bool { + filterEvent := func(event *api.LogEvent) bool { if len(filter) == 0 { return true } @@ -97,7 +115,7 @@ func (h *EventsHandler) Events(c echo.Context) error { return event.Filter(f) } - event := api.Event{} + event := api.LogEvent{} if contentType == "text/event-stream" { res.Write([]byte(":keepalive\n\n")) @@ -155,3 +173,130 @@ func (h *EventsHandler) Events(c echo.Context) error { } } } + +// LogEvents returns a stream of media event +// @Summary Stream of media events +// @Description Stream of media event of whats happening in the core +// @ID events +// @Tags v16.?.? +// @Accept json +// @Param glob query string false "glob pattern for media names" +// @Produce json-stream +// @Success 200 {object} api.Event +// @Security ApiKeyAuth +// @Router /api/v3/events/media/{type} [post] +func (h *EventsHandler) MediaEvents(c echo.Context) error { + pattern := util.DefaultQuery(c, "glob", "") + + var compiledPattern glob.Glob = nil + + if len(pattern) != 0 { + var err error + compiledPattern, err = glob.Compile(pattern, '/') + if err != nil { + return api.Err(http.StatusBadRequest, "", "invalid pattern: %w", err) + } + } + + mediaType := util.PathParam(c, "type") + + keepaliveTicker := time.NewTicker(5 * time.Second) + defer keepaliveTicker.Stop() + + listTicker := time.NewTicker(30 * time.Second) + defer listTicker.Stop() + + req := c.Request() + reqctx := req.Context() + + contentType := "application/x-json-stream" + + h.lock.Lock() + mediaSource, ok := h.media[mediaType] + h.lock.Unlock() + + if !ok { + return api.Err(http.StatusNotFound, "", "media source not found") + } + + evts, cancel, err := mediaSource.Events() + if err != nil { + return api.Err(http.StatusNotImplemented, "", "events are not implemented for this server") + } + defer cancel() + + res := c.Response() + + res.Header().Set(echo.HeaderContentType, contentType+"; charset=UTF-8") + res.Header().Set(echo.HeaderCacheControl, "no-store") + res.Header().Set(echo.HeaderConnection, "close") + res.WriteHeader(http.StatusOK) + + enc := json.NewEncoder(res) + enc.SetIndent("", "") + + done := make(chan error, 1) + + createList := func() api.MediaEvent { + list := mediaSource.MediaList() + + if compiledPattern != nil { + names := []string{} + + for _, l := range list { + if !compiledPattern.Match(l) { + continue + } + + names = append(names, l) + } + + list = names + } + + event := api.MediaEvent{ + Action: "list", + Names: slices.Copy(list), + Timestamp: time.Now().UnixMilli(), + } + + return event + } + + if err := enc.Encode(createList()); err != nil { + done <- err + } + res.Flush() + + for { + select { + case err := <-done: + return err + case <-reqctx.Done(): + done <- nil + case <-keepaliveTicker.C: + res.Write([]byte("{\"action\":\"keepalive\"}\n")) + res.Flush() + case <-listTicker.C: + if err := enc.Encode(createList()); err != nil { + done <- err + } + res.Flush() + case evt := <-evts: + e := evt.(*event.MediaEvent) + if compiledPattern != nil { + if !compiledPattern.Match(e.Name) { + continue + } + } + if err := enc.Encode(api.MediaEvent{ + Action: e.Action, + Name: e.Name, + Timestamp: e.Timestamp.UnixMilli(), + }); err != nil { + done <- err + } + res.Flush() + } + } +} diff --git a/http/handler/api/log.go b/http/handler/api/log.go index 8ef4dbed..4399236f 100644 --- a/http/handler/api/log.go +++ b/http/handler/api/log.go @@ -35,7 +35,7 @@ func NewLog(buffer log.BufferWriter) *LogHandler { // @ID log-3 // @Param format query string false "Format of the list of log events (*console, raw)" // @Produce json -// @Success 200 {array} api.LogEvent "application log" +// @Success 200 {array} api.LogEntries "application log" // @Success 200 {array} string "application log" // @Security ApiKeyAuth // @Router /api/v3/log [get] diff --git a/http/handler/api/log_test.go b/http/handler/api/log_test.go index f9747433..70f8f3aa 100644 --- a/http/handler/api/log_test.go +++ b/http/handler/api/log_test.go @@ -24,5 +24,5 @@ func TestLog(t *testing.T) { response := mock.Request(t, http.StatusOK, router, "GET", "/", nil) - mock.Validate(t, []api.LogEvent{}, response.Data) + mock.Validate(t, []api.LogEntries{}, response.Data) } diff --git a/http/handler/filesystem.go b/http/handler/filesystem.go index 3b7496e3..cac0ccb1 100644 --- a/http/handler/filesystem.go +++ b/http/handler/filesystem.go @@ -1,7 +1,6 @@ package handler import ( - "encoding/json" "errors" "fmt" "io" @@ -258,11 +257,6 @@ func (h *FSHandler) DeleteFiles(c echo.Context) error { } func (h *FSHandler) ListFiles(c echo.Context) error { - accept := c.Request().Header.Get(echo.HeaderAccept) - if strings.Contains(accept, "application/x-json-stream") || strings.Contains(accept, "text/event-stream") { - return h.ListFilesEvent(c) - } - pattern := util.DefaultQuery(c, "glob", "") sizeMin := util.DefaultQuery(c, "size_min", "0") sizeMax := util.DefaultQuery(c, "size_max", "0") @@ -353,115 +347,6 @@ func (h *FSHandler) ListFiles(c echo.Context) error { return c.JSON(http.StatusOK, fileinfos) } -func (h *FSHandler) ListFilesEvent(c echo.Context) error { - pattern := util.DefaultQuery(c, "glob", "") - - path := "/" - - if len(pattern) != 0 { - prefix := glob.Prefix(pattern) - index := strings.LastIndex(prefix, "/") - path = prefix[:index+1] - } - - var compiledPattern glob.Glob = nil - - if len(pattern) != 0 { - var err error - compiledPattern, err = glob.Compile(pattern, '/') - if err != nil { - return api.Err(http.StatusBadRequest, "", "invalid pattern: %w", err) - } - } - - options := fs.ListOptions{ - Pattern: pattern, - } - - keepaliveTicker := time.NewTicker(5 * time.Second) - defer keepaliveTicker.Stop() - - listTicker := time.NewTicker(30 * time.Second) - defer listTicker.Stop() - - req := c.Request() - reqctx := req.Context() - - contentType := "text/event-stream" - accept := req.Header.Get(echo.HeaderAccept) - if strings.Contains(accept, "application/x-json-stream") { - contentType = "application/x-json-stream" - } - - evts, cancel, err := h.FS.Filesystem.Events() - if err != nil { - return api.Err(http.StatusNotImplemented, "", "events are not implemented for this filesystem") - } - defer cancel() - - res := c.Response() - - res.Header().Set(echo.HeaderContentType, contentType+"; charset=UTF-8") - res.Header().Set(echo.HeaderCacheControl, "no-store") - res.Header().Set(echo.HeaderConnection, "close") - res.WriteHeader(http.StatusOK) - - enc := json.NewEncoder(res) - enc.SetIndent("", "") - - done := make(chan error, 1) - - createList := func() api.FilesystemEvent { - files := h.FS.Filesystem.List(path, options) - event := api.FilesystemEvent{ - Action: "list", - Names: make([]string, 0, len(files)), - Timestamp: time.Now().UnixMilli(), - } - for _, file := range files { - event.Names = append(event.Names, file.Name()) - } - - return event - } - - if err := enc.Encode(createList()); err != nil { - done <- err - } - res.Flush() - - for { - select { - case err := <-done: - return err - case <-reqctx.Done(): - done <- nil - case <-keepaliveTicker.C: - res.Write([]byte("{\"action\": \"keepalive\"}\n")) - res.Flush() - case <-listTicker.C: - if err := enc.Encode(createList()); err != nil { - done <- err - } - res.Flush() - case e := <-evts: - if compiledPattern != nil { - if !compiledPattern.Match(e.Name) { - continue - } - } - if err := enc.Encode(api.FilesystemEvent{ - Action: e.Action, - Name: e.Name, - Timestamp: e.Timestamp.UnixMilli(), - }); err != nil { - done <- err - } - res.Flush() - } - } -} - // From: github.com/golang/go/net/http/fs.go@7dc9fcb // errNoOverlap is returned by serveContent's parseRange if first-byte-pos of diff --git a/http/server.go b/http/server.go index 603e5b41..c0fa2efd 100644 --- a/http/server.go +++ b/http/server.go @@ -283,6 +283,12 @@ func NewServer(config Config) (serverhandler.Server, error) { config.LogEvents, ) + for name, fs := range s.filesystems { + s.v3handler.events.AddMediaSource(name, fs.Filesystem) + } + s.v3handler.events.AddMediaSource("srt", config.SRT) + s.v3handler.events.AddMediaSource("rtmp", config.RTMP) + if config.Restream != nil { s.v3handler.process = api.NewProcess( config.Restream, @@ -807,6 +813,8 @@ func (s *server) setRoutesV3(v3 *echo.Group) { // v3 Events if s.v3handler.events != nil { - v3.POST("/events", s.v3handler.events.Events) + v3.POST("/events", s.v3handler.events.LogEvents) + v3.POST("/events/log", s.v3handler.events.LogEvents) + v3.POST("/events/media/:type", s.v3handler.events.MediaEvents) } } diff --git a/io/fs/disk.go b/io/fs/disk.go index 12f60433..710172d5 100644 --- a/io/fs/disk.go +++ b/io/fs/disk.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/datarhei/core/v16/event" "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/log" ) @@ -140,7 +141,7 @@ type diskFilesystem struct { // Logger from the config logger log.Logger - events *EventWriter + events *event.PubSub } // NewDiskFilesystem returns a new filesystem that is backed by the disk filesystem. @@ -174,7 +175,7 @@ func NewDiskFilesystem(config DiskConfig) (Filesystem, error) { fs.logger = log.New("") } - fs.events = NewEventWriter() + fs.events = event.NewPubSub() return fs, nil } @@ -218,7 +219,7 @@ func NewRootedDiskFilesystem(config RootedDiskConfig) (Filesystem, error) { fs.logger = log.New("") } - fs.events = NewEventWriter() + fs.events = event.NewPubSub() return fs, nil } @@ -369,9 +370,9 @@ func (fs *diskFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int fs.lastSizeCheck = time.Time{} if replace { - fs.events.Publish(NewEvent("update", path)) + fs.events.Publish(event.NewMediaEvent("update", path)) } else { - fs.events.Publish(NewEvent("create", path)) + fs.events.Publish(event.NewMediaEvent("create", path)) } return size, !replace, nil @@ -437,7 +438,7 @@ func (fs *diskFilesystem) AppendFileReader(path string, r io.Reader, sizeHint in fs.lastSizeCheck = time.Time{} - fs.events.Publish(NewEvent("update", path)) + fs.events.Publish(event.NewMediaEvent("update", path)) return size, nil } @@ -461,8 +462,8 @@ func (fs *diskFilesystem) rename(src, dst string) error { // First try to rename the file if err := os.Rename(src, dst); err == nil { - fs.events.Publish(NewEvent("remove", src)) - fs.events.Publish(NewEvent("create", dst)) + fs.events.Publish(event.NewMediaEvent("create", dst)) + fs.events.Publish(event.NewMediaEvent("remove", src)) return nil } @@ -472,14 +473,14 @@ func (fs *diskFilesystem) rename(src, dst string) error { return fmt.Errorf("failed to copy files: %w", err) } - fs.events.Publish(NewEvent("create", dst)) + fs.events.Publish(event.NewMediaEvent("create", dst)) if err := os.Remove(src); err != nil { os.Remove(dst) return fmt.Errorf("failed to remove source file: %w", err) } - fs.events.Publish(NewEvent("remove", src)) + fs.events.Publish(event.NewMediaEvent("remove", src)) return nil } @@ -520,7 +521,7 @@ func (fs *diskFilesystem) copy(src, dst string) error { fs.lastSizeCheck = time.Time{} - fs.events.Publish(NewEvent("create", dst)) + fs.events.Publish(event.NewMediaEvent("create", dst)) return nil } @@ -574,7 +575,7 @@ func (fs *diskFilesystem) Remove(path string) int64 { fs.lastSizeCheck = time.Time{} - fs.events.Publish(NewEvent("remove", path)) + fs.events.Publish(event.NewMediaEvent("remove", path)) return size } @@ -642,7 +643,7 @@ func (fs *diskFilesystem) RemoveList(path string, options ListOptions) ([]string if err := os.Remove(path); err == nil { files = append(files, name) size += info.Size() - fs.events.Publish(NewEvent("remove", path)) + fs.events.Publish(event.NewMediaEvent("remove", path)) } }) @@ -792,8 +793,19 @@ func (fs *diskFilesystem) cleanPath(path string) string { return filepath.Join(fs.root, filepath.Clean(path)) } -func (fs *diskFilesystem) Events() (<-chan Event, EventsCancelFunc, error) { +func (fs *diskFilesystem) Events() (<-chan event.Event, event.CancelFunc, error) { ch, cancel := fs.events.Subscribe() return ch, cancel, nil } + +func (fs *diskFilesystem) MediaList() []string { + files := fs.List("/", ListOptions{}) + list := make([]string, 0, len(files)) + + for _, file := range files { + list = append(list, file.Name()) + } + + return list +} diff --git a/io/fs/event.go b/io/fs/event.go index 0956d4de..7b3251cf 100644 --- a/io/fs/event.go +++ b/io/fs/event.go @@ -1,139 +1,14 @@ package fs -import ( - "context" - "fmt" - "sync" - "time" +type Action string - "github.com/lithammer/shortuuid/v4" +const ( + ActionCreate Action = "create" + ActionUpdate Action = "update" + ActionRemove Action = "remove" + ActionList Action = "list" ) -type Event struct { - Action string - Name string - Timestamp time.Time -} - -func NewEvent(action, name string) Event { - return Event{ - Action: action, - Name: name, - Timestamp: time.Now(), - } -} - -func (e Event) clone() Event { - return Event{ - Action: e.Action, - Name: e.Name, - Timestamp: e.Timestamp, - } -} - -type EventsCancelFunc func() - -type EventWriter struct { - publisher chan Event - publisherClosed bool - publisherLock sync.Mutex - - ctx context.Context - cancel context.CancelFunc - - subscriber map[string]chan Event - subscriberLock sync.Mutex -} - -func NewEventWriter() *EventWriter { - w := &EventWriter{ - publisher: make(chan Event, 1024), - publisherClosed: false, - subscriber: make(map[string]chan Event), - } - - w.ctx, w.cancel = context.WithCancel(context.Background()) - - go w.broadcast() - - return w -} - -func (w *EventWriter) Publish(e Event) error { - event := e.clone() - - w.publisherLock.Lock() - defer w.publisherLock.Unlock() - - if w.publisherClosed { - return fmt.Errorf("writer is closed") - } - - select { - case w.publisher <- event: - default: - return fmt.Errorf("publisher queue full") - } - - return nil -} - -func (w *EventWriter) Close() { - w.cancel() - - w.publisherLock.Lock() - close(w.publisher) - w.publisherClosed = true - w.publisherLock.Unlock() - - w.subscriberLock.Lock() - for _, c := range w.subscriber { - close(c) - } - w.subscriber = make(map[string]chan Event) - w.subscriberLock.Unlock() -} - -func (w *EventWriter) Subscribe() (<-chan Event, EventsCancelFunc) { - l := make(chan Event, 1024) - - var id string = "" - - w.subscriberLock.Lock() - for { - id = shortuuid.New() - if _, ok := w.subscriber[id]; !ok { - w.subscriber[id] = l - break - } - } - w.subscriberLock.Unlock() - - unsubscribe := func() { - w.subscriberLock.Lock() - delete(w.subscriber, id) - w.subscriberLock.Unlock() - } - - return l, unsubscribe -} - -func (w *EventWriter) broadcast() { - for { - select { - case <-w.ctx.Done(): - return - case e := <-w.publisher: - w.subscriberLock.Lock() - for _, c := range w.subscriber { - pp := e.clone() - - select { - case c <- pp: - default: - } - } - w.subscriberLock.Unlock() - } - } +func (a Action) String() string { + return string(a) } diff --git a/io/fs/fs.go b/io/fs/fs.go index ed10a4bd..e3b15db5 100644 --- a/io/fs/fs.go +++ b/io/fs/fs.go @@ -7,6 +7,8 @@ import ( "io/fs" "os" "time" + + "github.com/datarhei/core/v16/event" ) var ErrExist = errors.New("file or directory already exists") @@ -85,7 +87,7 @@ type ReadFilesystem interface { // of that file is verfied. In case the file is not found, the error ErrNotExist will be returned. LookPath(file string) (string, error) - Events() (<-chan Event, EventsCancelFunc, error) + event.MediaSource } type WriteFilesystem interface { diff --git a/io/fs/mem.go b/io/fs/mem.go index fa40aea0..c89ad6ad 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/datarhei/core/v16/event" "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/mem" @@ -136,7 +137,7 @@ type memFilesystem struct { storage memStorage dirs *dirStorage - events *EventWriter + events *event.PubSub } type dirStorage struct { @@ -224,7 +225,7 @@ func NewMemFilesystem(config MemConfig) (Filesystem, error) { fs.logger.Debug().Log("Created") - fs.events = NewEventWriter() + fs.events = event.NewPubSub() return fs, nil } @@ -466,10 +467,10 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int) }) if replace { - fs.events.Publish(NewEvent("update", newFile.name)) + fs.events.Publish(event.NewMediaEvent("update", newFile.name)) logger.Debug().Log("Replaced file") } else { - fs.events.Publish(NewEvent("create", newFile.name)) + fs.events.Publish(event.NewMediaEvent("create", newFile.name)) logger.Debug().Log("Added file") } @@ -526,7 +527,7 @@ func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int "size_bytes": fs.currentSize, }).Log("Appended to file") - fs.events.Publish(NewEvent("update", file.name)) + fs.events.Publish(event.NewMediaEvent("update", file.name)) return size, nil } @@ -564,7 +565,7 @@ func (fs *memFilesystem) Purge(size int64) int64 { "size_bytes": fs.currentSize, }).Debug().Log("Purged file") - fs.events.Publish(NewEvent("remove", f.name)) + fs.events.Publish(event.NewMediaEvent("remove", f.name)) if size <= 0 { break @@ -607,23 +608,24 @@ func (fs *memFilesystem) Rename(src, dst string) error { } dstFile, replace := fs.storage.Store(dst, srcFile) + if replace { + fs.events.Publish(event.NewMediaEvent("update", dst)) + } else { + fs.events.Publish(event.NewMediaEvent("create", dst)) + } fs.storage.Delete(src) - fs.events.Publish(NewEvent("remove", src)) + fs.events.Publish(event.NewMediaEvent("remove", src)) fs.dirs.Remove(src) if !replace { fs.dirs.Add(dst) - - fs.events.Publish(NewEvent("create", dst)) } fs.sizeLock.Lock() defer fs.sizeLock.Unlock() if replace { - fs.events.Publish(NewEvent("update", dst)) - dstFile.Close() fs.currentSize -= dstFile.size @@ -660,14 +662,14 @@ func (fs *memFilesystem) Copy(src, dst string) error { if !replace { fs.dirs.Add(dst) - fs.events.Publish(NewEvent("create", dst)) + fs.events.Publish(event.NewMediaEvent("create", dst)) } fs.sizeLock.Lock() defer fs.sizeLock.Unlock() if replace { - fs.events.Publish(NewEvent("update", dst)) + fs.events.Publish(event.NewMediaEvent("update", dst)) replacedFile.Close() fs.currentSize -= replacedFile.size } @@ -754,7 +756,7 @@ func (fs *memFilesystem) remove(path string) int64 { "size_bytes": fs.currentSize, }).Debug().Log("Removed file") - fs.events.Publish(NewEvent("remove", file.name)) + fs.events.Publish(event.NewMediaEvent("remove", file.name)) return file.size } @@ -830,7 +832,7 @@ func (fs *memFilesystem) RemoveList(path string, options ListOptions) ([]string, file.Close() - fs.events.Publish(NewEvent("remove", file.name)) + fs.events.Publish(event.NewMediaEvent("remove", file.name)) } fs.sizeLock.Lock() @@ -948,8 +950,19 @@ func (fs *memFilesystem) cleanPath(path string) string { return filepath.Join("/", filepath.Clean(path)) } -func (fs *memFilesystem) Events() (<-chan Event, EventsCancelFunc, error) { +func (fs *memFilesystem) Events() (<-chan event.Event, event.CancelFunc, error) { ch, cancel := fs.events.Subscribe() return ch, cancel, nil } + +func (fs *memFilesystem) MediaList() []string { + files := fs.List("/", ListOptions{}) + list := make([]string, 0, len(files)) + + for _, file := range files { + list = append(list, file.Name()) + } + + return list +} diff --git a/io/fs/s3.go b/io/fs/s3.go index 3585deab..2affea7a 100644 --- a/io/fs/s3.go +++ b/io/fs/s3.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/datarhei/core/v16/event" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/mem" "github.com/minio/minio-go/v7" @@ -686,10 +687,14 @@ func (fs *s3Filesystem) cleanPath(path string) string { return filepath.Join("/", filepath.Clean(path))[1:] } -func (fs *s3Filesystem) Events() (<-chan Event, EventsCancelFunc, error) { +func (fs *s3Filesystem) Events() (<-chan event.Event, event.CancelFunc, error) { return nil, func() {}, fmt.Errorf("events are not implemented for this filesystem") } +func (fs *s3Filesystem) MediaList() []string { + return nil +} + type s3FileInfo struct { name string size int64 diff --git a/log/log.go b/log/log.go index 3ad1a822..a0965eb9 100644 --- a/log/log.go +++ b/log/log.go @@ -356,8 +356,8 @@ func (e *Event) Error() Logger { return clone } -func (l *Event) Write(p []byte) (int, error) { - l.Log("%s", strings.TrimSpace(string(p))) +func (e *Event) Write(p []byte) (int, error) { + e.Log("%s", strings.TrimSpace(string(p))) return len(p), nil } diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index ec842f23..1df7476f 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -12,6 +12,7 @@ import ( "github.com/datarhei/core/v16/cluster/node" enctoken "github.com/datarhei/core/v16/encoding/token" + "github.com/datarhei/core/v16/event" "github.com/datarhei/core/v16/iam" iamidentity "github.com/datarhei/core/v16/iam/identity" "github.com/datarhei/core/v16/log" @@ -79,6 +80,8 @@ type Server interface { // Channels return a list of currently publishing streams Channels() []string + + event.MediaSource } // server is an implementation of the Server interface @@ -101,6 +104,8 @@ type server struct { proxy *node.Manager iam iam.IAM + + events *event.PubSub } // New creates a new RTMP server according to the given config @@ -120,6 +125,7 @@ func New(config Config) (Server, error) { collector: config.Collector, proxy: config.Proxy, iam: config.IAM, + events: event.NewPubSub(), } if s.collector == nil { @@ -178,15 +184,18 @@ func (s *server) Close() { } } -// Channels returns the list of streams that are -// publishing currently +// Channels returns the list of streams that are publishing currently, excluding proxied channels func (s *server) Channels() []string { channels := []string{} s.lock.RLock() defer s.lock.RUnlock() - for key := range s.channels { + for key, channel := range s.channels { + if channel.isProxy { + continue + } + channels = append(channels, key) } @@ -366,6 +375,15 @@ func (s *server) handlePublish(conn *rtmp.Conn) { return } + // Check if this stream is already published on the cluster + if s.proxy != nil { + _, err = s.proxy.MediaGetURL("rtmp", playpath) + if err == nil { + s.log(identity, "PUBLISH", "CONFLICT", playpath, "already publishing", remote) + return + } + } + err = s.publish(conn, playpath, remote, identity, false) if err != nil { s.logger.WithField("path", conn.URL.Path).WithError(err).Log("") @@ -419,6 +437,10 @@ func (s *server) publish(src connection, playpath string, remote net.Addr, ident s.log(identity, "PUBLISH", "STREAM", playpath, stream.Type().String(), remote) } + if !isProxy { + s.events.Publish(event.NewMediaEvent("create", playpath)) + } + // Ingest the data, blocks until done avutil.CopyPackets(ch.queue, src) @@ -428,6 +450,10 @@ func (s *server) publish(src connection, playpath string, remote net.Addr, ident ch.Close() + if !isProxy { + s.events.Publish(event.NewMediaEvent("remove", playpath)) + } + s.log(identity, "PUBLISH", "STOP", playpath, "", remote) return nil @@ -493,3 +519,13 @@ func (s *server) findDomainFromPlaypath(path string) string { return "$none" } + +func (s *server) Events() (<-chan event.Event, event.CancelFunc, error) { + ch, cancel := s.events.Subscribe() + + return ch, cancel, nil +} + +func (s *server) MediaList() []string { + return s.Channels() +} diff --git a/srt/srt.go b/srt/srt.go index ba435f48..abd5ca1a 100644 --- a/srt/srt.go +++ b/srt/srt.go @@ -13,6 +13,7 @@ import ( "github.com/datarhei/core/v16/cluster/node" enctoken "github.com/datarhei/core/v16/encoding/token" + "github.com/datarhei/core/v16/event" "github.com/datarhei/core/v16/iam" iamidentity "github.com/datarhei/core/v16/iam/identity" "github.com/datarhei/core/v16/log" @@ -60,6 +61,8 @@ type Server interface { // Channels return a list of currently publishing streams Channels() []Channel + + event.MediaSource } // server implements the Server interface @@ -87,6 +90,8 @@ type server struct { proxy *node.Manager iam iam.IAM + + events *event.PubSub } func New(config Config) (Server, error) { @@ -98,6 +103,7 @@ func New(config Config) (Server, error) { iam: config.IAM, logger: config.Logger, proxy: config.Proxy, + events: event.NewPubSub(), } if s.collector == nil { @@ -369,6 +375,16 @@ func (s *server) publish(conn srt.Conn, isProxy bool) error { si, _ := url.ParseStreamId(streamId) identity, _ := s.findIdentityFromToken(si.Token) + // Check if this stream is already published on the cluster + if s.proxy != nil { + _, err := s.proxy.MediaGetURL("rtmp", si.Resource) + if err == nil { + s.log(identity, "PUBLISH", "CONFLICT", si.Resource, "already publishing", client) + conn.Close() + return fmt.Errorf("already publishing this resource") + } + } + // Look for the stream s.lock.Lock() ch := s.channels[si.Resource] @@ -388,6 +404,10 @@ func (s *server) publish(conn srt.Conn, isProxy bool) error { s.log(identity, "PUBLISH", "START", si.Resource, "", client) + if !isProxy { + s.events.Publish(event.NewMediaEvent("create", si.Resource)) + } + // Blocks until connection closes err := ch.pubsub.Publish(conn) @@ -397,6 +417,10 @@ func (s *server) publish(conn srt.Conn, isProxy bool) error { ch.Close() + if !isProxy { + s.events.Publish(event.NewMediaEvent("remove", si.Resource)) + } + s.log(identity, "PUBLISH", "STOP", si.Resource, err.Error(), client) conn.Close() @@ -561,3 +585,20 @@ func (s *server) findDomainFromPlaypath(path string) string { return "$none" } + +func (s *server) Events() (<-chan event.Event, event.CancelFunc, error) { + ch, cancel := s.events.Subscribe() + + return ch, cancel, nil +} + +func (s *server) MediaList() []string { + channels := s.Channels() + list := make([]string, 0, len(channels)) + + for _, channel := range channels { + list = append(list, channel.Name) + } + + return list +}