From 0431b6f8c43a8277b624de023ccc81c7a8cedb7b Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 2 Dec 2025 17:08:05 +0100 Subject: [PATCH] Add process events --- cluster/node/core.go | 2 +- cluster/node/manager.go | 2 +- docs/docs.go | 177 ++++++++++++++++++++++++++--- docs/swagger.json | 177 ++++++++++++++++++++++++++--- docs/swagger.yaml | 118 ++++++++++++++++--- event/event.go | 35 +++++- event/process.go | 104 +++++++++++++++++ ffmpeg/parse/parser.go | 67 ++++++++++- http/api/event.go | 156 ++++++++++++++++++++++++- http/client/client.go | 2 +- http/client/events.go | 2 +- http/handler/api/cluster_events.go | 6 +- http/handler/api/events.go | 145 +++++++++++++++++++---- http/server.go | 2 + restream/core.go | 36 ++++++ restream/task.go | 7 ++ 16 files changed, 954 insertions(+), 84 deletions(-) create mode 100644 event/process.go diff --git a/cluster/node/core.go b/cluster/node/core.go index 42088ff4..22721166 100644 --- a/cluster/node/core.go +++ b/cluster/node/core.go @@ -933,7 +933,7 @@ func (n *Core) ClusterProcessList() ([]Process, error) { return processes, nil } -func (n *Core) Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) { +func (n *Core) Events(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) { n.lock.RLock() client := n.client n.lock.RUnlock() diff --git a/cluster/node/manager.go b/cluster/node/manager.go index 4d8bb1cf..3cc151bb 100644 --- a/cluster/node/manager.go +++ b/cluster/node/manager.go @@ -655,7 +655,7 @@ func (p *Manager) ProcessValidateConfig(nodeid string, config *app.Config) error return node.Core().ProcessValidateConfig(config) } -func (p *Manager) Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) { +func (p *Manager) LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) { eventChan := make(chan api.LogEvent, 128) p.lock.RLock() diff --git a/docs/docs.go b/docs/docs.go index 3a016149..090d608d 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -577,7 +577,7 @@ const docTemplate = `{ "name": "filters", "in": "body", "schema": { - "$ref": "#/definitions/api.EventFilters" + "$ref": "#/definitions/api.LogEventFilters" } } ], @@ -2582,14 +2582,14 @@ const docTemplate = `{ "v16.?.?" ], "summary": "Stream of log events", - "operationId": "events-3-media", + "operationId": "events-3-log", "parameters": [ { "description": "Event filters", "name": "filters", "in": "body", "schema": { - "$ref": "#/definitions/api.EventFilters" + "$ref": "#/definitions/api.LogEventFilters" } } ], @@ -2597,7 +2597,7 @@ const docTemplate = `{ "200": { "description": "OK", "schema": { - "$ref": "#/definitions/api.MediaEvent" + "$ref": "#/definitions/api.LogEvent" } } } @@ -2610,7 +2610,7 @@ const docTemplate = `{ "ApiKeyAuth": [] } ], - "description": "Stream of media event of whats happening in the core", + "description": "Stream of media event of whats happening in the filesystems", "consumes": [ "application/json" ], @@ -2621,7 +2621,7 @@ const docTemplate = `{ "v16.?.?" ], "summary": "Stream of media events", - "operationId": "events-3-log", + "operationId": "events-3-media", "parameters": [ { "type": "string", @@ -2634,7 +2634,46 @@ const docTemplate = `{ "200": { "description": "OK", "schema": { - "$ref": "#/definitions/api.LogEvent" + "$ref": "#/definitions/api.MediaEvent" + } + } + } + } + }, + "/api/v3/events/process": { + "post": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Stream of process event of whats happening in the processes", + "consumes": [ + "application/json" + ], + "produces": [ + "application/x-json-stream" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Stream of process events", + "operationId": "events-3-process", + "parameters": [ + { + "description": "Event filters", + "name": "filters", + "in": "body", + "schema": { + "$ref": "#/definitions/api.ProcessEventFilters" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.ProcessEvent" } } } @@ -6775,17 +6814,6 @@ const docTemplate = `{ } } }, - "api.EventFilters": { - "type": "object", - "properties": { - "filters": { - "type": "array", - "items": { - "$ref": "#/definitions/api.LogEventFilter" - } - } - } - }, "api.FileInfo": { "type": "object", "properties": { @@ -7170,6 +7198,17 @@ const docTemplate = `{ } } }, + "api.LogEventFilters": { + "type": "object", + "properties": { + "filters": { + "type": "array", + "items": { + "$ref": "#/definitions/api.LogEventFilter" + } + } + } + }, "api.Login": { "type": "object", "required": [ @@ -7690,6 +7729,54 @@ const docTemplate = `{ } } }, + "api.ProcessEvent": { + "type": "object", + "properties": { + "domain": { + "type": "string" + }, + "line": { + "type": "string" + }, + "pid": { + "type": "string" + }, + "progress": { + "$ref": "#/definitions/api.ProcessProgress" + }, + "ts": { + "type": "integer" + }, + "type": { + "type": "string" + } + } + }, + "api.ProcessEventFilter": { + "type": "object", + "properties": { + "domain": { + "type": "string" + }, + "pid": { + "type": "string" + }, + "type": { + "type": "string" + } + } + }, + "api.ProcessEventFilters": { + "type": "object", + "properties": { + "filters": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ProcessEventFilter" + } + } + } + }, "api.ProcessID": { "type": "object", "properties": { @@ -7701,6 +7788,60 @@ const docTemplate = `{ } } }, + "api.ProcessProgress": { + "type": "object", + "properties": { + "input": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ProcessProgressInput" + } + }, + "output": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ProcessProgressOutput" + } + }, + "time": { + "type": "number" + } + } + }, + "api.ProcessProgressInput": { + "type": "object", + "properties": { + "bitrate": { + "type": "number" + }, + "drop": { + "type": "integer" + }, + "dup": { + "type": "integer" + }, + "enc": { + "type": "integer" + }, + "fps": { + "type": "number" + }, + "looping": { + "type": "boolean" + } + } + }, + "api.ProcessProgressOutput": { + "type": "object", + "properties": { + "bitrate": { + "type": "number" + }, + "fps": { + "type": "number" + } + } + }, "api.ProcessReport": { "type": "object", "properties": { diff --git a/docs/swagger.json b/docs/swagger.json index 4a4dd00e..8068f4ff 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -570,7 +570,7 @@ "name": "filters", "in": "body", "schema": { - "$ref": "#/definitions/api.EventFilters" + "$ref": "#/definitions/api.LogEventFilters" } } ], @@ -2575,14 +2575,14 @@ "v16.?.?" ], "summary": "Stream of log events", - "operationId": "events-3-media", + "operationId": "events-3-log", "parameters": [ { "description": "Event filters", "name": "filters", "in": "body", "schema": { - "$ref": "#/definitions/api.EventFilters" + "$ref": "#/definitions/api.LogEventFilters" } } ], @@ -2590,7 +2590,7 @@ "200": { "description": "OK", "schema": { - "$ref": "#/definitions/api.MediaEvent" + "$ref": "#/definitions/api.LogEvent" } } } @@ -2603,7 +2603,7 @@ "ApiKeyAuth": [] } ], - "description": "Stream of media event of whats happening in the core", + "description": "Stream of media event of whats happening in the filesystems", "consumes": [ "application/json" ], @@ -2614,7 +2614,7 @@ "v16.?.?" ], "summary": "Stream of media events", - "operationId": "events-3-log", + "operationId": "events-3-media", "parameters": [ { "type": "string", @@ -2627,7 +2627,46 @@ "200": { "description": "OK", "schema": { - "$ref": "#/definitions/api.LogEvent" + "$ref": "#/definitions/api.MediaEvent" + } + } + } + } + }, + "/api/v3/events/process": { + "post": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Stream of process event of whats happening in the processes", + "consumes": [ + "application/json" + ], + "produces": [ + "application/x-json-stream" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Stream of process events", + "operationId": "events-3-process", + "parameters": [ + { + "description": "Event filters", + "name": "filters", + "in": "body", + "schema": { + "$ref": "#/definitions/api.ProcessEventFilters" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.ProcessEvent" } } } @@ -6768,17 +6807,6 @@ } } }, - "api.EventFilters": { - "type": "object", - "properties": { - "filters": { - "type": "array", - "items": { - "$ref": "#/definitions/api.LogEventFilter" - } - } - } - }, "api.FileInfo": { "type": "object", "properties": { @@ -7163,6 +7191,17 @@ } } }, + "api.LogEventFilters": { + "type": "object", + "properties": { + "filters": { + "type": "array", + "items": { + "$ref": "#/definitions/api.LogEventFilter" + } + } + } + }, "api.Login": { "type": "object", "required": [ @@ -7683,6 +7722,54 @@ } } }, + "api.ProcessEvent": { + "type": "object", + "properties": { + "domain": { + "type": "string" + }, + "line": { + "type": "string" + }, + "pid": { + "type": "string" + }, + "progress": { + "$ref": "#/definitions/api.ProcessProgress" + }, + "ts": { + "type": "integer" + }, + "type": { + "type": "string" + } + } + }, + "api.ProcessEventFilter": { + "type": "object", + "properties": { + "domain": { + "type": "string" + }, + "pid": { + "type": "string" + }, + "type": { + "type": "string" + } + } + }, + "api.ProcessEventFilters": { + "type": "object", + "properties": { + "filters": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ProcessEventFilter" + } + } + } + }, "api.ProcessID": { "type": "object", "properties": { @@ -7694,6 +7781,60 @@ } } }, + "api.ProcessProgress": { + "type": "object", + "properties": { + "input": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ProcessProgressInput" + } + }, + "output": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ProcessProgressOutput" + } + }, + "time": { + "type": "number" + } + } + }, + "api.ProcessProgressInput": { + "type": "object", + "properties": { + "bitrate": { + "type": "number" + }, + "drop": { + "type": "integer" + }, + "dup": { + "type": "integer" + }, + "enc": { + "type": "integer" + }, + "fps": { + "type": "number" + }, + "looping": { + "type": "boolean" + } + } + }, + "api.ProcessProgressOutput": { + "type": "object", + "properties": { + "bitrate": { + "type": "number" + }, + "fps": { + "type": "number" + } + } + }, "api.ProcessReport": { "type": "object", "properties": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index fe57bb18..c2020172 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -911,13 +911,6 @@ definitions: message: type: string type: object - api.EventFilters: - properties: - filters: - items: - $ref: '#/definitions/api.LogEventFilter' - type: array - type: object api.FileInfo: properties: core_id: @@ -1172,6 +1165,13 @@ definitions: message: type: string type: object + api.LogEventFilters: + properties: + filters: + items: + $ref: '#/definitions/api.LogEventFilter' + type: array + type: object api.Login: properties: password: @@ -1528,6 +1528,37 @@ definitions: format: uint64 type: integer type: object + api.ProcessEvent: + properties: + domain: + type: string + line: + type: string + pid: + type: string + progress: + $ref: '#/definitions/api.ProcessProgress' + ts: + type: integer + type: + type: string + type: object + api.ProcessEventFilter: + properties: + domain: + type: string + pid: + type: string + type: + type: string + type: object + api.ProcessEventFilters: + properties: + filters: + items: + $ref: '#/definitions/api.ProcessEventFilter' + type: array + type: object api.ProcessID: properties: domain: @@ -1535,6 +1566,41 @@ definitions: id: type: string type: object + api.ProcessProgress: + properties: + input: + items: + $ref: '#/definitions/api.ProcessProgressInput' + type: array + output: + items: + $ref: '#/definitions/api.ProcessProgressOutput' + type: array + time: + type: number + type: object + api.ProcessProgressInput: + properties: + bitrate: + type: number + drop: + type: integer + dup: + type: integer + enc: + type: integer + fps: + type: number + looping: + type: boolean + type: object + api.ProcessProgressOutput: + properties: + bitrate: + type: number + fps: + type: number + type: object api.ProcessReport: properties: created_at: @@ -3296,7 +3362,7 @@ paths: in: body name: filters schema: - $ref: '#/definitions/api.EventFilters' + $ref: '#/definitions/api.LogEventFilters' produces: - text/event-stream - application/x-json-stream @@ -4612,13 +4678,13 @@ paths: consumes: - application/json description: Stream of log event of whats happening in the core - operationId: events-3-media + operationId: events-3-log parameters: - description: Event filters in: body name: filters schema: - $ref: '#/definitions/api.EventFilters' + $ref: '#/definitions/api.LogEventFilters' produces: - text/event-stream - application/x-json-stream @@ -4626,7 +4692,7 @@ paths: "200": description: OK schema: - $ref: '#/definitions/api.MediaEvent' + $ref: '#/definitions/api.LogEvent' security: - ApiKeyAuth: [] summary: Stream of log events @@ -4636,8 +4702,8 @@ paths: post: consumes: - application/json - description: Stream of media event of whats happening in the core - operationId: events-3-log + description: Stream of media event of whats happening in the filesystems + operationId: events-3-media parameters: - description: glob pattern for media names in: query @@ -4649,12 +4715,36 @@ paths: "200": description: OK schema: - $ref: '#/definitions/api.LogEvent' + $ref: '#/definitions/api.MediaEvent' security: - ApiKeyAuth: [] summary: Stream of media events tags: - v16.?.? + /api/v3/events/process: + post: + consumes: + - application/json + description: Stream of process event of whats happening in the processes + operationId: events-3-process + parameters: + - description: Event filters + in: body + name: filters + schema: + $ref: '#/definitions/api.ProcessEventFilters' + produces: + - application/x-json-stream + responses: + "200": + description: OK + schema: + $ref: '#/definitions/api.ProcessEvent' + security: + - ApiKeyAuth: [] + summary: Stream of process events + tags: + - v16.?.? /api/v3/fs: get: description: Listall registered filesystems diff --git a/event/event.go b/event/event.go index c331e683..de811132 100644 --- a/event/event.go +++ b/event/event.go @@ -44,6 +44,32 @@ func NewPubSub() *PubSub { return w } +func (w *PubSub) Consume(s EventSource, rewrite func(e Event) Event) { + ch, cancel, err := s.Events() + if err != nil { + return + } + + if rewrite == nil { + rewrite = func(e Event) Event { return e } + } + + go func(ch <-chan Event, cancel CancelFunc) { + for { + select { + case <-w.ctx.Done(): + cancel() + return + case e, ok := <-ch: + if !ok { + return + } + w.Publish(rewrite(e)) + } + } + }(ch, cancel) +} + func (w *PubSub) Publish(e Event) error { event := e.Clone() @@ -64,9 +90,14 @@ func (w *PubSub) Publish(e Event) error { } func (w *PubSub) Close() { + w.publisherLock.Lock() + if w.publisherClosed { + w.publisherLock.Unlock() + return + } + w.cancel() - w.publisherLock.Lock() close(w.publisher) w.publisherClosed = true w.publisherLock.Unlock() @@ -98,6 +129,8 @@ func (w *PubSub) Subscribe() (<-chan Event, CancelFunc) { w.subscriberLock.Lock() delete(w.subscriber, id) w.subscriberLock.Unlock() + + close(l) } return l, unsubscribe diff --git a/event/process.go b/event/process.go new file mode 100644 index 00000000..3710f907 --- /dev/null +++ b/event/process.go @@ -0,0 +1,104 @@ +package event + +import ( + "time" +) + +type ProcessEvent struct { + ProcessID string + Domain string + Type string + Line string + Progress *ProcessProgress + Timestamp time.Time +} + +func (e *ProcessEvent) Clone() Event { + evt := &ProcessEvent{ + ProcessID: e.ProcessID, + Domain: e.Domain, + Type: e.Type, + Line: e.Line, + Timestamp: e.Timestamp, + } + + if e.Progress != nil { + evt.Progress = e.Progress.Clone() + } + + return evt +} + +func NewProcessLogEvent(logline string) *ProcessEvent { + return &ProcessEvent{ + Type: "line", + Line: logline, + Timestamp: time.Now(), + } +} + +func NewProcessProgressEvent(progress *ProcessProgress) *ProcessEvent { + return &ProcessEvent{ + Type: "progress", + Progress: progress, + Timestamp: time.Now(), + } +} + +type ProcessProgressInput struct { + Bitrate float64 + FPS float64 + Looping bool + Enc uint64 + Drop uint64 + Dup uint64 +} + +func (p *ProcessProgressInput) Clone() ProcessProgressInput { + c := ProcessProgressInput{ + Bitrate: p.Bitrate, + FPS: p.FPS, + Looping: p.Looping, + Enc: p.Enc, + Drop: p.Drop, + Dup: p.Dup, + } + + return c +} + +type ProcessProgressOutput struct { + Bitrate float64 + FPS float64 +} + +func (p *ProcessProgressOutput) Clone() ProcessProgressOutput { + c := ProcessProgressOutput{ + Bitrate: p.Bitrate, + FPS: p.FPS, + } + + return c +} + +type ProcessProgress struct { + Input []ProcessProgressInput + Output []ProcessProgressOutput + Time float64 +} + +func (p *ProcessProgress) Clone() *ProcessProgress { + c := ProcessProgress{} + + for _, io := range p.Input { + c.Input = append(c.Input, io.Clone()) + } + + for _, io := range p.Output { + c.Output = append(c.Output, io.Clone()) + } + + c.Time = p.Time + + return &c +} diff --git a/ffmpeg/parse/parser.go b/ffmpeg/parse/parser.go index 45350964..f8a073dc 100644 --- a/ffmpeg/parse/parser.go +++ b/ffmpeg/parse/parser.go @@ -10,6 +10,7 @@ import ( "time" "github.com/datarhei/core/v16/encoding/json" + "github.com/datarhei/core/v16/event" "github.com/datarhei/core/v16/ffmpeg/prelude" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/net/url" @@ -42,6 +43,10 @@ type Parser interface { // ImportReportHistory imports a report history from another parser ImportReportHistory([]ReportHistoryEntry) + + Events() (<-chan event.Event, event.CancelFunc, error) + + Destroy() } // Config is the config for the Parser implementation @@ -125,6 +130,8 @@ type parser struct { log sync.RWMutex logHistory sync.RWMutex } + + events *event.PubSub } // New returns a Parser that satisfies the Parser interface @@ -135,6 +142,7 @@ func New(config Config) Parser { logMinimalHistoryLength: config.LogMinimalHistory, logger: config.Logger, collector: config.Collector, + events: event.NewPubSub(), } if p.logger == nil { @@ -319,6 +327,8 @@ func (p *parser) Parse(line []byte) uint64 { // Write the current non-progress line to the log p.addLog(stringLine) + p.events.Publish(event.NewProcessLogEvent(stringLine)) + p.lock.prelude.Lock() if !p.prelude.done { if len(p.prelude.data) < p.prelude.headLines { @@ -457,6 +467,36 @@ func (p *parser) Parse(line []byte) uint64 { } } + progress := p.assembleProgress() + evt := &event.ProcessProgress{ + Time: progress.Time, + } + + for _, io := range progress.Input { + input := event.ProcessProgressInput{ + Bitrate: io.Bitrate, + FPS: io.FPS, + } + + if io.AVstream != nil { + input.Looping = io.AVstream.Looping + input.Enc = io.AVstream.Enc + input.Drop = io.AVstream.Drop + input.Dup = io.AVstream.Dup + } + + evt.Input = append(evt.Input, input) + } + + for _, io := range progress.Output { + evt.Output = append(evt.Output, event.ProcessProgressOutput{ + Bitrate: io.Bitrate, + FPS: io.FPS, + }) + } + + p.events.Publish(event.NewProcessProgressEvent(evt)) + // Calculate if any of the processed frames staled. // If one number of frames in an output is the same as before, then pFrames becomes 0. pFrames := p.stats.main.diff.frame @@ -539,9 +579,10 @@ func (p *parser) parseFFmpegIO(kind string, line []byte) error { } } - if kind == "input" { + switch kind { + case "input": p.process.input = processIO - } else if kind == "output" { + case "output": p.process.output = processIO } @@ -644,10 +685,7 @@ func (p *parser) Stop(state string, pusage process.Usage) { p.storeReportHistory(state, usage) } -func (p *parser) Progress() Progress { - p.lock.progress.RLock() - defer p.lock.progress.RUnlock() - +func (p *parser) assembleProgress() Progress { progress := p.process.export() p.progress.ffmpeg.exportTo(&progress) @@ -666,6 +704,13 @@ func (p *parser) Progress() Progress { return progress } +func (p *parser) Progress() Progress { + p.lock.progress.RLock() + defer p.lock.progress.RUnlock() + + return p.assembleProgress() +} + func (p *parser) IsRunning() bool { p.lock.progress.RLock() defer p.lock.progress.RUnlock() @@ -1000,3 +1045,13 @@ func (p *parser) ImportReportHistory(history []ReportHistoryEntry) { p.logHistory = p.logHistory.Next() } } + +func (p *parser) Events() (<-chan event.Event, event.CancelFunc, error) { + ch, cancel := p.events.Subscribe() + + return ch, cancel, nil +} + +func (p *parser) Destroy() { + p.events.Close() +} diff --git a/http/api/event.go b/http/api/event.go index 49b2f494..2e783510 100644 --- a/http/api/event.go +++ b/http/api/event.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/datarhei/core/v16/encoding/json" + "github.com/datarhei/core/v16/event" "github.com/datarhei/core/v16/log" ) @@ -108,7 +109,7 @@ type LogEventFilter struct { reData map[string]*regexp.Regexp } -type EventFilters struct { +type LogEventFilters struct { Filters []LogEventFilter `json:"filters"` } @@ -169,3 +170,156 @@ type MediaEvent struct { Names []string `json:"names,omitempty"` Timestamp int64 `json:"ts"` } + +func (p *MediaEvent) Unmarshal(e event.Event) bool { + evt, ok := e.(*event.MediaEvent) + if !ok { + return false + } + + p.Action = evt.Action + p.Name = evt.Name + p.Names = nil + p.Timestamp = evt.Timestamp.UnixMilli() + + return true +} + +type ProcessEvent struct { + ProcessID string `json:"pid"` + Domain string `json:"domain"` + Type string `json:"type"` + Line string `json:"line,omitempty"` + Progress *ProcessProgress `json:"progress,omitempty"` + Timestamp int64 `json:"ts"` +} + +type ProcessProgressInput struct { + Bitrate json.Number `json:"bitrate" swaggertype:"number" jsonschema:"type=number"` + FPS json.Number `json:"fps" swaggertype:"number" jsonschema:"type=number"` + Looping bool `json:"looping"` + Enc uint64 `json:"enc"` + Drop uint64 `json:"drop"` + Dup uint64 `json:"dup"` +} + +type ProcessProgressOutput struct { + Bitrate json.Number `json:"bitrate" swaggertype:"number" jsonschema:"type=number"` + FPS json.Number `json:"fps" swaggertype:"number" jsonschema:"type=number"` +} + +type ProcessProgress struct { + Input []ProcessProgressInput `json:"input"` + Output []ProcessProgressOutput `json:"output"` + Time json.Number `json:"time" swaggertype:"number" jsonschema:"type=number"` +} + +func (p *ProcessProgress) Unmarshal(e *event.ProcessProgress) { + for _, io := range e.Input { + p.Input = append(p.Input, ProcessProgressInput{ + Bitrate: json.ToNumber(io.Bitrate), + FPS: json.ToNumber(io.FPS), + Looping: io.Looping, + Enc: io.Enc, + Drop: io.Drop, + Dup: io.Dup, + }) + } + + for _, io := range e.Output { + p.Output = append(p.Output, ProcessProgressOutput{ + Bitrate: json.ToNumber(io.Bitrate), + FPS: json.ToNumber(io.FPS), + }) + } + + p.Time = json.ToNumber(e.Time) +} + +func (p *ProcessEvent) Unmarshal(e event.Event) bool { + evt, ok := e.(*event.ProcessEvent) + if !ok { + return false + } + + p.ProcessID = evt.ProcessID + p.Domain = evt.Domain + p.Type = evt.Type + p.Line = evt.Line + if evt.Progress == nil { + p.Progress = nil + } else { + p.Progress = &ProcessProgress{} + p.Progress.Unmarshal(evt.Progress) + } + p.Timestamp = evt.Timestamp.UnixMilli() + + return true +} + +func (e *ProcessEvent) Filter(ef *ProcessEventFilter) bool { + if ef.reProcessID != nil { + if !ef.reProcessID.MatchString(e.ProcessID) { + return false + } + } + + if ef.reDomain != nil { + if !ef.reDomain.MatchString(e.Domain) { + return false + } + } + + if ef.reType != nil { + if !ef.reType.MatchString(e.Type) { + return false + } + } + + return true +} + +type ProcessEventFilter struct { + ProcessID string `json:"pid"` + Domain string `json:"domain"` + Type string `json:"type"` + + reProcessID *regexp.Regexp + reDomain *regexp.Regexp + reType *regexp.Regexp +} + +type ProcessEventFilters struct { + Filters []ProcessEventFilter `json:"filters"` +} + +func (ef *ProcessEventFilter) Compile() error { + if len(ef.ProcessID) != 0 { + r, err := regexp.Compile("(?i)" + ef.ProcessID) + if err != nil { + return err + } + + ef.reProcessID = r + } + + if len(ef.Domain) != 0 { + r, err := regexp.Compile("(?i)" + ef.Domain) + if err != nil { + return err + } + + ef.reDomain = r + } + + if len(ef.Type) != 0 { + r, err := regexp.Compile("(?i)" + ef.Type) + if err != nil { + return err + } + + ef.reType = r + } + + return nil +} diff --git a/http/client/client.go b/http/client/client.go index 0016260e..542c85e3 100644 --- a/http/client/client.go +++ b/http/client/client.go @@ -57,7 +57,7 @@ type RestClient interface { FilesystemDeleteFile(storage, path string) error // DELETE /v3/fs/{storage}/{path} FilesystemAddFile(storage, path string, data io.Reader) error // PUT /v3/fs/{storage}/{path} - Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) // POST /v3/events + Events(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) // POST /v3/events MediaEvents(ctx context.Context, storage, pattern string) (<-chan api.MediaEvent, error) // GET /v3/events/media/{storage} ProcessList(opts ProcessListOptions) ([]api.Process, error) // GET /v3/process diff --git a/http/client/events.go b/http/client/events.go index 9ac5ef82..f5fd7d41 100644 --- a/http/client/events.go +++ b/http/client/events.go @@ -11,7 +11,7 @@ import ( "github.com/datarhei/core/v16/mem" ) -func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) { +func (r *restclient) Events(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) { buf := mem.Get() defer mem.Put(buf) diff --git a/http/handler/api/cluster_events.go b/http/handler/api/cluster_events.go index 471ae5ac..a659b534 100644 --- a/http/handler/api/cluster_events.go +++ b/http/handler/api/cluster_events.go @@ -21,12 +21,12 @@ import ( // @Accept json // @Produce text/event-stream // @Produce json-stream -// @Param filters body api.EventFilters false "Event filters" +// @Param filters body api.LogEventFilters false "Event filters" // @Success 200 {object} api.LogEvent // @Security ApiKeyAuth // @Router /api/v3/cluster/events [post] func (h *ClusterHandler) Events(c echo.Context) error { - filters := api.EventFilters{} + filters := api.LogEventFilters{} if err := util.ShouldBindJSON(c, &filters); err != nil { return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) @@ -67,7 +67,7 @@ func (h *ClusterHandler) Events(c echo.Context) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - evts, err := h.proxy.Events(ctx, filters) + evts, err := h.proxy.LogEvents(ctx, filters) if err != nil { return api.Err(http.StatusInternalServerError, "", "%s", err.Error()) } diff --git a/http/handler/api/events.go b/http/handler/api/events.go index 15e7cd22..249c49f5 100644 --- a/http/handler/api/events.go +++ b/http/handler/api/events.go @@ -2,6 +2,7 @@ package api import ( "net/http" + goslices "slices" "strings" "sync" "time" @@ -19,9 +20,10 @@ import ( // The EventsHandler type provides handler functions for retrieving event. type EventsHandler struct { - logs log.ChannelWriter - media map[string]event.MediaSource - lock sync.Mutex + logs log.ChannelWriter + media map[string]event.MediaSource + process event.EventSource + lock sync.Mutex } // NewEvents returns a new EventsHandler type @@ -43,20 +45,31 @@ func (h *EventsHandler) AddMediaSource(name string, source event.MediaSource) { h.media[name] = source } +func (h *EventsHandler) SetProcessSource(source event.EventSource) { + if source == nil { + return + } + + h.lock.Lock() + defer h.lock.Unlock() + + h.process = source +} + // LogEvents returns a stream of event // @Summary Stream of log events // @Description Stream of log event of whats happening in the core -// @ID events-3-media +// @ID events-3-log // @Tags v16.?.? // @Accept json // @Produce text/event-stream // @Produce json-stream -// @Param filters body api.EventFilters false "Event filters" -// @Success 200 {object} api.MediaEvent +// @Param filters body api.LogEventFilters false "Event filters" +// @Success 200 {object} api.LogEvent // @Security ApiKeyAuth // @Router /api/v3/events [post] func (h *EventsHandler) LogEvents(c echo.Context) error { - filters := api.EventFilters{} + filters := api.LogEventFilters{} if err := util.ShouldBindJSON(c, &filters); err != nil { return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) @@ -174,15 +187,15 @@ func (h *EventsHandler) LogEvents(c echo.Context) error { } } -// LogEvents returns a stream of media event +// MediaEvents returns a stream of media event // @Summary Stream of media events -// @Description Stream of media event of whats happening in the core -// @ID events-3-log +// @Description Stream of media event of whats happening in the filesystems +// @ID events-3-media // @Tags v16.?.? // @Accept json // @Param glob query string false "glob pattern for media names" // @Produce json-stream -// @Success 200 {object} api.LogEvent +// @Success 200 {object} api.MediaEvent // @Security ApiKeyAuth // @Router /api/v3/events/media/{type} [post] func (h *EventsHandler) MediaEvents(c echo.Context) error { @@ -268,6 +281,8 @@ func (h *EventsHandler) MediaEvents(c echo.Context) error { } res.Flush() + event := api.MediaEvent{} + for { select { case err := <-done: @@ -282,18 +297,110 @@ func (h *EventsHandler) MediaEvents(c echo.Context) error { done <- err } res.Flush() - case evt := <-evts: - e := evt.(*event.MediaEvent) + case e := <-evts: + if !event.Unmarshal(e) { + continue + } + if compiledPattern != nil { - if !compiledPattern.Match(e.Name) { + if !compiledPattern.Match(event.Name) { continue } } - if err := enc.Encode(api.MediaEvent{ - Action: e.Action, - Name: e.Name, - Timestamp: e.Timestamp.UnixMilli(), - }); err != nil { + + if err := enc.Encode(event); err != nil { + done <- err + } + res.Flush() + } + } +} + +// ProcessEvents returns a stream of process event +// @Summary Stream of process events +// @Description Stream of process event of whats happening in the processes +// @ID events-3-process +// @Tags v16.?.? +// @Accept json +// @Produce json-stream +// @Param filters body api.ProcessEventFilters false "Event filters" +// @Success 200 {object} api.ProcessEvent +// @Security ApiKeyAuth +// @Router /api/v3/events/process [post] +func (h *EventsHandler) ProcessEvents(c echo.Context) error { + filters := api.ProcessEventFilters{} + + if err := util.ShouldBindJSON(c, &filters); err != nil { + return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) + } + + filter := []*api.ProcessEventFilter{} + + for _, f := range filters.Filters { + f := f + + if err := f.Compile(); err != nil { + return api.Err(http.StatusBadRequest, "", "invalid filter: %s", err.Error()) + } + + filter = append(filter, &f) + } + + keepaliveTicker := time.NewTicker(5 * time.Second) + defer keepaliveTicker.Stop() + + req := c.Request() + reqctx := req.Context() + + contentType := "application/x-json-stream" + + evts, cancel, err := h.process.Events() + if err != nil { + return api.Err(http.StatusNotImplemented, "", "events are not implemented for this server") + } + defer cancel() + + res := c.Response() + + res.Header().Set(echo.HeaderContentType, contentType+"; charset=UTF-8") + res.Header().Set(echo.HeaderCacheControl, "no-store") + res.Header().Set(echo.HeaderConnection, "close") + res.WriteHeader(http.StatusOK) + + enc := json.NewEncoder(res) + enc.SetIndent("", "") + + done := make(chan error, 1) + + filterEvent := func(event *api.ProcessEvent) bool { + if len(filter) == 0 { + return true + } + + return goslices.ContainsFunc(filter, event.Filter) + } + + event := api.ProcessEvent{} + + for { + select { + case err := <-done: + return err + case <-reqctx.Done(): + done <- nil + case <-keepaliveTicker.C: + res.Write([]byte("{\"type\":\"keepalive\"}\n")) + res.Flush() + case e := <-evts: + if !event.Unmarshal(e) { + continue + } + + if !filterEvent(&event) { + continue + } + + if err := enc.Encode(event); err != nil { done <- err } res.Flush() diff --git a/http/server.go b/http/server.go index c0fa2efd..c92b2a11 100644 --- a/http/server.go +++ b/http/server.go @@ -288,6 +288,7 @@ func NewServer(config Config) (serverhandler.Server, error) { } s.v3handler.events.AddMediaSource("srt", config.SRT) s.v3handler.events.AddMediaSource("rtmp", config.RTMP) + s.v3handler.events.SetProcessSource(config.Restream) if config.Restream != nil { s.v3handler.process = api.NewProcess( @@ -816,5 +817,6 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.POST("/events", s.v3handler.events.LogEvents) v3.POST("/events/log", s.v3handler.events.LogEvents) v3.POST("/events/media/:type", s.v3handler.events.MediaEvents) + v3.POST("/events/process", s.v3handler.events.ProcessEvents) } } diff --git a/restream/core.go b/restream/core.go index eb4327cb..81167ef9 100644 --- a/restream/core.go +++ b/restream/core.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/datarhei/core/v16/event" "github.com/datarhei/core/v16/ffmpeg" "github.com/datarhei/core/v16/ffmpeg/skills" "github.com/datarhei/core/v16/glob" @@ -59,6 +60,8 @@ type Restreamer interface { Probe(config *app.Config, timeout time.Duration) app.Probe // Probe a process with specific timeout Validate(config *app.Config) error // Validate a process config + + Events() (<-chan event.Event, event.CancelFunc, error) } // Config is the required configuration for a new restreamer instance. @@ -101,6 +104,8 @@ type restream struct { startOnce sync.Once stopOnce sync.Once + + events *event.PubSub } // New returns a new instance that implements the Restreamer interface @@ -115,6 +120,7 @@ func New(config Config) (Restreamer, error) { logger: config.Logger, tasks: NewStorage(), metadata: map[string]interface{}{}, + events: event.NewPubSub(), } if r.logger == nil { @@ -462,6 +468,18 @@ func (r *restream) load() error { t.ffmpeg = ffmpeg + r.events.Consume(t.parser, func(e event.Event) event.Event { + pe, ok := e.(*event.ProcessEvent) + if !ok { + return e + } + + pe.ProcessID = t.process.ID + pe.Domain = t.process.Domain + + return pe + }) + return true }) @@ -656,6 +674,18 @@ func (r *restream) createTask(config *app.Config) (*task, error) { t.ffmpeg = ffmpeg + r.events.Consume(t.parser, func(e event.Event) event.Event { + pe, ok := e.(*event.ProcessEvent) + if !ok { + return e + } + + pe.ProcessID = t.process.ID + pe.Domain = t.process.Domain + + return pe + }) + return t, nil } @@ -1979,3 +2009,9 @@ func (r *restream) Validate(config *app.Config) error { return nil } + +func (r *restream) Events() (<-chan event.Event, event.CancelFunc, error) { + ch, cancel := r.events.Subscribe() + + return ch, cancel, nil +} diff --git a/restream/task.go b/restream/task.go index 4b171436..cc98c5f0 100644 --- a/restream/task.go +++ b/restream/task.go @@ -5,6 +5,7 @@ import ( "sync/atomic" "time" + "github.com/datarhei/core/v16/event" "github.com/datarhei/core/v16/ffmpeg/parse" "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/log" @@ -326,6 +327,8 @@ func (t *task) Config() *app.Config { func (t *task) Destroy() { t.Stop() + + t.parser.Destroy() } func (t *task) Match(id, reference, owner, domain glob.Glob) bool { @@ -378,3 +381,7 @@ func (t *task) ExportParserReportHistory() []parse.ReportHistoryEntry { func (t *task) ImportParserReportHistory(report []parse.ReportHistoryEntry) { t.parser.ImportReportHistory(report) } + +func (t *task) Events() (<-chan event.Event, event.CancelFunc, error) { + return t.parser.Events() +}