From 5e2060f7852905c37708eb1875ebb12791b8ab46 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 26 Apr 2023 22:05:46 +0200 Subject: [PATCH] WIP: add resource manager --- restream/resources/resources.go | 160 ++++++++++++++++++++++++++++++++ restream/restream.go | 140 +++++++++++++++++++++++++--- 2 files changed, 286 insertions(+), 14 deletions(-) create mode 100644 restream/resources/resources.go diff --git a/restream/resources/resources.go b/restream/resources/resources.go new file mode 100644 index 00000000..1ccf526a --- /dev/null +++ b/restream/resources/resources.go @@ -0,0 +1,160 @@ +package resources + +import ( + "context" + "sync" + "time" + + "github.com/datarhei/core/v16/psutil" +) + +type resources struct { + ncpu float64 + maxCPU float64 + maxMemory uint64 + + consumerCPU float64 + consumerMemory uint64 + + limit chan bool + isLimiting bool + + cancelObserver context.CancelFunc + + lock sync.Mutex + startOnce sync.Once + stopOnce sync.Once +} + +type Resources interface { + Start() + Stop() + + Limit() <-chan bool + + Add(cpu float64, memory uint64) bool + Remove(cpu float64, memory uint64) +} + +func New(maxCPU, maxMemory float64) (Resources, error) { + r := &resources{ + maxCPU: maxCPU, + } + + vmstat, err := psutil.VirtualMemory() + if err != nil { + return nil, err + } + + ncpu, err := psutil.CPUCounts(true) + if err != nil { + ncpu = 1 + } + + r.ncpu = ncpu + + r.maxMemory = uint64(float64(vmstat.Total) * maxMemory / 100) + + r.stopOnce.Do(func() {}) + + return r, nil +} + +func (r *resources) Start() { + r.startOnce.Do(func() { + r.limit = make(chan bool, 10) + + ctx, cancel := context.WithCancel(context.Background()) + r.cancelObserver = cancel + + go r.observe(ctx, time.Second) + + r.stopOnce = sync.Once{} + }) +} + +func (r *resources) Stop() { + r.stopOnce.Do(func() { + r.cancelObserver() + + r.startOnce = sync.Once{} + }) +} + +func (r *resources) Limit() <-chan bool { + return r.limit +} + +func (r *resources) observe(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + limit := false + + cpustat, err := psutil.CPUPercent() + if err != nil { + continue + } + + cpuload := cpustat.User + cpustat.System + cpustat.Other + + if cpuload > r.maxCPU { + limit = true + } + + vmstat, err := psutil.VirtualMemory() + if err != nil { + continue + } + + if vmstat.Used > r.maxMemory { + limit = true + } + + r.lock.Lock() + if r.isLimiting != limit { + r.isLimiting = limit + select { + case r.limit <- limit: + default: + } + } + r.lock.Unlock() + } + } +} + +func (r *resources) Add(cpu float64, memory uint64) bool { + r.lock.Lock() + defer r.lock.Unlock() + + if r.isLimiting { + return false + } + + if r.consumerCPU+cpu > r.maxCPU { + return false + } + + if r.consumerMemory+memory > r.maxMemory { + return false + } + + r.consumerCPU += cpu + r.consumerMemory += memory + + return true +} + +func (r *resources) Remove(cpu float64, memory uint64) { + r.lock.Lock() + defer r.lock.Unlock() + + r.consumerCPU -= cpu + r.consumerMemory -= memory +} diff --git a/restream/restream.go b/restream/restream.go index b67bf479..719aab1e 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -24,6 +24,7 @@ import ( "github.com/datarhei/core/v16/restream/app" rfs "github.com/datarhei/core/v16/restream/fs" "github.com/datarhei/core/v16/restream/replace" + "github.com/datarhei/core/v16/restream/resources" "github.com/datarhei/core/v16/restream/store" "github.com/Masterminds/semver/v3" @@ -68,6 +69,8 @@ type Config struct { Replace replace.Replacer FFmpeg ffmpeg.FFmpeg MaxProcesses int64 + MaxCPU float64 // percent 0-100*ncpu + MaxMemory float64 // percent 0-100 Logger log.Logger } @@ -95,16 +98,20 @@ type restream struct { maxProc int64 nProc int64 fs struct { - list []rfs.Filesystem - stopObserver context.CancelFunc + list []rfs.Filesystem } replace replace.Replacer tasks map[string]*task logger log.Logger metadata map[string]interface{} + resources resources.Resources + enableSoftLimit bool + lock sync.RWMutex + cancelObserver context.CancelFunc + startOnce sync.Once stopOnce sync.Once } @@ -159,6 +166,11 @@ func New(config Config) (Restreamer, error) { r.maxProc = config.MaxProcesses + if config.MaxCPU > 0 || config.MaxMemory > 0 { + r.resources, _ = resources.New(config.MaxCPU, config.MaxMemory) + r.enableSoftLimit = true + } + if err := r.load(); err != nil { return nil, fmt.Errorf("failed to load data from DB (%w)", err) } @@ -175,6 +187,13 @@ func (r *restream) Start() { r.lock.Lock() defer r.lock.Unlock() + ctx, cancel := context.WithCancel(context.Background()) + r.cancelObserver = cancel + + if r.enableSoftLimit { + go r.resourceObserver(ctx, r.resources) + } + for id, t := range r.tasks { if t.process.Order == "start" { r.startProcess(id) @@ -184,14 +203,11 @@ func (r *restream) Start() { r.setCleanup(id, t.config) } - ctx, cancel := context.WithCancel(context.Background()) - r.fs.stopObserver = cancel - for _, fs := range r.fs.list { fs.Start() if fs.Type() == "disk" { - go r.observe(ctx, fs, 10*time.Second) + go r.filesystemObserver(ctx, fs, 10*time.Second) } } @@ -215,7 +231,7 @@ func (r *restream) Stop() { r.unsetCleanup(id) } - r.fs.stopObserver() + r.cancelObserver() // Stop the cleanup jobs for _, fs := range r.fs.list { @@ -226,7 +242,7 @@ func (r *restream) Stop() { }) } -func (r *restream) observe(ctx context.Context, fs fs.Filesystem, interval time.Duration) { +func (r *restream) filesystemObserver(ctx context.Context, fs fs.Filesystem, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -266,6 +282,36 @@ func (r *restream) observe(ctx context.Context, fs fs.Filesystem, interval time. } } +func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources) { + rsc.Start() + defer rsc.Stop() + + for { + select { + case <-ctx.Done(): + return + case limit := <-rsc.Limit(): + if limit { + r.logger.Warn().WithField("limit", limit).Log("limiter triggered") + } + + r.lock.Lock() + for id, t := range r.tasks { + if !t.valid { + continue + } + + r.logger.Debug().WithFields(log.Fields{ + "limit": limit, + "id": id, + }).Log("limiting process") + t.ffmpeg.Limit(limit) + } + r.lock.Unlock() + } + } +} + func (r *restream) load() error { data, err := r.store.Load() if err != nil { @@ -360,6 +406,11 @@ func (r *restream) load() error { t.command = t.config.CreateCommand() t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns) + limitMode := "hard" + if r.enableSoftLimit { + limitMode = "soft" + } + ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{ Reconnect: t.config.Reconnect, ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, @@ -368,13 +419,30 @@ func (r *restream) load() error { LimitCPU: t.config.LimitCPU, LimitMemory: t.config.LimitMemory, LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, - LimitMode: "hard", + LimitMode: limitMode, Scheduler: t.config.Scheduler, Args: t.command, Parser: t.parser, Logger: t.logger, OnArgs: r.onArgs(t.config.Clone()), - OnBeforeStart: func() error { return nil }, + OnBeforeStart: func() error { + if !r.enableSoftLimit { + return nil + } + + if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) { + return fmt.Errorf("not enough resources available") + } + + return nil + }, + OnExit: func(string) { + if !r.enableSoftLimit { + return + } + + r.resources.Remove(t.config.LimitCPU, t.config.LimitMemory) + }, }) if err != nil { return err @@ -519,6 +587,11 @@ func (r *restream) createTask(config *app.Config) (*task, error) { t.command = t.config.CreateCommand() t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns) + limitMode := "hard" + if r.enableSoftLimit { + limitMode = "soft" + } + ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{ Reconnect: t.config.Reconnect, ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, @@ -527,13 +600,30 @@ func (r *restream) createTask(config *app.Config) (*task, error) { LimitCPU: t.config.LimitCPU, LimitMemory: t.config.LimitMemory, LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, - LimitMode: "hard", + LimitMode: limitMode, Scheduler: t.config.Scheduler, Args: t.command, Parser: t.parser, Logger: t.logger, OnArgs: r.onArgs(t.config.Clone()), - OnBeforeStart: func() error { return nil }, + OnBeforeStart: func() error { + if !r.enableSoftLimit { + return nil + } + + if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) { + return fmt.Errorf("not enough resources available") + } + + return nil + }, + OnExit: func(string) { + if !r.enableSoftLimit { + return + } + + r.resources.Remove(t.config.LimitCPU, t.config.LimitMemory) + }, }) if err != nil { return nil, err @@ -1245,6 +1335,11 @@ func (r *restream) reloadProcess(id string) error { t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns) + limitMode := "hard" + if r.enableSoftLimit { + limitMode = "soft" + } + ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{ Reconnect: t.config.Reconnect, ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, @@ -1253,13 +1348,30 @@ func (r *restream) reloadProcess(id string) error { LimitCPU: t.config.LimitCPU, LimitMemory: t.config.LimitMemory, LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, - LimitMode: "hard", + LimitMode: limitMode, Scheduler: t.config.Scheduler, Args: t.command, Parser: t.parser, Logger: t.logger, OnArgs: r.onArgs(t.config.Clone()), - OnBeforeStart: func() error { return nil }, + OnBeforeStart: func() error { + if !r.enableSoftLimit { + return nil + } + + if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) { + return fmt.Errorf("not enough resources available") + } + + return nil + }, + OnExit: func(string) { + if !r.enableSoftLimit { + return + } + + r.resources.Remove(t.config.LimitCPU, t.config.LimitMemory) + }, }) if err != nil { return err