diff --git a/process/limiter.go b/process/limiter.go index 10f2436e..e692d8e7 100644 --- a/process/limiter.go +++ b/process/limiter.go @@ -66,7 +66,7 @@ type Limiter interface { // Limit enables or disables the throttling of the CPU or killing because of to much // memory consumption. - Limit(limit int) error + Limit(cpu, memory bool) error } type limiter struct { @@ -77,29 +77,30 @@ type limiter struct { cancel context.CancelFunc onLimit LimitFunc - cpu float64 - cpuCurrent float64 - cpuMax float64 - cpuTop float64 - cpuAvg float64 - cpuAvgCounter uint64 - cpuLast float64 - cpuLimitSince time.Time + cpu float64 // CPU limit + cpuCurrent float64 // Current CPU load of this process + cpuLast float64 // Last CPU load of this process + cpuMax float64 // Max. CPU load of this process + cpuTop float64 // Decaying max. CPU load of this process + cpuAvg float64 // Average CPU load of this process + cpuAvgCounter uint64 // Counter for average calculation + cpuLimitSince time.Time // Time when the CPU limit has been reached (hard limiter mode) + cpuLimitEnable bool // Whether CPU limiting is enabled (soft limiter mode) - memory uint64 - memoryCurrent uint64 - memoryMax uint64 - memoryTop uint64 - memoryAvg float64 - memoryAvgCounter uint64 - memoryLast uint64 - memoryLimitSince time.Time + memory uint64 // Memory limit (bytes) + memoryCurrent uint64 // Current memory usage + memoryLast uint64 // Last memory usage + memoryMax uint64 // Max. memory usage + memoryTop uint64 // Decaying max. memory usage + memoryAvg float64 // Average memory usage + memoryAvgCounter uint64 // Counter for average memory calculation + memoryLimitSince time.Time // Time when the memory limit has been reached (hard limiter mode) + memoryLimitEnable bool // Whether memory limiting is enabled (soft limiter mode) + + waitFor time.Duration + mode LimitMode - waitFor time.Duration - mode LimitMode - enableLimit bool cancelLimit context.CancelFunc - factorLimit int logger log.Logger } @@ -134,6 +135,8 @@ func NewLimiter(config LimiterConfig) Limiter { l.ncpuFactor = l.ncpu } + l.cpu /= 100 + if l.onLimit == nil { l.onLimit = func(float64, uint64) {} } @@ -148,14 +151,13 @@ func NewLimiter(config LimiterConfig) Limiter { } func (l *limiter) reset() { - l.enableLimit = false - l.cpuCurrent = 0 l.cpuLast = 0 l.cpuAvg = 0 l.cpuAvgCounter = 0 l.cpuMax = 0 l.cpuTop = 0 + l.cpuLimitEnable = false l.memoryCurrent = 0 l.memoryLast = 0 @@ -163,6 +165,7 @@ func (l *limiter) reset() { l.memoryAvgCounter = 0 l.memoryMax = 0 l.memoryTop = 0 + l.memoryLimitEnable = false } func (l *limiter) Start(process psutil.Process) error { @@ -182,6 +185,13 @@ func (l *limiter) Start(process psutil.Process) error { go l.ticker(ctx, 1000*time.Millisecond) + if l.mode == LimitModeSoft { + ctx, cancel = context.WithCancel(context.Background()) + l.cancelLimit = cancel + + go l.limitCPU(ctx, l.cpu, time.Second) + } + return nil } @@ -200,8 +210,6 @@ func (l *limiter) Stop() { l.cancelLimit = nil } - l.enableLimit = false - l.proc.Stop() l.proc = nil @@ -249,7 +257,7 @@ func (l *limiter) collect(t time.Time) { } if cpustat, err := l.proc.CPUPercent(); err == nil { - l.cpuLast, l.cpuCurrent = l.cpuCurrent, cpustat.System+cpustat.User+cpustat.Other + l.cpuLast, l.cpuCurrent = l.cpuCurrent, (cpustat.System+cpustat.User+cpustat.Other)/100 if l.cpuCurrent > l.cpuMax { l.cpuMax = l.cpuCurrent @@ -300,8 +308,8 @@ func (l *limiter) collect(t time.Time) { } } } - } else if l.mode == LimitModeSoft && l.enableLimit { - if l.memory > 0 { + } else { + if l.memory > 0 && l.memoryLimitEnable { if l.memoryCurrent > l.memory { // Current value is higher than the limit l.logger.Warn().Log("Memory limit exceeded") @@ -319,11 +327,11 @@ func (l *limiter) collect(t time.Time) { }).Log("Observation") if isLimitExceeded { - go l.onLimit(l.cpuCurrent*l.ncpuFactor, l.memoryCurrent) + go l.onLimit(l.cpuCurrent*l.ncpuFactor*100, l.memoryCurrent) } } -func (l *limiter) Limit(limit int) error { +func (l *limiter) Limit(cpu, memory bool) error { l.lock.Lock() defer l.lock.Unlock() @@ -331,50 +339,42 @@ func (l *limiter) Limit(limit int) error { return nil } - if limit > 0 { - l.factorLimit = limit + if memory { + if !l.memoryLimitEnable { + l.memoryLimitEnable = true - if l.enableLimit { - return nil + l.logger.Debug().Log("Memory limiter enabled") } - - if l.cancelLimit != nil { - l.cancelLimit() - } - - ctx, cancel := context.WithCancel(context.Background()) - l.cancelLimit = cancel - - l.enableLimit = true - - l.logger.Debug().Log("Limiter enabled") - - go l.limit(ctx, l.cpu/100, time.Second) } else { - if !l.enableLimit { - return nil + if l.memoryLimitEnable { + l.memoryLimitEnable = false + + l.logger.Debug().Log("Memory limiter disabled") } + } - if l.cancelLimit == nil { - return nil + if cpu { + if !l.cpuLimitEnable { + l.cpuLimitEnable = true + + l.logger.Debug().Log("CPU limiter enabled") } + } else { + if l.cpuLimitEnable { + l.cpuLimitEnable = false - l.enableLimit = false - - l.cancelLimit() - l.cancelLimit = nil - - l.logger.Debug().Log("Limiter disabled") + l.logger.Debug().Log("CPU limiter disabled") + } } return nil } -// limit will limit the CPU usage of this process. The limit is the max. CPU usage +// limitCPU will limit the CPU usage of this process. The limit is the max. CPU usage // normed to 0-1. The interval defines how long a time slot is that will be splitted // into sleeping and working. -func (l *limiter) limit(ctx context.Context, limit float64, interval time.Duration) { +func (l *limiter) limitCPU(ctx context.Context, limit float64, interval time.Duration) { defer func() { l.lock.Lock() if l.proc != nil { @@ -386,8 +386,10 @@ func (l *limiter) limit(ctx context.Context, limit float64, interval time.Durati }() var workingrate float64 = -1 + var factorTopLimit float64 = 0 + var topLimit float64 = 0 - l.logger.Debug().WithField("limit", limit).Log("CPU throttler enabled") + l.logger.Debug().WithField("limit", limit*l.ncpu).Log("CPU throttler enabled") for { select { @@ -396,11 +398,35 @@ func (l *limiter) limit(ctx context.Context, limit float64, interval time.Durati default: } + l.lock.Lock() + + if !l.cpuLimitEnable { + if factorTopLimit > 0 { + factorTopLimit -= 10 + } else { + if l.proc != nil { + l.proc.Resume() + } + l.lock.Unlock() + time.Sleep(100 * time.Millisecond) + continue + } + } else { + factorTopLimit = 100 + topLimit = l.cpuTop - limit + } + lim := limit - l.lock.Lock() - pcpu := l.cpuCurrent / 100 - lim += (100 - float64(l.factorLimit)) / 100 * ((l.cpuTop / 100) - limit) + if topLimit > 0 { + // After releasing the limiter, the process will not get the full CPU capacity back. + // Instead the limit will be gradually lifted by increments until it reaches the + // CPU top value. The CPU top value has to be larger than the actual limit. + lim += (100 - factorTopLimit) / 100 * topLimit + } + + pcpu := l.cpuCurrent + l.lock.Unlock() if workingrate < 0 { @@ -415,8 +441,9 @@ func (l *limiter) limit(ctx context.Context, limit float64, interval time.Durati sleeptime := float64(interval.Nanoseconds()) - worktime l.logger.Debug().WithFields(log.Fields{ - "limit": lim, + "limit": lim * l.ncpu, "pcpu": pcpu, + "factor": factorTopLimit, "worktime": (time.Duration(worktime) * time.Nanosecond).String(), "sleeptime": (time.Duration(sleeptime) * time.Nanosecond).String(), }).Log("Throttler") @@ -445,8 +472,8 @@ func (l *limiter) Current() (cpu float64, memory uint64) { l.lock.Lock() defer l.lock.Unlock() - cpu = l.cpuCurrent - memory = l.memoryCurrent + cpu = l.cpuCurrent * 100 + memory = l.memoryCurrent * 100 return } @@ -458,10 +485,10 @@ func (l *limiter) Usage() Usage { usage := Usage{} usage.CPU.NCPU = l.ncpu - usage.CPU.Limit = l.cpu * l.ncpu - usage.CPU.Current = l.cpuCurrent * l.ncpu - usage.CPU.Average = l.cpuAvg * l.ncpu - usage.CPU.Max = l.cpuMax * l.ncpu + usage.CPU.Limit = l.cpu * l.ncpu * 100 + usage.CPU.Current = l.cpuCurrent * l.ncpu * 100 + usage.CPU.Average = l.cpuAvg * l.ncpu * 100 + usage.CPU.Max = l.cpuMax * l.ncpu * 100 usage.Memory.Limit = l.memory usage.Memory.Current = l.memoryCurrent @@ -472,5 +499,5 @@ func (l *limiter) Usage() Usage { } func (l *limiter) Limits() (cpu float64, memory uint64) { - return l.cpu, l.memory + return l.cpu * 100, l.memory } diff --git a/process/process.go b/process/process.go index 0031b90c..a43f9268 100644 --- a/process/process.go +++ b/process/process.go @@ -46,7 +46,7 @@ type Process interface { // Limit enabled or disables CPU and memory limiting. CPU will be throttled // into the configured limit. If memory consumption is above the configured // limit, the process will be killed. - Limit(limit int) error + Limit(cpu, memory bool) error } // Config is the configuration of a process @@ -459,7 +459,7 @@ func (p *process) IsRunning() bool { return p.isRunning() } -func (p *process) Limit(limit int) error { +func (p *process) Limit(cpu, memory bool) error { if !p.isRunning() { return nil } @@ -468,9 +468,12 @@ func (p *process) Limit(limit int) error { return nil } - p.logger.Warn().WithField("limit", limit).Log("Limiter triggered") + p.logger.Warn().WithFields(log.Fields{ + "limit_cpu": cpu, + "limit_memory": memory, + }).Log("Limiter triggered") - return p.limits.Limit(limit) + return p.limits.Limit(cpu, memory) } // Start will start the process and sets the order to "start". If the diff --git a/restream/resources/resources.go b/restream/resources/resources.go index 86a6625a..4388cfd3 100644 --- a/restream/resources/resources.go +++ b/restream/resources/resources.go @@ -15,9 +15,11 @@ type resources struct { maxCPU float64 maxMemory uint64 - limitCh chan int - limitRate int - isLimiting bool + limitCPUCh chan bool + isCPULimiting bool + + limitMemoryCh chan bool + isMemoryLimiting bool cancelObserver context.CancelFunc @@ -32,7 +34,8 @@ type Resources interface { Start() Stop() - Limit() <-chan int + LimitCPU() <-chan bool + LimitMemory() <-chan bool Request(cpu float64, memory uint64) error } @@ -83,7 +86,8 @@ func New(config Config) (Resources, error) { func (r *resources) Start() { r.startOnce.Do(func() { - r.limitCh = make(chan int, 10) + r.limitCPUCh = make(chan bool, 10) + r.limitMemoryCh = make(chan bool, 10) ctx, cancel := context.WithCancel(context.Background()) r.cancelObserver = cancel @@ -106,8 +110,12 @@ func (r *resources) Stop() { }) } -func (r *resources) Limit() <-chan int { - return r.limitCh +func (r *resources) LimitCPU() <-chan bool { + return r.limitCPUCh +} + +func (r *resources) LimitMemory() <-chan bool { + return r.limitMemoryCh } func (r *resources) observe(ctx context.Context, interval time.Duration) { @@ -140,52 +148,57 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) { "cur_memory": vmstat.Used, }).Log("Observation") - doLimit := false + doCPULimit := false - if !r.isLimiting { + if !r.isCPULimiting { if cpuload > r.maxCPU { r.logger.Debug().WithField("cpu", cpuload).Log("CPU limit reached") - doLimit = true - } - - if vmstat.Used > r.maxMemory { - r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit reached") - doLimit = true + doCPULimit = true } } else { - doLimit = true - if cpuload <= r.maxCPU && vmstat.Used <= r.maxMemory { - doLimit = false + doCPULimit = true + if cpuload <= r.maxCPU { + r.logger.Debug().WithField("cpu", cpuload).Log("CPU limit released") + doCPULimit = false + } + } + + doMemoryLimit := false + + if !r.isMemoryLimiting { + if vmstat.Used > r.maxMemory { + r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit reached") + doMemoryLimit = true + } + } else { + doMemoryLimit = true + if vmstat.Used <= r.maxMemory { + r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit released") + doMemoryLimit = false } } r.lock.Lock() - if r.isLimiting != doLimit { - if !r.isLimiting { - r.limitRate = 100 - } else { - if r.limitRate > 0 { - r.limitRate -= 10 - doLimit = true - - if r.limitRate == 0 { - r.logger.Debug().WithFields(log.Fields{ - "cpu": cpuload, - "memory": vmstat.Used, - }).Log("CPU and memory limit released") - doLimit = false - } - } - } - + if r.isCPULimiting != doCPULimit { r.logger.Debug().WithFields(log.Fields{ - "enabled": doLimit, - "rate": r.limitRate, - }).Log("Limiting") + "enabled": doCPULimit, + }).Log("Limiting CPU") - r.isLimiting = doLimit + r.isCPULimiting = doCPULimit select { - case r.limitCh <- r.limitRate: + case r.limitCPUCh <- doCPULimit: + default: + } + } + + if r.isMemoryLimiting != doMemoryLimit { + r.logger.Debug().WithFields(log.Fields{ + "enabled": doMemoryLimit, + }).Log("Limiting memory") + + r.isMemoryLimiting = doMemoryLimit + select { + case r.limitMemoryCh <- doMemoryLimit: default: } } @@ -205,7 +218,7 @@ func (r *resources) Request(cpu float64, memory uint64) error { logger.Debug().Log("Request for acquiring resources") - if r.isLimiting { + if r.isCPULimiting || r.isMemoryLimiting { logger.Debug().Log("Rejected, currently limiting") return fmt.Errorf("resources are currenlty actively limited") } diff --git a/restream/restream.go b/restream/restream.go index b226e1d9..3b19a8f5 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -300,11 +300,13 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources rsc.Start() defer rsc.Stop() + limitCPU, limitMemory := false, false + for { select { case <-ctx.Done(): return - case limit := <-rsc.Limit(): + case limitCPU = <-rsc.LimitCPU(): r.lock.Lock() for id, t := range r.tasks { if !t.valid { @@ -312,10 +314,24 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources } r.logger.Debug().WithFields(log.Fields{ - "limit": limit, - "id": id, - }).Log("limiting process") - t.ffmpeg.Limit(limit) + "limit_cpu": limitCPU, + "id": id, + }).Log("Limiting process CPU consumption") + t.ffmpeg.Limit(limitCPU, limitMemory) + } + r.lock.Unlock() + case limitMemory = <-rsc.LimitMemory(): + r.lock.Lock() + for id, t := range r.tasks { + if !t.valid { + continue + } + + r.logger.Debug().WithFields(log.Fields{ + "limit_memory": limitMemory, + "id": id, + }).Log("Limiting process memory consumption") + t.ffmpeg.Limit(limitCPU, limitMemory) } r.lock.Unlock() }