From 688450f341a9354f91f2c0f8b729f7a0d2e374fd Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Fri, 19 Jul 2024 12:26:47 +0200 Subject: [PATCH] Add nil checks, add NewTask function --- restream/core.go | 52 +++++++-------------- restream/task.go | 119 +++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 131 insertions(+), 40 deletions(-) diff --git a/restream/core.go b/restream/core.go index c8e49808..e9effb7a 100644 --- a/restream/core.go +++ b/restream/core.go @@ -26,7 +26,6 @@ import ( "github.com/datarhei/core/v16/restream/store" "github.com/Masterminds/semver/v3" - "github.com/puzpuzpuz/xsync/v3" ) // The Restreamer interface @@ -346,23 +345,14 @@ func (r *restream) load() error { p.Process.Config.FFVersion = "^" + ffversion } - t := &task{ - id: p.Process.ID, - owner: p.Process.Owner, - domain: p.Process.Domain, - reference: p.Process.Reference, - process: p.Process, - config: p.Process.Config.Clone(), - logger: r.logger.WithFields(log.Fields{ - "id": p.Process.ID, - "owner": p.Process.Owner, - "domain": p.Process.Domain, - "reference": p.Process.Reference, - }), - lock: xsync.NewRBMutex(), - } + t := NewTask(p.Process, r.logger.WithFields(log.Fields{ + "id": p.Process.ID, + "owner": p.Process.Owner, + "domain": p.Process.Domain, + "reference": p.Process.Reference, + })) - t.metadata = p.Metadata + t.ImportMetadata(p.Metadata) // Replace all placeholders in the config resolveStaticPlaceholders(t.config, r.replace) @@ -570,21 +560,12 @@ func (r *restream) createTask(config *app.Config) (*task, error) { process.Order.Set("start") } - t := &task{ - id: config.ID, - owner: config.Owner, - domain: config.Domain, - reference: process.Reference, - process: process, - config: process.Config.Clone(), - logger: r.logger.WithFields(log.Fields{ - "id": process.ID, - "owner": process.Owner, - "reference": process.Reference, - "domain": process.Domain, - }), - lock: xsync.NewRBMutex(), - } + t := NewTask(process, r.logger.WithFields(log.Fields{ + "id": process.ID, + "owner": process.Owner, + "reference": process.Reference, + "domain": process.Domain, + })) resolveStaticPlaceholders(t.config, r.replace) @@ -1173,11 +1154,12 @@ func (r *restream) updateProcess(id app.ProcessID, config *app.Config) error { //t.process.CreatedAt = task.process.CreatedAt // Transfer the report history to the new process - history := task.parser.ReportHistory() - t.parser.ImportReportHistory(history) + history := task.ExportParserReportHistory() + t.ImportParserReportHistory(history) // Transfer the metadata to the new process - t.metadata = task.metadata + metadata := task.ExportMetadata() + t.ImportMetadata(metadata) if err := r.deleteProcess(id); err != nil { return fmt.Errorf("delete process: %w", err) diff --git a/restream/task.go b/restream/task.go index 9120d00e..fd8a1830 100644 --- a/restream/task.go +++ b/restream/task.go @@ -36,6 +36,23 @@ type task struct { lock *xsync.RBMutex } +func NewTask(process *app.Process, logger log.Logger) *task { + t := &task{ + id: process.ID, + owner: process.Owner, + domain: process.Domain, + reference: process.Reference, + process: process, + config: process.Config.Clone(), + playout: map[string]int{}, + logger: logger, + metadata: nil, + lock: xsync.NewRBMutex(), + } + + return t +} + func (t *task) IsValid() bool { token := t.lock.RLock() defer t.lock.RUnlock(token) @@ -81,6 +98,10 @@ func (t *task) Restore() error { return ErrInvalidProcessConfig } + if t.process == nil { + return ErrInvalidProcessConfig + } + if t.process.Order.String() == "start" { err := t.ffmpeg.Start() if err != nil { @@ -103,6 +124,10 @@ func (t *task) Start() error { return nil } + if t.process == nil { + return nil + } + status := t.ffmpeg.Status() if t.process.Order.String() == "start" && status.Order == "start" { @@ -124,6 +149,10 @@ func (t *task) Stop() error { return nil } + if t.process == nil { + return nil + } + status := t.ffmpeg.Status() if t.process.Order.String() == "stop" && status.Order == "stop" { @@ -157,6 +186,10 @@ func (t *task) Restart() error { return ErrInvalidProcessConfig } + if t.process == nil { + return nil + } + if t.process.Order.String() == "stop" { return nil } @@ -179,6 +212,18 @@ func (t *task) State() (*app.State, error) { return state, nil } + if t.ffmpeg == nil { + return state, nil + } + + if t.parser == nil { + return state, nil + } + + if t.process == nil { + return state, nil + } + status := t.ffmpeg.Status() state.Order = t.process.Order.String() @@ -231,6 +276,10 @@ func (t *task) Report() (*app.Report, error) { return report, nil } + if t.parser == nil { + return report, nil + } + current := t.parser.Report() report.UnmarshalParser(¤t) @@ -271,6 +320,10 @@ func (t *task) SetReport(report *app.Report) error { return nil } + if t.parser == nil { + return nil + } + _, history := report.MarshalParser() t.parser.ImportReportHistory(history) @@ -282,6 +335,10 @@ func (t *task) SearchReportHistory(state string, from, to *time.Time) []app.Repo token := t.lock.RLock() defer t.lock.RUnlock(token) + if t.parser == nil { + return []app.ReportHistorySearchResult{} + } + result := []app.ReportHistorySearchResult{} presult := t.parser.SearchReportHistory(state, from, to) @@ -324,6 +381,13 @@ func (t *task) SetMetadata(key string, data interface{}) error { return nil } +func (t *task) ImportMetadata(m map[string]interface{}) { + t.lock.Lock() + defer t.lock.Unlock() + + t.metadata = m +} + func (t *task) GetMetadata(key string) (interface{}, error) { token := t.lock.RLock() defer t.lock.RUnlock(token) @@ -332,6 +396,10 @@ func (t *task) GetMetadata(key string) (interface{}, error) { return t.metadata, nil } + if t.metadata == nil { + return nil, ErrMetadataKeyNotFound + } + data, ok := t.metadata[key] if !ok { return nil, ErrMetadataKeyNotFound @@ -340,13 +408,16 @@ func (t *task) GetMetadata(key string) (interface{}, error) { return data, nil } -func (t *task) Limit(cpu, memory bool) bool { +func (t *task) ExportMetadata() map[string]interface{} { token := t.lock.RLock() defer t.lock.RUnlock(token) - if !t.valid { - return false - } + return t.metadata +} + +func (t *task) Limit(cpu, memory bool) bool { + token := t.lock.RLock() + defer t.lock.RUnlock(token) if t.ffmpeg == nil { return false @@ -361,6 +432,10 @@ func (t *task) Equal(config *app.Config) bool { token := t.lock.RLock() defer t.lock.RUnlock(token) + if t.process == nil { + return false + } + return t.process.Config.Equal(config) } @@ -368,6 +443,10 @@ func (t *task) Config() *app.Config { token := t.lock.RLock() defer t.lock.RUnlock(token) + if t.config == nil { + return nil + } + return t.config.Clone() } @@ -383,7 +462,7 @@ func (t *task) Destroy() { t.command = nil t.ffmpeg = nil t.parser = nil - t.metadata = nil + t.metadata = map[string]interface{}{} } func (t *task) Match(id, reference, owner, domain glob.Glob) bool { @@ -428,6 +507,10 @@ func (t *task) Process() *app.Process { token := t.lock.RLock() defer t.lock.RUnlock(token) + if t.process == nil { + return nil + } + return t.process.Clone() } @@ -435,5 +518,31 @@ func (t *task) Order() string { token := t.lock.RLock() defer t.lock.RUnlock(token) + if t.process == nil { + return "" + } + return t.process.Order.String() } + +func (t *task) ExportParserReportHistory() []parse.ReportHistoryEntry { + token := t.lock.RLock() + defer t.lock.RUnlock(token) + + if t.parser == nil { + return nil + } + + return t.parser.ReportHistory() +} + +func (t *task) ImportParserReportHistory(report []parse.ReportHistoryEntry) { + token := t.lock.RLock() + defer t.lock.RUnlock(token) + + if t.parser == nil { + return + } + + t.parser.ImportReportHistory(report) +}