diff --git a/process/limiter.go b/process/limiter.go index e692d8e7..6faa6198 100644 --- a/process/limiter.go +++ b/process/limiter.go @@ -42,6 +42,7 @@ type LimiterConfig struct { WaitFor time.Duration // Duration for one of the limits has to be above the limit until OnLimit gets triggered OnLimit LimitFunc // Function to be triggered if limits are exceeded Mode LimitMode // How to limit CPU usage + PSUtil psutil.Util Logger log.Logger } @@ -70,6 +71,8 @@ type Limiter interface { } type limiter struct { + psutil psutil.Util + ncpu float64 ncpuFactor float64 proc psutil.Process @@ -113,6 +116,7 @@ func NewLimiter(config LimiterConfig) Limiter { waitFor: config.WaitFor, onLimit: config.OnLimit, mode: config.Mode, + psutil: config.PSUtil, logger: config.Logger, } @@ -120,7 +124,11 @@ func NewLimiter(config LimiterConfig) Limiter { l.logger = log.New("") } - if ncpu, err := psutil.CPUCounts(true); err != nil { + if l.psutil == nil { + l.psutil = psutil.DefaultUtil + } + + if ncpu, err := l.psutil.CPUCounts(true); err != nil { l.ncpu = 1 } else { l.ncpu = ncpu diff --git a/process/limiter_test.go b/process/limiter_test.go index 2b9302e3..c9e31127 100644 --- a/process/limiter_test.go +++ b/process/limiter_test.go @@ -174,3 +174,42 @@ func TestMemoryLimitWaitFor(t *testing.T) { return done }, 10*time.Second, 1*time.Second) } + +func TestMemoryLimitSoftMode(t *testing.T) { + lock := sync.Mutex{} + + lock.Lock() + done := false + lock.Unlock() + + go func() { + wg := sync.WaitGroup{} + wg.Add(1) + + l := NewLimiter(LimiterConfig{ + Memory: 42, + Mode: LimitModeSoft, + OnLimit: func(float64, uint64) { + wg.Done() + }, + }) + + l.Start(&psproc{}) + defer l.Stop() + + l.Limit(false, true) + + wg.Wait() + + lock.Lock() + done = true + lock.Unlock() + }() + + assert.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return done + }, 2*time.Second, 100*time.Millisecond) +} diff --git a/restream/resources/resources.go b/restream/resources/resources.go index 4388cfd3..9a4e9ffc 100644 --- a/restream/resources/resources.go +++ b/restream/resources/resources.go @@ -11,6 +11,8 @@ import ( ) type resources struct { + psutil psutil.Util + ncpu float64 maxCPU float64 maxMemory uint64 @@ -43,21 +45,31 @@ type Resources interface { type Config struct { MaxCPU float64 MaxMemory float64 + PSUtil psutil.Util Logger log.Logger } func New(config Config) (Resources, error) { r := &resources{ maxCPU: config.MaxCPU, + psutil: config.PSUtil, logger: config.Logger, } - vmstat, err := psutil.VirtualMemory() + if r.logger == nil { + r.logger = log.New("") + } + + if r.psutil == nil { + r.psutil = psutil.DefaultUtil + } + + vmstat, err := r.psutil.VirtualMemory() if err != nil { return nil, fmt.Errorf("unable to determine available memory: %w", err) } - ncpu, err := psutil.CPUCounts(true) + ncpu, err := r.psutil.CPUCounts(true) if err != nil { return nil, fmt.Errorf("unable to determine number of logical CPUs: %w", err) } @@ -67,10 +79,6 @@ func New(config Config) (Resources, error) { r.maxCPU *= r.ncpu r.maxMemory = uint64(float64(vmstat.Total) * config.MaxMemory / 100) - if r.logger == nil { - r.logger = log.New("") - } - r.logger = r.logger.WithFields(log.Fields{ "ncpu": r.ncpu, "max_cpu": r.maxCPU, @@ -129,7 +137,7 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) { case <-ctx.Done(): return case <-ticker.C: - cpustat, err := psutil.CPUPercent() + cpustat, err := r.psutil.CPUPercent() if err != nil { r.logger.Warn().WithError(err).Log("Failed to determine system CPU usage") continue @@ -137,7 +145,7 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) { cpuload := (cpustat.User + cpustat.System + cpustat.Other) * r.ncpu - vmstat, err := psutil.VirtualMemory() + vmstat, err := r.psutil.VirtualMemory() if err != nil { r.logger.Warn().WithError(err).Log("Failed to determine system memory usage") continue @@ -228,7 +236,7 @@ func (r *resources) Request(cpu float64, memory uint64) error { return fmt.Errorf("the cpu and/or memory values are invalid: cpu=%f, memory=%d", cpu, memory) } - cpustat, err := psutil.CPUPercent() + cpustat, err := r.psutil.CPUPercent() if err != nil { r.logger.Warn().WithError(err).Log("Failed to determine system CPU usage") return fmt.Errorf("the system CPU usage couldn't be determined") @@ -236,7 +244,7 @@ func (r *resources) Request(cpu float64, memory uint64) error { cpuload := (cpustat.User + cpustat.System + cpustat.Other) * r.ncpu - vmstat, err := psutil.VirtualMemory() + vmstat, err := r.psutil.VirtualMemory() if err != nil { r.logger.Warn().WithError(err).Log("Failed to determine system memory usage") return fmt.Errorf("the system memory usage couldn't be determined") diff --git a/restream/resources/resources_test.go b/restream/resources/resources_test.go new file mode 100644 index 00000000..572b601c --- /dev/null +++ b/restream/resources/resources_test.go @@ -0,0 +1,147 @@ +package resources + +import ( + "sync" + "testing" + "time" + + "github.com/datarhei/core/v16/psutil" + "github.com/stretchr/testify/require" + + "github.com/shirou/gopsutil/v3/disk" + "github.com/shirou/gopsutil/v3/net" +) + +type util struct{} + +func (u *util) Start() {} +func (u *util) Stop() {} + +func (u *util) CPUCounts(logical bool) (float64, error) { + return 2, nil +} + +func (u *util) CPUPercent() (*psutil.CPUInfoStat, error) { + return &psutil.CPUInfoStat{ + System: 10, + User: 50, + Idle: 35, + Other: 5, + }, nil +} + +func (u *util) DiskUsage(path string) (*disk.UsageStat, error) { + return &disk.UsageStat{}, nil +} + +func (u *util) VirtualMemory() (*psutil.MemoryInfoStat, error) { + return &psutil.MemoryInfoStat{ + Total: 200, + Available: 40, + Used: 160, + }, nil +} + +func (u *util) NetIOCounters(pernic bool) ([]net.IOCountersStat, error) { + return nil, nil +} + +func (u *util) Process(pid int32) (psutil.Process, error) { + return nil, nil +} + +func TestMemoryLimit(t *testing.T) { + r, err := New(Config{ + MaxCPU: 0, + MaxMemory: 150. / 200. * 100, + PSUtil: &util{}, + Logger: nil, + }) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + + limit := false + + go func() { + defer func() { + wg.Done() + }() + + timer := time.NewTimer(10 * time.Second) + defer timer.Stop() + + select { + case limit = <-r.LimitMemory(): + case <-timer.C: + } + }() + + r.Start() + + wg.Wait() + + require.True(t, limit) + + r.Stop() +} + +func TestCPULimit(t *testing.T) { + r, err := New(Config{ + MaxCPU: 50., + MaxMemory: 0, + PSUtil: &util{}, + Logger: nil, + }) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + + limit := false + + go func() { + defer func() { + wg.Done() + }() + + timer := time.NewTimer(10 * time.Second) + defer timer.Stop() + + select { + case limit = <-r.LimitCPU(): + case <-timer.C: + } + }() + + r.Start() + + wg.Wait() + + require.True(t, limit) + + r.Stop() +} + +func TestRequest(t *testing.T) { + r, err := New(Config{ + MaxCPU: 70., + MaxMemory: 170. / 200. * 100, + PSUtil: &util{}, + Logger: nil, + }) + require.NoError(t, err) + + err = r.Request(-1, 0) + require.Error(t, err) + + err = r.Request(5, 10) + require.NoError(t, err) + + err = r.Request(5, 20) + require.Error(t, err) + + err = r.Request(10, 10) + require.NoError(t, err) +}