diff --git a/config/config.go b/config/config.go index c5efd4ab..c6f3c114 100644 --- a/config/config.go +++ b/config/config.go @@ -105,6 +105,7 @@ func (d *Config) Clone() *Config { data.Sessions = d.Sessions data.Service = d.Service data.Router = d.Router + data.Resources = d.Resources data.Log.Topics = copy.Slice(d.Log.Topics) @@ -273,6 +274,10 @@ func (d *Config) init() { d.vars.Register(value.NewStringList(&d.Router.BlockedPrefixes, []string{"/api"}, ","), "router.blocked_prefixes", "CORE_ROUTER_BLOCKED_PREFIXES", nil, "List of path prefixes that can't be routed", false, false) d.vars.Register(value.NewStringMapString(&d.Router.Routes, nil), "router.routes", "CORE_ROUTER_ROUTES", nil, "List of route mappings", false, false) d.vars.Register(value.NewDir(&d.Router.UIPath, "", d.fs), "router.ui_path", "CORE_ROUTER_UI_PATH", nil, "Path to a directory holding UI files mounted as /ui", false, false) + + // Resources + d.vars.Register(value.NewFloat(&d.Resources.MaxCPUUsage, 0), "resources.max_cpu_usage", "CORE_RESOURCES_MAX_CPU_USAGE", nil, "Maximum system CPU usage in percent, from 0 (no limit) to 100*ncpu", false, false) + d.vars.Register(value.NewFloat(&d.Resources.MaxMemoryUsage, 0), "resources.max_memory_usage", "CORE_RESOURCES_MAX_MEMORY_USAGE", nil, "Maximum system usage in percent, from 0 (no limit) to 100", false, false) } // Validate validates the current state of the Config for completeness and sanity. Errors are diff --git a/config/data.go b/config/data.go index 4dfb4ec0..f4d1afa4 100644 --- a/config/data.go +++ b/config/data.go @@ -139,7 +139,7 @@ type Data struct { } `json:"playout"` Debug struct { Profiling bool `json:"profiling"` - ForceGC int `json:"force_gc" format:"int"` + ForceGC int `json:"force_gc" format:"int"` // deprecated, use MemoryLimit instead MemoryLimit int64 `json:"memory_limit_mbytes" format:"int64"` AutoMaxProcs bool `json:"auto_max_procs"` } `json:"debug"` @@ -168,6 +168,10 @@ type Data struct { Routes map[string]string `json:"routes"` UIPath string `json:"ui_path"` } `json:"router"` + Resources struct { + MaxCPUUsage float64 `json:"max_cpu_usage"` + MaxMemoryUsage float64 `json:"max_memory_usage"` + } `json:"resources"` } func UpgradeV2ToV3(d *v2.Data, fs fs.Filesystem) (*Data, error) { diff --git a/config/value/primitives.go b/config/value/primitives.go index 1dd52a94..4d1258fd 100644 --- a/config/value/primitives.go +++ b/config/value/primitives.go @@ -279,3 +279,34 @@ func (u *Uint64) Validate() error { func (u *Uint64) IsEmpty() bool { return uint64(*u) == 0 } + +// float64 + +type Float64 float64 + +func NewFloat(p *float64, val float64) *Float64 { + *p = val + + return (*Float64)(p) +} + +func (u *Float64) Set(val string) error { + v, err := strconv.ParseFloat(val, 64) + if err != nil { + return err + } + *u = Float64(v) + return nil +} + +func (u *Float64) String() string { + return strconv.FormatFloat(float64(*u), 'f', -1, 64) +} + +func (u *Float64) Validate() error { + return nil +} + +func (u *Float64) IsEmpty() bool { + return float64(*u) == 0 +} diff --git a/config/value/primitives_test.go b/config/value/primitives_test.go index 4b815b90..4406d8b0 100644 --- a/config/value/primitives_test.go +++ b/config/value/primitives_test.go @@ -145,3 +145,23 @@ func TestUint64Value(t *testing.T) { require.Equal(t, uint64(77), x) } + +func TestFloat64Value(t *testing.T) { + var x float64 + + val := NewFloat(&x, 11.1) + + require.Equal(t, "11.1", val.String()) + require.Equal(t, nil, val.Validate()) + require.Equal(t, false, val.IsEmpty()) + + x = 42.5 + + require.Equal(t, "42.5", val.String()) + require.Equal(t, nil, val.Validate()) + require.Equal(t, false, val.IsEmpty()) + + val.Set("77.7") + + require.Equal(t, float64(77.7), x) +} diff --git a/ffmpeg/ffmpeg.go b/ffmpeg/ffmpeg.go index 553df06c..030d5b5a 100644 --- a/ffmpeg/ffmpeg.go +++ b/ffmpeg/ffmpeg.go @@ -29,18 +29,21 @@ type FFmpeg interface { } type ProcessConfig struct { - Reconnect bool - ReconnectDelay time.Duration - StaleTimeout time.Duration - Timeout time.Duration - Scheduler string - Args []string - Parser process.Parser - Logger log.Logger - OnArgs func([]string) []string - OnExit func(state string) - OnStart func() - OnStateChange func(from, to string) + Reconnect bool // Whether to reconnect + ReconnectDelay time.Duration // Duration until next reconnect + StaleTimeout time.Duration // Duration to wait until killing the process if there is no progress in the process + Timeout time.Duration // Duration to wait until killing the process + LimitCPU float64 // Kill the process if the CPU usage in percent is above this value. + LimitMemory uint64 // Kill the process if the memory consumption in bytes is above this value. + LimitDuration time.Duration // Kill the process if the limits are exceeded for this duration. + Scheduler string // A scheduler for starting the process, either a concrete date (RFC3339) or in crontab syntax + Args []string // Arguments for the process + Parser process.Parser // Parser for the process output + Logger log.Logger // Logger + OnArgs func([]string) []string // Callback before starting the process to retrieve new arguments + OnExit func(state string) // Callback called after the process stopped with exit state as argument + OnStart func() // Callback called after process has been started + OnStateChange func(from, to string) // Callback called on state change } // Config is the configuration for ffmpeg that is part of the configuration @@ -134,6 +137,9 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) { ReconnectDelay: config.ReconnectDelay, StaleTimeout: config.StaleTimeout, Timeout: config.Timeout, + LimitCPU: config.LimitCPU, + LimitMemory: config.LimitMemory, + LimitDuration: config.LimitDuration, Scheduler: scheduler, Parser: config.Parser, Logger: config.Logger, diff --git a/process/limits.go b/process/limits.go index 890e79d9..14652ae9 100644 --- a/process/limits.go +++ b/process/limits.go @@ -49,6 +49,10 @@ type Limiter interface { // Usage returns the current state of the limiter, such as current, average, max, and // limit values for CPU and memory. Usage() Usage + + // Limit enables or disables the throttling of the CPU or killing because of to much + // memory consumption. + Limit(enable bool) error } type limiter struct { @@ -257,3 +261,7 @@ func (l *limiter) Usage() Usage { func (l *limiter) Limits() (cpu float64, memory uint64) { return l.cpu, l.memory } + +func (l *limiter) Limit(enable bool) error { + return nil +} diff --git a/process/process.go b/process/process.go index 3fb8e56d..31df9387 100644 --- a/process/process.go +++ b/process/process.go @@ -42,25 +42,31 @@ type Process interface { // IsRunning returns whether the process is currently // running or not. IsRunning() bool + + // Limit enabled or disables CPU and memory limiting. CPU will be throttled + // into the configured limit. If memory consumption is above the configured + // limit, the process will be killed. + Limit(enable bool) error } // Config is the configuration of a process type Config struct { - Binary string // Path to the ffmpeg binary - Args []string // List of arguments for the binary - Reconnect bool // Whether to restart the process if it exited - ReconnectDelay time.Duration // Duration to wait before restarting the process - StaleTimeout time.Duration // Kill the process after this duration if it doesn't produce any output - Timeout time.Duration // Kill the process after this duration - LimitCPU float64 // Kill the process if the CPU usage in percent is above this value - LimitMemory uint64 // Kill the process if the memory consumption in bytes is above this value - LimitDuration time.Duration // Kill the process if the limits are exceeded for this duration - Scheduler Scheduler // A scheduler - Parser Parser // A parser for the output of the process - OnArgs func(args []string) []string // A callback which is called right before the process will start with the command args - OnStart func() // A callback which is called after the process started - OnExit func(state string) // A callback which is called after the process exited with the exit state - OnStateChange func(from, to string) // A callback which is called after a state changed + Binary string // Path to the ffmpeg binary. + Args []string // List of arguments for the binary. + Reconnect bool // Whether to restart the process if it exited. + ReconnectDelay time.Duration // Duration to wait before restarting the process. + StaleTimeout time.Duration // Kill the process after this duration if it doesn't produce any output. + Timeout time.Duration // Kill the process after this duration. + LimitCPU float64 // Kill the process if the CPU usage in percent is above this value. + LimitMemory uint64 // Kill the process if the memory consumption in bytes is above this value. + LimitDuration time.Duration // Kill the process if the limits are exceeded for this duration. + Scheduler Scheduler // A scheduler. + Parser Parser // A parser for the output of the process. + OnArgs func(args []string) []string // A callback which is called right before the process will start with the command args. + OnBeforeStart func() error // A callback which is called before the process will be started. If error is non-nil, the start will be refused. + OnStart func() // A callback which is called after the process started. + OnExit func(state string) // A callback which is called after the process exited with the exit state. + OnStateChange func(from, to string) // A callback which is called after a state changed. Logger log.Logger } @@ -74,16 +80,16 @@ type Status struct { Time time.Time // Time is the time of the last change of the state CommandArgs []string // Currently running command arguments CPU struct { - Current float64 - Average float64 - Max float64 - Limit float64 + Current float64 // Currently consumed CPU in percent + Average float64 // Average consumed CPU in percent + Max float64 // Max. consumed CPU in percent + Limit float64 // Usage limit in percent } // Used CPU in percent Memory struct { - Current uint64 - Average float64 - Max uint64 - Limit uint64 + Current uint64 // Currently consumed memory in bytes + Average float64 // Average consumed memory in bytes + Max uint64 // Max. consumed memory in bytes + Limit uint64 // Usage limit in bytes } // Used memory in bytes } @@ -196,6 +202,7 @@ type process struct { debuglogger log.Logger callbacks struct { onArgs func(args []string) []string + onBeforeStart func() error onStart func() onExit func(state string) onStateChange func(from, to string) @@ -252,6 +259,7 @@ func New(config Config) (Process, error) { p.stale.timeout = config.StaleTimeout p.callbacks.onArgs = config.OnArgs + p.callbacks.onBeforeStart = config.OnBeforeStart p.callbacks.onStart = config.OnStart p.callbacks.onExit = config.OnExit p.callbacks.onStateChange = config.OnStateChange @@ -445,6 +453,10 @@ func (p *process) IsRunning() bool { return p.isRunning() } +func (p *process) Limit(enable bool) error { + return p.limits.Limit(enable) +} + // Start will start the process and sets the order to "start". If the // process has alread the "start" order, nothing will be done. Returns // an error if start failed. @@ -511,6 +523,19 @@ func (p *process) start() error { args = p.callbacks.onArgs(args) } + + if p.callbacks.onBeforeStart != nil { + if err := p.callbacks.onBeforeStart(); err != nil { + p.setState(stateFailed) + + p.parser.Parse(err.Error()) + p.logger.WithError(err).Error().Log("Starting failed") + + p.reconnect(p.delay(stateFailed)) + + return err + } + } p.callbacks.lock.Unlock() // Start the stop timeout if enabled diff --git a/restream/restream.go b/restream/restream.go index c2606012..cd49e437 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -365,6 +365,9 @@ func (r *restream) load() error { ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second, Timeout: time.Duration(t.config.Timeout) * time.Second, + LimitCPU: t.config.LimitCPU, + LimitMemory: t.config.LimitMemory, + LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, Scheduler: t.config.Scheduler, Args: t.command, Parser: t.parser, @@ -519,6 +522,9 @@ func (r *restream) createTask(config *app.Config) (*task, error) { ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second, Timeout: time.Duration(t.config.Timeout) * time.Second, + LimitCPU: t.config.LimitCPU, + LimitMemory: t.config.LimitMemory, + LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, Scheduler: t.config.Scheduler, Args: t.command, Parser: t.parser, @@ -1240,6 +1246,9 @@ func (r *restream) reloadProcess(id string) error { ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second, Timeout: time.Duration(t.config.Timeout) * time.Second, + LimitCPU: t.config.LimitCPU, + LimitMemory: t.config.LimitMemory, + LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, Scheduler: t.config.Scheduler, Args: t.command, Parser: t.parser,