WIP: add resource manager

This commit is contained in:
Ingo Oppermann 2023-04-26 22:05:46 +02:00
parent 1e35d29371
commit 5e2060f785
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
2 changed files with 286 additions and 14 deletions

View File

@ -0,0 +1,160 @@
package resources
import (
"context"
"sync"
"time"
"github.com/datarhei/core/v16/psutil"
)
type resources struct {
ncpu float64
maxCPU float64
maxMemory uint64
consumerCPU float64
consumerMemory uint64
limit chan bool
isLimiting bool
cancelObserver context.CancelFunc
lock sync.Mutex
startOnce sync.Once
stopOnce sync.Once
}
type Resources interface {
Start()
Stop()
Limit() <-chan bool
Add(cpu float64, memory uint64) bool
Remove(cpu float64, memory uint64)
}
func New(maxCPU, maxMemory float64) (Resources, error) {
r := &resources{
maxCPU: maxCPU,
}
vmstat, err := psutil.VirtualMemory()
if err != nil {
return nil, err
}
ncpu, err := psutil.CPUCounts(true)
if err != nil {
ncpu = 1
}
r.ncpu = ncpu
r.maxMemory = uint64(float64(vmstat.Total) * maxMemory / 100)
r.stopOnce.Do(func() {})
return r, nil
}
func (r *resources) Start() {
r.startOnce.Do(func() {
r.limit = make(chan bool, 10)
ctx, cancel := context.WithCancel(context.Background())
r.cancelObserver = cancel
go r.observe(ctx, time.Second)
r.stopOnce = sync.Once{}
})
}
func (r *resources) Stop() {
r.stopOnce.Do(func() {
r.cancelObserver()
r.startOnce = sync.Once{}
})
}
func (r *resources) Limit() <-chan bool {
return r.limit
}
func (r *resources) observe(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
limit := false
cpustat, err := psutil.CPUPercent()
if err != nil {
continue
}
cpuload := cpustat.User + cpustat.System + cpustat.Other
if cpuload > r.maxCPU {
limit = true
}
vmstat, err := psutil.VirtualMemory()
if err != nil {
continue
}
if vmstat.Used > r.maxMemory {
limit = true
}
r.lock.Lock()
if r.isLimiting != limit {
r.isLimiting = limit
select {
case r.limit <- limit:
default:
}
}
r.lock.Unlock()
}
}
}
func (r *resources) Add(cpu float64, memory uint64) bool {
r.lock.Lock()
defer r.lock.Unlock()
if r.isLimiting {
return false
}
if r.consumerCPU+cpu > r.maxCPU {
return false
}
if r.consumerMemory+memory > r.maxMemory {
return false
}
r.consumerCPU += cpu
r.consumerMemory += memory
return true
}
func (r *resources) Remove(cpu float64, memory uint64) {
r.lock.Lock()
defer r.lock.Unlock()
r.consumerCPU -= cpu
r.consumerMemory -= memory
}

View File

@ -24,6 +24,7 @@ import (
"github.com/datarhei/core/v16/restream/app"
rfs "github.com/datarhei/core/v16/restream/fs"
"github.com/datarhei/core/v16/restream/replace"
"github.com/datarhei/core/v16/restream/resources"
"github.com/datarhei/core/v16/restream/store"
"github.com/Masterminds/semver/v3"
@ -68,6 +69,8 @@ type Config struct {
Replace replace.Replacer
FFmpeg ffmpeg.FFmpeg
MaxProcesses int64
MaxCPU float64 // percent 0-100*ncpu
MaxMemory float64 // percent 0-100
Logger log.Logger
}
@ -95,16 +98,20 @@ type restream struct {
maxProc int64
nProc int64
fs struct {
list []rfs.Filesystem
stopObserver context.CancelFunc
list []rfs.Filesystem
}
replace replace.Replacer
tasks map[string]*task
logger log.Logger
metadata map[string]interface{}
resources resources.Resources
enableSoftLimit bool
lock sync.RWMutex
cancelObserver context.CancelFunc
startOnce sync.Once
stopOnce sync.Once
}
@ -159,6 +166,11 @@ func New(config Config) (Restreamer, error) {
r.maxProc = config.MaxProcesses
if config.MaxCPU > 0 || config.MaxMemory > 0 {
r.resources, _ = resources.New(config.MaxCPU, config.MaxMemory)
r.enableSoftLimit = true
}
if err := r.load(); err != nil {
return nil, fmt.Errorf("failed to load data from DB (%w)", err)
}
@ -175,6 +187,13 @@ func (r *restream) Start() {
r.lock.Lock()
defer r.lock.Unlock()
ctx, cancel := context.WithCancel(context.Background())
r.cancelObserver = cancel
if r.enableSoftLimit {
go r.resourceObserver(ctx, r.resources)
}
for id, t := range r.tasks {
if t.process.Order == "start" {
r.startProcess(id)
@ -184,14 +203,11 @@ func (r *restream) Start() {
r.setCleanup(id, t.config)
}
ctx, cancel := context.WithCancel(context.Background())
r.fs.stopObserver = cancel
for _, fs := range r.fs.list {
fs.Start()
if fs.Type() == "disk" {
go r.observe(ctx, fs, 10*time.Second)
go r.filesystemObserver(ctx, fs, 10*time.Second)
}
}
@ -215,7 +231,7 @@ func (r *restream) Stop() {
r.unsetCleanup(id)
}
r.fs.stopObserver()
r.cancelObserver()
// Stop the cleanup jobs
for _, fs := range r.fs.list {
@ -226,7 +242,7 @@ func (r *restream) Stop() {
})
}
func (r *restream) observe(ctx context.Context, fs fs.Filesystem, interval time.Duration) {
func (r *restream) filesystemObserver(ctx context.Context, fs fs.Filesystem, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
@ -266,6 +282,36 @@ func (r *restream) observe(ctx context.Context, fs fs.Filesystem, interval time.
}
}
func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources) {
rsc.Start()
defer rsc.Stop()
for {
select {
case <-ctx.Done():
return
case limit := <-rsc.Limit():
if limit {
r.logger.Warn().WithField("limit", limit).Log("limiter triggered")
}
r.lock.Lock()
for id, t := range r.tasks {
if !t.valid {
continue
}
r.logger.Debug().WithFields(log.Fields{
"limit": limit,
"id": id,
}).Log("limiting process")
t.ffmpeg.Limit(limit)
}
r.lock.Unlock()
}
}
}
func (r *restream) load() error {
data, err := r.store.Load()
if err != nil {
@ -360,6 +406,11 @@ func (r *restream) load() error {
t.command = t.config.CreateCommand()
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns)
limitMode := "hard"
if r.enableSoftLimit {
limitMode = "soft"
}
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
Reconnect: t.config.Reconnect,
ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second,
@ -368,13 +419,30 @@ func (r *restream) load() error {
LimitCPU: t.config.LimitCPU,
LimitMemory: t.config.LimitMemory,
LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second,
LimitMode: "hard",
LimitMode: limitMode,
Scheduler: t.config.Scheduler,
Args: t.command,
Parser: t.parser,
Logger: t.logger,
OnArgs: r.onArgs(t.config.Clone()),
OnBeforeStart: func() error { return nil },
OnBeforeStart: func() error {
if !r.enableSoftLimit {
return nil
}
if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) {
return fmt.Errorf("not enough resources available")
}
return nil
},
OnExit: func(string) {
if !r.enableSoftLimit {
return
}
r.resources.Remove(t.config.LimitCPU, t.config.LimitMemory)
},
})
if err != nil {
return err
@ -519,6 +587,11 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
t.command = t.config.CreateCommand()
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns)
limitMode := "hard"
if r.enableSoftLimit {
limitMode = "soft"
}
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
Reconnect: t.config.Reconnect,
ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second,
@ -527,13 +600,30 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
LimitCPU: t.config.LimitCPU,
LimitMemory: t.config.LimitMemory,
LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second,
LimitMode: "hard",
LimitMode: limitMode,
Scheduler: t.config.Scheduler,
Args: t.command,
Parser: t.parser,
Logger: t.logger,
OnArgs: r.onArgs(t.config.Clone()),
OnBeforeStart: func() error { return nil },
OnBeforeStart: func() error {
if !r.enableSoftLimit {
return nil
}
if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) {
return fmt.Errorf("not enough resources available")
}
return nil
},
OnExit: func(string) {
if !r.enableSoftLimit {
return
}
r.resources.Remove(t.config.LimitCPU, t.config.LimitMemory)
},
})
if err != nil {
return nil, err
@ -1245,6 +1335,11 @@ func (r *restream) reloadProcess(id string) error {
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns)
limitMode := "hard"
if r.enableSoftLimit {
limitMode = "soft"
}
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
Reconnect: t.config.Reconnect,
ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second,
@ -1253,13 +1348,30 @@ func (r *restream) reloadProcess(id string) error {
LimitCPU: t.config.LimitCPU,
LimitMemory: t.config.LimitMemory,
LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second,
LimitMode: "hard",
LimitMode: limitMode,
Scheduler: t.config.Scheduler,
Args: t.command,
Parser: t.parser,
Logger: t.logger,
OnArgs: r.onArgs(t.config.Clone()),
OnBeforeStart: func() error { return nil },
OnBeforeStart: func() error {
if !r.enableSoftLimit {
return nil
}
if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) {
return fmt.Errorf("not enough resources available")
}
return nil
},
OnExit: func(string) {
if !r.enableSoftLimit {
return
}
r.resources.Remove(t.config.LimitCPU, t.config.LimitMemory)
},
})
if err != nil {
return err