diff --git a/process/process.go b/process/process.go index 1fc8ae13..0c9dad19 100644 --- a/process/process.go +++ b/process/process.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "io" + "math/rand/v2" "os" "os/exec" "runtime" @@ -37,8 +38,11 @@ type Process interface { Stop(wait bool) error // Kill stops the process such that it will restart - // automatically if it is defined to do so. - Kill(wait bool, reason string) error + // automatically if it is defined to do so. The reason will be written + // to the report as the last line. The delayReconnect will add an + // additional duration to the reconnect timeout for the next reconnect, + // in case the process should reconnect. + Kill(wait bool, reason string, delayReconnect time.Duration) error // IsRunning returns whether the process is currently // running or not. @@ -231,16 +235,17 @@ type process struct { timer *time.Timer lock sync.Mutex } - timeout time.Duration - stopTimer *time.Timer - stopTimerLock sync.Mutex - stopReason string - stopReasonLock sync.Mutex - killTimer *time.Timer - killTimerLock sync.Mutex - logger log.Logger - debuglogger log.Logger - callbacks struct { + timeout time.Duration + stopTimer *time.Timer + stopTimerLock sync.Mutex + stopReason string + stopDelayReconnect time.Duration + stopReasonLock sync.Mutex + killTimer *time.Timer + killTimerLock sync.Mutex + logger log.Logger + debuglogger log.Logger + callbacks struct { onBeforeStart func(args []string) ([]string, error) onStart func() onExit func(state string) @@ -339,7 +344,9 @@ func New(config Config) (Process, error) { "gpudecoder": gpudecoder, "gpumemmory": gpumemory, }).Warn().Log("Killed because limits are exceeded") - p.Kill(false, fmt.Sprintf("Killed because limits are exceeded (mode: %s, tolerance: %s): %.2f (%.2f) CPU, %d (%d) bytes memory, %.2f/%.2f/%.2f (%.2f) GPU usage, %d (%d) bytes GPU memory", config.LimitMode.String(), config.LimitDuration.String(), cpu, config.LimitCPU, memory, config.LimitMemory, gpuusage, gpuencoder, gpudecoder, config.LimitGPUUsage, gpumemory, config.LimitGPUMemory)) + + reason := fmt.Sprintf("Killed because limits are exceeded (mode: %s, tolerance: %s): %.2f (%.2f) CPU, %d (%d) bytes memory, %.2f/%.2f/%.2f (%.2f) GPU usage, %d (%d) bytes GPU memory", config.LimitMode.String(), config.LimitDuration.String(), cpu, config.LimitCPU, memory, config.LimitMemory, gpuusage, gpuencoder, gpudecoder, config.LimitGPUUsage, gpumemory, config.LimitGPUMemory) + p.Kill(false, reason, time.Duration(rand.IntN(30))*time.Second) }, }) if err != nil { @@ -597,9 +604,9 @@ func (p *process) Limit(cpu, memory, gpu bool) error { } p.logger.Warn().WithFields(log.Fields{ - "limit_cpu": cpu, - "limit_memory": memory, - "limit_gpumemory": gpu, + "limit_cpu": cpu, + "limit_memory": memory, + "limit_gpu": gpu, }).Log("Limiter triggered") return p.limits.Limit(cpu, memory, gpu) @@ -718,7 +725,7 @@ func (p *process) start() error { if p.stopTimer == nil { // Only create a new timer if there isn't already one running p.stopTimer = time.AfterFunc(p.timeout, func() { - p.Kill(false, fmt.Sprintf("Killed because timeout triggered (%s)", p.timeout)) + p.Kill(false, fmt.Sprintf("Killed because timeout triggered (%s)", p.timeout), 0) p.stopTimerLock.Lock() p.stopTimer.Stop() @@ -765,7 +772,7 @@ func (p *process) Stop(wait bool) error { return nil } - err := p.stop(wait, "") + err := p.stop(wait, "", 0) if err != nil { p.debuglogger.WithFields(log.Fields{ "state": p.getStateString(), @@ -779,20 +786,20 @@ func (p *process) Stop(wait bool) error { // Kill will stop the process without changing the order such that it // will restart automatically if enabled. -func (p *process) Kill(wait bool, reason string) error { +func (p *process) Kill(wait bool, reason string, delayReconnect time.Duration) error { // If the process is currently not running, we don't need // to do anything. if !p.isRunning() { return nil } - err := p.stop(wait, reason) + err := p.stop(wait, reason, delayReconnect) return err } // stop will stop a process considering the current order and state. -func (p *process) stop(wait bool, reason string) error { +func (p *process) stop(wait bool, reason string, delayReconnect time.Duration) error { // Stop the restart timer p.unreconnect() @@ -801,7 +808,7 @@ func (p *process) stop(wait bool, reason string) error { return nil } - // If the process in starting state, wait until the process has been started + // If the process is in starting state, wait until the process has been started start := time.Now() for { if time.Since(start) > 5*time.Second { @@ -825,6 +832,7 @@ func (p *process) stop(wait bool, reason string) error { p.stopReasonLock.Lock() p.stopReason = reason + p.stopDelayReconnect = delayReconnect p.stopReasonLock.Unlock() p.logger.Info().Log("Stopping") @@ -956,7 +964,7 @@ func (p *process) staler(ctx context.Context) { d := t.Sub(last) if d.Seconds() > timeout.Seconds() { p.logger.Info().Log("Stale timeout after %s (%.2fs).", timeout, d.Seconds()) - p.stop(false, fmt.Sprintf("Stale timeout after %s, no output received from process", timeout)) + p.stop(false, fmt.Sprintf("Stale timeout after %s, no output received from process", timeout), 0) return } } @@ -1116,7 +1124,19 @@ func (p *process) waiter() { // Restart the process if p.getOrder() == "start" && enableReconnect { - p.reconnect(p.delay(state)) + delayReconnect := time.Duration(0) + + p.stopReasonLock.Lock() + delayReconnect = p.stopDelayReconnect + p.stopDelayReconnect = time.Duration(0) + p.stopReasonLock.Unlock() + + delay := p.delay(state) + if delay > time.Duration(0) { + delay += delayReconnect + } + + p.reconnect(delay) } // Call the onExit callback