diff --git a/restream/restream.go b/restream/core.go similarity index 83% rename from restream/restream.go rename to restream/core.go index 8825a12c..3e5e5dbc 100644 --- a/restream/restream.go +++ b/restream/core.go @@ -12,14 +12,12 @@ import ( "time" "github.com/datarhei/core/v16/ffmpeg" - "github.com/datarhei/core/v16/ffmpeg/parse" "github.com/datarhei/core/v16/ffmpeg/skills" "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/net/url" - "github.com/datarhei/core/v16/process" "github.com/datarhei/core/v16/resources" "github.com/datarhei/core/v16/restream/app" rfs "github.com/datarhei/core/v16/restream/fs" @@ -77,34 +75,6 @@ type Config struct { Logger log.Logger } -type task struct { - valid bool - id string // ID of the task/process - owner string - domain string - reference string - process *app.Process - config *app.Config // Process config with replaced static placeholders - command []string // The actual command parameter for ffmpeg - ffmpeg process.Process - parser parse.Parser - playout map[string]int - logger log.Logger - usesDisk bool // Whether this task uses the disk - metadata map[string]interface{} -} - -func (t *task) ID() app.ProcessID { - return app.ProcessID{ - ID: t.id, - Domain: t.domain, - } -} - -func (t *task) String() string { - return t.ID().String() -} - type restream struct { id string name string @@ -118,8 +88,8 @@ type restream struct { } replace replace.Replacer rewrite rewrite.Rewriter - tasks map[app.ProcessID]*task // domain:ProcessID - metadata map[string]interface{} // global metadata + tasks *Storage // domain:ProcessID + metadata map[string]interface{} // global metadata logger log.Logger resources resources.Resources @@ -143,7 +113,7 @@ func New(config Config) (Restreamer, error) { replace: config.Replace, rewrite: config.Rewrite, logger: config.Logger, - tasks: map[app.ProcessID]*task{}, + tasks: NewStorage(), metadata: map[string]interface{}{}, } @@ -151,19 +121,6 @@ func New(config Config) (Restreamer, error) { r.logger = log.New("") } - /* - if r.store == nil { - dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{}) - s, err := jsonstore.New(jsonstore.Config{ - Filesystem: dummyfs, - }) - if err != nil { - return nil, err - } - r.store = s - } - */ - if len(config.Filesystems) == 0 { return nil, fmt.Errorf("at least one filesystem must be provided") } @@ -226,14 +183,14 @@ func (r *restream) Start() { go r.resourceObserver(ctx, r.resources, time.Second) } - for id, t := range r.tasks { - if t.process.Order == "start" { - r.startProcess(id) - } + r.tasks.Range(func(id app.ProcessID, t *task) bool { + t.Restore() // The filesystem cleanup rules can be set - r.setCleanup(id, t.config) - } + r.setCleanup(id, t.Config()) + + return true + }) for _, fs := range r.fs.list { fs.Start() @@ -252,28 +209,27 @@ func (r *restream) Stop() { r.lock.Lock() defer r.lock.Unlock() + wg := sync.WaitGroup{} + // Stop the currently running processes without altering their order such that on a subsequent // Start() they will get restarted. - wg := sync.WaitGroup{} - - for _, t := range r.tasks { - if t.ffmpeg == nil { - continue - } - + r.tasks.Range(func(_ app.ProcessID, t *task) bool { wg.Add(1) - go func(p process.Process) { + go func(t *task) { defer wg.Done() - p.Stop(true) - }(t.ffmpeg) - } + t.Kill() + }(t) + + return true + }) wg.Wait() - for id := range r.tasks { + r.tasks.Range(func(id app.ProcessID, _ *task) bool { r.unsetCleanup(id) - } + return true + }) r.cancelObserver() @@ -303,24 +259,16 @@ func (r *restream) filesystemObserver(ctx context.Context, fs fs.Filesystem, int if isFull { // Stop all tasks that write to this filesystem - r.lock.Lock() - for id, t := range r.tasks { - if !t.valid { - continue + r.tasks.Range(func(id app.ProcessID, t *task) bool { + if !t.UsesDisk() { + return true } - if !t.usesDisk { - continue - } + r.logger.Warn().WithField("id", id).Log("Shutting down because filesystem is full") + t.Stop() - if t.process.Order != "start" { - continue - } - - r.logger.Warn().Log("Shutting down because filesystem is full") - r.stopProcess(id) - } - r.lock.Unlock() + return true + }) } } } @@ -355,20 +303,17 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources break } - r.lock.RLock() - for id, t := range r.tasks { - if !t.valid { - continue + r.tasks.Range(func(id app.ProcessID, t *task) bool { + if t.Limit(limitCPU, limitMemory) { + r.logger.Debug().WithFields(log.Fields{ + "limit_cpu": limitCPU, + "limit_memory": limitMemory, + "id": id, + }).Log("Limiting process CPU and memory consumption") } - r.logger.Debug().WithFields(log.Fields{ - "limit_cpu": limitCPU, - "limit_memory": limitMemory, - "id": id, - }).Log("Limiting process CPU and memory consumption") - t.ffmpeg.Limit(limitCPU, limitMemory) - } - r.lock.RUnlock() + return true + }) } } } @@ -384,7 +329,7 @@ func (r *restream) load() error { return err } - tasks := make(map[app.ProcessID]*task) + tasks := NewStorage() skills := r.ffmpeg.Skills() ffversion := skills.FFmpeg.Version @@ -420,7 +365,7 @@ func (r *restream) load() error { // Replace all placeholders in the config resolveStaticPlaceholders(t.config, r.replace) - tasks[t.ID()] = t + tasks.LoadOrStore(t.ID(), t) } } @@ -428,9 +373,7 @@ func (r *restream) load() error { // replaced, we can resolve references and validate the // inputs and outputs. - for _, t := range tasks { - t := t - + tasks.Range(func(_ app.ProcessID, t *task) bool { // Just warn if the ffmpeg version constraint doesn't match the available ffmpeg version if c, err := semver.NewConstraint(t.config.FFVersion); err == nil { if v, err := semver.NewVersion(skills.FFmpeg.Version); err == nil { @@ -450,7 +393,7 @@ func (r *restream) load() error { err := r.resolveAddresses(tasks, t.config) if err != nil { t.logger.Warn().WithError(err).Log("Ignoring") - continue + return true } // Validate config with all placeholders replaced. However, we need to take care @@ -461,13 +404,13 @@ func (r *restream) load() error { t.usesDisk, err = validateConfig(config, r.fs.list, r.ffmpeg) if err != nil { t.logger.Warn().WithError(err).Log("Ignoring") - continue + return true } err = r.setPlayoutPorts(t) if err != nil { t.logger.Warn().WithError(err).Log("Ignoring") - continue + return true } t.command = t.config.CreateCommand() @@ -505,14 +448,18 @@ func (r *restream) load() error { }, }) if err != nil { - return err + return true } t.ffmpeg = ffmpeg t.valid = true - } + return true + }) + + r.tasks.Clear() r.tasks = tasks + r.metadata = data.Metadata return nil @@ -525,7 +472,7 @@ func (r *restream) save() { data := store.NewData() - for tid, t := range r.tasks { + r.tasks.Range(func(tid app.ProcessID, t *task) bool { domain := data.Process[tid.Domain] if domain == nil { domain = map[string]store.Process{} @@ -537,7 +484,9 @@ func (r *restream) save() { } data.Process[tid.Domain] = domain - } + + return true + }) data.Metadata = r.metadata @@ -562,35 +511,26 @@ var ErrProcessExists = errors.New("process already exists") var ErrForbidden = errors.New("forbidden") func (r *restream) AddProcess(config *app.Config) error { - r.lock.RLock() t, err := r.createTask(config) - r.lock.RUnlock() if err != nil { return err } - r.lock.Lock() - defer r.lock.Unlock() - tid := t.ID() - _, ok := r.tasks[tid] + _, ok := r.tasks.LoadOrStore(tid, t) if ok { return ErrProcessExists } - r.tasks[tid] = t - // set filesystem cleanup rules r.setCleanup(tid, t.config) - if t.process.Order == "start" { - err := r.startProcess(tid) - if err != nil { - delete(r.tasks, tid) - return err - } + err = t.Restore() + if err != nil { + r.tasks.Delete(tid) + return err } r.save() @@ -1032,7 +972,7 @@ func validateOutputAddress(address, basedir string, ffmpeg ffmpeg.FFmpeg) (strin } // resolveAddresses replaces the addresse reference from each input in a config with the actual address. -func (r *restream) resolveAddresses(tasks map[app.ProcessID]*task, config *app.Config) error { +func (r *restream) resolveAddresses(tasks *Storage, config *app.Config) error { for i, input := range config.Input { // Resolve any references address, err := r.resolveAddress(tasks, config.ID, input.Address) @@ -1049,7 +989,7 @@ func (r *restream) resolveAddresses(tasks map[app.ProcessID]*task, config *app.C } // resolveAddress replaces the address reference with the actual address. -func (r *restream) resolveAddress(tasks map[app.ProcessID]*task, id, address string) (string, error) { +func (r *restream) resolveAddress(tasks *Storage, id, address string) (string, error) { matches, err := parseAddressReference(address) if err != nil { return address, err @@ -1066,12 +1006,14 @@ func (r *restream) resolveAddress(tasks map[app.ProcessID]*task, id, address str var t *task = nil - for _, tsk := range tasks { + tasks.Range(func(_ app.ProcessID, tsk *task) bool { if tsk.id == matches["id"] && tsk.domain == matches["domain"] { t = tsk - break + return false } - } + + return true + }) if t == nil { return address, fmt.Errorf("unknown process '%s' in domain '%s' (%s)", matches["id"], matches["domain"], address) @@ -1186,13 +1128,13 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { r.lock.Lock() defer r.lock.Unlock() - task, ok := r.tasks[id] + task, ok := r.tasks.Load(id) if !ok { return ErrUnknownProcess } // If the new config has the same hash as the current config, do nothing. - if task.process.Config.Equal(config) { + if task.Equal(config) { return nil } @@ -1204,22 +1146,18 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { tid := t.ID() if !tid.Equal(id) { - _, ok := r.tasks[tid] + _, ok := r.tasks.Load(tid) if ok { return ErrProcessExists } } - t.process.Order = task.process.Order + t.process.Order = task.Order() if err := r.stopProcess(id); err != nil { return fmt.Errorf("stop process: %w", err) } - if err := r.deleteProcess(id); err != nil { - return fmt.Errorf("delete process: %w", err) - } - // This would require a major version jump //t.process.CreatedAt = task.process.CreatedAt t.process.UpdatedAt = time.Now().Unix() @@ -1231,14 +1169,16 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { // Transfer the metadata to the new process t.metadata = task.metadata - r.tasks[tid] = t + if err := r.deleteProcess(id); err != nil { + return fmt.Errorf("delete process: %w", err) + } + + r.tasks.Store(tid, t) // set filesystem cleanup rules - r.setCleanup(tid, t.config) + r.setCleanup(tid, t.Config()) - if t.process.Order == "start" { - r.startProcess(tid) - } + t.Restore() r.save() @@ -1246,112 +1186,64 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { } func (r *restream) GetProcessIDs(idpattern, refpattern, ownerpattern, domainpattern string) []app.ProcessID { - count := 0 - - var idglob glob.Glob - var refglob glob.Glob - var ownerglob glob.Glob - var domainglob glob.Glob + var idglob glob.Glob = nil + var refglob glob.Glob = nil + var ownerglob glob.Glob = nil + var domainglob glob.Glob = nil if len(idpattern) != 0 { - count++ idglob, _ = glob.Compile(idpattern) } if len(refpattern) != 0 { - count++ refglob, _ = glob.Compile(refpattern) } if len(ownerpattern) != 0 { - count++ ownerglob, _ = glob.Compile(ownerpattern) } if len(domainpattern) != 0 { - count++ domainglob, _ = glob.Compile(domainpattern) } var ids []app.ProcessID - r.lock.RLock() - defer r.lock.RUnlock() + if idglob == nil && refglob == nil && ownerglob == nil && domainglob == nil { + ids = make([]app.ProcessID, 0, r.tasks.Size()) - if count == 0 { - ids = make([]app.ProcessID, 0, len(r.tasks)) + r.tasks.Range(func(id app.ProcessID, t *task) bool { + ids = append(ids, id) - for _, t := range r.tasks { - tid := app.ProcessID{ - ID: t.id, - Domain: t.domain, - } - - ids = append(ids, tid) - } + return true + }) } else { ids = []app.ProcessID{} - for _, t := range r.tasks { - matches := 0 - if idglob != nil { - if match := idglob.Match(t.id); match { - matches++ - } + r.tasks.Range(func(id app.ProcessID, t *task) bool { + if !t.Match(idglob, refglob, ownerglob, domainglob) { + return true } - if refglob != nil { - if match := refglob.Match(t.reference); match { - matches++ - } - } + ids = append(ids, id) - if ownerglob != nil { - if match := ownerglob.Match(t.owner); match { - matches++ - } - } - - if domainglob != nil { - if match := domainglob.Match(t.domain); match { - matches++ - } - } - - if count != matches { - continue - } - - tid := app.ProcessID{ - ID: t.id, - Domain: t.domain, - } - - ids = append(ids, tid) - } + return true + }) } return ids } func (r *restream) GetProcess(id app.ProcessID) (*app.Process, error) { - r.lock.RLock() - defer r.lock.RUnlock() - - task, ok := r.tasks[id] + task, ok := r.tasks.Load(id) if !ok { return &app.Process{}, ErrUnknownProcess } - process := task.process.Clone() - - return process, nil + return task.Process(), nil } func (r *restream) DeleteProcess(id app.ProcessID) error { - r.lock.Lock() - defer r.lock.Unlock() - err := r.deleteProcess(id) if err != nil { return err @@ -1363,27 +1255,24 @@ func (r *restream) DeleteProcess(id app.ProcessID) error { } func (r *restream) deleteProcess(tid app.ProcessID) error { - task, ok := r.tasks[tid] + task, ok := r.tasks.Load(tid) if !ok { return ErrUnknownProcess } - if task.process.Order != "stop" { + if task.Order() != "stop" { return fmt.Errorf("the process with the ID '%s' is still running", tid) } r.unsetPlayoutPorts(task) r.unsetCleanup(tid) - delete(r.tasks, tid) + r.tasks.Delete(tid) return nil } func (r *restream) StartProcess(id app.ProcessID) error { - r.lock.Lock() - defer r.lock.Unlock() - err := r.startProcess(id) if err != nil { return err @@ -1395,40 +1284,22 @@ func (r *restream) StartProcess(id app.ProcessID) error { } func (r *restream) startProcess(tid app.ProcessID) error { - task, ok := r.tasks[tid] + task, ok := r.tasks.Load(tid) if !ok { return ErrUnknownProcess } - if !task.valid { - return fmt.Errorf("invalid process definition") + err := task.Start() + if err != nil { + return err } - if task.ffmpeg != nil { - status := task.ffmpeg.Status() - - if task.process.Order == "start" && status.Order == "start" { - return nil - } - } - - if r.maxProc > 0 && r.nProc >= r.maxProc { - return fmt.Errorf("max. number of running processes (%d) reached", r.maxProc) - } - - task.process.Order = "start" - - task.ffmpeg.Start() - r.nProc++ return nil } func (r *restream) StopProcess(id app.ProcessID) error { - r.lock.Lock() - defer r.lock.Unlock() - err := r.stopProcess(id) if err != nil { return err @@ -1440,63 +1311,38 @@ func (r *restream) StopProcess(id app.ProcessID) error { } func (r *restream) stopProcess(tid app.ProcessID) error { - task, ok := r.tasks[tid] + task, ok := r.tasks.Load(tid) if !ok { return ErrUnknownProcess } - if task.ffmpeg == nil { - return nil + // TODO: aufpassen mit nProc und nil error. In task.Stop() noch einen error einführen, falls der process nicht läuft. + err := task.Stop() + if err != nil { + return err } - status := task.ffmpeg.Status() - - if task.process.Order == "stop" && status.Order == "stop" { - return nil - } - - task.process.Order = "stop" - - task.ffmpeg.Stop(true) - r.nProc-- return nil } func (r *restream) RestartProcess(id app.ProcessID) error { - r.lock.RLock() - defer r.lock.RUnlock() - return r.restartProcess(id) } func (r *restream) restartProcess(tid app.ProcessID) error { - task, ok := r.tasks[tid] + task, ok := r.tasks.Load(tid) if !ok { return ErrUnknownProcess } - if !task.valid { - return fmt.Errorf("invalid process definition") - } - - if task.process.Order == "stop" { - return nil - } - - if task.ffmpeg != nil { - task.ffmpeg.Stop(true) - task.ffmpeg.Start() - } + task.Restart() return nil } func (r *restream) ReloadProcess(id app.ProcessID) error { - r.lock.Lock() - defer r.lock.Unlock() - err := r.reloadProcess(id) if err != nil { return err @@ -1508,7 +1354,7 @@ func (r *restream) ReloadProcess(id app.ProcessID) error { } func (r *restream) reloadProcess(tid app.ProcessID) error { - t, ok := r.tasks[tid] + t, ok := r.tasks.Load(tid) if !ok { return ErrUnknownProcess } @@ -1601,125 +1447,32 @@ func (r *restream) reloadProcess(tid app.ProcessID) error { func (r *restream) GetProcessState(id app.ProcessID) (*app.State, error) { state := &app.State{} - r.lock.RLock() - defer r.lock.RUnlock() - - task, ok := r.tasks[id] + task, ok := r.tasks.Load(id) if !ok { return state, ErrUnknownProcess } - if !task.valid { - return state, nil - } - - status := task.ffmpeg.Status() - - state.Order = task.process.Order - state.State = status.State - state.States.Marshal(status.States) - state.Time = status.Time.Unix() - state.Memory = status.Memory.Current - state.CPU = status.CPU.Current / status.CPU.NCPU - state.LimitMode = status.LimitMode - state.Resources.CPU = status.CPU - state.Resources.Memory = status.Memory - state.Duration = status.Duration.Round(10 * time.Millisecond).Seconds() - state.Reconnect = -1 - state.Command = status.CommandArgs - state.LastLog = task.parser.LastLogline() - - if status.Reconnect >= time.Duration(0) { - state.Reconnect = status.Reconnect.Round(10 * time.Millisecond).Seconds() - } - - progress := task.parser.Progress() - state.Progress.UnmarshalParser(&progress) - - for i, p := range state.Progress.Input { - if int(p.Index) >= len(task.process.Config.Input) { - continue - } - - state.Progress.Input[i].ID = task.process.Config.Input[p.Index].ID - } - - for i, p := range state.Progress.Output { - if int(p.Index) >= len(task.process.Config.Output) { - continue - } - - state.Progress.Output[i].ID = task.process.Config.Output[p.Index].ID - } - - return state, nil + return task.State() } func (r *restream) GetProcessReport(id app.ProcessID) (*app.Report, error) { report := &app.Report{} - r.lock.RLock() - defer r.lock.RUnlock() - - task, ok := r.tasks[id] + task, ok := r.tasks.Load(id) if !ok { return report, ErrUnknownProcess } - if !task.valid { - return report, nil - } - - current := task.parser.Report() - - report.UnmarshalParser(¤t) - - history := task.parser.ReportHistory() - - report.History = make([]app.ReportHistoryEntry, len(history)) - - for i, h := range history { - report.History[i].UnmarshalParser(&h) - e := &report.History[i] - - for i, p := range e.Progress.Input { - if int(p.Index) >= len(task.process.Config.Input) { - continue - } - - e.Progress.Input[i].ID = task.process.Config.Input[p.Index].ID - } - - for i, p := range e.Progress.Output { - if int(p.Index) >= len(task.process.Config.Output) { - continue - } - - e.Progress.Output[i].ID = task.process.Config.Output[p.Index].ID - } - } - - return report, nil + return task.Report() } func (r *restream) SetProcessReport(id app.ProcessID, report *app.Report) error { - r.lock.RLock() - defer r.lock.RUnlock() - - task, ok := r.tasks[id] + task, ok := r.tasks.Load(id) if !ok { return ErrUnknownProcess } - if !task.valid { - return nil - } - - _, history := report.MarshalParser() - - task.parser.ImportReportHistory(history) - - return nil + return task.SetReport(report) } func (r *restream) SearchProcessLogHistory(idpattern, refpattern, state string, from, to *time.Time) []app.ReportHistorySearchResult { @@ -1727,26 +1480,15 @@ func (r *restream) SearchProcessLogHistory(idpattern, refpattern, state string, ids := r.GetProcessIDs(idpattern, refpattern, "", "") - r.lock.RLock() - defer r.lock.RUnlock() - for _, id := range ids { - task, ok := r.tasks[id] + task, ok := r.tasks.Load(id) if !ok { continue } - presult := task.parser.SearchReportHistory(state, from, to) + presult := task.SearchReportHistory(state, from, to) - for _, f := range presult { - result = append(result, app.ReportHistorySearchResult{ - ProcessID: task.id, - Reference: task.reference, - ExitState: f.ExitState, - CreatedAt: f.CreatedAt, - ExitedAt: f.ExitedAt, - }) - } + result = append(result, presult...) } return result @@ -1836,10 +1578,7 @@ func (r *restream) ReloadSkills() error { } func (r *restream) GetPlayout(id app.ProcessID, inputid string) (string, error) { - r.lock.RLock() - defer r.lock.RUnlock() - - task, ok := r.tasks[id] + task, ok := r.tasks.Load(id) if !ok { return "", ErrUnknownProcess } @@ -1856,33 +1595,15 @@ func (r *restream) GetPlayout(id app.ProcessID, inputid string) (string, error) return "127.0.0.1:" + strconv.Itoa(port), nil } -var ErrMetadataKeyNotFound = errors.New("unknown key") - func (r *restream) SetProcessMetadata(id app.ProcessID, key string, data interface{}) error { - if len(key) == 0 { - return fmt.Errorf("a key for storing the data has to be provided") - } - - r.lock.Lock() - defer r.lock.Unlock() - - task, ok := r.tasks[id] + task, ok := r.tasks.Load(id) if !ok { return ErrUnknownProcess } - if task.metadata == nil { - task.metadata = make(map[string]interface{}) - } - - if data == nil { - delete(task.metadata, key) - } else { - task.metadata[key] = data - } - - if len(task.metadata) == 0 { - task.metadata = nil + err := task.SetMetadata(key, data) + if err != nil { + return err } r.save() @@ -1891,24 +1612,12 @@ func (r *restream) SetProcessMetadata(id app.ProcessID, key string, data interfa } func (r *restream) GetProcessMetadata(id app.ProcessID, key string) (interface{}, error) { - r.lock.RLock() - defer r.lock.RUnlock() - - task, ok := r.tasks[id] + task, ok := r.tasks.Load(id) if !ok { return nil, ErrUnknownProcess } - if len(key) == 0 { - return task.metadata, nil - } - - data, ok := task.metadata[key] - if !ok { - return nil, ErrMetadataKeyNotFound - } - - return data, nil + return task.GetMetadata(key) } func (r *restream) SetMetadata(key string, data interface{}) error { diff --git a/restream/restream_test.go b/restream/core_test.go similarity index 98% rename from restream/restream_test.go rename to restream/core_test.go index a4b9da65..7b68f5b5 100644 --- a/restream/restream_test.go +++ b/restream/core_test.go @@ -884,9 +884,17 @@ func TestTeeAddressReference(t *testing.T) { r := rs.(*restream) - require.Equal(t, "http://example.com/live.m3u8", r.tasks[app.ProcessID{ID: "process2"}].config.Input[0].Address) - require.Equal(t, "http://example.com/live.m3u8", r.tasks[app.ProcessID{ID: "process3"}].config.Input[0].Address) - require.Equal(t, "rtmp://example.com/live.stream?token=123", r.tasks[app.ProcessID{ID: "process4"}].config.Input[0].Address) + task, ok := r.tasks.Load(app.ProcessID{ID: "process2"}) + require.True(t, ok) + require.Equal(t, "http://example.com/live.m3u8", task.config.Input[0].Address) + + task, ok = r.tasks.Load(app.ProcessID{ID: "process3"}) + require.True(t, ok) + require.Equal(t, "http://example.com/live.m3u8", task.config.Input[0].Address) + + task, ok = r.tasks.Load(app.ProcessID{ID: "process4"}) + require.True(t, ok) + require.Equal(t, "rtmp://example.com/live.stream?token=123", task.config.Input[0].Address) } func TestConfigValidation(t *testing.T) { @@ -1466,7 +1474,7 @@ func TestProcessReplacer(t *testing.T) { LogPatterns: []string{}, } - task, ok := rs.tasks[app.ProcessID{ID: "314159265359"}] + task, ok := rs.tasks.Load(app.ProcessID{ID: "314159265359"}) require.True(t, ok) require.Equal(t, process, task.config) @@ -1517,7 +1525,7 @@ func TestProcessLimit(t *testing.T) { rs := rsi.(*restream) - task, ok := rs.tasks[app.ProcessID{ID: process.ID}] + task, ok := rs.tasks.Load(app.ProcessID{ID: process.ID}) require.True(t, ok) status := task.ffmpeg.Status() diff --git a/restream/manager.go b/restream/manager.go new file mode 100644 index 00000000..c2ce403d --- /dev/null +++ b/restream/manager.go @@ -0,0 +1,64 @@ +package restream + +import ( + "github.com/datarhei/core/v16/restream/app" + "github.com/puzpuzpuz/xsync/v3" +) + +type Storage struct { + tasks *xsync.MapOf[app.ProcessID, *task] +} + +func NewStorage() *Storage { + m := &Storage{ + tasks: xsync.NewMapOf[app.ProcessID, *task](), + } + + return m +} + +func (m *Storage) Range(f func(key app.ProcessID, value *task) bool) { + m.tasks.Range(f) +} + +func (m *Storage) Store(id app.ProcessID, t *task) { + m.tasks.Store(id, t) +} + +func (m *Storage) LoadOrStore(id app.ProcessID, t *task) (*task, bool) { + return m.tasks.LoadOrStore(id, t) +} + +func (m *Storage) Has(id app.ProcessID) bool { + _, hasTask := m.Load(id) + + return hasTask +} + +func (m *Storage) Load(id app.ProcessID) (*task, bool) { + return m.tasks.Load(id) +} + +func (m *Storage) Delete(id app.ProcessID) bool { + if t, ok := m.Load(id); ok { + m.tasks.Delete(id) + t.Destroy() + return true + } + + return false +} + +func (m *Storage) Size() int { + return m.tasks.Size() +} + +func (m *Storage) Clear() { + m.tasks.Range(func(_ app.ProcessID, t *task) bool { + t.Destroy() + + return true + }) + + m.tasks.Clear() +} diff --git a/restream/task.go b/restream/task.go new file mode 100644 index 00000000..6928385c --- /dev/null +++ b/restream/task.go @@ -0,0 +1,430 @@ +package restream + +import ( + "errors" + "sync" + "time" + + "github.com/datarhei/core/v16/ffmpeg/parse" + "github.com/datarhei/core/v16/glob" + "github.com/datarhei/core/v16/log" + "github.com/datarhei/core/v16/process" + "github.com/datarhei/core/v16/restream/app" +) + +var ErrInvalidProcessConfig = errors.New("invalid process config") +var ErrMetadataKeyNotFound = errors.New("unknown metadata key") +var ErrMetadataKeyRequired = errors.New("a key for storing metadata is required") + +type task struct { + valid bool + id string // ID of the task/process + owner string + domain string + reference string + process *app.Process + config *app.Config // Process config with replaced static placeholders + command []string // The actual command parameter for ffmpeg + ffmpeg process.Process + parser parse.Parser + playout map[string]int + logger log.Logger + usesDisk bool // Whether this task uses the disk + metadata map[string]interface{} + + lock sync.RWMutex +} + +func (t *task) IsValid() bool { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.valid +} + +func (t *task) UsesDisk() bool { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.usesDisk +} + +func (t *task) ID() app.ProcessID { + return app.ProcessID{ + ID: t.id, + Domain: t.domain, + } +} + +func (t *task) String() string { + return t.ID().String() +} + +// Restore restores the task's order +func (t *task) Restore() error { + t.lock.RLock() + defer t.lock.RUnlock() + + if !t.valid { + return ErrInvalidProcessConfig + } + + if t.ffmpeg == nil { + return ErrInvalidProcessConfig + } + + if t.process.Order == "start" { + err := t.ffmpeg.Start() + if err != nil { + return err + } + } + + return nil +} + +func (t *task) Start() error { + t.lock.Lock() + defer t.lock.Unlock() + + if !t.valid { + return ErrInvalidProcessConfig + } + + if t.ffmpeg == nil { + return nil + } + + status := t.ffmpeg.Status() + + if t.process.Order == "start" && status.Order == "start" { + return nil + } + + t.process.Order = "start" + t.ffmpeg.Start() + + return nil +} + +func (t *task) Stop() error { + t.lock.Lock() + defer t.lock.Unlock() + + if t.ffmpeg == nil { + return nil + } + + status := t.ffmpeg.Status() + + if t.process.Order == "stop" && status.Order == "stop" { + return nil + } + + t.process.Order = "stop" + + t.ffmpeg.Stop(true) + + return nil +} + +// Kill stops a process without changing the tasks order +func (t *task) Kill() { + t.lock.RLock() + defer t.lock.RUnlock() + + if t.ffmpeg == nil { + return + } + + t.ffmpeg.Stop(true) +} + +func (t *task) Restart() error { + t.lock.RLock() + defer t.lock.RUnlock() + + if !t.valid { + return ErrInvalidProcessConfig + } + + if t.process.Order == "stop" { + return nil + } + + if t.ffmpeg != nil { + t.ffmpeg.Stop(true) + t.ffmpeg.Start() + } + + return nil +} + +func (t *task) State() (*app.State, error) { + t.lock.RLock() + defer t.lock.RUnlock() + + state := &app.State{} + + if !t.valid { + return state, nil + } + + status := t.ffmpeg.Status() + + state.Order = t.process.Order + state.State = status.State + state.States.Marshal(status.States) + state.Time = status.Time.Unix() + state.Memory = status.Memory.Current + state.CPU = status.CPU.Current / status.CPU.NCPU + state.LimitMode = status.LimitMode + state.Resources.CPU = status.CPU + state.Resources.Memory = status.Memory + state.Duration = status.Duration.Round(10 * time.Millisecond).Seconds() + state.Reconnect = -1 + state.Command = status.CommandArgs + state.LastLog = t.parser.LastLogline() + + if status.Reconnect >= time.Duration(0) { + state.Reconnect = status.Reconnect.Round(10 * time.Millisecond).Seconds() + } + + progress := t.parser.Progress() + state.Progress.UnmarshalParser(&progress) + + for i, p := range state.Progress.Input { + if int(p.Index) >= len(t.process.Config.Input) { + continue + } + + state.Progress.Input[i].ID = t.process.Config.Input[p.Index].ID + } + + for i, p := range state.Progress.Output { + if int(p.Index) >= len(t.process.Config.Output) { + continue + } + + state.Progress.Output[i].ID = t.process.Config.Output[p.Index].ID + } + + return state, nil +} + +func (t *task) Report() (*app.Report, error) { + t.lock.RLock() + defer t.lock.RUnlock() + + report := &app.Report{} + + if !t.valid { + return report, nil + } + + current := t.parser.Report() + + report.UnmarshalParser(¤t) + + history := t.parser.ReportHistory() + + report.History = make([]app.ReportHistoryEntry, len(history)) + + for i, h := range history { + report.History[i].UnmarshalParser(&h) + e := &report.History[i] + + for i, p := range e.Progress.Input { + if int(p.Index) >= len(t.process.Config.Input) { + continue + } + + e.Progress.Input[i].ID = t.process.Config.Input[p.Index].ID + } + + for i, p := range e.Progress.Output { + if int(p.Index) >= len(t.process.Config.Output) { + continue + } + + e.Progress.Output[i].ID = t.process.Config.Output[p.Index].ID + } + } + + return report, nil +} + +func (t *task) SetReport(report *app.Report) error { + t.lock.RLock() + defer t.lock.RUnlock() + + if !t.valid { + return nil + } + + _, history := report.MarshalParser() + + t.parser.ImportReportHistory(history) + + return nil +} + +func (t *task) SearchReportHistory(state string, from, to *time.Time) []app.ReportHistorySearchResult { + t.lock.RLock() + defer t.lock.RUnlock() + + result := []app.ReportHistorySearchResult{} + + presult := t.parser.SearchReportHistory(state, from, to) + + for _, f := range presult { + result = append(result, app.ReportHistorySearchResult{ + ProcessID: t.id, + Reference: t.reference, + ExitState: f.ExitState, + CreatedAt: f.CreatedAt, + ExitedAt: f.ExitedAt, + }) + } + + return result +} + +func (t *task) SetMetadata(key string, data interface{}) error { + t.lock.Lock() + defer t.lock.Unlock() + + if len(key) == 0 { + return ErrMetadataKeyRequired + } + + if t.metadata == nil { + t.metadata = make(map[string]interface{}) + } + + if data == nil { + delete(t.metadata, key) + } else { + t.metadata[key] = data + } + + if len(t.metadata) == 0 { + t.metadata = nil + } + + return nil +} + +func (t *task) GetMetadata(key string) (interface{}, error) { + t.lock.RLock() + defer t.lock.RUnlock() + + if len(key) == 0 { + return t.metadata, nil + } + + data, ok := t.metadata[key] + if !ok { + return nil, ErrMetadataKeyNotFound + } + + return data, nil +} + +func (t *task) Limit(cpu, memory bool) bool { + t.lock.RLock() + defer t.lock.RUnlock() + + if !t.valid { + return false + } + + if t.ffmpeg == nil { + return false + } + + t.ffmpeg.Limit(cpu, memory) + + return true +} + +func (t *task) Equal(config *app.Config) bool { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.process.Config.Equal(config) +} + +func (t *task) Config() *app.Config { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.config.Clone() +} + +func (t *task) Destroy() { + t.Stop() + + t.lock.Lock() + defer t.lock.Unlock() + + t.valid = false + t.process = nil + t.config = nil + t.command = nil + t.ffmpeg = nil + t.parser = nil + t.metadata = nil +} + +func (t *task) Match(id, reference, owner, domain glob.Glob) bool { + t.lock.RLock() + defer t.lock.RUnlock() + + count := 0 + matches := 0 + + if id != nil { + count++ + if match := id.Match(t.id); match { + matches++ + } + } + + if reference != nil { + count++ + if match := reference.Match(t.reference); match { + matches++ + } + } + + if owner != nil { + count++ + if match := owner.Match(t.owner); match { + matches++ + } + } + + if domain != nil { + count++ + if match := domain.Match(t.domain); match { + matches++ + } + } + + return count == matches +} + +func (t *task) Process() *app.Process { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.process.Clone() +} + +func (t *task) Order() string { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.process.Order +}