diff --git a/app/api/api.go b/app/api/api.go index 6b35eb79..b7d9c679 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -106,7 +106,7 @@ type api struct { log struct { writer io.Writer buffer log.BufferWriter - events log.ChannelWriter + events *log.ChannelWriter logger struct { core log.Logger main log.Logger diff --git a/cluster/node/core.go b/cluster/node/core.go index 22721166..7f643a93 100644 --- a/cluster/node/core.go +++ b/cluster/node/core.go @@ -13,6 +13,7 @@ import ( "time" "github.com/datarhei/core/v16/config" + "github.com/datarhei/core/v16/event" "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/client" "github.com/datarhei/core/v16/log" @@ -99,6 +100,11 @@ type Core struct { media map[string]*Media mediaLock sync.RWMutex + events struct { + log *event.PubSub + process *event.PubSub + } + logger log.Logger } @@ -111,6 +117,9 @@ func NewCore(id string, logger log.Logger) *Core { media: map[string]*Media{}, } + core.events.log = event.NewPubSub() + core.events.process = event.NewPubSub() + if core.logger == nil { core.logger = log.New("") } @@ -174,6 +183,9 @@ func (n *Core) Stop() { n.cancel() n.cancel = nil + n.events.process.Close() + n.events.log.Close() + n.disconnect() } @@ -321,6 +333,9 @@ func (n *Core) connect() error { go n.mediaEvents(ctx, "rtmp") go n.mediaEvents(ctx, "srt") + go n.logEvents(ctx) + go n.processEvents(ctx) + n.lock.Unlock() return nil @@ -933,14 +948,114 @@ func (n *Core) ClusterProcessList() ([]Process, error) { return processes, nil } -func (n *Core) Events(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) { - n.lock.RLock() - client := n.client - n.lock.RUnlock() +func (n *Core) logEvents(ctx context.Context) { + defer func() { + n.logger.Warn().WithField("source", "log").Log("Disconnected from event source") + }() - if client == nil { - return nil, ErrNoPeer + for { + select { + case <-ctx.Done(): + return + default: + } + + n.lock.RLock() + client := n.client + n.lock.RUnlock() + + if client == nil { + n.logger.Error().WithField("source", "log").Log("Failed to connect to event source, client not connected") + time.Sleep(5 * time.Second) + continue + } + + ch, err := client.LogEvents(ctx, api.LogEventFilters{}) + if err != nil { + n.logger.Error().WithField("source", "log").WithError(err).Log("Failed to connect to event source") + time.Sleep(5 * time.Second) + continue + } + + n.logger.Info().WithField("source", "log").Log("Connected to event source") + + innerloop: + for { + select { + case <-ctx.Done(): + return + case e, ok := <-ch: + if !ok { + break innerloop + } + + e.CoreID = n.id + + n.events.log.Publish(e.Marshal()) + } + } + + n.logger.Info().WithField("source", "process").Log("Reconnecting to event source") + time.Sleep(5 * time.Second) } - - return client.Events(ctx, filters) +} + +func (n *Core) LogEventsSource() event.EventSource { + return n.events.log.EventSource() +} + +func (n *Core) processEvents(ctx context.Context) { + defer func() { + n.logger.Warn().WithField("source", "process").Log("Disconnected from event source") + }() + + for { + select { + case <-ctx.Done(): + return + default: + } + + n.lock.RLock() + client := n.client + n.lock.RUnlock() + + if client == nil { + n.logger.Error().WithField("source", "process").Log("Failed to connect to event source, client not connected") + time.Sleep(5 * time.Second) + continue + } + + ch, err := client.ProcessEvents(ctx, api.ProcessEventFilters{}) + if err != nil { + n.logger.Error().WithField("source", "process").WithError(err).Log("Failed to connect to event source") + time.Sleep(5 * time.Second) + continue + } + + n.logger.Info().WithField("source", "process").Log("Connected to event source") + + innerloop: + for { + select { + case <-ctx.Done(): + return + case e, ok := <-ch: + if !ok { + break innerloop + } + + e.CoreID = n.id + + n.events.process.Publish(e.Marshal()) + } + } + + n.logger.Info().WithField("source", "process").Log("Reconnecting to event source") + time.Sleep(5 * time.Second) + } +} + +func (n *Core) ProcessEventsSource() event.EventSource { + return n.events.process.EventSource() } diff --git a/cluster/node/manager.go b/cluster/node/manager.go index 3cc151bb..3d2417f3 100644 --- a/cluster/node/manager.go +++ b/cluster/node/manager.go @@ -1,7 +1,6 @@ package node import ( - "context" "errors" "fmt" "io" @@ -11,6 +10,7 @@ import ( "sync" "time" + "github.com/datarhei/core/v16/event" "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/client" "github.com/datarhei/core/v16/log" @@ -34,6 +34,11 @@ type Manager struct { cache *Cache[string] logger log.Logger + + events struct { + log *event.PubSub + process *event.PubSub + } } var ErrNodeNotFound = errors.New("node not found") @@ -50,6 +55,9 @@ func NewManager(config ManagerConfig) (*Manager, error) { p.logger = log.New("") } + p.events.log = event.NewPubSub() + p.events.process = event.NewPubSub() + return p, nil } @@ -66,6 +74,9 @@ func (p *Manager) NodeAdd(id string, node *Node) (string, error) { p.nodes[id] = node + p.events.log.Consume(node.Core().LogEventsSource(), nil) + p.events.process.Consume(node.Core().ProcessEventsSource(), nil) + p.logger.Info().WithFields(log.Fields{ "address": about.Address, "name": about.Name, @@ -655,24 +666,14 @@ func (p *Manager) ProcessValidateConfig(nodeid string, config *app.Config) error return node.Core().ProcessValidateConfig(config) } -func (p *Manager) LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) { - eventChan := make(chan api.LogEvent, 128) +func (p *Manager) LogEvents() (<-chan event.Event, event.CancelFunc, error) { + ch, cancel := p.events.log.Subscribe() - p.lock.RLock() - for _, n := range p.nodes { - go func(node *Node, e chan<- api.LogEvent) { - eventChan, err := node.Core().Events(ctx, filters) - if err != nil { - return - } - - for event := range eventChan { - event.CoreID = node.id - e <- event - } - }(n, eventChan) - } - p.lock.RUnlock() - - return eventChan, nil + return ch, cancel, nil +} + +func (p *Manager) ProcessEvents() (<-chan event.Event, event.CancelFunc, error) { + ch, cancel := p.events.process.Subscribe() + + return ch, cancel, nil } diff --git a/event/event.go b/event/event.go index de811132..34700726 100644 --- a/event/event.go +++ b/event/event.go @@ -18,6 +18,16 @@ type EventSource interface { Events() (<-chan Event, CancelFunc, error) } +type eventSource struct { + pubsub *PubSub +} + +func (s *eventSource) Events() (<-chan Event, CancelFunc, error) { + ch, cancel := s.pubsub.Subscribe() + + return ch, cancel, nil +} + type PubSub struct { publisher chan Event publisherClosed bool @@ -44,6 +54,10 @@ func NewPubSub() *PubSub { return w } +func (w *PubSub) EventSource() EventSource { + return &eventSource{pubsub: w} +} + func (w *PubSub) Consume(s EventSource, rewrite func(e Event) Event) { ch, cancel, err := s.Events() if err != nil { @@ -111,7 +125,7 @@ func (w *PubSub) Close() { } func (w *PubSub) Subscribe() (<-chan Event, CancelFunc) { - l := make(chan Event, 1024) + l := make(chan Event, 128) var id string = "" diff --git a/event/log.go b/event/log.go new file mode 100644 index 00000000..c513cd02 --- /dev/null +++ b/event/log.go @@ -0,0 +1,42 @@ +package event + +import ( + "maps" + "time" +) + +type LogEvent struct { + Time time.Time + Level string + Component string + Caller string + Message string + CoreID string + + Data map[string]any +} + +func (e *LogEvent) Clone() Event { + evt := &LogEvent{ + Time: e.Time, + Level: e.Level, + Component: e.Component, + Caller: e.Caller, + Message: e.Message, + CoreID: e.CoreID, + Data: maps.Clone(e.Data), + } + + return evt +} + +func NewLogEvent(ts time.Time, level, component, caller, message string, data map[string]any) *LogEvent { + return &LogEvent{ + Time: ts, + Level: level, + Component: component, + Caller: caller, + Message: message, + Data: maps.Clone(data), + } +} diff --git a/event/process.go b/event/process.go index 75a2f801..9a96d66f 100644 --- a/event/process.go +++ b/event/process.go @@ -11,6 +11,7 @@ type ProcessEvent struct { Line string Progress *ProcessProgress Timestamp time.Time + CoreID string } func (e *ProcessEvent) Clone() Event { @@ -20,6 +21,7 @@ func (e *ProcessEvent) Clone() Event { Type: e.Type, Line: e.Line, Timestamp: e.Timestamp, + CoreID: e.CoreID, } if e.Progress != nil { diff --git a/http/api/event.go b/http/api/event.go index ac628536..ca1cfcb3 100644 --- a/http/api/event.go +++ b/http/api/event.go @@ -4,15 +4,15 @@ import ( "fmt" "regexp" "strings" + "time" "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/event" - "github.com/datarhei/core/v16/log" ) type LogEvent struct { Timestamp int64 `json:"ts" format:"int64"` - Level int `json:"level"` + Level string `json:"level"` Component string `json:"event"` Message string `json:"message"` Caller string `json:"caller"` @@ -21,16 +21,22 @@ type LogEvent struct { Data map[string]string `json:"data"` } -func (e *LogEvent) Unmarshal(le *log.Event) { - e.Timestamp = le.Time.Unix() - e.Level = int(le.Level) - e.Component = strings.ToLower(le.Component) - e.Message = le.Message - e.Caller = le.Caller +func (e *LogEvent) Unmarshal(le event.Event) bool { + evt, ok := le.(*event.LogEvent) + if !ok { + return false + } + + e.Timestamp = evt.Time.Unix() + e.Level = evt.Level + e.Component = strings.ToLower(evt.Component) + e.Message = evt.Message + e.Caller = evt.Caller + e.CoreID = evt.CoreID e.Data = make(map[string]string) - for k, v := range le.Data { + for k, v := range evt.Data { var value string switch val := v.(type) { @@ -52,6 +58,26 @@ func (e *LogEvent) Unmarshal(le *log.Event) { e.Data[k] = value } + + return true +} + +func (e *LogEvent) Marshal() event.Event { + evt := &event.LogEvent{ + Time: time.UnixMilli(e.Timestamp), + Level: e.Level, + Component: e.Component, + Caller: e.Caller, + Message: e.Message, + CoreID: e.CoreID, + Data: map[string]any{}, + } + + for k, v := range e.Data { + evt.Data[k] = v + } + + return evt } func (e *LogEvent) Filter(ef *LogEventFilter) bool { @@ -62,8 +88,7 @@ func (e *LogEvent) Filter(ef *LogEventFilter) bool { } if ef.reLevel != nil { - level := log.Level(e.Level).String() - if !ef.reLevel.MatchString(level) { + if !ef.reLevel.MatchString(e.Level) { return false } } @@ -191,6 +216,7 @@ type ProcessEvent struct { Type string `json:"type"` Line string `json:"line,omitempty"` Progress *ProcessProgress `json:"progress,omitempty"` + CoreID string `json:"core_id,omitempty"` Timestamp int64 `json:"ts"` } @@ -200,6 +226,22 @@ type ProcessProgressInput struct { AVstream ProcessProgressInputAVstream `json:"avstream"` } +func (p *ProcessProgressInput) Marshal() event.ProcessProgressInput { + o := event.ProcessProgressInput{} + + if x, err := p.Bitrate.Float64(); err == nil { + o.Bitrate = x + } + + if x, err := p.FPS.Float64(); err == nil { + o.FPS = x + } + + o.AVstream = p.AVstream.Marshal() + + return o +} + type ProcessProgressInputAVstream struct { Looping bool `json:"looping"` Enc uint64 `json:"enc"` @@ -208,11 +250,37 @@ type ProcessProgressInputAVstream struct { Time uint64 `json:"time"` } +func (p *ProcessProgressInputAVstream) Marshal() event.ProcessProgressInputAVstream { + o := event.ProcessProgressInputAVstream{ + Looping: p.Looping, + Enc: p.Enc, + Drop: p.Drop, + Dup: p.Dup, + Time: p.Time, + } + + return o +} + type ProcessProgressOutput struct { Bitrate json.Number `json:"bitrate" swaggertype:"number" jsonschema:"type=number"` FPS json.Number `json:"fps" swaggertype:"number" jsonschema:"type=number"` } +func (p *ProcessProgressOutput) Marshal() event.ProcessProgressOutput { + o := event.ProcessProgressOutput{} + + if x, err := p.Bitrate.Float64(); err == nil { + o.Bitrate = x + } + + if x, err := p.FPS.Float64(); err == nil { + o.FPS = x + } + + return o +} + type ProcessProgress struct { Input []ProcessProgressInput `json:"input"` Output []ProcessProgressOutput `json:"output"` @@ -244,6 +312,24 @@ func (p *ProcessProgress) Unmarshal(e *event.ProcessProgress) { p.Time = json.ToNumber(e.Time) } +func (p *ProcessProgress) Marshal() *event.ProcessProgress { + e := &event.ProcessProgress{} + + if x, err := p.Time.Float64(); err == nil { + e.Time = x + } + + for _, input := range p.Input { + e.Input = append(e.Input, input.Marshal()) + } + + for _, output := range p.Output { + e.Output = append(e.Output, output.Marshal()) + } + + return e +} + func (p *ProcessEvent) Unmarshal(e event.Event) bool { evt, ok := e.(*event.ProcessEvent) if !ok { @@ -261,10 +347,29 @@ func (p *ProcessEvent) Unmarshal(e event.Event) bool { p.Progress.Unmarshal(evt.Progress) } p.Timestamp = evt.Timestamp.UnixMilli() + p.CoreID = evt.CoreID return true } +func (p *ProcessEvent) Marshal() event.Event { + evt := &event.ProcessEvent{ + ProcessID: p.ProcessID, + Domain: p.Domain, + Type: p.Type, + Line: p.Line, + Progress: nil, + Timestamp: time.UnixMilli(p.Timestamp), + CoreID: p.CoreID, + } + + if p.Progress != nil { + evt.Progress = p.Progress.Marshal() + } + + return evt +} + func (e *ProcessEvent) Filter(ef *ProcessEventFilter) bool { if ef.reProcessID != nil { if !ef.reProcessID.MatchString(e.ProcessID) { @@ -284,6 +389,12 @@ func (e *ProcessEvent) Filter(ef *ProcessEventFilter) bool { } } + if ef.reCoreID != nil { + if !ef.reCoreID.MatchString(e.CoreID) { + return false + } + } + return true } @@ -291,10 +402,12 @@ type ProcessEventFilter struct { ProcessID string `json:"pid"` Domain string `json:"domain"` Type string `json:"type"` + CoreID string `json:"core_id"` reProcessID *regexp.Regexp reDomain *regexp.Regexp reType *regexp.Regexp + reCoreID *regexp.Regexp } type ProcessEventFilters struct { @@ -329,5 +442,14 @@ func (ef *ProcessEventFilter) Compile() error { ef.reType = r } + if len(ef.CoreID) != 0 { + r, err := regexp.Compile("(?i)" + ef.CoreID) + if err != nil { + return err + } + + ef.reCoreID = r + } + return nil } diff --git a/http/api/event_test.go b/http/api/event_test.go index e169d184..44e9c8e8 100644 --- a/http/api/event_test.go +++ b/http/api/event_test.go @@ -9,7 +9,7 @@ import ( func TestEventFilter(t *testing.T) { event := LogEvent{ Timestamp: 1234, - Level: 3, + Level: "info", Component: "foobar", Message: "none", Data: map[string]string{ @@ -83,7 +83,7 @@ func TestEventFilter(t *testing.T) { func TestEventFilterDataKey(t *testing.T) { event := LogEvent{ Timestamp: 1234, - Level: 3, + Level: "info", Component: "foobar", Message: "none", Data: map[string]string{ @@ -137,7 +137,7 @@ func TestEventFilterDataKey(t *testing.T) { func BenchmarkEventFilters(b *testing.B) { event := LogEvent{ Timestamp: 1234, - Level: 3, + Level: "info", Component: "foobar", Message: "none", Data: map[string]string{ @@ -159,7 +159,7 @@ func BenchmarkEventFilters(b *testing.B) { res := event.Filter(&levelfilter) require.True(b, res) - for i := 0; i < b.N; i++ { + for b.Loop() { event.Filter(&levelfilter) } } diff --git a/http/client/client.go b/http/client/client.go index 542c85e3..1963befb 100644 --- a/http/client/client.go +++ b/http/client/client.go @@ -57,8 +57,9 @@ type RestClient interface { FilesystemDeleteFile(storage, path string) error // DELETE /v3/fs/{storage}/{path} FilesystemAddFile(storage, path string, data io.Reader) error // PUT /v3/fs/{storage}/{path} - Events(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) // POST /v3/events - MediaEvents(ctx context.Context, storage, pattern string) (<-chan api.MediaEvent, error) // GET /v3/events/media/{storage} + LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) // POST /v3/events + MediaEvents(ctx context.Context, storage, pattern string) (<-chan api.MediaEvent, error) // GET /v3/events/media/{storage} + ProcessEvents(ctx context.Context, filters api.ProcessEventFilters) (<-chan api.ProcessEvent, error) // POST /v3/events/process ProcessList(opts ProcessListOptions) ([]api.Process, error) // GET /v3/process ProcessAdd(p *app.Config, metadata map[string]any) error // POST /v3/process diff --git a/http/client/events.go b/http/client/events.go index f5fd7d41..0f33c0f2 100644 --- a/http/client/events.go +++ b/http/client/events.go @@ -11,7 +11,7 @@ import ( "github.com/datarhei/core/v16/mem" ) -func (r *restclient) Events(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) { +func (r *restclient) LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) { buf := mem.Get() defer mem.Put(buf) @@ -104,3 +104,51 @@ func (r *restclient) MediaEvents(ctx context.Context, storage, pattern string) ( return channel, nil } + +func (r *restclient) ProcessEvents(ctx context.Context, filters api.ProcessEventFilters) (<-chan api.ProcessEvent, error) { + buf := mem.Get() + defer mem.Put(buf) + + e := json.NewEncoder(buf) + e.Encode(filters) + + header := make(http.Header) + header.Set("Accept", "application/x-json-stream") + + stream, err := r.stream(ctx, "POST", "/v3/events/process", nil, header, "application/json", buf.Reader()) + if err != nil { + return nil, err + } + + channel := make(chan api.ProcessEvent, 128) + + go func(stream io.ReadCloser, ch chan<- api.ProcessEvent) { + defer stream.Close() + defer close(channel) + + decoder := json.NewDecoder(stream) + + for decoder.More() { + var event api.ProcessEvent + if err := decoder.Decode(&event); err == io.EOF { + return + } else if err != nil { + event.Type = "error" + event.Line = err.Error() + } + + // Don't emit keepalives + if event.Type == "keepalive" { + continue + } + + ch <- event + + if event.Type == "" || event.Type == "error" { + return + } + } + }(stream, channel) + + return channel, nil +} diff --git a/http/handler/api/cluster_events.go b/http/handler/api/cluster_events.go index a659b534..a3f3f892 100644 --- a/http/handler/api/cluster_events.go +++ b/http/handler/api/cluster_events.go @@ -1,8 +1,9 @@ package api import ( - "context" + "fmt" "net/http" + goslices "slices" "strings" "time" @@ -13,10 +14,10 @@ import ( "github.com/labstack/echo/v4" ) -// Events returns a stream of log event +// LogEvents returns a stream of log event // @Summary Stream of log events // @Description Stream of log events of whats happening on each node in the cluster -// @ID cluster-3-events +// @ID cluster-3-events-log // @Tags v16.?.? // @Accept json // @Produce text/event-stream @@ -25,7 +26,7 @@ import ( // @Success 200 {object} api.LogEvent // @Security ApiKeyAuth // @Router /api/v3/cluster/events [post] -func (h *ClusterHandler) Events(c echo.Context) error { +func (h *ClusterHandler) LogEvents(c echo.Context) error { filters := api.LogEventFilters{} if err := util.ShouldBindJSON(c, &filters); err != nil { @@ -64,19 +65,15 @@ func (h *ClusterHandler) Events(c echo.Context) error { res.Header().Set(echo.HeaderConnection, "close") res.WriteHeader(http.StatusOK) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - evts, err := h.proxy.LogEvents(ctx, filters) + evts, cancel, err := h.proxy.LogEvents() if err != nil { return api.Err(http.StatusInternalServerError, "", "%s", err.Error()) } + defer cancel() enc := json.NewEncoder(res) enc.SetIndent("", "") - done := make(chan error, 1) - filterEvent := func(event *api.LogEvent) bool { if len(filter) == 0 { return true @@ -90,27 +87,33 @@ func (h *ClusterHandler) Events(c echo.Context) error { return event.Filter(f) } + event := api.LogEvent{} + if contentType == "text/event-stream" { res.Write([]byte(":keepalive\n\n")) res.Flush() for { select { - case err := <-done: - return err case <-reqctx.Done(): - done <- nil + return nil case <-ticker.C: res.Write([]byte(":keepalive\n\n")) res.Flush() - case event := <-evts: + case e, ok := <-evts: + if !ok { + return fmt.Errorf("channel closed") + } + + event.Unmarshal(e) + if !filterEvent(&event) { continue } res.Write([]byte("event: " + event.Component + "\ndata: ")) if err := enc.Encode(event); err != nil { - done <- err + return err } res.Write([]byte("\n")) res.Flush() @@ -122,23 +125,122 @@ func (h *ClusterHandler) Events(c echo.Context) error { for { select { - case err := <-done: - return err case <-reqctx.Done(): - done <- nil + return nil case <-ticker.C: res.Write([]byte("{\"event\": \"keepalive\"}\n")) res.Flush() - case event := <-evts: + case e, ok := <-evts: + if !ok { + return fmt.Errorf("channel closed") + } + + event.Unmarshal(e) + if !filterEvent(&event) { continue } if err := enc.Encode(event); err != nil { - done <- err + return err } res.Flush() } } } } + +// ProcessEvents returns a stream of process event +// @Summary Stream of process events +// @Description Stream of process events of whats happening on each node in the cluster +// @ID cluster-3-events-process +// @Tags v16.?.? +// @Accept json +// @Produce json-stream +// @Param filters body api.ProcessEventFilters false "Event filters" +// @Success 200 {object} api.ProcessEvent +// @Security ApiKeyAuth +// @Router /api/v3/cluster/events/process [post] +func (h *ClusterHandler) ProcessEvents(c echo.Context) error { + filters := api.ProcessEventFilters{} + + if err := util.ShouldBindJSON(c, &filters); err != nil { + return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) + } + + filter := []*api.ProcessEventFilter{} + + for _, f := range filters.Filters { + f := f + + if err := f.Compile(); err != nil { + return api.Err(http.StatusBadRequest, "", "invalid filter: %s", err.Error()) + } + + filter = append(filter, &f) + } + + keepaliveTicker := time.NewTicker(5 * time.Second) + defer keepaliveTicker.Stop() + + req := c.Request() + reqctx := req.Context() + + contentType := "application/x-json-stream" + + evts, cancel, err := h.proxy.ProcessEvents() + if err != nil { + return api.Err(http.StatusNotImplemented, "", "events are not implemented for this server") + } + defer cancel() + + res := c.Response() + + res.Header().Set(echo.HeaderContentType, contentType+"; charset=UTF-8") + res.Header().Set(echo.HeaderCacheControl, "no-store") + res.Header().Set(echo.HeaderConnection, "close") + res.WriteHeader(http.StatusOK) + + enc := json.NewEncoder(res) + enc.SetIndent("", "") + + filterEvent := func(event *api.ProcessEvent) bool { + if len(filter) == 0 { + return true + } + + return goslices.ContainsFunc(filter, event.Filter) + } + + event := api.ProcessEvent{} + + for { + select { + case <-reqctx.Done(): + return nil + case <-keepaliveTicker.C: + _, err := res.Write([]byte("{\"type\":\"keepalive\"}\n")) + if err != nil { + return err + } + res.Flush() + case e, ok := <-evts: + if !ok { + return fmt.Errorf("channel closed") + } + + if !event.Unmarshal(e) { + continue + } + + if !filterEvent(&event) { + continue + } + + if err := enc.Encode(event); err != nil { + return err + } + res.Flush() + } + } +} diff --git a/http/handler/api/events.go b/http/handler/api/events.go index 249c49f5..2a35020c 100644 --- a/http/handler/api/events.go +++ b/http/handler/api/events.go @@ -1,6 +1,7 @@ package api import ( + "fmt" "net/http" goslices "slices" "strings" @@ -12,7 +13,6 @@ import ( "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/handler/util" - "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/slices" "github.com/labstack/echo/v4" @@ -20,16 +20,15 @@ import ( // The EventsHandler type provides handler functions for retrieving event. type EventsHandler struct { - logs log.ChannelWriter + logs event.EventSource media map[string]event.MediaSource process event.EventSource lock sync.Mutex } // NewEvents returns a new EventsHandler type -func NewEvents(logs log.ChannelWriter) *EventsHandler { +func NewEvents() *EventsHandler { return &EventsHandler{ - logs: logs, media: map[string]event.MediaSource{}, } } @@ -56,6 +55,17 @@ func (h *EventsHandler) SetProcessSource(source event.EventSource) { h.process = source } +func (h *EventsHandler) SetLogSource(source event.EventSource) { + if source == nil { + return + } + + h.lock.Lock() + defer h.lock.Unlock() + + h.logs = source +} + // LogEvents returns a stream of event // @Summary Stream of log events // @Description Stream of log event of whats happening in the core @@ -107,14 +117,15 @@ func (h *EventsHandler) LogEvents(c echo.Context) error { res.Header().Set(echo.HeaderConnection, "close") res.WriteHeader(http.StatusOK) - evts, cancel := h.logs.Subscribe() + evts, cancel, err := h.logs.Events() + if err != nil { + return api.Err(http.StatusNotImplemented, "", "events are not implemented for this server") + } defer cancel() enc := json.NewEncoder(res) enc.SetIndent("", "") - done := make(chan error, 1) - filterEvent := func(event *api.LogEvent) bool { if len(filter) == 0 { return true @@ -136,15 +147,17 @@ func (h *EventsHandler) LogEvents(c echo.Context) error { for { select { - case err := <-done: - return err case <-reqctx.Done(): - done <- nil + return nil case <-ticker.C: res.Write([]byte(":keepalive\n\n")) res.Flush() - case e := <-evts: - event.Unmarshal(&e) + case e, ok := <-evts: + if !ok { + return fmt.Errorf("channel closed") + } + + event.Unmarshal(e) if !filterEvent(&event) { continue @@ -152,7 +165,7 @@ func (h *EventsHandler) LogEvents(c echo.Context) error { res.Write([]byte("event: " + event.Component + "\ndata: ")) if err := enc.Encode(event); err != nil { - done <- err + return err } res.Write([]byte("\n")) res.Flush() @@ -164,22 +177,24 @@ func (h *EventsHandler) LogEvents(c echo.Context) error { for { select { - case err := <-done: - return err case <-reqctx.Done(): - done <- nil + return nil case <-ticker.C: res.Write([]byte("{\"event\": \"keepalive\"}\n")) res.Flush() - case e := <-evts: - event.Unmarshal(&e) + case e, ok := <-evts: + if !ok { + return fmt.Errorf("channel closed") + } + + event.Unmarshal(e) if !filterEvent(&event) { continue } if err := enc.Encode(event); err != nil { - done <- err + return err } res.Flush() } @@ -248,8 +263,6 @@ func (h *EventsHandler) MediaEvents(c echo.Context) error { enc := json.NewEncoder(res) enc.SetIndent("", "") - done := make(chan error, 1) - createList := func() api.MediaEvent { list := mediaSource.MediaList() @@ -277,7 +290,7 @@ func (h *EventsHandler) MediaEvents(c echo.Context) error { } if err := enc.Encode(createList()); err != nil { - done <- err + return err } res.Flush() @@ -285,19 +298,21 @@ func (h *EventsHandler) MediaEvents(c echo.Context) error { for { select { - case err := <-done: - return err case <-reqctx.Done(): - done <- nil + return nil case <-keepaliveTicker.C: res.Write([]byte("{\"action\":\"keepalive\"}\n")) res.Flush() case <-listTicker.C: if err := enc.Encode(createList()); err != nil { - done <- err + return err } res.Flush() - case e := <-evts: + case e, ok := <-evts: + if !ok { + return fmt.Errorf("channel closed") + } + if !event.Unmarshal(e) { continue } @@ -309,7 +324,7 @@ func (h *EventsHandler) MediaEvents(c echo.Context) error { } if err := enc.Encode(event); err != nil { - done <- err + return err } res.Flush() } @@ -370,8 +385,6 @@ func (h *EventsHandler) ProcessEvents(c echo.Context) error { enc := json.NewEncoder(res) enc.SetIndent("", "") - done := make(chan error, 1) - filterEvent := func(event *api.ProcessEvent) bool { if len(filter) == 0 { return true @@ -384,14 +397,19 @@ func (h *EventsHandler) ProcessEvents(c echo.Context) error { for { select { - case err := <-done: - return err case <-reqctx.Done(): - done <- nil + return nil case <-keepaliveTicker.C: - res.Write([]byte("{\"type\":\"keepalive\"}\n")) + _, err := res.Write([]byte("{\"type\":\"keepalive\"}\n")) + if err != nil { + return err + } res.Flush() - case e := <-evts: + case e, ok := <-evts: + if !ok { + return fmt.Errorf("channel closed") + } + if !event.Unmarshal(e) { continue } @@ -401,7 +419,7 @@ func (h *EventsHandler) ProcessEvents(c echo.Context) error { } if err := enc.Encode(event); err != nil { - done <- err + return err } res.Flush() } diff --git a/http/server.go b/http/server.go index c92b2a11..6831a8cd 100644 --- a/http/server.go +++ b/http/server.go @@ -38,6 +38,7 @@ import ( "github.com/datarhei/core/v16/cluster" cfgstore "github.com/datarhei/core/v16/config/store" + "github.com/datarhei/core/v16/event" "github.com/datarhei/core/v16/http/cache" "github.com/datarhei/core/v16/http/errorhandler" "github.com/datarhei/core/v16/http/fs" @@ -84,7 +85,7 @@ var ListenAndServe = http.ListenAndServe type Config struct { Logger log.Logger LogBuffer log.BufferWriter - LogEvents log.ChannelWriter + LogEvents event.EventSource Restream restream.Restreamer Metrics monitor.HistoryReader Prometheus prometheus.Reader @@ -279,10 +280,9 @@ func NewServer(config Config) (serverhandler.Server, error) { config.LogBuffer, ) - s.v3handler.events = api.NewEvents( - config.LogEvents, - ) + s.v3handler.events = api.NewEvents() + s.v3handler.events.SetLogSource(config.LogEvents) for name, fs := range s.filesystems { s.v3handler.events.AddMediaSource(name, fs.Filesystem) } @@ -773,7 +773,9 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.GET("/cluster/fs/:storage", s.v3handler.cluster.FilesystemListFiles) - v3.POST("/cluster/events", s.v3handler.cluster.Events) + v3.POST("/cluster/events", s.v3handler.cluster.LogEvents) + v3.POST("/cluster/events/log", s.v3handler.cluster.LogEvents) + v3.POST("/cluster/events/process", s.v3handler.cluster.ProcessEvents) if !s.readOnly { v3.PUT("/cluster/transfer/:id", s.v3handler.cluster.TransferLeadership) diff --git a/log/log.go b/log/log.go index bc06e057..9484c663 100644 --- a/log/log.go +++ b/log/log.go @@ -3,6 +3,7 @@ package log import ( "fmt" + "maps" "reflect" "runtime" "runtime/debug" @@ -285,11 +286,6 @@ func (e *Event) Log(format string, args ...interface{}) { } func (e *Event) clone() *Event { - data := make(Fields, len(e.Data)) - for k, v := range e.Data { - data[k] = v - } - return &Event{ Time: e.Time, Caller: e.Caller, @@ -298,7 +294,7 @@ func (e *Event) clone() *Event { Component: e.Component, Message: e.Message, err: e.err, - Data: data, + Data: maps.Clone(e.Data), } } diff --git a/log/writer.go b/log/writer.go index 8063dac1..fad96804 100644 --- a/log/writer.go +++ b/log/writer.go @@ -2,15 +2,13 @@ package log import ( "container/ring" - "context" - "fmt" "io" "os" "regexp" "strings" "sync" - "github.com/lithammer/shortuuid/v4" + "github.com/datarhei/core/v16/event" "github.com/mattn/go-isatty" ) @@ -318,114 +316,28 @@ func (w *bufferWriter) Events() []*Event { return lines } -type ChannelWriter interface { - Writer - - Subscribe() (<-chan Event, func()) +type ChannelWriter struct { + pubsub *event.PubSub } -type channelWriter struct { - publisher chan Event - publisherClosed bool - publisherLock sync.Mutex - - ctx context.Context - cancel context.CancelFunc - - subscriber map[string]chan Event - subscriberLock sync.Mutex -} - -func NewChannelWriter() ChannelWriter { - w := &channelWriter{ - publisher: make(chan Event, 1024), - publisherClosed: false, - subscriber: make(map[string]chan Event), +func NewChannelWriter() *ChannelWriter { + w := &ChannelWriter{ + pubsub: event.NewPubSub(), } - w.ctx, w.cancel = context.WithCancel(context.Background()) - - go w.broadcast() - return w } -func (w *channelWriter) Write(e *Event) error { - event := e.clone() - event.logger = nil - - w.publisherLock.Lock() - defer w.publisherLock.Unlock() - - if w.publisherClosed { - return fmt.Errorf("writer is closed") - } - - select { - case w.publisher <- *e: - default: - return fmt.Errorf("publisher queue full") - } - - return nil +func (w *ChannelWriter) Write(e *Event) error { + return w.pubsub.Publish(event.NewLogEvent(e.Time, e.Level.String(), e.Component, e.Caller, e.Message, e.Data)) } -func (w *channelWriter) Close() { - w.cancel() - - w.publisherLock.Lock() - close(w.publisher) - w.publisherClosed = true - w.publisherLock.Unlock() - - w.subscriberLock.Lock() - for _, c := range w.subscriber { - close(c) - } - w.subscriber = make(map[string]chan Event) - w.subscriberLock.Unlock() +func (w *ChannelWriter) Close() { + w.pubsub.Close() } -func (w *channelWriter) Subscribe() (<-chan Event, func()) { - l := make(chan Event, 1024) +func (w *ChannelWriter) Events() (<-chan event.Event, event.CancelFunc, error) { + ch, cancel := w.pubsub.Subscribe() - var id string = "" - - w.subscriberLock.Lock() - for { - id = shortuuid.New() - if _, ok := w.subscriber[id]; !ok { - w.subscriber[id] = l - break - } - } - w.subscriberLock.Unlock() - - unsubscribe := func() { - w.subscriberLock.Lock() - delete(w.subscriber, id) - w.subscriberLock.Unlock() - } - - return l, unsubscribe -} - -func (w *channelWriter) broadcast() { - for { - select { - case <-w.ctx.Done(): - return - case e := <-w.publisher: - w.subscriberLock.Lock() - for _, c := range w.subscriber { - pp := e.clone() - - select { - case c <- *pp: - default: - } - } - w.subscriberLock.Unlock() - } - } + return ch, cancel, nil } diff --git a/restream/core.go b/restream/core.go index 81167ef9..257eceec 100644 --- a/restream/core.go +++ b/restream/core.go @@ -226,6 +226,7 @@ func (r *restream) Stop() { go func(t *task) { defer wg.Done() t.Kill() + t.parser.Destroy() }(t) return true