From 6657f8d723645604a20f643896dc1e24441fb5a5 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 9 Dec 2025 13:39:19 +0100 Subject: [PATCH] Add ID and type to process progress events for each in/output --- Dockerfile | 2 +- cluster/node/core.go | 10 +++--- event/event.go | 4 +-- event/process.go | 14 ++++++++ ffmpeg/parse/parser.go | 9 +++++ http/api/event.go | 38 ++++++++++++++------ http/client/events.go | 25 ------------- http/handler/api/events.go | 27 ++++++++++++-- restream/app/process.go | 20 +++++++++++ restream/core.go | 12 +------ restream/task.go | 72 ++++++++++++++++++++++++++++---------- 11 files changed, 156 insertions(+), 77 deletions(-) diff --git a/Dockerfile b/Dockerfile index ce4a9577..562e204e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -ARG GOLANG_IMAGE=golang:1.24-alpine3.21 +ARG GOLANG_IMAGE=golang:1.25-alpine3.21 ARG BUILD_IMAGE=alpine:3.21 # Cross-Compilation diff --git a/cluster/node/core.go b/cluster/node/core.go index 76d1c006..7aad5447 100644 --- a/cluster/node/core.go +++ b/cluster/node/core.go @@ -343,7 +343,7 @@ func (n *Core) connect() error { func (n *Core) mediaEvents(ctx context.Context, storage string) { defer func() { - n.logger.Warn().WithField("storage", storage).Log("Disconnected from event source") + n.logger.Warn().WithField("source", storage).Log("Disconnected from event source") }() m := &Media{} @@ -360,7 +360,7 @@ func (n *Core) mediaEvents(ctx context.Context, storage string) { n.lock.RUnlock() if client == nil { - n.logger.Error().WithField("storage", storage).Log("Failed to connect to event source, client not connected") + n.logger.Error().WithField("source", storage).Log("Failed to connect to event source, client not connected") time.Sleep(5 * time.Second) continue } @@ -374,12 +374,12 @@ func (n *Core) mediaEvents(ctx context.Context, storage string) { n.media[storage] = m n.mediaLock.Unlock() - n.logger.Error().WithField("storage", storage).WithError(err).Log("Failed to connect to event source") + n.logger.Error().WithField("source", storage).WithError(err).Log("Failed to connect to event source") time.Sleep(5 * time.Second) continue } - n.logger.Info().WithField("storage", storage).Log("Connected to event source") + n.logger.Info().WithField("source", storage).Log("Connected to event source") m.available = true m.media = map[string]int64{} @@ -407,7 +407,7 @@ func (n *Core) mediaEvents(ctx context.Context, storage string) { } } - n.logger.Info().WithField("storage", storage).Log("Reconnecting to event source") + n.logger.Info().WithField("source", storage).Log("Reconnecting to event source") time.Sleep(5 * time.Second) } } diff --git a/event/event.go b/event/event.go index 34700726..ccde4ffe 100644 --- a/event/event.go +++ b/event/event.go @@ -158,10 +158,8 @@ func (w *PubSub) broadcast() { case e := <-w.publisher: w.subscriberLock.Lock() for _, c := range w.subscriber { - pp := e.Clone() - select { - case c <- pp: + case c <- e.Clone(): default: } } diff --git a/event/process.go b/event/process.go index 450d9bf0..0ff6ca98 100644 --- a/event/process.go +++ b/event/process.go @@ -52,6 +52,9 @@ func NewProcessProgressEvent(progress *ProcessProgress) *ProcessEvent { } type ProcessProgressInput struct { + ID string + URL string + Type string Bitrate float64 FPS float64 AVstream ProcessProgressInputAVstream @@ -59,6 +62,9 @@ type ProcessProgressInput struct { func (p *ProcessProgressInput) Clone() ProcessProgressInput { c := ProcessProgressInput{ + ID: p.ID, + URL: p.URL, + Type: p.Type, Bitrate: p.Bitrate, FPS: p.FPS, AVstream: p.AVstream.Clone(), @@ -68,6 +74,7 @@ func (p *ProcessProgressInput) Clone() ProcessProgressInput { } type ProcessProgressInputAVstream struct { + Enabled bool Looping bool Enc uint64 Drop uint64 @@ -77,6 +84,7 @@ type ProcessProgressInputAVstream struct { func (p *ProcessProgressInputAVstream) Clone() ProcessProgressInputAVstream { c := ProcessProgressInputAVstream{ + Enabled: p.Enabled, Looping: p.Looping, Enc: p.Enc, Drop: p.Drop, @@ -88,12 +96,18 @@ func (p *ProcessProgressInputAVstream) Clone() ProcessProgressInputAVstream { } type ProcessProgressOutput struct { + ID string + URL string + Type string Bitrate float64 FPS float64 } func (p *ProcessProgressOutput) Clone() ProcessProgressOutput { c := ProcessProgressOutput{ + ID: p.ID, + URL: p.URL, + Type: p.Type, Bitrate: p.Bitrate, FPS: p.FPS, } diff --git a/ffmpeg/parse/parser.go b/ffmpeg/parse/parser.go index 03d43992..3597a4df 100644 --- a/ffmpeg/parse/parser.go +++ b/ffmpeg/parse/parser.go @@ -474,18 +474,24 @@ func (p *parser) Parse(line []byte) uint64 { for _, io := range progress.Input { input := event.ProcessProgressInput{ + ID: "", + URL: io.URL, + Type: io.Type, Bitrate: io.Bitrate, FPS: io.FPS, } if io.AVstream != nil { input.AVstream = event.ProcessProgressInputAVstream{ + Enabled: true, Looping: io.AVstream.Looping, Enc: io.AVstream.Enc, Drop: io.AVstream.Drop, Dup: io.AVstream.Dup, Time: io.AVstream.Input.Time, } + } else { + input.AVstream = event.ProcessProgressInputAVstream{} } evt.Input = append(evt.Input, input) @@ -493,6 +499,9 @@ func (p *parser) Parse(line []byte) uint64 { for _, io := range progress.Output { evt.Output = append(evt.Output, event.ProcessProgressOutput{ + ID: "", + URL: io.URL, + Type: io.Type, Bitrate: io.Bitrate, FPS: io.FPS, }) diff --git a/http/api/event.go b/http/api/event.go index 393feda5..7794c95c 100644 --- a/http/api/event.go +++ b/http/api/event.go @@ -242,6 +242,8 @@ func (e *ProcessEventRaw) Clone() event.Event { } type ProcessProgressInput struct { + ID string `json:"id"` + Type string `json:"type"` Bitrate json.Number `json:"bitrate" swaggertype:"number" jsonschema:"type=number"` FPS json.Number `json:"fps" swaggertype:"number" jsonschema:"type=number"` AVstream ProcessProgressInputAVstream `json:"avstream"` @@ -264,6 +266,7 @@ func (p *ProcessProgressInput) Marshal() event.ProcessProgressInput { } type ProcessProgressInputAVstream struct { + Enabled bool `json:"enabled"` Looping bool `json:"looping"` Enc uint64 `json:"enc"` Drop uint64 `json:"drop"` @@ -273,6 +276,7 @@ type ProcessProgressInputAVstream struct { func (p *ProcessProgressInputAVstream) Marshal() event.ProcessProgressInputAVstream { o := event.ProcessProgressInputAVstream{ + Enabled: p.Enabled, Looping: p.Looping, Enc: p.Enc, Drop: p.Drop, @@ -283,13 +287,27 @@ func (p *ProcessProgressInputAVstream) Marshal() event.ProcessProgressInputAVstr return o } +func (p *ProcessProgressInputAVstream) Unmarshal(e event.ProcessProgressInputAVstream) { + p.Enabled = e.Enabled + p.Looping = e.Looping + p.Enc = e.Enc + p.Drop = e.Drop + p.Dup = e.Dup + p.Time = e.Time +} + type ProcessProgressOutput struct { + ID string `json:"id"` + Type string `json:"type"` Bitrate json.Number `json:"bitrate" swaggertype:"number" jsonschema:"type=number"` FPS json.Number `json:"fps" swaggertype:"number" jsonschema:"type=number"` } func (p *ProcessProgressOutput) Marshal() event.ProcessProgressOutput { - o := event.ProcessProgressOutput{} + o := event.ProcessProgressOutput{ + ID: p.ID, + Type: p.Type, + } if x, err := p.Bitrate.Float64(); err == nil { o.Bitrate = x @@ -310,21 +328,21 @@ type ProcessProgress struct { func (p *ProcessProgress) Unmarshal(e *event.ProcessProgress) { for _, io := range e.Input { - p.Input = append(p.Input, ProcessProgressInput{ + x := ProcessProgressInput{ + ID: io.ID, + Type: io.Type, Bitrate: json.ToNumber(io.Bitrate), FPS: json.ToNumber(io.FPS), - AVstream: ProcessProgressInputAVstream{ - Looping: io.AVstream.Looping, - Enc: io.AVstream.Enc, - Drop: io.AVstream.Drop, - Dup: io.AVstream.Dup, - Time: io.AVstream.Time, - }, - }) + } + x.AVstream.Unmarshal(io.AVstream) + + p.Input = append(p.Input, x) } for _, io := range e.Output { p.Output = append(p.Output, ProcessProgressOutput{ + ID: io.ID, + Type: io.Type, Bitrate: json.ToNumber(io.Bitrate), FPS: json.ToNumber(io.FPS), }) diff --git a/http/client/events.go b/http/client/events.go index 4ef660e8..c0d394c3 100644 --- a/http/client/events.go +++ b/http/client/events.go @@ -42,18 +42,6 @@ func (r *restclient) LogEvents(ctx context.Context, filters api.LogEventFilters) ch <- data } - /* - decoder := json.NewDecoder(stream) - - for decoder.More() { - var event api.LogEventRaw - if err := decoder.Decode(&event); err != nil { - return - } - - ch <- event - } - */ }(stream, channel) return channel, nil @@ -134,19 +122,6 @@ func (r *restclient) ProcessEvents(ctx context.Context, filters api.ProcessEvent ch <- data } - - /* - decoder := json.NewDecoder(io.TeeReader(stream, os.Stdout)) - - for decoder.More() { - var event api.ProcessEventRaw - if err := decoder.Decode(&event); err != nil { - return - } - - ch <- event - } - */ }(stream, channel) return channel, nil diff --git a/http/handler/api/events.go b/http/handler/api/events.go index 93b3bb88..28c28859 100644 --- a/http/handler/api/events.go +++ b/http/handler/api/events.go @@ -151,7 +151,10 @@ func (h *EventsHandler) LogEvents(c echo.Context) error { case <-reqctx.Done(): return nil case <-ticker.C: - res.Write([]byte(":keepalive\n\n")) + _, err := res.Write([]byte(":keepalive\n\n")) + if err != nil { + return err + } res.Flush() case e, ok := <-evts: if !ok { @@ -181,7 +184,10 @@ func (h *EventsHandler) LogEvents(c echo.Context) error { case <-reqctx.Done(): return nil case <-ticker.C: - res.Write([]byte("{\"event\": \"keepalive\"}\n")) + _, err := res.Write([]byte("{\"event\": \"keepalive\"}\n")) + if err != nil { + return err + } res.Flush() case e, ok := <-evts: if !ok { @@ -297,12 +303,21 @@ func (h *EventsHandler) MediaEvents(c echo.Context) error { event := api.MediaEvent{} + _, err = res.Write([]byte("{\"action\":\"keepalive\"}\n")) + if err != nil { + return err + } + res.Flush() + for { select { case <-reqctx.Done(): return nil case <-keepaliveTicker.C: - res.Write([]byte("{\"action\":\"keepalive\"}\n")) + _, err := res.Write([]byte("{\"action\":\"keepalive\"}\n")) + if err != nil { + return err + } res.Flush() case <-listTicker.C: if err := enc.Encode(createList()); err != nil { @@ -396,6 +411,12 @@ func (h *EventsHandler) ProcessEvents(c echo.Context) error { event := api.ProcessEvent{} + _, err = res.Write([]byte("{\"type\":\"keepalive\"}\n")) + if err != nil { + return err + } + res.Flush() + for { select { case <-reqctx.Done(): diff --git a/restream/app/process.go b/restream/app/process.go index 74f769f1..3f627c75 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -216,6 +216,26 @@ func (c *Config) ProcessID() ProcessID { } } +func (c *Config) InputIDFromAddress(address string) string { + for _, input := range c.Input { + if input.Address == address { + return input.ID + } + } + + return "" +} + +func (c *Config) OutputIDFromAddress(address string) string { + for _, output := range c.Output { + if output.Address == address { + return output.ID + } + } + + return "" +} + type order struct { order string lock sync.RWMutex diff --git a/restream/core.go b/restream/core.go index 257eceec..fa1656ec 100644 --- a/restream/core.go +++ b/restream/core.go @@ -675,17 +675,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) { t.ffmpeg = ffmpeg - r.events.Consume(t.parser, func(e event.Event) event.Event { - pe, ok := e.(*event.ProcessEvent) - if !ok { - return e - } - - pe.ProcessID = t.process.ID - pe.Domain = t.process.Domain - - return pe - }) + r.events.Consume(t.parser, t.RewriteEvent) return t, nil } diff --git a/restream/task.go b/restream/task.go index cc98c5f0..7b4b4351 100644 --- a/restream/task.go +++ b/restream/task.go @@ -14,21 +14,21 @@ import ( ) type task struct { - readers *atomic.Int64 // Number of concurrent readers - id string // ID of the task/process - owner string // Owner of the process - domain string // Domain of the process - reference string // reference of the process - process *app.Process // The process definition - config *app.Config // Process config with replaced static placeholders - command []string // The actual command parameter for ffmpeg - ffmpeg process.Process // The OS process - parser parse.Parser // Parser for the OS process' output - playout map[string]int // Port mapping to access playout API - logger log.Logger // Logger - usesDisk bool // Whether this task uses the disk - hwdevice *atomic.Int32 // Index of the GPU this task uses - metadata map[string]interface{} // Metadata of the process + readers *atomic.Int64 // Number of concurrent readers + id string // ID of the task/process + owner string // Owner of the process + domain string // Domain of the process + reference string // reference of the process + process *app.Process // The process definition + config *app.Config // Process config with replaced static placeholders + command []string // The actual command parameter for ffmpeg + ffmpeg process.Process // The OS process + parser parse.Parser // Parser for the OS process' output + playout map[string]int // Port mapping to access playout API + logger log.Logger // Logger + usesDisk bool // Whether this task uses the disk + hwdevice *atomic.Int32 // Index of the GPU this task uses + metadata map[string]any // Metadata of the process } func NewTask(process *app.Process, logger log.Logger) *task { @@ -175,8 +175,13 @@ func (t *task) State() (*app.State, error) { progress := t.parser.Progress() state.Progress.UnmarshalParser(&progress) - state.Progress.Input = assignConfigID(state.Progress.Input, t.config.Input) - state.Progress.Output = assignConfigID(state.Progress.Output, t.config.Output) + for i, io := range state.Progress.Input { + state.Progress.Input[i].ID = t.config.InputIDFromAddress(io.URL) + } + + for i, io := range state.Progress.Output { + state.Progress.Output[i].ID = t.config.OutputIDFromAddress(io.URL) + } state.PID = status.PID @@ -214,8 +219,13 @@ func (t *task) Report() (*app.Report, error) { report.History[i].UnmarshalParser(&h) e := &report.History[i] - e.Progress.Input = assignConfigID(e.Progress.Input, t.config.Input) - e.Progress.Output = assignConfigID(e.Progress.Output, t.config.Input) + for i, io := range e.Progress.Input { + e.Progress.Input[i].ID = t.config.InputIDFromAddress(io.URL) + } + + for i, io := range e.Progress.Output { + e.Progress.Output[i].ID = t.config.OutputIDFromAddress(io.URL) + } } return report, nil @@ -385,3 +395,27 @@ func (t *task) ImportParserReportHistory(report []parse.ReportHistoryEntry) { func (t *task) Events() (<-chan event.Event, event.CancelFunc, error) { return t.parser.Events() } + +func (t *task) RewriteEvent(e event.Event) event.Event { + pe, ok := e.(*event.ProcessEvent) + if !ok { + return e + } + + pe.ProcessID = t.process.ID + pe.Domain = t.process.Domain + + if pe.Progress != nil { + for i, io := range pe.Progress.Input { + pe.Progress.Input[i].ID = t.config.InputIDFromAddress(io.URL) + pe.Progress.Input[i].URL = "" + } + + for i, io := range pe.Progress.Output { + pe.Progress.Output[i].ID = t.config.OutputIDFromAddress(io.URL) + pe.Progress.Output[i].URL = "" + } + } + + return pe +}