Add process events

This commit is contained in:
Ingo Oppermann 2025-12-02 17:08:05 +01:00
parent 9af53917f4
commit 0431b6f8c4
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
16 changed files with 954 additions and 84 deletions

View File

@ -933,7 +933,7 @@ func (n *Core) ClusterProcessList() ([]Process, error) {
return processes, nil
}
func (n *Core) Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) {
func (n *Core) Events(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) {
n.lock.RLock()
client := n.client
n.lock.RUnlock()

View File

@ -655,7 +655,7 @@ func (p *Manager) ProcessValidateConfig(nodeid string, config *app.Config) error
return node.Core().ProcessValidateConfig(config)
}
func (p *Manager) Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) {
func (p *Manager) LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) {
eventChan := make(chan api.LogEvent, 128)
p.lock.RLock()

View File

@ -577,7 +577,7 @@ const docTemplate = `{
"name": "filters",
"in": "body",
"schema": {
"$ref": "#/definitions/api.EventFilters"
"$ref": "#/definitions/api.LogEventFilters"
}
}
],
@ -2582,14 +2582,14 @@ const docTemplate = `{
"v16.?.?"
],
"summary": "Stream of log events",
"operationId": "events-3-media",
"operationId": "events-3-log",
"parameters": [
{
"description": "Event filters",
"name": "filters",
"in": "body",
"schema": {
"$ref": "#/definitions/api.EventFilters"
"$ref": "#/definitions/api.LogEventFilters"
}
}
],
@ -2597,7 +2597,7 @@ const docTemplate = `{
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.MediaEvent"
"$ref": "#/definitions/api.LogEvent"
}
}
}
@ -2610,7 +2610,7 @@ const docTemplate = `{
"ApiKeyAuth": []
}
],
"description": "Stream of media event of whats happening in the core",
"description": "Stream of media event of whats happening in the filesystems",
"consumes": [
"application/json"
],
@ -2621,7 +2621,7 @@ const docTemplate = `{
"v16.?.?"
],
"summary": "Stream of media events",
"operationId": "events-3-log",
"operationId": "events-3-media",
"parameters": [
{
"type": "string",
@ -2634,7 +2634,46 @@ const docTemplate = `{
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.LogEvent"
"$ref": "#/definitions/api.MediaEvent"
}
}
}
}
},
"/api/v3/events/process": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Stream of process event of whats happening in the processes",
"consumes": [
"application/json"
],
"produces": [
"application/x-json-stream"
],
"tags": [
"v16.?.?"
],
"summary": "Stream of process events",
"operationId": "events-3-process",
"parameters": [
{
"description": "Event filters",
"name": "filters",
"in": "body",
"schema": {
"$ref": "#/definitions/api.ProcessEventFilters"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.ProcessEvent"
}
}
}
@ -6775,17 +6814,6 @@ const docTemplate = `{
}
}
},
"api.EventFilters": {
"type": "object",
"properties": {
"filters": {
"type": "array",
"items": {
"$ref": "#/definitions/api.LogEventFilter"
}
}
}
},
"api.FileInfo": {
"type": "object",
"properties": {
@ -7170,6 +7198,17 @@ const docTemplate = `{
}
}
},
"api.LogEventFilters": {
"type": "object",
"properties": {
"filters": {
"type": "array",
"items": {
"$ref": "#/definitions/api.LogEventFilter"
}
}
}
},
"api.Login": {
"type": "object",
"required": [
@ -7690,6 +7729,54 @@ const docTemplate = `{
}
}
},
"api.ProcessEvent": {
"type": "object",
"properties": {
"domain": {
"type": "string"
},
"line": {
"type": "string"
},
"pid": {
"type": "string"
},
"progress": {
"$ref": "#/definitions/api.ProcessProgress"
},
"ts": {
"type": "integer"
},
"type": {
"type": "string"
}
}
},
"api.ProcessEventFilter": {
"type": "object",
"properties": {
"domain": {
"type": "string"
},
"pid": {
"type": "string"
},
"type": {
"type": "string"
}
}
},
"api.ProcessEventFilters": {
"type": "object",
"properties": {
"filters": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ProcessEventFilter"
}
}
}
},
"api.ProcessID": {
"type": "object",
"properties": {
@ -7701,6 +7788,60 @@ const docTemplate = `{
}
}
},
"api.ProcessProgress": {
"type": "object",
"properties": {
"input": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ProcessProgressInput"
}
},
"output": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ProcessProgressOutput"
}
},
"time": {
"type": "number"
}
}
},
"api.ProcessProgressInput": {
"type": "object",
"properties": {
"bitrate": {
"type": "number"
},
"drop": {
"type": "integer"
},
"dup": {
"type": "integer"
},
"enc": {
"type": "integer"
},
"fps": {
"type": "number"
},
"looping": {
"type": "boolean"
}
}
},
"api.ProcessProgressOutput": {
"type": "object",
"properties": {
"bitrate": {
"type": "number"
},
"fps": {
"type": "number"
}
}
},
"api.ProcessReport": {
"type": "object",
"properties": {

View File

@ -570,7 +570,7 @@
"name": "filters",
"in": "body",
"schema": {
"$ref": "#/definitions/api.EventFilters"
"$ref": "#/definitions/api.LogEventFilters"
}
}
],
@ -2575,14 +2575,14 @@
"v16.?.?"
],
"summary": "Stream of log events",
"operationId": "events-3-media",
"operationId": "events-3-log",
"parameters": [
{
"description": "Event filters",
"name": "filters",
"in": "body",
"schema": {
"$ref": "#/definitions/api.EventFilters"
"$ref": "#/definitions/api.LogEventFilters"
}
}
],
@ -2590,7 +2590,7 @@
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.MediaEvent"
"$ref": "#/definitions/api.LogEvent"
}
}
}
@ -2603,7 +2603,7 @@
"ApiKeyAuth": []
}
],
"description": "Stream of media event of whats happening in the core",
"description": "Stream of media event of whats happening in the filesystems",
"consumes": [
"application/json"
],
@ -2614,7 +2614,7 @@
"v16.?.?"
],
"summary": "Stream of media events",
"operationId": "events-3-log",
"operationId": "events-3-media",
"parameters": [
{
"type": "string",
@ -2627,7 +2627,46 @@
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.LogEvent"
"$ref": "#/definitions/api.MediaEvent"
}
}
}
}
},
"/api/v3/events/process": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Stream of process event of whats happening in the processes",
"consumes": [
"application/json"
],
"produces": [
"application/x-json-stream"
],
"tags": [
"v16.?.?"
],
"summary": "Stream of process events",
"operationId": "events-3-process",
"parameters": [
{
"description": "Event filters",
"name": "filters",
"in": "body",
"schema": {
"$ref": "#/definitions/api.ProcessEventFilters"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.ProcessEvent"
}
}
}
@ -6768,17 +6807,6 @@
}
}
},
"api.EventFilters": {
"type": "object",
"properties": {
"filters": {
"type": "array",
"items": {
"$ref": "#/definitions/api.LogEventFilter"
}
}
}
},
"api.FileInfo": {
"type": "object",
"properties": {
@ -7163,6 +7191,17 @@
}
}
},
"api.LogEventFilters": {
"type": "object",
"properties": {
"filters": {
"type": "array",
"items": {
"$ref": "#/definitions/api.LogEventFilter"
}
}
}
},
"api.Login": {
"type": "object",
"required": [
@ -7683,6 +7722,54 @@
}
}
},
"api.ProcessEvent": {
"type": "object",
"properties": {
"domain": {
"type": "string"
},
"line": {
"type": "string"
},
"pid": {
"type": "string"
},
"progress": {
"$ref": "#/definitions/api.ProcessProgress"
},
"ts": {
"type": "integer"
},
"type": {
"type": "string"
}
}
},
"api.ProcessEventFilter": {
"type": "object",
"properties": {
"domain": {
"type": "string"
},
"pid": {
"type": "string"
},
"type": {
"type": "string"
}
}
},
"api.ProcessEventFilters": {
"type": "object",
"properties": {
"filters": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ProcessEventFilter"
}
}
}
},
"api.ProcessID": {
"type": "object",
"properties": {
@ -7694,6 +7781,60 @@
}
}
},
"api.ProcessProgress": {
"type": "object",
"properties": {
"input": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ProcessProgressInput"
}
},
"output": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ProcessProgressOutput"
}
},
"time": {
"type": "number"
}
}
},
"api.ProcessProgressInput": {
"type": "object",
"properties": {
"bitrate": {
"type": "number"
},
"drop": {
"type": "integer"
},
"dup": {
"type": "integer"
},
"enc": {
"type": "integer"
},
"fps": {
"type": "number"
},
"looping": {
"type": "boolean"
}
}
},
"api.ProcessProgressOutput": {
"type": "object",
"properties": {
"bitrate": {
"type": "number"
},
"fps": {
"type": "number"
}
}
},
"api.ProcessReport": {
"type": "object",
"properties": {

View File

@ -911,13 +911,6 @@ definitions:
message:
type: string
type: object
api.EventFilters:
properties:
filters:
items:
$ref: '#/definitions/api.LogEventFilter'
type: array
type: object
api.FileInfo:
properties:
core_id:
@ -1172,6 +1165,13 @@ definitions:
message:
type: string
type: object
api.LogEventFilters:
properties:
filters:
items:
$ref: '#/definitions/api.LogEventFilter'
type: array
type: object
api.Login:
properties:
password:
@ -1528,6 +1528,37 @@ definitions:
format: uint64
type: integer
type: object
api.ProcessEvent:
properties:
domain:
type: string
line:
type: string
pid:
type: string
progress:
$ref: '#/definitions/api.ProcessProgress'
ts:
type: integer
type:
type: string
type: object
api.ProcessEventFilter:
properties:
domain:
type: string
pid:
type: string
type:
type: string
type: object
api.ProcessEventFilters:
properties:
filters:
items:
$ref: '#/definitions/api.ProcessEventFilter'
type: array
type: object
api.ProcessID:
properties:
domain:
@ -1535,6 +1566,41 @@ definitions:
id:
type: string
type: object
api.ProcessProgress:
properties:
input:
items:
$ref: '#/definitions/api.ProcessProgressInput'
type: array
output:
items:
$ref: '#/definitions/api.ProcessProgressOutput'
type: array
time:
type: number
type: object
api.ProcessProgressInput:
properties:
bitrate:
type: number
drop:
type: integer
dup:
type: integer
enc:
type: integer
fps:
type: number
looping:
type: boolean
type: object
api.ProcessProgressOutput:
properties:
bitrate:
type: number
fps:
type: number
type: object
api.ProcessReport:
properties:
created_at:
@ -3296,7 +3362,7 @@ paths:
in: body
name: filters
schema:
$ref: '#/definitions/api.EventFilters'
$ref: '#/definitions/api.LogEventFilters'
produces:
- text/event-stream
- application/x-json-stream
@ -4612,13 +4678,13 @@ paths:
consumes:
- application/json
description: Stream of log event of whats happening in the core
operationId: events-3-media
operationId: events-3-log
parameters:
- description: Event filters
in: body
name: filters
schema:
$ref: '#/definitions/api.EventFilters'
$ref: '#/definitions/api.LogEventFilters'
produces:
- text/event-stream
- application/x-json-stream
@ -4626,7 +4692,7 @@ paths:
"200":
description: OK
schema:
$ref: '#/definitions/api.MediaEvent'
$ref: '#/definitions/api.LogEvent'
security:
- ApiKeyAuth: []
summary: Stream of log events
@ -4636,8 +4702,8 @@ paths:
post:
consumes:
- application/json
description: Stream of media event of whats happening in the core
operationId: events-3-log
description: Stream of media event of whats happening in the filesystems
operationId: events-3-media
parameters:
- description: glob pattern for media names
in: query
@ -4649,12 +4715,36 @@ paths:
"200":
description: OK
schema:
$ref: '#/definitions/api.LogEvent'
$ref: '#/definitions/api.MediaEvent'
security:
- ApiKeyAuth: []
summary: Stream of media events
tags:
- v16.?.?
/api/v3/events/process:
post:
consumes:
- application/json
description: Stream of process event of whats happening in the processes
operationId: events-3-process
parameters:
- description: Event filters
in: body
name: filters
schema:
$ref: '#/definitions/api.ProcessEventFilters'
produces:
- application/x-json-stream
responses:
"200":
description: OK
schema:
$ref: '#/definitions/api.ProcessEvent'
security:
- ApiKeyAuth: []
summary: Stream of process events
tags:
- v16.?.?
/api/v3/fs:
get:
description: Listall registered filesystems

View File

@ -44,6 +44,32 @@ func NewPubSub() *PubSub {
return w
}
func (w *PubSub) Consume(s EventSource, rewrite func(e Event) Event) {
ch, cancel, err := s.Events()
if err != nil {
return
}
if rewrite == nil {
rewrite = func(e Event) Event { return e }
}
go func(ch <-chan Event, cancel CancelFunc) {
for {
select {
case <-w.ctx.Done():
cancel()
return
case e, ok := <-ch:
if !ok {
return
}
w.Publish(rewrite(e))
}
}
}(ch, cancel)
}
func (w *PubSub) Publish(e Event) error {
event := e.Clone()
@ -64,9 +90,14 @@ func (w *PubSub) Publish(e Event) error {
}
func (w *PubSub) Close() {
w.publisherLock.Lock()
if w.publisherClosed {
w.publisherLock.Unlock()
return
}
w.cancel()
w.publisherLock.Lock()
close(w.publisher)
w.publisherClosed = true
w.publisherLock.Unlock()
@ -98,6 +129,8 @@ func (w *PubSub) Subscribe() (<-chan Event, CancelFunc) {
w.subscriberLock.Lock()
delete(w.subscriber, id)
w.subscriberLock.Unlock()
close(l)
}
return l, unsubscribe

104
event/process.go Normal file
View File

@ -0,0 +1,104 @@
package event
import (
"time"
)
type ProcessEvent struct {
ProcessID string
Domain string
Type string
Line string
Progress *ProcessProgress
Timestamp time.Time
}
func (e *ProcessEvent) Clone() Event {
evt := &ProcessEvent{
ProcessID: e.ProcessID,
Domain: e.Domain,
Type: e.Type,
Line: e.Line,
Timestamp: e.Timestamp,
}
if e.Progress != nil {
evt.Progress = e.Progress.Clone()
}
return evt
}
func NewProcessLogEvent(logline string) *ProcessEvent {
return &ProcessEvent{
Type: "line",
Line: logline,
Timestamp: time.Now(),
}
}
func NewProcessProgressEvent(progress *ProcessProgress) *ProcessEvent {
return &ProcessEvent{
Type: "progress",
Progress: progress,
Timestamp: time.Now(),
}
}
type ProcessProgressInput struct {
Bitrate float64
FPS float64
Looping bool
Enc uint64
Drop uint64
Dup uint64
}
func (p *ProcessProgressInput) Clone() ProcessProgressInput {
c := ProcessProgressInput{
Bitrate: p.Bitrate,
FPS: p.FPS,
Looping: p.Looping,
Enc: p.Enc,
Drop: p.Drop,
Dup: p.Dup,
}
return c
}
type ProcessProgressOutput struct {
Bitrate float64
FPS float64
}
func (p *ProcessProgressOutput) Clone() ProcessProgressOutput {
c := ProcessProgressOutput{
Bitrate: p.Bitrate,
FPS: p.FPS,
}
return c
}
type ProcessProgress struct {
Input []ProcessProgressInput
Output []ProcessProgressOutput
Time float64
}
func (p *ProcessProgress) Clone() *ProcessProgress {
c := ProcessProgress{}
for _, io := range p.Input {
c.Input = append(c.Input, io.Clone())
}
for _, io := range p.Output {
c.Output = append(c.Output, io.Clone())
}
c.Time = p.Time
return &c
}

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/event"
"github.com/datarhei/core/v16/ffmpeg/prelude"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/net/url"
@ -42,6 +43,10 @@ type Parser interface {
// ImportReportHistory imports a report history from another parser
ImportReportHistory([]ReportHistoryEntry)
Events() (<-chan event.Event, event.CancelFunc, error)
Destroy()
}
// Config is the config for the Parser implementation
@ -125,6 +130,8 @@ type parser struct {
log sync.RWMutex
logHistory sync.RWMutex
}
events *event.PubSub
}
// New returns a Parser that satisfies the Parser interface
@ -135,6 +142,7 @@ func New(config Config) Parser {
logMinimalHistoryLength: config.LogMinimalHistory,
logger: config.Logger,
collector: config.Collector,
events: event.NewPubSub(),
}
if p.logger == nil {
@ -319,6 +327,8 @@ func (p *parser) Parse(line []byte) uint64 {
// Write the current non-progress line to the log
p.addLog(stringLine)
p.events.Publish(event.NewProcessLogEvent(stringLine))
p.lock.prelude.Lock()
if !p.prelude.done {
if len(p.prelude.data) < p.prelude.headLines {
@ -457,6 +467,36 @@ func (p *parser) Parse(line []byte) uint64 {
}
}
progress := p.assembleProgress()
evt := &event.ProcessProgress{
Time: progress.Time,
}
for _, io := range progress.Input {
input := event.ProcessProgressInput{
Bitrate: io.Bitrate,
FPS: io.FPS,
}
if io.AVstream != nil {
input.Looping = io.AVstream.Looping
input.Enc = io.AVstream.Enc
input.Drop = io.AVstream.Drop
input.Dup = io.AVstream.Dup
}
evt.Input = append(evt.Input, input)
}
for _, io := range progress.Output {
evt.Output = append(evt.Output, event.ProcessProgressOutput{
Bitrate: io.Bitrate,
FPS: io.FPS,
})
}
p.events.Publish(event.NewProcessProgressEvent(evt))
// Calculate if any of the processed frames staled.
// If one number of frames in an output is the same as before, then pFrames becomes 0.
pFrames := p.stats.main.diff.frame
@ -539,9 +579,10 @@ func (p *parser) parseFFmpegIO(kind string, line []byte) error {
}
}
if kind == "input" {
switch kind {
case "input":
p.process.input = processIO
} else if kind == "output" {
case "output":
p.process.output = processIO
}
@ -644,10 +685,7 @@ func (p *parser) Stop(state string, pusage process.Usage) {
p.storeReportHistory(state, usage)
}
func (p *parser) Progress() Progress {
p.lock.progress.RLock()
defer p.lock.progress.RUnlock()
func (p *parser) assembleProgress() Progress {
progress := p.process.export()
p.progress.ffmpeg.exportTo(&progress)
@ -666,6 +704,13 @@ func (p *parser) Progress() Progress {
return progress
}
func (p *parser) Progress() Progress {
p.lock.progress.RLock()
defer p.lock.progress.RUnlock()
return p.assembleProgress()
}
func (p *parser) IsRunning() bool {
p.lock.progress.RLock()
defer p.lock.progress.RUnlock()
@ -1000,3 +1045,13 @@ func (p *parser) ImportReportHistory(history []ReportHistoryEntry) {
p.logHistory = p.logHistory.Next()
}
}
func (p *parser) Events() (<-chan event.Event, event.CancelFunc, error) {
ch, cancel := p.events.Subscribe()
return ch, cancel, nil
}
func (p *parser) Destroy() {
p.events.Close()
}

View File

@ -6,6 +6,7 @@ import (
"strings"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/event"
"github.com/datarhei/core/v16/log"
)
@ -108,7 +109,7 @@ type LogEventFilter struct {
reData map[string]*regexp.Regexp
}
type EventFilters struct {
type LogEventFilters struct {
Filters []LogEventFilter `json:"filters"`
}
@ -169,3 +170,156 @@ type MediaEvent struct {
Names []string `json:"names,omitempty"`
Timestamp int64 `json:"ts"`
}
func (p *MediaEvent) Unmarshal(e event.Event) bool {
evt, ok := e.(*event.MediaEvent)
if !ok {
return false
}
p.Action = evt.Action
p.Name = evt.Name
p.Names = nil
p.Timestamp = evt.Timestamp.UnixMilli()
return true
}
type ProcessEvent struct {
ProcessID string `json:"pid"`
Domain string `json:"domain"`
Type string `json:"type"`
Line string `json:"line,omitempty"`
Progress *ProcessProgress `json:"progress,omitempty"`
Timestamp int64 `json:"ts"`
}
type ProcessProgressInput struct {
Bitrate json.Number `json:"bitrate" swaggertype:"number" jsonschema:"type=number"`
FPS json.Number `json:"fps" swaggertype:"number" jsonschema:"type=number"`
Looping bool `json:"looping"`
Enc uint64 `json:"enc"`
Drop uint64 `json:"drop"`
Dup uint64 `json:"dup"`
}
type ProcessProgressOutput struct {
Bitrate json.Number `json:"bitrate" swaggertype:"number" jsonschema:"type=number"`
FPS json.Number `json:"fps" swaggertype:"number" jsonschema:"type=number"`
}
type ProcessProgress struct {
Input []ProcessProgressInput `json:"input"`
Output []ProcessProgressOutput `json:"output"`
Time json.Number `json:"time" swaggertype:"number" jsonschema:"type=number"`
}
func (p *ProcessProgress) Unmarshal(e *event.ProcessProgress) {
for _, io := range e.Input {
p.Input = append(p.Input, ProcessProgressInput{
Bitrate: json.ToNumber(io.Bitrate),
FPS: json.ToNumber(io.FPS),
Looping: io.Looping,
Enc: io.Enc,
Drop: io.Drop,
Dup: io.Dup,
})
}
for _, io := range e.Output {
p.Output = append(p.Output, ProcessProgressOutput{
Bitrate: json.ToNumber(io.Bitrate),
FPS: json.ToNumber(io.FPS),
})
}
p.Time = json.ToNumber(e.Time)
}
func (p *ProcessEvent) Unmarshal(e event.Event) bool {
evt, ok := e.(*event.ProcessEvent)
if !ok {
return false
}
p.ProcessID = evt.ProcessID
p.Domain = evt.Domain
p.Type = evt.Type
p.Line = evt.Line
if evt.Progress == nil {
p.Progress = nil
} else {
p.Progress = &ProcessProgress{}
p.Progress.Unmarshal(evt.Progress)
}
p.Timestamp = evt.Timestamp.UnixMilli()
return true
}
func (e *ProcessEvent) Filter(ef *ProcessEventFilter) bool {
if ef.reProcessID != nil {
if !ef.reProcessID.MatchString(e.ProcessID) {
return false
}
}
if ef.reDomain != nil {
if !ef.reDomain.MatchString(e.Domain) {
return false
}
}
if ef.reType != nil {
if !ef.reType.MatchString(e.Type) {
return false
}
}
return true
}
type ProcessEventFilter struct {
ProcessID string `json:"pid"`
Domain string `json:"domain"`
Type string `json:"type"`
reProcessID *regexp.Regexp
reDomain *regexp.Regexp
reType *regexp.Regexp
}
type ProcessEventFilters struct {
Filters []ProcessEventFilter `json:"filters"`
}
func (ef *ProcessEventFilter) Compile() error {
if len(ef.ProcessID) != 0 {
r, err := regexp.Compile("(?i)" + ef.ProcessID)
if err != nil {
return err
}
ef.reProcessID = r
}
if len(ef.Domain) != 0 {
r, err := regexp.Compile("(?i)" + ef.Domain)
if err != nil {
return err
}
ef.reDomain = r
}
if len(ef.Type) != 0 {
r, err := regexp.Compile("(?i)" + ef.Type)
if err != nil {
return err
}
ef.reType = r
}
return nil
}

View File

@ -57,7 +57,7 @@ type RestClient interface {
FilesystemDeleteFile(storage, path string) error // DELETE /v3/fs/{storage}/{path}
FilesystemAddFile(storage, path string, data io.Reader) error // PUT /v3/fs/{storage}/{path}
Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) // POST /v3/events
Events(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) // POST /v3/events
MediaEvents(ctx context.Context, storage, pattern string) (<-chan api.MediaEvent, error) // GET /v3/events/media/{storage}
ProcessList(opts ProcessListOptions) ([]api.Process, error) // GET /v3/process

View File

@ -11,7 +11,7 @@ import (
"github.com/datarhei/core/v16/mem"
)
func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-chan api.LogEvent, error) {
func (r *restclient) Events(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) {
buf := mem.Get()
defer mem.Put(buf)

View File

@ -21,12 +21,12 @@ import (
// @Accept json
// @Produce text/event-stream
// @Produce json-stream
// @Param filters body api.EventFilters false "Event filters"
// @Param filters body api.LogEventFilters false "Event filters"
// @Success 200 {object} api.LogEvent
// @Security ApiKeyAuth
// @Router /api/v3/cluster/events [post]
func (h *ClusterHandler) Events(c echo.Context) error {
filters := api.EventFilters{}
filters := api.LogEventFilters{}
if err := util.ShouldBindJSON(c, &filters); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
@ -67,7 +67,7 @@ func (h *ClusterHandler) Events(c echo.Context) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
evts, err := h.proxy.Events(ctx, filters)
evts, err := h.proxy.LogEvents(ctx, filters)
if err != nil {
return api.Err(http.StatusInternalServerError, "", "%s", err.Error())
}

View File

@ -2,6 +2,7 @@ package api
import (
"net/http"
goslices "slices"
"strings"
"sync"
"time"
@ -19,9 +20,10 @@ import (
// The EventsHandler type provides handler functions for retrieving event.
type EventsHandler struct {
logs log.ChannelWriter
media map[string]event.MediaSource
lock sync.Mutex
logs log.ChannelWriter
media map[string]event.MediaSource
process event.EventSource
lock sync.Mutex
}
// NewEvents returns a new EventsHandler type
@ -43,20 +45,31 @@ func (h *EventsHandler) AddMediaSource(name string, source event.MediaSource) {
h.media[name] = source
}
func (h *EventsHandler) SetProcessSource(source event.EventSource) {
if source == nil {
return
}
h.lock.Lock()
defer h.lock.Unlock()
h.process = source
}
// LogEvents returns a stream of event
// @Summary Stream of log events
// @Description Stream of log event of whats happening in the core
// @ID events-3-media
// @ID events-3-log
// @Tags v16.?.?
// @Accept json
// @Produce text/event-stream
// @Produce json-stream
// @Param filters body api.EventFilters false "Event filters"
// @Success 200 {object} api.MediaEvent
// @Param filters body api.LogEventFilters false "Event filters"
// @Success 200 {object} api.LogEvent
// @Security ApiKeyAuth
// @Router /api/v3/events [post]
func (h *EventsHandler) LogEvents(c echo.Context) error {
filters := api.EventFilters{}
filters := api.LogEventFilters{}
if err := util.ShouldBindJSON(c, &filters); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
@ -174,15 +187,15 @@ func (h *EventsHandler) LogEvents(c echo.Context) error {
}
}
// LogEvents returns a stream of media event
// MediaEvents returns a stream of media event
// @Summary Stream of media events
// @Description Stream of media event of whats happening in the core
// @ID events-3-log
// @Description Stream of media event of whats happening in the filesystems
// @ID events-3-media
// @Tags v16.?.?
// @Accept json
// @Param glob query string false "glob pattern for media names"
// @Produce json-stream
// @Success 200 {object} api.LogEvent
// @Success 200 {object} api.MediaEvent
// @Security ApiKeyAuth
// @Router /api/v3/events/media/{type} [post]
func (h *EventsHandler) MediaEvents(c echo.Context) error {
@ -268,6 +281,8 @@ func (h *EventsHandler) MediaEvents(c echo.Context) error {
}
res.Flush()
event := api.MediaEvent{}
for {
select {
case err := <-done:
@ -282,18 +297,110 @@ func (h *EventsHandler) MediaEvents(c echo.Context) error {
done <- err
}
res.Flush()
case evt := <-evts:
e := evt.(*event.MediaEvent)
case e := <-evts:
if !event.Unmarshal(e) {
continue
}
if compiledPattern != nil {
if !compiledPattern.Match(e.Name) {
if !compiledPattern.Match(event.Name) {
continue
}
}
if err := enc.Encode(api.MediaEvent{
Action: e.Action,
Name: e.Name,
Timestamp: e.Timestamp.UnixMilli(),
}); err != nil {
if err := enc.Encode(event); err != nil {
done <- err
}
res.Flush()
}
}
}
// ProcessEvents returns a stream of process event
// @Summary Stream of process events
// @Description Stream of process event of whats happening in the processes
// @ID events-3-process
// @Tags v16.?.?
// @Accept json
// @Produce json-stream
// @Param filters body api.ProcessEventFilters false "Event filters"
// @Success 200 {object} api.ProcessEvent
// @Security ApiKeyAuth
// @Router /api/v3/events/process [post]
func (h *EventsHandler) ProcessEvents(c echo.Context) error {
filters := api.ProcessEventFilters{}
if err := util.ShouldBindJSON(c, &filters); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
}
filter := []*api.ProcessEventFilter{}
for _, f := range filters.Filters {
f := f
if err := f.Compile(); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid filter: %s", err.Error())
}
filter = append(filter, &f)
}
keepaliveTicker := time.NewTicker(5 * time.Second)
defer keepaliveTicker.Stop()
req := c.Request()
reqctx := req.Context()
contentType := "application/x-json-stream"
evts, cancel, err := h.process.Events()
if err != nil {
return api.Err(http.StatusNotImplemented, "", "events are not implemented for this server")
}
defer cancel()
res := c.Response()
res.Header().Set(echo.HeaderContentType, contentType+"; charset=UTF-8")
res.Header().Set(echo.HeaderCacheControl, "no-store")
res.Header().Set(echo.HeaderConnection, "close")
res.WriteHeader(http.StatusOK)
enc := json.NewEncoder(res)
enc.SetIndent("", "")
done := make(chan error, 1)
filterEvent := func(event *api.ProcessEvent) bool {
if len(filter) == 0 {
return true
}
return goslices.ContainsFunc(filter, event.Filter)
}
event := api.ProcessEvent{}
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
case <-keepaliveTicker.C:
res.Write([]byte("{\"type\":\"keepalive\"}\n"))
res.Flush()
case e := <-evts:
if !event.Unmarshal(e) {
continue
}
if !filterEvent(&event) {
continue
}
if err := enc.Encode(event); err != nil {
done <- err
}
res.Flush()

View File

@ -288,6 +288,7 @@ func NewServer(config Config) (serverhandler.Server, error) {
}
s.v3handler.events.AddMediaSource("srt", config.SRT)
s.v3handler.events.AddMediaSource("rtmp", config.RTMP)
s.v3handler.events.SetProcessSource(config.Restream)
if config.Restream != nil {
s.v3handler.process = api.NewProcess(
@ -816,5 +817,6 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.POST("/events", s.v3handler.events.LogEvents)
v3.POST("/events/log", s.v3handler.events.LogEvents)
v3.POST("/events/media/:type", s.v3handler.events.MediaEvents)
v3.POST("/events/process", s.v3handler.events.ProcessEvents)
}
}

View File

@ -10,6 +10,7 @@ import (
"sync"
"time"
"github.com/datarhei/core/v16/event"
"github.com/datarhei/core/v16/ffmpeg"
"github.com/datarhei/core/v16/ffmpeg/skills"
"github.com/datarhei/core/v16/glob"
@ -59,6 +60,8 @@ type Restreamer interface {
Probe(config *app.Config, timeout time.Duration) app.Probe // Probe a process with specific timeout
Validate(config *app.Config) error // Validate a process config
Events() (<-chan event.Event, event.CancelFunc, error)
}
// Config is the required configuration for a new restreamer instance.
@ -101,6 +104,8 @@ type restream struct {
startOnce sync.Once
stopOnce sync.Once
events *event.PubSub
}
// New returns a new instance that implements the Restreamer interface
@ -115,6 +120,7 @@ func New(config Config) (Restreamer, error) {
logger: config.Logger,
tasks: NewStorage(),
metadata: map[string]interface{}{},
events: event.NewPubSub(),
}
if r.logger == nil {
@ -462,6 +468,18 @@ func (r *restream) load() error {
t.ffmpeg = ffmpeg
r.events.Consume(t.parser, func(e event.Event) event.Event {
pe, ok := e.(*event.ProcessEvent)
if !ok {
return e
}
pe.ProcessID = t.process.ID
pe.Domain = t.process.Domain
return pe
})
return true
})
@ -656,6 +674,18 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
t.ffmpeg = ffmpeg
r.events.Consume(t.parser, func(e event.Event) event.Event {
pe, ok := e.(*event.ProcessEvent)
if !ok {
return e
}
pe.ProcessID = t.process.ID
pe.Domain = t.process.Domain
return pe
})
return t, nil
}
@ -1979,3 +2009,9 @@ func (r *restream) Validate(config *app.Config) error {
return nil
}
func (r *restream) Events() (<-chan event.Event, event.CancelFunc, error) {
ch, cancel := r.events.Subscribe()
return ch, cancel, nil
}

View File

@ -5,6 +5,7 @@ import (
"sync/atomic"
"time"
"github.com/datarhei/core/v16/event"
"github.com/datarhei/core/v16/ffmpeg/parse"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/log"
@ -326,6 +327,8 @@ func (t *task) Config() *app.Config {
func (t *task) Destroy() {
t.Stop()
t.parser.Destroy()
}
func (t *task) Match(id, reference, owner, domain glob.Glob) bool {
@ -378,3 +381,7 @@ func (t *task) ExportParserReportHistory() []parse.ReportHistoryEntry {
func (t *task) ImportParserReportHistory(report []parse.ReportHistoryEntry) {
t.parser.ImportReportHistory(report)
}
func (t *task) Events() (<-chan event.Event, event.CancelFunc, error) {
return t.parser.Events()
}