Limit CPU and memory independently, release CPU throttling incremently

This commit is contained in:
Ingo Oppermann 2023-05-01 16:29:18 +02:00
parent 2376e43f96
commit ef138fb90f
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
4 changed files with 181 additions and 122 deletions

View File

@ -66,7 +66,7 @@ type Limiter interface {
// Limit enables or disables the throttling of the CPU or killing because of to much
// memory consumption.
Limit(limit int) error
Limit(cpu, memory bool) error
}
type limiter struct {
@ -77,29 +77,30 @@ type limiter struct {
cancel context.CancelFunc
onLimit LimitFunc
cpu float64
cpuCurrent float64
cpuMax float64
cpuTop float64
cpuAvg float64
cpuAvgCounter uint64
cpuLast float64
cpuLimitSince time.Time
cpu float64 // CPU limit
cpuCurrent float64 // Current CPU load of this process
cpuLast float64 // Last CPU load of this process
cpuMax float64 // Max. CPU load of this process
cpuTop float64 // Decaying max. CPU load of this process
cpuAvg float64 // Average CPU load of this process
cpuAvgCounter uint64 // Counter for average calculation
cpuLimitSince time.Time // Time when the CPU limit has been reached (hard limiter mode)
cpuLimitEnable bool // Whether CPU limiting is enabled (soft limiter mode)
memory uint64
memoryCurrent uint64
memoryMax uint64
memoryTop uint64
memoryAvg float64
memoryAvgCounter uint64
memoryLast uint64
memoryLimitSince time.Time
memory uint64 // Memory limit (bytes)
memoryCurrent uint64 // Current memory usage
memoryLast uint64 // Last memory usage
memoryMax uint64 // Max. memory usage
memoryTop uint64 // Decaying max. memory usage
memoryAvg float64 // Average memory usage
memoryAvgCounter uint64 // Counter for average memory calculation
memoryLimitSince time.Time // Time when the memory limit has been reached (hard limiter mode)
memoryLimitEnable bool // Whether memory limiting is enabled (soft limiter mode)
waitFor time.Duration
mode LimitMode
waitFor time.Duration
mode LimitMode
enableLimit bool
cancelLimit context.CancelFunc
factorLimit int
logger log.Logger
}
@ -134,6 +135,8 @@ func NewLimiter(config LimiterConfig) Limiter {
l.ncpuFactor = l.ncpu
}
l.cpu /= 100
if l.onLimit == nil {
l.onLimit = func(float64, uint64) {}
}
@ -148,14 +151,13 @@ func NewLimiter(config LimiterConfig) Limiter {
}
func (l *limiter) reset() {
l.enableLimit = false
l.cpuCurrent = 0
l.cpuLast = 0
l.cpuAvg = 0
l.cpuAvgCounter = 0
l.cpuMax = 0
l.cpuTop = 0
l.cpuLimitEnable = false
l.memoryCurrent = 0
l.memoryLast = 0
@ -163,6 +165,7 @@ func (l *limiter) reset() {
l.memoryAvgCounter = 0
l.memoryMax = 0
l.memoryTop = 0
l.memoryLimitEnable = false
}
func (l *limiter) Start(process psutil.Process) error {
@ -182,6 +185,13 @@ func (l *limiter) Start(process psutil.Process) error {
go l.ticker(ctx, 1000*time.Millisecond)
if l.mode == LimitModeSoft {
ctx, cancel = context.WithCancel(context.Background())
l.cancelLimit = cancel
go l.limitCPU(ctx, l.cpu, time.Second)
}
return nil
}
@ -200,8 +210,6 @@ func (l *limiter) Stop() {
l.cancelLimit = nil
}
l.enableLimit = false
l.proc.Stop()
l.proc = nil
@ -249,7 +257,7 @@ func (l *limiter) collect(t time.Time) {
}
if cpustat, err := l.proc.CPUPercent(); err == nil {
l.cpuLast, l.cpuCurrent = l.cpuCurrent, cpustat.System+cpustat.User+cpustat.Other
l.cpuLast, l.cpuCurrent = l.cpuCurrent, (cpustat.System+cpustat.User+cpustat.Other)/100
if l.cpuCurrent > l.cpuMax {
l.cpuMax = l.cpuCurrent
@ -300,8 +308,8 @@ func (l *limiter) collect(t time.Time) {
}
}
}
} else if l.mode == LimitModeSoft && l.enableLimit {
if l.memory > 0 {
} else {
if l.memory > 0 && l.memoryLimitEnable {
if l.memoryCurrent > l.memory {
// Current value is higher than the limit
l.logger.Warn().Log("Memory limit exceeded")
@ -319,11 +327,11 @@ func (l *limiter) collect(t time.Time) {
}).Log("Observation")
if isLimitExceeded {
go l.onLimit(l.cpuCurrent*l.ncpuFactor, l.memoryCurrent)
go l.onLimit(l.cpuCurrent*l.ncpuFactor*100, l.memoryCurrent)
}
}
func (l *limiter) Limit(limit int) error {
func (l *limiter) Limit(cpu, memory bool) error {
l.lock.Lock()
defer l.lock.Unlock()
@ -331,50 +339,42 @@ func (l *limiter) Limit(limit int) error {
return nil
}
if limit > 0 {
l.factorLimit = limit
if memory {
if !l.memoryLimitEnable {
l.memoryLimitEnable = true
if l.enableLimit {
return nil
l.logger.Debug().Log("Memory limiter enabled")
}
if l.cancelLimit != nil {
l.cancelLimit()
}
ctx, cancel := context.WithCancel(context.Background())
l.cancelLimit = cancel
l.enableLimit = true
l.logger.Debug().Log("Limiter enabled")
go l.limit(ctx, l.cpu/100, time.Second)
} else {
if !l.enableLimit {
return nil
if l.memoryLimitEnable {
l.memoryLimitEnable = false
l.logger.Debug().Log("Memory limiter disabled")
}
}
if l.cancelLimit == nil {
return nil
if cpu {
if !l.cpuLimitEnable {
l.cpuLimitEnable = true
l.logger.Debug().Log("CPU limiter enabled")
}
} else {
if l.cpuLimitEnable {
l.cpuLimitEnable = false
l.enableLimit = false
l.cancelLimit()
l.cancelLimit = nil
l.logger.Debug().Log("Limiter disabled")
l.logger.Debug().Log("CPU limiter disabled")
}
}
return nil
}
// limit will limit the CPU usage of this process. The limit is the max. CPU usage
// limitCPU will limit the CPU usage of this process. The limit is the max. CPU usage
// normed to 0-1. The interval defines how long a time slot is that will be splitted
// into sleeping and working.
func (l *limiter) limit(ctx context.Context, limit float64, interval time.Duration) {
func (l *limiter) limitCPU(ctx context.Context, limit float64, interval time.Duration) {
defer func() {
l.lock.Lock()
if l.proc != nil {
@ -386,8 +386,10 @@ func (l *limiter) limit(ctx context.Context, limit float64, interval time.Durati
}()
var workingrate float64 = -1
var factorTopLimit float64 = 0
var topLimit float64 = 0
l.logger.Debug().WithField("limit", limit).Log("CPU throttler enabled")
l.logger.Debug().WithField("limit", limit*l.ncpu).Log("CPU throttler enabled")
for {
select {
@ -396,11 +398,35 @@ func (l *limiter) limit(ctx context.Context, limit float64, interval time.Durati
default:
}
l.lock.Lock()
if !l.cpuLimitEnable {
if factorTopLimit > 0 {
factorTopLimit -= 10
} else {
if l.proc != nil {
l.proc.Resume()
}
l.lock.Unlock()
time.Sleep(100 * time.Millisecond)
continue
}
} else {
factorTopLimit = 100
topLimit = l.cpuTop - limit
}
lim := limit
l.lock.Lock()
pcpu := l.cpuCurrent / 100
lim += (100 - float64(l.factorLimit)) / 100 * ((l.cpuTop / 100) - limit)
if topLimit > 0 {
// After releasing the limiter, the process will not get the full CPU capacity back.
// Instead the limit will be gradually lifted by increments until it reaches the
// CPU top value. The CPU top value has to be larger than the actual limit.
lim += (100 - factorTopLimit) / 100 * topLimit
}
pcpu := l.cpuCurrent
l.lock.Unlock()
if workingrate < 0 {
@ -415,8 +441,9 @@ func (l *limiter) limit(ctx context.Context, limit float64, interval time.Durati
sleeptime := float64(interval.Nanoseconds()) - worktime
l.logger.Debug().WithFields(log.Fields{
"limit": lim,
"limit": lim * l.ncpu,
"pcpu": pcpu,
"factor": factorTopLimit,
"worktime": (time.Duration(worktime) * time.Nanosecond).String(),
"sleeptime": (time.Duration(sleeptime) * time.Nanosecond).String(),
}).Log("Throttler")
@ -445,8 +472,8 @@ func (l *limiter) Current() (cpu float64, memory uint64) {
l.lock.Lock()
defer l.lock.Unlock()
cpu = l.cpuCurrent
memory = l.memoryCurrent
cpu = l.cpuCurrent * 100
memory = l.memoryCurrent * 100
return
}
@ -458,10 +485,10 @@ func (l *limiter) Usage() Usage {
usage := Usage{}
usage.CPU.NCPU = l.ncpu
usage.CPU.Limit = l.cpu * l.ncpu
usage.CPU.Current = l.cpuCurrent * l.ncpu
usage.CPU.Average = l.cpuAvg * l.ncpu
usage.CPU.Max = l.cpuMax * l.ncpu
usage.CPU.Limit = l.cpu * l.ncpu * 100
usage.CPU.Current = l.cpuCurrent * l.ncpu * 100
usage.CPU.Average = l.cpuAvg * l.ncpu * 100
usage.CPU.Max = l.cpuMax * l.ncpu * 100
usage.Memory.Limit = l.memory
usage.Memory.Current = l.memoryCurrent
@ -472,5 +499,5 @@ func (l *limiter) Usage() Usage {
}
func (l *limiter) Limits() (cpu float64, memory uint64) {
return l.cpu, l.memory
return l.cpu * 100, l.memory
}

View File

@ -46,7 +46,7 @@ type Process interface {
// Limit enabled or disables CPU and memory limiting. CPU will be throttled
// into the configured limit. If memory consumption is above the configured
// limit, the process will be killed.
Limit(limit int) error
Limit(cpu, memory bool) error
}
// Config is the configuration of a process
@ -459,7 +459,7 @@ func (p *process) IsRunning() bool {
return p.isRunning()
}
func (p *process) Limit(limit int) error {
func (p *process) Limit(cpu, memory bool) error {
if !p.isRunning() {
return nil
}
@ -468,9 +468,12 @@ func (p *process) Limit(limit int) error {
return nil
}
p.logger.Warn().WithField("limit", limit).Log("Limiter triggered")
p.logger.Warn().WithFields(log.Fields{
"limit_cpu": cpu,
"limit_memory": memory,
}).Log("Limiter triggered")
return p.limits.Limit(limit)
return p.limits.Limit(cpu, memory)
}
// Start will start the process and sets the order to "start". If the

View File

@ -15,9 +15,11 @@ type resources struct {
maxCPU float64
maxMemory uint64
limitCh chan int
limitRate int
isLimiting bool
limitCPUCh chan bool
isCPULimiting bool
limitMemoryCh chan bool
isMemoryLimiting bool
cancelObserver context.CancelFunc
@ -32,7 +34,8 @@ type Resources interface {
Start()
Stop()
Limit() <-chan int
LimitCPU() <-chan bool
LimitMemory() <-chan bool
Request(cpu float64, memory uint64) error
}
@ -83,7 +86,8 @@ func New(config Config) (Resources, error) {
func (r *resources) Start() {
r.startOnce.Do(func() {
r.limitCh = make(chan int, 10)
r.limitCPUCh = make(chan bool, 10)
r.limitMemoryCh = make(chan bool, 10)
ctx, cancel := context.WithCancel(context.Background())
r.cancelObserver = cancel
@ -106,8 +110,12 @@ func (r *resources) Stop() {
})
}
func (r *resources) Limit() <-chan int {
return r.limitCh
func (r *resources) LimitCPU() <-chan bool {
return r.limitCPUCh
}
func (r *resources) LimitMemory() <-chan bool {
return r.limitMemoryCh
}
func (r *resources) observe(ctx context.Context, interval time.Duration) {
@ -140,52 +148,57 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) {
"cur_memory": vmstat.Used,
}).Log("Observation")
doLimit := false
doCPULimit := false
if !r.isLimiting {
if !r.isCPULimiting {
if cpuload > r.maxCPU {
r.logger.Debug().WithField("cpu", cpuload).Log("CPU limit reached")
doLimit = true
}
if vmstat.Used > r.maxMemory {
r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit reached")
doLimit = true
doCPULimit = true
}
} else {
doLimit = true
if cpuload <= r.maxCPU && vmstat.Used <= r.maxMemory {
doLimit = false
doCPULimit = true
if cpuload <= r.maxCPU {
r.logger.Debug().WithField("cpu", cpuload).Log("CPU limit released")
doCPULimit = false
}
}
doMemoryLimit := false
if !r.isMemoryLimiting {
if vmstat.Used > r.maxMemory {
r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit reached")
doMemoryLimit = true
}
} else {
doMemoryLimit = true
if vmstat.Used <= r.maxMemory {
r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit released")
doMemoryLimit = false
}
}
r.lock.Lock()
if r.isLimiting != doLimit {
if !r.isLimiting {
r.limitRate = 100
} else {
if r.limitRate > 0 {
r.limitRate -= 10
doLimit = true
if r.limitRate == 0 {
r.logger.Debug().WithFields(log.Fields{
"cpu": cpuload,
"memory": vmstat.Used,
}).Log("CPU and memory limit released")
doLimit = false
}
}
}
if r.isCPULimiting != doCPULimit {
r.logger.Debug().WithFields(log.Fields{
"enabled": doLimit,
"rate": r.limitRate,
}).Log("Limiting")
"enabled": doCPULimit,
}).Log("Limiting CPU")
r.isLimiting = doLimit
r.isCPULimiting = doCPULimit
select {
case r.limitCh <- r.limitRate:
case r.limitCPUCh <- doCPULimit:
default:
}
}
if r.isMemoryLimiting != doMemoryLimit {
r.logger.Debug().WithFields(log.Fields{
"enabled": doMemoryLimit,
}).Log("Limiting memory")
r.isMemoryLimiting = doMemoryLimit
select {
case r.limitMemoryCh <- doMemoryLimit:
default:
}
}
@ -205,7 +218,7 @@ func (r *resources) Request(cpu float64, memory uint64) error {
logger.Debug().Log("Request for acquiring resources")
if r.isLimiting {
if r.isCPULimiting || r.isMemoryLimiting {
logger.Debug().Log("Rejected, currently limiting")
return fmt.Errorf("resources are currenlty actively limited")
}

View File

@ -300,11 +300,13 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources
rsc.Start()
defer rsc.Stop()
limitCPU, limitMemory := false, false
for {
select {
case <-ctx.Done():
return
case limit := <-rsc.Limit():
case limitCPU = <-rsc.LimitCPU():
r.lock.Lock()
for id, t := range r.tasks {
if !t.valid {
@ -312,10 +314,24 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources
}
r.logger.Debug().WithFields(log.Fields{
"limit": limit,
"id": id,
}).Log("limiting process")
t.ffmpeg.Limit(limit)
"limit_cpu": limitCPU,
"id": id,
}).Log("Limiting process CPU consumption")
t.ffmpeg.Limit(limitCPU, limitMemory)
}
r.lock.Unlock()
case limitMemory = <-rsc.LimitMemory():
r.lock.Lock()
for id, t := range r.tasks {
if !t.valid {
continue
}
r.logger.Debug().WithFields(log.Fields{
"limit_memory": limitMemory,
"id": id,
}).Log("Limiting process memory consumption")
t.ffmpeg.Limit(limitCPU, limitMemory)
}
r.lock.Unlock()
}