Add nil checks, add NewTask function

This commit is contained in:
Ingo Oppermann 2024-07-19 12:26:47 +02:00
parent 72883d18d4
commit 688450f341
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
2 changed files with 131 additions and 40 deletions

View File

@ -26,7 +26,6 @@ import (
"github.com/datarhei/core/v16/restream/store"
"github.com/Masterminds/semver/v3"
"github.com/puzpuzpuz/xsync/v3"
)
// The Restreamer interface
@ -346,23 +345,14 @@ func (r *restream) load() error {
p.Process.Config.FFVersion = "^" + ffversion
}
t := &task{
id: p.Process.ID,
owner: p.Process.Owner,
domain: p.Process.Domain,
reference: p.Process.Reference,
process: p.Process,
config: p.Process.Config.Clone(),
logger: r.logger.WithFields(log.Fields{
"id": p.Process.ID,
"owner": p.Process.Owner,
"domain": p.Process.Domain,
"reference": p.Process.Reference,
}),
lock: xsync.NewRBMutex(),
}
t := NewTask(p.Process, r.logger.WithFields(log.Fields{
"id": p.Process.ID,
"owner": p.Process.Owner,
"domain": p.Process.Domain,
"reference": p.Process.Reference,
}))
t.metadata = p.Metadata
t.ImportMetadata(p.Metadata)
// Replace all placeholders in the config
resolveStaticPlaceholders(t.config, r.replace)
@ -570,21 +560,12 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
process.Order.Set("start")
}
t := &task{
id: config.ID,
owner: config.Owner,
domain: config.Domain,
reference: process.Reference,
process: process,
config: process.Config.Clone(),
logger: r.logger.WithFields(log.Fields{
"id": process.ID,
"owner": process.Owner,
"reference": process.Reference,
"domain": process.Domain,
}),
lock: xsync.NewRBMutex(),
}
t := NewTask(process, r.logger.WithFields(log.Fields{
"id": process.ID,
"owner": process.Owner,
"reference": process.Reference,
"domain": process.Domain,
}))
resolveStaticPlaceholders(t.config, r.replace)
@ -1173,11 +1154,12 @@ func (r *restream) updateProcess(id app.ProcessID, config *app.Config) error {
//t.process.CreatedAt = task.process.CreatedAt
// Transfer the report history to the new process
history := task.parser.ReportHistory()
t.parser.ImportReportHistory(history)
history := task.ExportParserReportHistory()
t.ImportParserReportHistory(history)
// Transfer the metadata to the new process
t.metadata = task.metadata
metadata := task.ExportMetadata()
t.ImportMetadata(metadata)
if err := r.deleteProcess(id); err != nil {
return fmt.Errorf("delete process: %w", err)

View File

@ -36,6 +36,23 @@ type task struct {
lock *xsync.RBMutex
}
func NewTask(process *app.Process, logger log.Logger) *task {
t := &task{
id: process.ID,
owner: process.Owner,
domain: process.Domain,
reference: process.Reference,
process: process,
config: process.Config.Clone(),
playout: map[string]int{},
logger: logger,
metadata: nil,
lock: xsync.NewRBMutex(),
}
return t
}
func (t *task) IsValid() bool {
token := t.lock.RLock()
defer t.lock.RUnlock(token)
@ -81,6 +98,10 @@ func (t *task) Restore() error {
return ErrInvalidProcessConfig
}
if t.process == nil {
return ErrInvalidProcessConfig
}
if t.process.Order.String() == "start" {
err := t.ffmpeg.Start()
if err != nil {
@ -103,6 +124,10 @@ func (t *task) Start() error {
return nil
}
if t.process == nil {
return nil
}
status := t.ffmpeg.Status()
if t.process.Order.String() == "start" && status.Order == "start" {
@ -124,6 +149,10 @@ func (t *task) Stop() error {
return nil
}
if t.process == nil {
return nil
}
status := t.ffmpeg.Status()
if t.process.Order.String() == "stop" && status.Order == "stop" {
@ -157,6 +186,10 @@ func (t *task) Restart() error {
return ErrInvalidProcessConfig
}
if t.process == nil {
return nil
}
if t.process.Order.String() == "stop" {
return nil
}
@ -179,6 +212,18 @@ func (t *task) State() (*app.State, error) {
return state, nil
}
if t.ffmpeg == nil {
return state, nil
}
if t.parser == nil {
return state, nil
}
if t.process == nil {
return state, nil
}
status := t.ffmpeg.Status()
state.Order = t.process.Order.String()
@ -231,6 +276,10 @@ func (t *task) Report() (*app.Report, error) {
return report, nil
}
if t.parser == nil {
return report, nil
}
current := t.parser.Report()
report.UnmarshalParser(&current)
@ -271,6 +320,10 @@ func (t *task) SetReport(report *app.Report) error {
return nil
}
if t.parser == nil {
return nil
}
_, history := report.MarshalParser()
t.parser.ImportReportHistory(history)
@ -282,6 +335,10 @@ func (t *task) SearchReportHistory(state string, from, to *time.Time) []app.Repo
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if t.parser == nil {
return []app.ReportHistorySearchResult{}
}
result := []app.ReportHistorySearchResult{}
presult := t.parser.SearchReportHistory(state, from, to)
@ -324,6 +381,13 @@ func (t *task) SetMetadata(key string, data interface{}) error {
return nil
}
func (t *task) ImportMetadata(m map[string]interface{}) {
t.lock.Lock()
defer t.lock.Unlock()
t.metadata = m
}
func (t *task) GetMetadata(key string) (interface{}, error) {
token := t.lock.RLock()
defer t.lock.RUnlock(token)
@ -332,6 +396,10 @@ func (t *task) GetMetadata(key string) (interface{}, error) {
return t.metadata, nil
}
if t.metadata == nil {
return nil, ErrMetadataKeyNotFound
}
data, ok := t.metadata[key]
if !ok {
return nil, ErrMetadataKeyNotFound
@ -340,13 +408,16 @@ func (t *task) GetMetadata(key string) (interface{}, error) {
return data, nil
}
func (t *task) Limit(cpu, memory bool) bool {
func (t *task) ExportMetadata() map[string]interface{} {
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if !t.valid {
return false
}
return t.metadata
}
func (t *task) Limit(cpu, memory bool) bool {
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if t.ffmpeg == nil {
return false
@ -361,6 +432,10 @@ func (t *task) Equal(config *app.Config) bool {
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if t.process == nil {
return false
}
return t.process.Config.Equal(config)
}
@ -368,6 +443,10 @@ func (t *task) Config() *app.Config {
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if t.config == nil {
return nil
}
return t.config.Clone()
}
@ -383,7 +462,7 @@ func (t *task) Destroy() {
t.command = nil
t.ffmpeg = nil
t.parser = nil
t.metadata = nil
t.metadata = map[string]interface{}{}
}
func (t *task) Match(id, reference, owner, domain glob.Glob) bool {
@ -428,6 +507,10 @@ func (t *task) Process() *app.Process {
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if t.process == nil {
return nil
}
return t.process.Clone()
}
@ -435,5 +518,31 @@ func (t *task) Order() string {
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if t.process == nil {
return ""
}
return t.process.Order.String()
}
func (t *task) ExportParserReportHistory() []parse.ReportHistoryEntry {
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if t.parser == nil {
return nil
}
return t.parser.ReportHistory()
}
func (t *task) ImportParserReportHistory(report []parse.ReportHistoryEntry) {
token := t.lock.RLock()
defer t.lock.RUnlock(token)
if t.parser == nil {
return
}
t.parser.ImportReportHistory(report)
}