diff --git a/app/api/api.go b/app/api/api.go index b7d9c679..d5f4a713 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -23,6 +23,7 @@ import ( configstore "github.com/datarhei/core/v16/config/store" configvars "github.com/datarhei/core/v16/config/vars" "github.com/datarhei/core/v16/ffmpeg" + "github.com/datarhei/core/v16/global" "github.com/datarhei/core/v16/http" "github.com/datarhei/core/v16/http/cache" httpfs "github.com/datarhei/core/v16/http/fs" @@ -292,6 +293,8 @@ func (a *api) Reload() error { a.log.buffer = buffer a.log.events = events + global.SetCoreID(cfg.ID) + return nil } diff --git a/cluster/node/core.go b/cluster/node/core.go index 7f643a93..76d1c006 100644 --- a/cluster/node/core.go +++ b/cluster/node/core.go @@ -989,13 +989,11 @@ func (n *Core) logEvents(ctx context.Context) { break innerloop } - e.CoreID = n.id - - n.events.log.Publish(e.Marshal()) + n.events.log.Publish(&e) } } - n.logger.Info().WithField("source", "process").Log("Reconnecting to event source") + n.logger.Info().WithField("source", "log").Log("Reconnecting to event source") time.Sleep(5 * time.Second) } } @@ -1045,9 +1043,7 @@ func (n *Core) processEvents(ctx context.Context) { break innerloop } - e.CoreID = n.id - - n.events.process.Publish(e.Marshal()) + n.events.process.Publish(&e) } } diff --git a/encoding/json/json.go b/encoding/json/json.go index 51d0f0e8..3b13dde2 100644 --- a/encoding/json/json.go +++ b/encoding/json/json.go @@ -10,6 +10,7 @@ import ( type UnmarshalTypeError = json.UnmarshalTypeError type SyntaxError = json.SyntaxError type Number = json.Number +type RawMessage = json.RawMessage // Unmarshal is a wrapper for json.Unmarshal func Unmarshal(data []byte, v interface{}) error { diff --git a/event/log.go b/event/log.go index c513cd02..10f3d1cc 100644 --- a/event/log.go +++ b/event/log.go @@ -3,6 +3,8 @@ package event import ( "maps" "time" + + "github.com/datarhei/core/v16/global" ) type LogEvent struct { @@ -38,5 +40,6 @@ func NewLogEvent(ts time.Time, level, component, caller, message string, data ma Caller: caller, Message: message, Data: maps.Clone(data), + CoreID: global.GetCoreID(), } } diff --git a/event/process.go b/event/process.go index 9a96d66f..450d9bf0 100644 --- a/event/process.go +++ b/event/process.go @@ -2,6 +2,8 @@ package event import ( "time" + + "github.com/datarhei/core/v16/global" ) type ProcessEvent struct { @@ -36,6 +38,7 @@ func NewProcessLogEvent(logline string) *ProcessEvent { Type: "line", Line: logline, Timestamp: time.Now(), + CoreID: global.GetCoreID(), } } @@ -44,6 +47,7 @@ func NewProcessProgressEvent(progress *ProcessProgress) *ProcessEvent { Type: "progress", Progress: progress, Timestamp: time.Now(), + CoreID: global.GetCoreID(), } } diff --git a/global/global.go b/global/global.go new file mode 100644 index 00000000..e9f43e2c --- /dev/null +++ b/global/global.go @@ -0,0 +1,11 @@ +package global + +var coreid string = "" + +func SetCoreID(id string) { + coreid = id +} + +func GetCoreID() string { + return coreid +} diff --git a/http/api/event.go b/http/api/event.go index ca1cfcb3..393feda5 100644 --- a/http/api/event.go +++ b/http/api/event.go @@ -1,6 +1,7 @@ package api import ( + "bytes" "fmt" "regexp" "strings" @@ -119,6 +120,16 @@ func (e *LogEvent) Filter(ef *LogEventFilter) bool { return true } +type LogEventRaw json.RawMessage + +func (e *LogEventRaw) Clone() event.Event { + p := bytes.Clone([]byte(*e)) + + x := LogEventRaw(p) + + return &x +} + type LogEventFilter struct { Component string `json:"event"` Message string `json:"message"` @@ -220,6 +231,16 @@ type ProcessEvent struct { Timestamp int64 `json:"ts"` } +type ProcessEventRaw json.RawMessage + +func (e *ProcessEventRaw) Clone() event.Event { + p := bytes.Clone([]byte(*e)) + + x := ProcessEventRaw(p) + + return &x +} + type ProcessProgressInput struct { Bitrate json.Number `json:"bitrate" swaggertype:"number" jsonschema:"type=number"` FPS json.Number `json:"fps" swaggertype:"number" jsonschema:"type=number"` diff --git a/http/client/client.go b/http/client/client.go index 1963befb..fc3b08a2 100644 --- a/http/client/client.go +++ b/http/client/client.go @@ -57,9 +57,9 @@ type RestClient interface { FilesystemDeleteFile(storage, path string) error // DELETE /v3/fs/{storage}/{path} FilesystemAddFile(storage, path string, data io.Reader) error // PUT /v3/fs/{storage}/{path} - LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) // POST /v3/events - MediaEvents(ctx context.Context, storage, pattern string) (<-chan api.MediaEvent, error) // GET /v3/events/media/{storage} - ProcessEvents(ctx context.Context, filters api.ProcessEventFilters) (<-chan api.ProcessEvent, error) // POST /v3/events/process + LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEventRaw, error) // POST /v3/events + MediaEvents(ctx context.Context, storage, pattern string) (<-chan api.MediaEvent, error) // GET /v3/events/media/{storage} + ProcessEvents(ctx context.Context, filters api.ProcessEventFilters) (<-chan api.ProcessEventRaw, error) // POST /v3/events/process ProcessList(opts ProcessListOptions) ([]api.Process, error) // GET /v3/process ProcessAdd(p *app.Config, metadata map[string]any) error // POST /v3/process diff --git a/http/client/events.go b/http/client/events.go index 0f33c0f2..4ef660e8 100644 --- a/http/client/events.go +++ b/http/client/events.go @@ -1,6 +1,8 @@ package client import ( + "bufio" + "bytes" "context" "io" "net/http" @@ -11,7 +13,7 @@ import ( "github.com/datarhei/core/v16/mem" ) -func (r *restclient) LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) { +func (r *restclient) LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEventRaw, error) { buf := mem.Get() defer mem.Put(buf) @@ -26,34 +28,32 @@ func (r *restclient) LogEvents(ctx context.Context, filters api.LogEventFilters) return nil, err } - channel := make(chan api.LogEvent, 128) + channel := make(chan api.LogEventRaw, 128) - go func(stream io.ReadCloser, ch chan<- api.LogEvent) { + go func(stream io.ReadCloser, ch chan<- api.LogEventRaw) { defer stream.Close() defer close(channel) - decoder := json.NewDecoder(stream) + scanner := bufio.NewScanner(stream) + scanner.Split(bufio.ScanLines) - for decoder.More() { - var event api.LogEvent - if err := decoder.Decode(&event); err == io.EOF { - return - } else if err != nil { - event.Component = "error" - event.Message = err.Error() - } + for scanner.Scan() { + data := bytes.Clone(scanner.Bytes()) - // Don't emit keepalives - if event.Component == "keepalive" { - continue - } - - ch <- event - - if event.Component == "" || event.Component == "error" { - return - } + ch <- data } + /* + decoder := json.NewDecoder(stream) + + for decoder.More() { + var event api.LogEventRaw + if err := decoder.Decode(&event); err != nil { + return + } + + ch <- event + } + */ }(stream, channel) return channel, nil @@ -105,7 +105,7 @@ func (r *restclient) MediaEvents(ctx context.Context, storage, pattern string) ( return channel, nil } -func (r *restclient) ProcessEvents(ctx context.Context, filters api.ProcessEventFilters) (<-chan api.ProcessEvent, error) { +func (r *restclient) ProcessEvents(ctx context.Context, filters api.ProcessEventFilters) (<-chan api.ProcessEventRaw, error) { buf := mem.Get() defer mem.Put(buf) @@ -120,34 +120,33 @@ func (r *restclient) ProcessEvents(ctx context.Context, filters api.ProcessEvent return nil, err } - channel := make(chan api.ProcessEvent, 128) + channel := make(chan api.ProcessEventRaw, 128) - go func(stream io.ReadCloser, ch chan<- api.ProcessEvent) { + go func(stream io.ReadCloser, ch chan<- api.ProcessEventRaw) { defer stream.Close() defer close(channel) - decoder := json.NewDecoder(stream) + scanner := bufio.NewScanner(stream) + scanner.Split(bufio.ScanLines) - for decoder.More() { - var event api.ProcessEvent - if err := decoder.Decode(&event); err == io.EOF { - return - } else if err != nil { - event.Type = "error" - event.Line = err.Error() - } + for scanner.Scan() { + data := bytes.Clone(scanner.Bytes()) - // Don't emit keepalives - if event.Type == "keepalive" { - continue - } - - ch <- event - - if event.Type == "" || event.Type == "error" { - return - } + ch <- data } + + /* + decoder := json.NewDecoder(io.TeeReader(stream, os.Stdout)) + + for decoder.More() { + var event api.ProcessEventRaw + if err := decoder.Decode(&event); err != nil { + return + } + + ch <- event + } + */ }(stream, channel) return channel, nil diff --git a/http/handler/api/cluster_events.go b/http/handler/api/cluster_events.go index a3f3f892..9b5bd577 100644 --- a/http/handler/api/cluster_events.go +++ b/http/handler/api/cluster_events.go @@ -135,7 +135,25 @@ func (h *ClusterHandler) LogEvents(c echo.Context) error { return fmt.Errorf("channel closed") } - event.Unmarshal(e) + ev, ok := e.(*api.LogEventRaw) + if !ok { + continue + } + + var event api.LogEvent + + err := json.Unmarshal([]byte(*ev), &event) + if err != nil { + continue + } + + //if !event.Unmarshal(e) { + // continue + //} + + if event.Component == "keepalive" { + continue + } if !filterEvent(&event) { continue @@ -212,7 +230,7 @@ func (h *ClusterHandler) ProcessEvents(c echo.Context) error { return goslices.ContainsFunc(filter, event.Filter) } - event := api.ProcessEvent{} + //event := api.ProcessEvent{} for { select { @@ -229,7 +247,24 @@ func (h *ClusterHandler) ProcessEvents(c echo.Context) error { return fmt.Errorf("channel closed") } - if !event.Unmarshal(e) { + ev, ok := e.(*api.ProcessEventRaw) + if !ok { + continue + } + + var event api.ProcessEvent + + err := json.Unmarshal([]byte(*ev), &event) + if err != nil { + continue + } + //json.RawMessage(ev) + + //if !event.Unmarshal(e) { + // continue + //} + + if event.Type == "keepalive" { continue } diff --git a/http/handler/api/events.go b/http/handler/api/events.go index 2a35020c..93b3bb88 100644 --- a/http/handler/api/events.go +++ b/http/handler/api/events.go @@ -2,6 +2,7 @@ package api import ( "fmt" + "io" "net/http" goslices "slices" "strings" @@ -123,7 +124,7 @@ func (h *EventsHandler) LogEvents(c echo.Context) error { } defer cancel() - enc := json.NewEncoder(res) + enc := json.NewEncoder(io.MultiWriter(res)) enc.SetIndent("", "") filterEvent := func(event *api.LogEvent) bool {