From a2dab2682f079d0401d18ad933feebaa845110a7 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 26 Apr 2023 09:49:28 +0200 Subject: [PATCH] Fix not propagating process limits --- ffmpeg/ffmpeg.go | 6 ++++++ process/process.go | 42 +++++++++++++++++++-------------------- restream/restream.go | 13 ++++++++++-- restream/restream_test.go | 23 +++++++++++++++++++++ 4 files changed, 60 insertions(+), 24 deletions(-) diff --git a/ffmpeg/ffmpeg.go b/ffmpeg/ffmpeg.go index d5f79184..f58d87c1 100644 --- a/ffmpeg/ffmpeg.go +++ b/ffmpeg/ffmpeg.go @@ -32,6 +32,9 @@ type ProcessConfig struct { Reconnect bool ReconnectDelay time.Duration StaleTimeout time.Duration + LimitCPU float64 + LimitMemory uint64 + LimitDuration time.Duration Command []string Parser process.Parser Logger log.Logger @@ -117,6 +120,9 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) { Reconnect: config.Reconnect, ReconnectDelay: config.ReconnectDelay, StaleTimeout: config.StaleTimeout, + LimitCPU: config.LimitCPU, + LimitMemory: config.LimitMemory, + LimitDuration: config.LimitDuration, Parser: config.Parser, Logger: config.Logger, OnStart: config.OnStart, diff --git a/process/process.go b/process/process.go index 9a8637a2..32b28ae2 100644 --- a/process/process.go +++ b/process/process.go @@ -63,26 +63,19 @@ type Config struct { // Status represents the current status of a process type Status struct { - // State is the current state of the process. See stateType for the known states. - State string - - // States is the cumulative history of states the process had. - States States - - // Order is the wanted condition of process, either "start" or "stop" - Order string - - // Duration is the time since the last change of the state - Duration time.Duration - - // Time is the time of the last change of the state - Time time.Time - - // Used CPU in percent - CPU float64 - - // Used memory in bytes - Memory uint64 + State string // State is the current state of the process. See stateType for the known states. + States States // States is the cumulative history of states the process had. + Order string // Order is the wanted condition of process, either "start" or "stop" + Duration time.Duration // Duration is the time since the last change of the state + Time time.Time // Time is the time of the last change of the state + CPU struct { + Current float64 // Used CPU in percent + Limit float64 // Limit in percent + } + Memory struct { + Current uint64 // Used memory in bytes + Limit uint64 // Limit in bytes + } } // States @@ -390,6 +383,7 @@ func (p *process) getStateString() string { // Status returns the current status of the process func (p *process) Status() Status { cpu, memory := p.limits.Current() + cpuLimit, memoryLimit := p.limits.Limits() p.state.lock.Lock() stateTime := p.state.time @@ -407,10 +401,14 @@ func (p *process) Status() Status { Order: order, Duration: time.Since(stateTime), Time: stateTime, - CPU: cpu, - Memory: memory, } + s.CPU.Current = cpu + s.CPU.Limit = cpuLimit + + s.Memory.Current = memory + s.Memory.Limit = memoryLimit + return s } diff --git a/restream/restream.go b/restream/restream.go index bd5e1482..eb014297 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -355,6 +355,9 @@ func (r *restream) load() error { Reconnect: t.config.Reconnect, ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second, + LimitCPU: t.config.LimitCPU, + LimitMemory: t.config.LimitMemory, + LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, Command: t.command, Parser: t.parser, Logger: t.logger, @@ -494,6 +497,9 @@ func (r *restream) createTask(config *app.Config) (*task, error) { Reconnect: t.config.Reconnect, ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second, + LimitCPU: t.config.LimitCPU, + LimitMemory: t.config.LimitMemory, + LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, Command: t.command, Parser: t.parser, Logger: t.logger, @@ -1179,6 +1185,9 @@ func (r *restream) reloadProcess(id string) error { Reconnect: t.config.Reconnect, ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second, + LimitCPU: t.config.LimitCPU, + LimitMemory: t.config.LimitMemory, + LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, Command: t.command, Parser: t.parser, Logger: t.logger, @@ -1218,8 +1227,8 @@ func (r *restream) GetProcessState(id string) (*app.State, error) { state.State = status.State state.States.Marshal(status.States) state.Time = status.Time.Unix() - state.Memory = status.Memory - state.CPU = status.CPU + state.Memory = status.Memory.Current + state.CPU = status.CPU.Current state.Duration = status.Duration.Round(10 * time.Millisecond).Seconds() state.Reconnect = -1 state.Command = make([]string, len(task.command)) diff --git a/restream/restream_test.go b/restream/restream_test.go index c142c7fd..64a536a5 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -883,3 +883,26 @@ func TestReplacer(t *testing.T) { require.Equal(t, process, rs.tasks["314159265359"].config) } + +func TestProcessLimit(t *testing.T) { + rsi, err := getDummyRestreamer(nil, nil, nil, nil) + require.NoError(t, err) + + process := getDummyProcess() + process.LimitCPU = 61 + process.LimitMemory = 42 + process.Autostart = false + + err = rsi.AddProcess(process) + require.NoError(t, err) + + rs := rsi.(*restream) + + task, ok := rs.tasks[process.ID] + require.True(t, ok) + + status := task.ffmpeg.Status() + + require.Equal(t, float64(61), status.CPU.Limit) + require.Equal(t, uint64(42), status.Memory.Limit) +}