From 8d2f0b2c16c88ba884622ec1e98dd6a9979902ed Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Thu, 17 Jul 2025 16:43:37 +0200 Subject: [PATCH] Fix locking for process map --- restream/app/process.go | 7 ++ restream/core.go | 155 ++++++++++++++++------------------------ restream/core_test.go | 14 ++-- restream/manager.go | 144 ++++++++++++++++++++++++++----------- restream/task.go | 120 ------------------------------- 5 files changed, 180 insertions(+), 260 deletions(-) diff --git a/restream/app/process.go b/restream/app/process.go index d1b99555..74f769f1 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -482,6 +482,11 @@ func ParseProcessID(pid string) ProcessID { } func (p ProcessID) String() string { + // TODO: causes problems in the cluster with processes without a domain + //if len(p.Domain) == 0 { + // return p.ID + //} + return p.ID + "@" + p.Domain } @@ -498,6 +503,8 @@ func (p *ProcessID) Parse(pid string) { if i == -1 { p.ID = pid p.Domain = "" + + return } p.ID = pid[:i] diff --git a/restream/core.go b/restream/core.go index 73a8f282..cb9ad4c6 100644 --- a/restream/core.go +++ b/restream/core.go @@ -183,9 +183,7 @@ func (r *restream) Start() { go r.resourceObserver(ctx, r.resources, time.Second) } - r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool { - defer t.Release(token) - + r.tasks.Range(func(id app.ProcessID, t *task) bool { t.Restore() // The filesystem cleanup rules can be set @@ -216,10 +214,9 @@ func (r *restream) Stop() { // Stop the currently running processes without altering their order such that on a subsequent // Start() they will get restarted. - r.tasks.Range(true, func(_ app.ProcessID, t *task, token string) bool { + r.tasks.Range(func(_ app.ProcessID, t *task) bool { wg.Add(1) go func(t *task) { - defer t.Release(token) defer wg.Done() t.Kill() }(t) @@ -229,8 +226,7 @@ func (r *restream) Stop() { wg.Wait() - r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool { - defer t.Release(token) + r.tasks.Range(func(id app.ProcessID, t *task) bool { r.unsetCleanup(id) return true }) @@ -263,8 +259,7 @@ func (r *restream) filesystemObserver(ctx context.Context, fs fs.Filesystem, int if isFull { // Stop all tasks that write to this filesystem - r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool { - defer t.Release(token) + r.tasks.Range(func(id app.ProcessID, t *task) bool { if !t.UsesDisk() { return true } @@ -320,8 +315,7 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources break } - r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool { - defer t.Release(token) + r.tasks.Range(func(id app.ProcessID, t *task) bool { limitGPU := false gpuindex := t.GetHWDevice() if gpuindex >= 0 && len(limitGPUs) >= gpuindex+1 { @@ -381,7 +375,7 @@ func (r *restream) load() error { // Replace all placeholders in the config resolveStaticPlaceholders(t.config, r.replace) - tasks.Store(t.ID(), t) + tasks.LoadAndStore(t.ID(), t) } } @@ -389,9 +383,7 @@ func (r *restream) load() error { // replaced, we can resolve references and validate the // inputs and outputs. - tasks.Range(false, func(_ app.ProcessID, t *task, token string) bool { - defer t.Release(token) - + tasks.Range(func(_ app.ProcessID, t *task) bool { // Just warn if the ffmpeg version constraint doesn't match the available ffmpeg version if c, err := semver.NewConstraint(t.config.FFVersion); err == nil { if v, err := semver.NewVersion(skills.FFmpeg.Version); err == nil { @@ -468,12 +460,14 @@ func (r *restream) load() error { } t.ffmpeg = ffmpeg - t.SetValid(true) return true }) - r.tasks.Clear() + r.tasks.Clear(func(_ app.ProcessID, t *task) bool { + t.Destroy() + return true + }) r.tasks = tasks r.metadata = data.Metadata @@ -488,13 +482,7 @@ func (r *restream) save() { data := store.NewData() - r.tasks.Range(true, func(tid app.ProcessID, t *task, token string) bool { - defer t.Release(token) - - if !t.IsValid() { - return true - } - + r.tasks.Range(func(tid app.ProcessID, t *task) bool { domain := data.Process[tid.Domain] if domain == nil { domain = map[string]store.Process{} @@ -528,13 +516,20 @@ func (r *restream) CreatedAt() time.Time { } func (r *restream) AddProcess(config *app.Config) error { + tid := app.ProcessID{ + ID: config.ID, + Domain: config.Domain, + } + + if r.tasks.Has(tid) { + return ErrProcessExists + } + t, err := r.createTask(config) if err != nil { return err } - tid := t.ID() - _, ok := r.tasks.LoadOrStore(tid, t) if ok { t.Destroy() @@ -546,7 +541,7 @@ func (r *restream) AddProcess(config *app.Config) error { err = t.Restore() if err != nil { - r.tasks.Delete(tid) + r.tasks.LoadAndDelete(tid) t.Destroy() return err } @@ -655,8 +650,6 @@ func (r *restream) createTask(config *app.Config) (*task, error) { t.ffmpeg = ffmpeg - t.SetValid(true) - return t, nil } @@ -683,9 +676,8 @@ func (r *restream) onBeforeStart(cfg *app.Config) func([]string) ([]string, erro selectedGPU = 0 } - if t, token, hasTask := r.tasks.Load(cfg.ProcessID()); hasTask { + if t, hasTask := r.tasks.LoadUnsafe(cfg.ProcessID()); hasTask { t.SetHWDevice(selectedGPU) - t.Release(token) } else { return []string{}, fmt.Errorf("process with the ID '%s' not found", cfg.ProcessID()) } @@ -1046,16 +1038,12 @@ func (r *restream) resolveAddress(tasks *Storage, id, address string) (string, e } var t *task = nil - var ttoken string = "" - tasks.Range(true, func(_ app.ProcessID, task *task, token string) bool { + tasks.Range(func(_ app.ProcessID, task *task) bool { if task.id == matches["id"] && task.domain == matches["domain"] { t = task - ttoken = token return false } - - task.Release(token) return true }) @@ -1063,8 +1051,6 @@ func (r *restream) resolveAddress(tasks *Storage, id, address string) (string, e return address, fmt.Errorf("unknown process '%s' in domain '%s' (%s): %w", matches["id"], matches["domain"], address, ErrInvalidProcessConfig) } - defer t.Release(ttoken) - teeOptions := regexp.MustCompile(`^\[[^\]]*\]`) for _, x := range t.config.Output { @@ -1176,9 +1162,9 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { return ErrUnknownProcess } - err := r.updateProcess(task, config) + defer r.tasks.Unlock(id) - task.Unlock() + err := r.updateProcess(task, config) if err != nil { return err @@ -1217,7 +1203,7 @@ func (r *restream) updateProcess(task *task, config *app.Config) error { t.process.Order.Set(order) - if err := r.stopProcess(task); err != nil { + if err := task.Stop(); err != nil { t.Destroy() return fmt.Errorf("stop process: %w", err) } @@ -1231,21 +1217,22 @@ func (r *restream) updateProcess(task *task, config *app.Config) error { // Transfer the metadata to the new process t.ImportMetadata(task.ExportMetadata()) - if err := r.deleteProcess(task); err != nil { - t.Destroy() - return fmt.Errorf("delete process: %w", err) - } + r.unsetPlayoutPorts(task) + r.unsetCleanup(task.ID()) - t.Lock() - defer t.Unlock() - - r.tasks.Store(tid, t) + r.tasks.LoadAndStore(tid, t) // set filesystem cleanup rules r.setCleanup(tid, t.config) t.Restore() + if !tid.Equal(task.ID()) { + r.tasks.LoadAndDelete(task.ID()) + } + + task.Destroy() + return nil } @@ -1276,9 +1263,7 @@ func (r *restream) GetProcessIDs(idpattern, refpattern, ownerpattern, domainpatt if idglob == nil && refglob == nil && ownerglob == nil && domainglob == nil { ids = make([]app.ProcessID, 0, r.tasks.Size()) - r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool { - defer t.Release(token) - + r.tasks.Range(func(id app.ProcessID, t *task) bool { ids = append(ids, id) return true @@ -1286,9 +1271,7 @@ func (r *restream) GetProcessIDs(idpattern, refpattern, ownerpattern, domainpatt } else { ids = []app.ProcessID{} - r.tasks.Range(true, func(id app.ProcessID, t *task, token string) bool { - defer t.Release(token) - + r.tasks.Range(func(id app.ProcessID, t *task) bool { if !t.Match(idglob, refglob, ownerglob, domainglob) { return true } @@ -1303,11 +1286,10 @@ func (r *restream) GetProcessIDs(idpattern, refpattern, ownerpattern, domainpatt } func (r *restream) GetProcess(id app.ProcessID) (*app.Process, error) { - task, token, ok := r.tasks.Load(id) + task, ok := r.tasks.LoadUnsafe(id) if !ok { return &app.Process{}, ErrUnknownProcess } - defer task.Release(token) return task.Process(), nil } @@ -1320,7 +1302,7 @@ func (r *restream) DeleteProcess(id app.ProcessID) error { err := r.deleteProcess(task) - task.Unlock() + r.tasks.Unlock(id) if err != nil { return err @@ -1339,7 +1321,7 @@ func (r *restream) deleteProcess(task *task) error { r.unsetPlayoutPorts(task) r.unsetCleanup(task.ID()) - r.tasks.Delete(task.ID()) + r.tasks.LoadAndDelete(task.ID()) task.Destroy() @@ -1347,15 +1329,14 @@ func (r *restream) deleteProcess(task *task) error { } func (r *restream) StartProcess(id app.ProcessID) error { - task, token, ok := r.tasks.Load(id) + task, ok := r.tasks.LoadAndLock(id) if !ok { return ErrUnknownProcess } + defer r.tasks.Unlock(id) err := r.startProcess(task) - task.Release(token) - if err != nil { return err } @@ -1377,15 +1358,14 @@ func (r *restream) startProcess(task *task) error { } func (r *restream) StopProcess(id app.ProcessID) error { - task, token, ok := r.tasks.Load(id) + task, ok := r.tasks.LoadAndLock(id) if !ok { return ErrUnknownProcess } + defer r.tasks.Unlock(id) err := r.stopProcess(task) - task.Release(token) - if err != nil { return err } @@ -1408,11 +1388,11 @@ func (r *restream) stopProcess(task *task) error { } func (r *restream) RestartProcess(id app.ProcessID) error { - task, token, ok := r.tasks.Load(id) + task, ok := r.tasks.LoadAndLock(id) if !ok { return ErrUnknownProcess } - defer task.Release(token) + defer r.tasks.Unlock(id) return r.restartProcess(task) } @@ -1431,7 +1411,7 @@ func (r *restream) ReloadProcess(id app.ProcessID) error { err := r.reloadProcess(task) - task.Unlock() + r.tasks.Unlock(id) if err != nil { return err @@ -1464,34 +1444,33 @@ func (r *restream) reloadProcess(task *task) error { } // Transfer the report history to the new process - t.parser.ImportReportHistory(task.parser.ReportHistory()) + t.ImportParserReportHistory(task.ExportParserReportHistory()) // Transfer the metadata to the new process - t.metadata = task.metadata + t.ImportMetadata(task.ExportMetadata()) - if err := r.deleteProcess(task); err != nil { - t.Destroy() - return fmt.Errorf("delete process: %w", err) - } + r.unsetPlayoutPorts(task) + r.unsetCleanup(task.ID()) - r.tasks.Store(tid, t) + r.tasks.LoadAndStore(tid, t) // set filesystem cleanup rules r.setCleanup(tid, t.config) t.Restore() + task.Destroy() + return nil } func (r *restream) GetProcessState(id app.ProcessID) (*app.State, error) { state := &app.State{} - task, token, ok := r.tasks.Load(id) + task, ok := r.tasks.LoadUnsafe(id) if !ok { return state, ErrUnknownProcess } - defer task.Release(token) return task.State() } @@ -1499,11 +1478,10 @@ func (r *restream) GetProcessState(id app.ProcessID) (*app.State, error) { func (r *restream) GetProcessReport(id app.ProcessID) (*app.Report, error) { report := &app.Report{} - task, token, ok := r.tasks.Load(id) + task, ok := r.tasks.LoadUnsafe(id) if !ok { return report, ErrUnknownProcess } - defer task.Release(token) return task.Report() } @@ -1513,7 +1491,7 @@ func (r *restream) SetProcessReport(id app.ProcessID, report *app.Report) error if !ok { return ErrUnknownProcess } - defer task.Unlock() + defer r.tasks.Unlock(id) return task.SetReport(report) } @@ -1524,15 +1502,13 @@ func (r *restream) SearchProcessLogHistory(idpattern, refpattern, state string, ids := r.GetProcessIDs(idpattern, refpattern, "", "") for _, id := range ids { - task, token, ok := r.tasks.Load(id) + task, ok := r.tasks.LoadUnsafe(id) if !ok { continue } presult := task.SearchReportHistory(state, from, to) result = append(result, presult...) - - task.Release(token) } return result @@ -1626,15 +1602,11 @@ func (r *restream) ReloadSkills() error { } func (r *restream) GetPlayout(id app.ProcessID, inputid string) (string, error) { - task, token, ok := r.tasks.Load(id) + task, ok := r.tasks.LoadAndRLock(id) if !ok { return "", ErrUnknownProcess } - defer task.Release(token) - - if !task.IsValid() { - return "", ErrInvalidProcessConfig - } + defer r.tasks.RUnlock(id) port, ok := task.playout[inputid] if !ok { @@ -1652,7 +1624,7 @@ func (r *restream) SetProcessMetadata(id app.ProcessID, key string, data interfa err := task.SetMetadata(key, data) - task.Unlock() + r.tasks.Unlock(id) if err != nil { return err @@ -1664,11 +1636,10 @@ func (r *restream) SetProcessMetadata(id app.ProcessID, key string, data interfa } func (r *restream) GetProcessMetadata(id app.ProcessID, key string) (interface{}, error) { - task, token, ok := r.tasks.Load(id) + task, ok := r.tasks.LoadUnsafe(id) if !ok { return nil, ErrUnknownProcess } - defer task.Release(token) return task.GetMetadata(key) } diff --git a/restream/core_test.go b/restream/core_test.go index 4bf0de12..0d649515 100644 --- a/restream/core_test.go +++ b/restream/core_test.go @@ -988,15 +988,15 @@ func TestTeeAddressReference(t *testing.T) { r := rs.(*restream) - task, _, ok := r.tasks.Load(app.ProcessID{ID: "process2"}) + task, ok := r.tasks.LoadAndRLock(app.ProcessID{ID: "process2"}) require.True(t, ok) require.Equal(t, "http://example.com/live.m3u8", task.config.Input[0].Address) - task, _, ok = r.tasks.Load(app.ProcessID{ID: "process3"}) + task, ok = r.tasks.LoadAndRLock(app.ProcessID{ID: "process3"}) require.True(t, ok) require.Equal(t, "http://example.com/live.m3u8", task.config.Input[0].Address) - task, _, ok = r.tasks.Load(app.ProcessID{ID: "process4"}) + task, ok = r.tasks.LoadAndRLock(app.ProcessID{ID: "process4"}) require.True(t, ok) require.Equal(t, "rtmp://example.com/live.stream?token=123", task.config.Input[0].Address) } @@ -1598,9 +1598,8 @@ func TestProcessReplacer(t *testing.T) { LogPatterns: []string{}, } - task, token, ok := rs.tasks.Load(app.ProcessID{ID: "314159265359"}) + task, ok := rs.tasks.LoadAndRLock(app.ProcessID{ID: "314159265359"}) require.True(t, ok) - task.Release(token) require.Equal(t, process, task.config) @@ -1693,9 +1692,8 @@ func TestProcessLimit(t *testing.T) { rs := rsi.(*restream) - task, token, ok := rs.tasks.Load(app.ProcessID{ID: process.ID}) + task, ok := rs.tasks.LoadAndRLock(app.ProcessID{ID: process.ID}) require.True(t, ok) - task.Release(token) status := task.ffmpeg.Status() @@ -1850,7 +1848,7 @@ func TestProcessCleanup(t *testing.T) { rsi.Stop() - for i := 0; i < 10; i++ { + for i := range 10 { memfs.WriteFileReader(fmt.Sprintf("/foobar_%02d.dat", i), bytes.NewReader([]byte("hello")), -1) } diff --git a/restream/manager.go b/restream/manager.go index ff3549e4..736c0476 100644 --- a/restream/manager.go +++ b/restream/manager.go @@ -1,97 +1,161 @@ package restream import ( + "sync" + "github.com/datarhei/core/v16/restream/app" "github.com/puzpuzpuz/xsync/v3" ) +type metatask struct { + task *task + lock sync.RWMutex +} + type Storage struct { - tasks *xsync.MapOf[app.ProcessID, *task] + lock *xsync.RBMutex + tasks *xsync.MapOf[app.ProcessID, *metatask] } func NewStorage() *Storage { m := &Storage{ - tasks: xsync.NewMapOf[app.ProcessID, *task](), + lock: xsync.NewRBMutex(), + tasks: xsync.NewMapOf[app.ProcessID, *metatask](), } return m } -func (m *Storage) Range(onlyValid bool, f func(key app.ProcessID, value *task, token string) bool) { - m.tasks.Range(func(id app.ProcessID, task *task) bool { - token := task.RLock() - if onlyValid && !task.IsValid() { - task.Release(token) - return true - } - return f(id, task, token) +// Range calls f sequentially for each key and value present in the map. If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot of the Map's contents: no key will be visited more than once, +// but if the value for any key is stored or deleted concurrently, Range may reflect any mapping for that key from any point during the Range call. +// +// It is safe to modify the map while iterating it, including entry creation, modification and deletion. However, +// the concurrent modification rule apply, i.e. the changes may be not reflected in the subsequently iterated entries. +func (m *Storage) Range(f func(key app.ProcessID, value *task) bool) { + m.tasks.Range(func(id app.ProcessID, mt *metatask) bool { + return f(id, mt.task) }) } -func (m *Storage) Store(id app.ProcessID, t *task) { - t, ok := m.tasks.LoadAndStore(id, t) +// Store sets the value for a key. +func (m *Storage) LoadAndStore(id app.ProcessID, t *task) (*task, bool) { + m.lock.Lock() + defer m.lock.Unlock() + + mt, ok := m.tasks.Load(id) if ok { - t.Destroy() + old := mt.task + mt.task = t + + return old, true } + mt = &metatask{ + task: t, + } + m.tasks.Store(id, mt) + + return t, false } +// LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. func (m *Storage) LoadOrStore(id app.ProcessID, t *task) (*task, bool) { - return m.tasks.LoadOrStore(id, t) + m.lock.Lock() + defer m.lock.Unlock() + + mt, ok := m.tasks.Load(id) + if ok { + return mt.task, true + } + mt = &metatask{ + task: t, + } + m.tasks.Store(id, mt) + + return t, false } +// Has returns whether a value is stored in the map. func (m *Storage) Has(id app.ProcessID) bool { _, hasTask := m.tasks.Load(id) return hasTask } -func (m *Storage) Load(id app.ProcessID) (*task, string, bool) { - task, ok := m.tasks.Load(id) +func (m *Storage) LoadUnsafe(id app.ProcessID) (*task, bool) { + mt, ok := m.tasks.Load(id) if !ok { - return nil, "", false + return nil, false } - token := task.RLock() - if !task.IsValid() { - task.Release(token) - return nil, "", false + return mt.task, true +} + +// LoadAndRLock returns the value stored in the map for a key, or zero value of type V if no value is present. +// The ok result indicates whether value was found in the map. +func (m *Storage) LoadAndRLock(id app.ProcessID) (*task, bool) { + mt, ok := m.tasks.Load(id) + if !ok { + return nil, false } - return task, token, true + + mt.lock.RLock() + + return mt.task, true +} + +func (m *Storage) RUnlock(id app.ProcessID) { + mt, ok := m.tasks.Load(id) + if !ok { + return + } + + mt.lock.RUnlock() } func (m *Storage) LoadAndLock(id app.ProcessID) (*task, bool) { - task, ok := m.tasks.Load(id) + mt, ok := m.tasks.Load(id) if !ok { return nil, false } - task.Lock() - if !task.IsValid() { - task.Unlock() - return nil, false - } - return task, true + mt.lock.Lock() + + return mt.task, true } -func (m *Storage) Delete(id app.ProcessID) bool { - if t, ok := m.tasks.Load(id); ok { - m.tasks.Delete(id) - t.Destroy() - return true +func (m *Storage) Unlock(id app.ProcessID) { + mt, ok := m.tasks.Load(id) + if !ok { + return } - return false + mt.lock.Unlock() } +// Delete deletes the value for a key. +func (m *Storage) LoadAndDelete(id app.ProcessID) (*task, bool) { + m.lock.Lock() + defer m.lock.Unlock() + + if mt, ok := m.tasks.LoadAndDelete(id); ok { + return mt.task, true + } + + return nil, false +} + +// Size returns current size of the map. func (m *Storage) Size() int { return m.tasks.Size() } -func (m *Storage) Clear() { - m.tasks.Range(func(_ app.ProcessID, t *task) bool { - t.Destroy() - - return true +// Clear deletes all keys and values currently stored in the map. +func (m *Storage) Clear(f func(key app.ProcessID, value *task) bool) { + m.tasks.Range(func(id app.ProcessID, mt *metatask) bool { + return f(id, mt.task) }) m.tasks.Clear() diff --git a/restream/task.go b/restream/task.go index 22d25b2b..4b171436 100644 --- a/restream/task.go +++ b/restream/task.go @@ -2,7 +2,6 @@ package restream import ( "maps" - "sync" "sync/atomic" "time" @@ -14,7 +13,6 @@ import ( ) type task struct { - valid *atomic.Bool // Whether the task is valid an can be used readers *atomic.Int64 // Number of concurrent readers id string // ID of the task/process owner string // Owner of the process @@ -30,13 +28,10 @@ type task struct { usesDisk bool // Whether this task uses the disk hwdevice *atomic.Int32 // Index of the GPU this task uses metadata map[string]interface{} // Metadata of the process - - lock sync.RWMutex } func NewTask(process *app.Process, logger log.Logger) *task { t := &task{ - valid: &atomic.Bool{}, readers: &atomic.Int64{}, id: process.ID, owner: process.Owner, @@ -53,31 +48,6 @@ func NewTask(process *app.Process, logger log.Logger) *task { return t } -func (t *task) Lock() { - t.lock.Lock() -} - -func (t *task) Unlock() { - t.lock.Unlock() -} - -func (t *task) RLock() string { - t.readers.Add(1) - return "" -} - -func (t *task) Release(token string) { - t.readers.Add(-1) -} - -func (t *task) IsValid() bool { - return t.valid.Load() -} - -func (t *task) SetValid(valid bool) { - t.valid.Store(valid) -} - func (t *task) UsesDisk() bool { return t.usesDisk } @@ -95,10 +65,6 @@ func (t *task) String() string { // Restore restores the task's order func (t *task) Restore() error { - if !t.valid.Load() { - return ErrInvalidProcessConfig - } - if t.process.Order.String() == "start" { err := t.ffmpeg.Start() if err != nil { @@ -110,16 +76,6 @@ func (t *task) Restore() error { } func (t *task) Start() error { - if !t.valid.Load() { - return ErrInvalidProcessConfig - } - - status := t.ffmpeg.Status() - - if t.process.Order.String() == "start" && status.Order == "start" { - return nil - } - t.process.Order.Set("start") t.ffmpeg.Start() @@ -128,16 +84,6 @@ func (t *task) Start() error { } func (t *task) Stop() error { - if !t.valid.Load() { - return ErrInvalidProcessConfig - } - - status := t.ffmpeg.Status() - - if t.process.Order.String() == "stop" && status.Order == "stop" { - return nil - } - t.process.Order.Set("stop") t.ffmpeg.Stop(true) @@ -147,18 +93,10 @@ func (t *task) Stop() error { // Kill stops a process without changing the tasks order func (t *task) Kill() { - if !t.valid.Load() { - return - } - t.ffmpeg.Stop(true) } func (t *task) Restart() error { - if !t.valid.Load() { - return ErrInvalidProcessConfig - } - if t.process.Order.String() == "stop" { return nil } @@ -174,10 +112,6 @@ func (t *task) Restart() error { func (t *task) State() (*app.State, error) { state := &app.State{} - if !t.valid.Load() { - return state, nil - } - status := t.ffmpeg.Status() state.Order = t.process.Order.String() @@ -267,10 +201,6 @@ func assignConfigID(progress []app.ProgressIO, config []app.ConfigIO) []app.Prog func (t *task) Report() (*app.Report, error) { report := &app.Report{} - if !t.valid.Load() { - return report, nil - } - current := t.parser.Report() report.UnmarshalParser(¤t) @@ -291,10 +221,6 @@ func (t *task) Report() (*app.Report, error) { } func (t *task) SetReport(report *app.Report) error { - if !t.valid.Load() { - return nil - } - _, history := report.MarshalParser() t.parser.ImportReportHistory(history) @@ -305,10 +231,6 @@ func (t *task) SetReport(report *app.Report) error { func (t *task) SearchReportHistory(state string, from, to *time.Time) []app.ReportHistorySearchResult { result := []app.ReportHistorySearchResult{} - if !t.valid.Load() { - return result - } - presult := t.parser.SearchReportHistory(state, from, to) for _, f := range presult { @@ -377,10 +299,6 @@ func (t *task) ExportMetadata() map[string]interface{} { } func (t *task) Limit(cpu, memory, gpu bool) bool { - if !t.valid.Load() { - return false - } - t.ffmpeg.Limit(cpu, memory, gpu) return true @@ -395,41 +313,19 @@ func (t *task) GetHWDevice() int { } func (t *task) Equal(config *app.Config) bool { - if !t.valid.Load() { - return false - } - return t.process.Config.Equal(config) } func (t *task) ResolvedConfig() *app.Config { - if !t.valid.Load() { - return nil - } - return t.config.Clone() } func (t *task) Config() *app.Config { - if !t.valid.Load() { - return nil - } - return t.process.Config.Clone() } func (t *task) Destroy() { t.Stop() - - t.valid.Store(false) - /* - t.process = nil - t.config = nil - t.command = nil - t.ffmpeg = nil - t.parser = nil - t.metadata = map[string]interface{}{} - */ } func (t *task) Match(id, reference, owner, domain glob.Glob) bool { @@ -468,33 +364,17 @@ func (t *task) Match(id, reference, owner, domain glob.Glob) bool { } func (t *task) Process() *app.Process { - if !t.valid.Load() { - return nil - } - return t.process.Clone() } func (t *task) Order() string { - if !t.valid.Load() { - return "" - } - return t.process.Order.String() } func (t *task) ExportParserReportHistory() []parse.ReportHistoryEntry { - if !t.valid.Load() { - return nil - } - return t.parser.ReportHistory() } func (t *task) ImportParserReportHistory(report []parse.ReportHistoryEntry) { - if !t.valid.Load() { - return - } - t.parser.ImportReportHistory(report) }