From 72883d18d4818648d80e6578d018f038fae30eaf Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Thu, 18 Jul 2024 17:16:49 +0200 Subject: [PATCH] Remove bottlenecks in process handling, still some rough edges --- app/import/import.go | 12 +-- cluster/cluster.go | 4 +- cluster/leader_relocate.go | 3 + cluster/leader_synchronize.go | 2 + cluster/node/core.go | 6 +- http/api/process.go | 4 +- http/api/process_test.go | 2 +- http/client/process.go | 8 +- http/handler/api/cluster_process.go | 4 +- http/handler/api/process.go | 9 +-- process/limiter.go | 68 +++++++++------- process/process.go | 18 ++--- psutil/process.go | 45 ++++++++--- psutil/psutil.go | 60 ++++++++++++-- restream/app/process.go | 36 ++++++++- restream/core.go | 117 +++++++++------------------- restream/store/json/data.go | 4 +- restream/store/json/json_test.go | 4 +- restream/task.go | 101 +++++++++++++----------- 19 files changed, 302 insertions(+), 205 deletions(-) diff --git a/app/import/import.go b/app/import/import.go index 0694fe28..f8d1daf2 100644 --- a/app/import/import.go +++ b/app/import/import.go @@ -575,11 +575,11 @@ func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.Data, erro ID: "restreamer-ui:ingest:" + cfg.id, Reference: cfg.id, CreatedAt: time.Now().Unix(), - Order: "stop", + Order: app.NewOrder("stop"), } if v1data.Actions.Ingest == "start" { - process.Order = "start" + process.Order = app.NewOrder("start") } config := &app.Config{ @@ -1211,11 +1211,11 @@ func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.Data, erro ID: "restreamer-ui:ingest:" + cfg.id + "_snapshot", Reference: cfg.id, CreatedAt: time.Now().Unix(), - Order: "stop", + Order: app.NewOrder("stop"), } if v1data.Actions.Ingest == "start" { - process.Order = "start" + process.Order = app.NewOrder("start") } snapshotConfig := &app.Config{ @@ -1292,11 +1292,11 @@ func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.Data, erro ID: egressId, Reference: cfg.id, CreatedAt: time.Now().Unix(), - Order: "stop", + Order: app.NewOrder("stop"), } if v1data.Actions.Egress == "start" { - process.Order = "start" + process.Order = app.NewOrder("start") } egress := restreamerUIEgress{ diff --git a/cluster/cluster.go b/cluster/cluster.go index d1bf5dd2..b0b11fdd 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -634,7 +634,6 @@ func (c *cluster) Shutdown() error { if c.manager != nil { c.manager.NodesClear() - c.manager = nil } if c.api != nil { @@ -648,6 +647,9 @@ func (c *cluster) Shutdown() error { c.raft.Shutdown() } + c.manager = nil + c.raft = nil + return nil } diff --git a/cluster/leader_relocate.go b/cluster/leader_relocate.go index b28e5696..99493213 100644 --- a/cluster/leader_relocate.go +++ b/cluster/leader_relocate.go @@ -197,6 +197,9 @@ func relocate(have []node.Process, nodes map[string]node.About, relocateMap map[ haveReferenceAffinity.Move(process.Config.Reference, process.Config.Domain, sourceNodeid, targetNodeid) relocatedProcessIDs = append(relocatedProcessIDs, processid) + + // Move only one process at a time. + break } return opStack, resources.Map(), relocatedProcessIDs diff --git a/cluster/leader_synchronize.go b/cluster/leader_synchronize.go index cab2755e..f618f769 100644 --- a/cluster/leader_synchronize.go +++ b/cluster/leader_synchronize.go @@ -368,6 +368,8 @@ func synchronize(wish map[string]string, want []store.Process, have []node.Proce err: errNotEnoughResourcesForDeployment, }) } + + //break } return opStack, resources.Map(), reality diff --git a/cluster/node/core.go b/cluster/node/core.go index 3b57f913..2af52762 100644 --- a/cluster/node/core.go +++ b/cluster/node/core.go @@ -796,10 +796,12 @@ func (n *Core) ClusterProcessList() ([]Process, error) { UpdatedAt: time.Unix(p.UpdatedAt, 0), } - config, metadata := p.Config.Marshal() + config, _ := p.Config.Marshal() process.Config = config - process.Metadata = metadata + if p.Metadata != nil { + process.Metadata = p.Metadata.(map[string]interface{}) + } processes = append(processes, process) } diff --git a/http/api/process.go b/http/api/process.go index 29fa57dd..265789a6 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -154,7 +154,7 @@ func (cfg *ProcessConfig) generateInputOutputIDs(ioconfig []ProcessConfigIO) { } // Unmarshal converts a core process config to a process config in API representation -func (cfg *ProcessConfig) Unmarshal(c *app.Config) { +func (cfg *ProcessConfig) Unmarshal(c *app.Config, metadata map[string]interface{}) { if c == nil { return } @@ -212,6 +212,8 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config) { cfg.LogPatterns = make([]string, len(c.LogPatterns)) copy(cfg.LogPatterns, c.LogPatterns) + + cfg.Metadata = metadata } func (p *ProcessConfig) ProcessID() app.ProcessID { diff --git a/http/api/process_test.go b/http/api/process_test.go index 4ddc1dc9..6dddce39 100644 --- a/http/api/process_test.go +++ b/http/api/process_test.go @@ -107,7 +107,7 @@ func TestProcessConfig(t *testing.T) { } p := ProcessConfig{} - p.Unmarshal(&original) + p.Unmarshal(&original, nil) restored, _ := p.Marshal() require.Equal(t, &original, restored) diff --git a/http/client/process.go b/http/client/process.go index b3e976d3..38a41d18 100644 --- a/http/client/process.go +++ b/http/client/process.go @@ -69,8 +69,7 @@ func (r *restclient) ProcessAdd(p *app.Config, metadata map[string]interface{}) var buf bytes.Buffer config := api.ProcessConfig{} - config.Unmarshal(p) - config.Metadata = metadata + config.Unmarshal(p, metadata) e := json.NewEncoder(&buf) e.Encode(config) @@ -87,8 +86,7 @@ func (r *restclient) ProcessUpdate(id app.ProcessID, p *app.Config, metadata map var buf bytes.Buffer config := api.ProcessConfig{} - config.Unmarshal(p) - config.Metadata = metadata + config.Unmarshal(p, metadata) e := json.NewEncoder(&buf) e.Encode(config) @@ -206,7 +204,7 @@ func (r *restclient) ProcessProbeConfig(p *app.Config) (api.Probe, error) { var buf bytes.Buffer config := api.ProcessConfig{} - config.Unmarshal(p) + config.Unmarshal(p, nil) e := json.NewEncoder(&buf) e.Encode(config) diff --git a/http/handler/api/cluster_process.go b/http/handler/api/cluster_process.go index 7db33417..cc18d693 100644 --- a/http/handler/api/cluster_process.go +++ b/http/handler/api/cluster_process.go @@ -241,7 +241,7 @@ func (h *ClusterHandler) convertStoreProcessToAPIProcess(p store.Process, filter if filter.config { config := &api.ProcessConfig{} - config.Unmarshal(p.Config) + config.Unmarshal(p.Config, p.Metadata) process.Config = config } @@ -442,7 +442,7 @@ func (h *ClusterHandler) ProcessUpdate(c echo.Context) error { } // Prefill the config with the current values - process.Unmarshal(current.Config) + process.Unmarshal(current.Config, current.Metadata) if err := util.ShouldBindJSON(c, &process); err != nil { return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) diff --git a/http/handler/api/process.go b/http/handler/api/process.go index f362bda4..1f83b236 100644 --- a/http/handler/api/process.go +++ b/http/handler/api/process.go @@ -3,7 +3,6 @@ package api import ( "fmt" "net/http" - "runtime" "sort" "strconv" "strings" @@ -156,7 +155,7 @@ func (h *ProcessHandler) GetAll(c echo.Context) error { wg := sync.WaitGroup{} - for i := 0; i < runtime.NumCPU(); i++ { + for i := 0; i < 8; /*runtime.NumCPU()*/ i++ { wg.Add(1) go func(idChan <-chan app.ProcessID) { @@ -316,7 +315,7 @@ func (h *ProcessHandler) Update(c echo.Context) error { } // Prefill the config with the current values - process.Unmarshal(current.Config) + process.Unmarshal(current.Config, nil) if err := util.ShouldBindJSON(c, &process); err != nil { return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) @@ -446,7 +445,7 @@ func (h *ProcessHandler) GetConfig(c echo.Context) error { } config := api.ProcessConfig{} - config.Unmarshal(p.Config) + config.Unmarshal(p.Config, nil) return c.JSON(http.StatusOK, config) } @@ -1062,7 +1061,7 @@ func (h *ProcessHandler) getProcess(id app.ProcessID, filter filter) (api.Proces if filter.config { info.Config = &api.ProcessConfig{} - info.Config.Unmarshal(process.Config) + info.Config.Unmarshal(process.Config, nil) } if filter.state { diff --git a/process/limiter.go b/process/limiter.go index 5a11869c..3d5f85e5 100644 --- a/process/limiter.go +++ b/process/limiter.go @@ -91,10 +91,13 @@ type limiter struct { ncpu float64 ncpuFactor float64 proc psutil.Process - lock sync.Mutex + lock sync.RWMutex cancel context.CancelFunc onLimit LimitFunc + lastUsage Usage + lastUsageLock sync.RWMutex + cpu float64 // CPU limit cpuCurrent float64 // Current CPU load of this process cpuLast float64 // Last CPU load of this process @@ -150,6 +153,10 @@ func NewLimiter(config LimiterConfig) Limiter { l.ncpu = ncpu } + l.lastUsage.CPU.NCPU = l.ncpu + l.lastUsage.CPU.Limit = l.cpu * l.ncpu + l.lastUsage.Memory.Limit = l.memory + l.ncpuFactor = 1 mode := "hard" @@ -208,7 +215,7 @@ func (l *limiter) Start(process psutil.Process) error { ctx, cancel := context.WithCancel(context.Background()) l.cancel = cancel - go l.ticker(ctx, 1000*time.Millisecond) + go l.ticker(ctx, time.Second) if l.mode == LimitModeSoft { ctx, cancel = context.WithCancel(context.Background()) @@ -255,15 +262,21 @@ func (l *limiter) ticker(ctx context.Context, interval time.Duration) { } } -func (l *limiter) collect(t time.Time) { +func (l *limiter) collect(_ time.Time) { l.lock.Lock() - defer l.lock.Unlock() + proc := l.proc + l.lock.Unlock() - if l.proc == nil { + if proc == nil { return } - if mstat, err := l.proc.VirtualMemory(); err == nil { + mstat, merr := proc.VirtualMemory() + cpustat, cerr := proc.CPUPercent() + + l.lock.Lock() + + if merr == nil { l.memoryLast, l.memoryCurrent = l.memoryCurrent, mstat if l.memoryCurrent > l.memoryMax { @@ -281,7 +294,7 @@ func (l *limiter) collect(t time.Time) { l.memoryAvg = ((l.memoryAvg * float64(l.memoryAvgCounter-1)) + float64(l.memoryCurrent)) / float64(l.memoryAvgCounter) } - if cpustat, err := l.proc.CPUPercent(); err == nil { + if cerr == nil { l.cpuLast, l.cpuCurrent = l.cpuCurrent, (cpustat.System+cpustat.User+cpustat.Other)/100 if l.cpuCurrent > l.cpuMax { @@ -354,6 +367,19 @@ func (l *limiter) collect(t time.Time) { if isLimitExceeded { go l.onLimit(l.cpuCurrent*l.ncpuFactor*100, l.memoryCurrent) } + + l.lastUsageLock.Lock() + l.lastUsage.CPU.Current = l.cpuCurrent * l.ncpu * 100 + l.lastUsage.CPU.Average = l.cpuAvg * l.ncpu * 100 + l.lastUsage.CPU.Max = l.cpuMax * l.ncpu * 100 + l.lastUsage.CPU.IsThrottling = l.cpuThrottling + + l.lastUsage.Memory.Current = l.memoryCurrent + l.lastUsage.Memory.Average = l.memoryAvg + l.lastUsage.Memory.Max = l.memoryMax + l.lastUsageLock.Unlock() + + l.lock.Unlock() } func (l *limiter) Limit(cpu, memory bool) error { @@ -498,34 +524,20 @@ func (l *limiter) limitCPU(ctx context.Context, limit float64, interval time.Dur } func (l *limiter) Current() (cpu float64, memory uint64) { - l.lock.Lock() - defer l.lock.Unlock() + l.lastUsageLock.RLock() + defer l.lastUsageLock.RUnlock() - cpu = l.cpuCurrent * 100 - memory = l.memoryCurrent * 100 + cpu = l.lastUsage.CPU.Current / l.ncpu + memory = l.lastUsage.Memory.Current return } func (l *limiter) Usage() Usage { - l.lock.Lock() - defer l.lock.Unlock() + l.lastUsageLock.RLock() + defer l.lastUsageLock.RUnlock() - usage := Usage{} - - usage.CPU.NCPU = l.ncpu - usage.CPU.Limit = l.cpu * l.ncpu * 100 - usage.CPU.Current = l.cpuCurrent * l.ncpu * 100 - usage.CPU.Average = l.cpuAvg * l.ncpu * 100 - usage.CPU.Max = l.cpuMax * l.ncpu * 100 - usage.CPU.IsThrottling = l.cpuThrottling - - usage.Memory.Limit = l.memory - usage.Memory.Current = l.memoryCurrent - usage.Memory.Average = l.memoryAvg - usage.Memory.Max = l.memoryMax - - return usage + return l.lastUsage } func (l *limiter) Limits() (cpu float64, memory uint64) { diff --git a/process/process.go b/process/process.go index 44e66516..4da10426 100644 --- a/process/process.go +++ b/process/process.go @@ -177,7 +177,7 @@ type process struct { state stateType time time.Time states States - lock sync.Mutex + lock sync.RWMutex } order struct { order string @@ -401,8 +401,8 @@ func (p *process) setState(state stateType) (stateType, error) { } func (p *process) getState() stateType { - p.state.lock.Lock() - defer p.state.lock.Unlock() + p.state.lock.RLock() + defer p.state.lock.RUnlock() return p.state.state } @@ -431,15 +431,15 @@ func (p *process) setOrder(order string) bool { } func (p *process) isRunning() bool { - p.state.lock.Lock() - defer p.state.lock.Unlock() + p.state.lock.RLock() + defer p.state.lock.RUnlock() return p.state.state.IsRunning() } func (p *process) getStateString() string { - p.state.lock.Lock() - defer p.state.lock.Unlock() + p.state.lock.RLock() + defer p.state.lock.RUnlock() return p.state.state.String() } @@ -448,11 +448,11 @@ func (p *process) getStateString() string { func (p *process) Status() Status { usage := p.limits.Usage() - p.state.lock.Lock() + p.state.lock.RLock() stateTime := p.state.time state := p.state.state states := p.state.states - p.state.lock.Unlock() + p.state.lock.RUnlock() if state == stateRunning && !p.parser.IsRunning() { state = stateStarting diff --git a/psutil/process.go b/psutil/process.go index 7580d3bf..dc190ef3 100644 --- a/psutil/process.go +++ b/psutil/process.go @@ -41,6 +41,7 @@ type process struct { statPrevious cpuTimesStat statPreviousTime time.Time nTicks uint64 + memRSS uint64 } func (u *util) Process(pid int32) (Process, error) { @@ -60,7 +61,8 @@ func (u *util) Process(pid int32) (Process, error) { ctx, cancel := context.WithCancel(context.Background()) p.stopTicker = cancel - go p.tick(ctx, 1000*time.Millisecond) + go p.tickCPU(ctx, time.Second) + go p.tickMemory(ctx, time.Second) return p, nil } @@ -69,7 +71,7 @@ func NewProcess(pid int32, limit bool) (Process, error) { return DefaultUtil.Process(pid) } -func (p *process) tick(ctx context.Context, interval time.Duration) { +func (p *process) tickCPU(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -78,7 +80,7 @@ func (p *process) tick(ctx context.Context, interval time.Duration) { case <-ctx.Done(): return case t := <-ticker.C: - stat := p.collect() + stat := p.collectCPU() p.lock.Lock() p.statPrevious, p.statCurrent = p.statCurrent, stat @@ -89,7 +91,7 @@ func (p *process) tick(ctx context.Context, interval time.Duration) { } } -func (p *process) collect() cpuTimesStat { +func (p *process) collectCPU() cpuTimesStat { stat, err := p.cpuTimes() if err != nil { return cpuTimesStat{ @@ -101,6 +103,33 @@ func (p *process) collect() cpuTimesStat { return *stat } +func (p *process) tickMemory(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + rss := p.collectMemory() + + p.lock.Lock() + p.memRSS = rss + p.lock.Unlock() + } + } +} + +func (p *process) collectMemory() uint64 { + info, err := p.proc.MemoryInfo() + if err != nil { + return 0 + } + + return info.RSS +} + func (p *process) Stop() { p.stopTicker() } @@ -178,10 +207,8 @@ func (p *process) CPUPercent() (*CPUInfoStat, error) { } func (p *process) VirtualMemory() (uint64, error) { - info, err := p.proc.MemoryInfo() - if err != nil { - return 0, err - } + p.lock.RLock() + defer p.lock.RUnlock() - return info.RSS, nil + return p.memRSS, nil } diff --git a/psutil/psutil.go b/psutil/psutil.go index f6b95934..0af65387 100644 --- a/psutil/psutil.go +++ b/psutil/psutil.go @@ -102,6 +102,7 @@ type util struct { statPrevious cpuTimesStat statPreviousTime time.Time nTicks uint64 + mem MemoryInfoStat } // New returns a new util, it will be started automatically @@ -127,6 +128,13 @@ func New(root string) (Util, error) { } } + mem, err := u.virtualMemory() + if err != nil { + return nil, fmt.Errorf("unable to determine system memory: %w", err) + } + + u.mem = *mem + u.stopOnce.Do(func() {}) u.Start() @@ -139,7 +147,8 @@ func (u *util) Start() { ctx, cancel := context.WithCancel(context.Background()) u.stopTicker = cancel - go u.tick(ctx, 1000*time.Millisecond) + go u.tickCPU(ctx, time.Second) + go u.tickMemory(ctx, time.Second) }) } @@ -233,7 +242,7 @@ func (u *util) cgroupCPULimit(version int) (uint64, float64) { return 0, 0 } -func (u *util) tick(ctx context.Context, interval time.Duration) { +func (u *util) tickCPU(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -242,7 +251,7 @@ func (u *util) tick(ctx context.Context, interval time.Duration) { case <-ctx.Done(): return case t := <-ticker.C: - stat := u.collect() + stat := u.collectCPU() u.lock.Lock() u.statPrevious, u.statCurrent = u.statCurrent, stat @@ -253,7 +262,7 @@ func (u *util) tick(ctx context.Context, interval time.Duration) { } } -func (u *util) collect() cpuTimesStat { +func (u *util) collectCPU() cpuTimesStat { stat, err := u.cpuTimes() if err != nil { return cpuTimesStat{ @@ -265,6 +274,34 @@ func (u *util) collect() cpuTimesStat { return *stat } +func (u *util) tickMemory(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + stat := u.collectMemory() + if stat != nil { + u.lock.Lock() + u.mem = *stat + u.lock.Unlock() + } + } + } +} + +func (u *util) collectMemory() *MemoryInfoStat { + stat, err := u.virtualMemory() + if err != nil { + return nil + } + + return stat +} + func (u *util) CPUCounts(logical bool) (float64, error) { if u.hasCgroup && u.ncpu > 0 { return u.ncpu, nil @@ -409,7 +446,7 @@ func DiskUsage(path string) (*disk.UsageStat, error) { return DefaultUtil.DiskUsage(path) } -func (u *util) VirtualMemory() (*MemoryInfoStat, error) { +func (u *util) virtualMemory() (*MemoryInfoStat, error) { info, err := mem.VirtualMemory() if err != nil { return nil, err @@ -431,6 +468,19 @@ func (u *util) VirtualMemory() (*MemoryInfoStat, error) { }, nil } +func (u *util) VirtualMemory() (*MemoryInfoStat, error) { + u.lock.RLock() + defer u.lock.RUnlock() + + stat := &MemoryInfoStat{ + Total: u.mem.Total, + Available: u.mem.Available, + Used: u.mem.Used, + } + + return stat, nil +} + func VirtualMemory() (*MemoryInfoStat, error) { return DefaultUtil.VirtualMemory() } diff --git a/restream/app/process.go b/restream/app/process.go index 9db4083e..6dfebab7 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -6,6 +6,7 @@ import ( "encoding/json" "strconv" "strings" + "sync" "github.com/datarhei/core/v16/ffmpeg/parse" "github.com/datarhei/core/v16/process" @@ -197,6 +198,37 @@ func (c *Config) ProcessID() ProcessID { } } +type order struct { + order string + lock sync.RWMutex +} + +func NewOrder(o string) order { + return order{ + order: o, + } +} + +func (o *order) Clone() order { + return order{ + order: o.order, + } +} + +func (o *order) String() string { + o.lock.RLock() + defer o.lock.RUnlock() + + return o.order +} + +func (o *order) Set(order string) { + o.lock.Lock() + defer o.lock.Unlock() + + o.order = order +} + type Process struct { ID string Owner string @@ -205,7 +237,7 @@ type Process struct { Config *Config CreatedAt int64 UpdatedAt int64 - Order string + Order order } func (process *Process) Clone() *Process { @@ -217,7 +249,7 @@ func (process *Process) Clone() *Process { Config: process.Config.Clone(), CreatedAt: process.CreatedAt, UpdatedAt: process.UpdatedAt, - Order: process.Order, + Order: process.Order.Clone(), } return clone diff --git a/restream/core.go b/restream/core.go index 3e5e5dbc..c8e49808 100644 --- a/restream/core.go +++ b/restream/core.go @@ -26,6 +26,7 @@ import ( "github.com/datarhei/core/v16/restream/store" "github.com/Masterminds/semver/v3" + "github.com/puzpuzpuz/xsync/v3" ) // The Restreamer interface @@ -358,6 +359,7 @@ func (r *restream) load() error { "domain": p.Process.Domain, "reference": p.Process.Reference, }), + lock: xsync.NewRBMutex(), } t.metadata = p.Metadata @@ -452,7 +454,7 @@ func (r *restream) load() error { } t.ffmpeg = ffmpeg - t.valid = true + t.Valid(true) return true }) @@ -558,14 +560,14 @@ func (r *restream) createTask(config *app.Config) (*task, error) { Domain: config.Domain, Reference: config.Reference, Config: config.Clone(), - Order: "stop", + Order: app.NewOrder("stop"), CreatedAt: time.Now().Unix(), } process.UpdatedAt = process.CreatedAt if config.Autostart { - process.Order = "start" + process.Order.Set("start") } t := &task{ @@ -581,6 +583,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) { "reference": process.Reference, "domain": process.Domain, }), + lock: xsync.NewRBMutex(), } resolveStaticPlaceholders(t.config, r.replace) @@ -647,7 +650,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) { t.ffmpeg = ffmpeg - t.valid = true + t.Valid(true) return t, nil } @@ -1125,9 +1128,17 @@ func parseAddressReference(address string) (map[string]string, error) { } func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { - r.lock.Lock() - defer r.lock.Unlock() + err := r.updateProcess(id, config) + if err != nil { + return err + } + r.save() + + return nil +} + +func (r *restream) updateProcess(id app.ProcessID, config *app.Config) error { task, ok := r.tasks.Load(id) if !ok { return ErrUnknownProcess @@ -1152,7 +1163,7 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { } } - t.process.Order = task.Order() + t.process.Order.Set(task.Order()) if err := r.stopProcess(id); err != nil { return fmt.Errorf("stop process: %w", err) @@ -1160,7 +1171,6 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { // This would require a major version jump //t.process.CreatedAt = task.process.CreatedAt - t.process.UpdatedAt = time.Now().Unix() // Transfer the report history to the new process history := task.parser.ReportHistory() @@ -1180,8 +1190,6 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { t.Restore() - r.save() - return nil } @@ -1353,93 +1361,44 @@ func (r *restream) ReloadProcess(id app.ProcessID) error { return nil } -func (r *restream) reloadProcess(tid app.ProcessID) error { - t, ok := r.tasks.Load(tid) +func (r *restream) reloadProcess(id app.ProcessID) error { + task, ok := r.tasks.Load(id) if !ok { return ErrUnknownProcess } - t.valid = false - - t.config = t.process.Config.Clone() - - resolveStaticPlaceholders(t.config, r.replace) - - err := r.resolveAddresses(r.tasks, t.config) + t, err := r.createTask(task.Config()) if err != nil { return err } - // Validate config with all placeholders replaced. However, we need to take care - // that the config with the task keeps its dynamic placeholders for process starts. - config := t.config.Clone() - resolveDynamicPlaceholder(config, r.replace) + tid := t.ID() - t.usesDisk, err = validateConfig(config, r.fs.list, r.ffmpeg) - if err != nil { - return err + t.process.Order.Set(task.Order()) + + if err := task.Stop(); err != nil { + return fmt.Errorf("stop process: %w", err) } - err = r.setPlayoutPorts(t) - if err != nil { - return err - } - - t.command = t.config.CreateCommand() - - order := "stop" - if t.process.Order == "start" { - order = "start" - r.stopProcess(tid) - } - - history := t.parser.ReportHistory() - - parser := r.ffmpeg.NewProcessParser(t.logger, t.String(), t.reference, t.config.LogPatterns) + // Transfer the report history to the new process + history := task.parser.ReportHistory() t.parser.ImportReportHistory(history) - t.parser = parser - limitMode := "hard" - if r.enableSoftLimit { - limitMode = "soft" + // Transfer the metadata to the new process + t.metadata = task.metadata + + if err := r.deleteProcess(id); err != nil { + return fmt.Errorf("delete process: %w", err) } - ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{ - Reconnect: t.config.Reconnect, - ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, - StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second, - Timeout: time.Duration(t.config.Timeout) * time.Second, - LimitCPU: t.config.LimitCPU, - LimitMemory: t.config.LimitMemory, - LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, - LimitMode: limitMode, - Scheduler: t.config.Scheduler, - Args: t.command, - Parser: t.parser, - Logger: t.logger, - OnArgs: r.onArgs(t.config.Clone()), - OnBeforeStart: func() error { - if !r.enableSoftLimit { - return nil - } + r.tasks.Store(tid, t) - if err := r.resources.Request(t.config.LimitCPU, t.config.LimitMemory); err != nil { - return err - } + // set filesystem cleanup rules + r.setCleanup(tid, t.Config()) - return nil - }, - }) - if err != nil { - return err - } + t.Restore() - t.ffmpeg = ffmpeg - t.valid = true - - if order == "start" { - r.startProcess(tid) - } + r.save() return nil } diff --git a/restream/store/json/data.go b/restream/store/json/data.go index b8e7a932..c9e3d0fd 100644 --- a/restream/store/json/data.go +++ b/restream/store/json/data.go @@ -186,7 +186,7 @@ func MarshalProcess(a *app.Process) Process { Config: ProcessConfig{}, CreatedAt: a.CreatedAt, UpdatedAt: a.UpdatedAt, - Order: a.Order, + Order: a.Order.String(), } p.Config.Marshal(a.Config) @@ -203,7 +203,7 @@ func UnmarshalProcess(p Process) *app.Process { Config: &app.Config{}, CreatedAt: p.CreatedAt, UpdatedAt: p.UpdatedAt, - Order: p.Order, + Order: app.NewOrder(p.Order), } a.Config = p.Config.Unmarshal() diff --git a/restream/store/json/json_test.go b/restream/store/json/json_test.go index 2771c8fd..94680b9d 100644 --- a/restream/store/json/json_test.go +++ b/restream/store/json/json_test.go @@ -73,7 +73,7 @@ func TestStoreLoad(t *testing.T) { }, CreatedAt: 0, UpdatedAt: 0, - Order: "stop", + Order: app.NewOrder("stop"), }, Metadata: map[string]interface{}{ "some": "data", @@ -112,7 +112,7 @@ func TestStoreLoad(t *testing.T) { }, CreatedAt: 0, UpdatedAt: 0, - Order: "stop", + Order: app.NewOrder("stop"), }, Metadata: map[string]interface{}{ "some-more": "data", diff --git a/restream/task.go b/restream/task.go index 6928385c..9120d00e 100644 --- a/restream/task.go +++ b/restream/task.go @@ -2,7 +2,6 @@ package restream import ( "errors" - "sync" "time" "github.com/datarhei/core/v16/ffmpeg/parse" @@ -10,6 +9,8 @@ import ( "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/process" "github.com/datarhei/core/v16/restream/app" + + "github.com/puzpuzpuz/xsync/v3" ) var ErrInvalidProcessConfig = errors.New("invalid process config") @@ -32,19 +33,26 @@ type task struct { usesDisk bool // Whether this task uses the disk metadata map[string]interface{} - lock sync.RWMutex + lock *xsync.RBMutex } func (t *task) IsValid() bool { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) return t.valid } +func (t *task) Valid(valid bool) { + t.lock.Lock() + defer t.lock.Unlock() + + t.valid = valid +} + func (t *task) UsesDisk() bool { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) return t.usesDisk } @@ -62,8 +70,8 @@ func (t *task) String() string { // Restore restores the task's order func (t *task) Restore() error { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) if !t.valid { return ErrInvalidProcessConfig @@ -73,7 +81,7 @@ func (t *task) Restore() error { return ErrInvalidProcessConfig } - if t.process.Order == "start" { + if t.process.Order.String() == "start" { err := t.ffmpeg.Start() if err != nil { return err @@ -84,8 +92,8 @@ func (t *task) Restore() error { } func (t *task) Start() error { - t.lock.Lock() - defer t.lock.Unlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) if !t.valid { return ErrInvalidProcessConfig @@ -97,19 +105,20 @@ func (t *task) Start() error { status := t.ffmpeg.Status() - if t.process.Order == "start" && status.Order == "start" { + if t.process.Order.String() == "start" && status.Order == "start" { return nil } - t.process.Order = "start" + t.process.Order.Set("start") + t.ffmpeg.Start() return nil } func (t *task) Stop() error { - t.lock.Lock() - defer t.lock.Unlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) if t.ffmpeg == nil { return nil @@ -117,11 +126,11 @@ func (t *task) Stop() error { status := t.ffmpeg.Status() - if t.process.Order == "stop" && status.Order == "stop" { + if t.process.Order.String() == "stop" && status.Order == "stop" { return nil } - t.process.Order = "stop" + t.process.Order.Set("stop") t.ffmpeg.Stop(true) @@ -130,8 +139,8 @@ func (t *task) Stop() error { // Kill stops a process without changing the tasks order func (t *task) Kill() { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) if t.ffmpeg == nil { return @@ -141,14 +150,14 @@ func (t *task) Kill() { } func (t *task) Restart() error { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) if !t.valid { return ErrInvalidProcessConfig } - if t.process.Order == "stop" { + if t.process.Order.String() == "stop" { return nil } @@ -161,8 +170,8 @@ func (t *task) Restart() error { } func (t *task) State() (*app.State, error) { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) state := &app.State{} @@ -172,7 +181,7 @@ func (t *task) State() (*app.State, error) { status := t.ffmpeg.Status() - state.Order = t.process.Order + state.Order = t.process.Order.String() state.State = status.State state.States.Marshal(status.States) state.Time = status.Time.Unix() @@ -213,8 +222,8 @@ func (t *task) State() (*app.State, error) { } func (t *task) Report() (*app.Report, error) { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) report := &app.Report{} @@ -255,8 +264,8 @@ func (t *task) Report() (*app.Report, error) { } func (t *task) SetReport(report *app.Report) error { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) if !t.valid { return nil @@ -270,8 +279,8 @@ func (t *task) SetReport(report *app.Report) error { } func (t *task) SearchReportHistory(state string, from, to *time.Time) []app.ReportHistorySearchResult { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) result := []app.ReportHistorySearchResult{} @@ -316,8 +325,8 @@ func (t *task) SetMetadata(key string, data interface{}) error { } func (t *task) GetMetadata(key string) (interface{}, error) { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) if len(key) == 0 { return t.metadata, nil @@ -332,8 +341,8 @@ func (t *task) GetMetadata(key string) (interface{}, error) { } func (t *task) Limit(cpu, memory bool) bool { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) if !t.valid { return false @@ -349,15 +358,15 @@ func (t *task) Limit(cpu, memory bool) bool { } func (t *task) Equal(config *app.Config) bool { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) return t.process.Config.Equal(config) } func (t *task) Config() *app.Config { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) return t.config.Clone() } @@ -378,8 +387,8 @@ func (t *task) Destroy() { } func (t *task) Match(id, reference, owner, domain glob.Glob) bool { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) count := 0 matches := 0 @@ -416,15 +425,15 @@ func (t *task) Match(id, reference, owner, domain glob.Glob) bool { } func (t *task) Process() *app.Process { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) return t.process.Clone() } func (t *task) Order() string { - t.lock.RLock() - defer t.lock.RUnlock() + token := t.lock.RLock() + defer t.lock.RUnlock(token) - return t.process.Order + return t.process.Order.String() }