Allow to additionally delay a reconnect after kill
This commit is contained in:
parent
73e5f45a23
commit
0a6d772b98
@ -9,6 +9,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand/v2"
|
||||
"os"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
@ -37,8 +38,11 @@ type Process interface {
|
||||
Stop(wait bool) error
|
||||
|
||||
// Kill stops the process such that it will restart
|
||||
// automatically if it is defined to do so.
|
||||
Kill(wait bool, reason string) error
|
||||
// automatically if it is defined to do so. The reason will be written
|
||||
// to the report as the last line. The delayReconnect will add an
|
||||
// additional duration to the reconnect timeout for the next reconnect,
|
||||
// in case the process should reconnect.
|
||||
Kill(wait bool, reason string, delayReconnect time.Duration) error
|
||||
|
||||
// IsRunning returns whether the process is currently
|
||||
// running or not.
|
||||
@ -231,16 +235,17 @@ type process struct {
|
||||
timer *time.Timer
|
||||
lock sync.Mutex
|
||||
}
|
||||
timeout time.Duration
|
||||
stopTimer *time.Timer
|
||||
stopTimerLock sync.Mutex
|
||||
stopReason string
|
||||
stopReasonLock sync.Mutex
|
||||
killTimer *time.Timer
|
||||
killTimerLock sync.Mutex
|
||||
logger log.Logger
|
||||
debuglogger log.Logger
|
||||
callbacks struct {
|
||||
timeout time.Duration
|
||||
stopTimer *time.Timer
|
||||
stopTimerLock sync.Mutex
|
||||
stopReason string
|
||||
stopDelayReconnect time.Duration
|
||||
stopReasonLock sync.Mutex
|
||||
killTimer *time.Timer
|
||||
killTimerLock sync.Mutex
|
||||
logger log.Logger
|
||||
debuglogger log.Logger
|
||||
callbacks struct {
|
||||
onBeforeStart func(args []string) ([]string, error)
|
||||
onStart func()
|
||||
onExit func(state string)
|
||||
@ -339,7 +344,9 @@ func New(config Config) (Process, error) {
|
||||
"gpudecoder": gpudecoder,
|
||||
"gpumemmory": gpumemory,
|
||||
}).Warn().Log("Killed because limits are exceeded")
|
||||
p.Kill(false, fmt.Sprintf("Killed because limits are exceeded (mode: %s, tolerance: %s): %.2f (%.2f) CPU, %d (%d) bytes memory, %.2f/%.2f/%.2f (%.2f) GPU usage, %d (%d) bytes GPU memory", config.LimitMode.String(), config.LimitDuration.String(), cpu, config.LimitCPU, memory, config.LimitMemory, gpuusage, gpuencoder, gpudecoder, config.LimitGPUUsage, gpumemory, config.LimitGPUMemory))
|
||||
|
||||
reason := fmt.Sprintf("Killed because limits are exceeded (mode: %s, tolerance: %s): %.2f (%.2f) CPU, %d (%d) bytes memory, %.2f/%.2f/%.2f (%.2f) GPU usage, %d (%d) bytes GPU memory", config.LimitMode.String(), config.LimitDuration.String(), cpu, config.LimitCPU, memory, config.LimitMemory, gpuusage, gpuencoder, gpudecoder, config.LimitGPUUsage, gpumemory, config.LimitGPUMemory)
|
||||
p.Kill(false, reason, time.Duration(rand.IntN(30))*time.Second)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
@ -597,9 +604,9 @@ func (p *process) Limit(cpu, memory, gpu bool) error {
|
||||
}
|
||||
|
||||
p.logger.Warn().WithFields(log.Fields{
|
||||
"limit_cpu": cpu,
|
||||
"limit_memory": memory,
|
||||
"limit_gpumemory": gpu,
|
||||
"limit_cpu": cpu,
|
||||
"limit_memory": memory,
|
||||
"limit_gpu": gpu,
|
||||
}).Log("Limiter triggered")
|
||||
|
||||
return p.limits.Limit(cpu, memory, gpu)
|
||||
@ -718,7 +725,7 @@ func (p *process) start() error {
|
||||
if p.stopTimer == nil {
|
||||
// Only create a new timer if there isn't already one running
|
||||
p.stopTimer = time.AfterFunc(p.timeout, func() {
|
||||
p.Kill(false, fmt.Sprintf("Killed because timeout triggered (%s)", p.timeout))
|
||||
p.Kill(false, fmt.Sprintf("Killed because timeout triggered (%s)", p.timeout), 0)
|
||||
|
||||
p.stopTimerLock.Lock()
|
||||
p.stopTimer.Stop()
|
||||
@ -765,7 +772,7 @@ func (p *process) Stop(wait bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := p.stop(wait, "")
|
||||
err := p.stop(wait, "", 0)
|
||||
if err != nil {
|
||||
p.debuglogger.WithFields(log.Fields{
|
||||
"state": p.getStateString(),
|
||||
@ -779,20 +786,20 @@ func (p *process) Stop(wait bool) error {
|
||||
|
||||
// Kill will stop the process without changing the order such that it
|
||||
// will restart automatically if enabled.
|
||||
func (p *process) Kill(wait bool, reason string) error {
|
||||
func (p *process) Kill(wait bool, reason string, delayReconnect time.Duration) error {
|
||||
// If the process is currently not running, we don't need
|
||||
// to do anything.
|
||||
if !p.isRunning() {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := p.stop(wait, reason)
|
||||
err := p.stop(wait, reason, delayReconnect)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// stop will stop a process considering the current order and state.
|
||||
func (p *process) stop(wait bool, reason string) error {
|
||||
func (p *process) stop(wait bool, reason string, delayReconnect time.Duration) error {
|
||||
// Stop the restart timer
|
||||
p.unreconnect()
|
||||
|
||||
@ -801,7 +808,7 @@ func (p *process) stop(wait bool, reason string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If the process in starting state, wait until the process has been started
|
||||
// If the process is in starting state, wait until the process has been started
|
||||
start := time.Now()
|
||||
for {
|
||||
if time.Since(start) > 5*time.Second {
|
||||
@ -825,6 +832,7 @@ func (p *process) stop(wait bool, reason string) error {
|
||||
|
||||
p.stopReasonLock.Lock()
|
||||
p.stopReason = reason
|
||||
p.stopDelayReconnect = delayReconnect
|
||||
p.stopReasonLock.Unlock()
|
||||
|
||||
p.logger.Info().Log("Stopping")
|
||||
@ -956,7 +964,7 @@ func (p *process) staler(ctx context.Context) {
|
||||
d := t.Sub(last)
|
||||
if d.Seconds() > timeout.Seconds() {
|
||||
p.logger.Info().Log("Stale timeout after %s (%.2fs).", timeout, d.Seconds())
|
||||
p.stop(false, fmt.Sprintf("Stale timeout after %s, no output received from process", timeout))
|
||||
p.stop(false, fmt.Sprintf("Stale timeout after %s, no output received from process", timeout), 0)
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -1116,7 +1124,19 @@ func (p *process) waiter() {
|
||||
|
||||
// Restart the process
|
||||
if p.getOrder() == "start" && enableReconnect {
|
||||
p.reconnect(p.delay(state))
|
||||
delayReconnect := time.Duration(0)
|
||||
|
||||
p.stopReasonLock.Lock()
|
||||
delayReconnect = p.stopDelayReconnect
|
||||
p.stopDelayReconnect = time.Duration(0)
|
||||
p.stopReasonLock.Unlock()
|
||||
|
||||
delay := p.delay(state)
|
||||
if delay > time.Duration(0) {
|
||||
delay += delayReconnect
|
||||
}
|
||||
|
||||
p.reconnect(delay)
|
||||
}
|
||||
|
||||
// Call the onExit callback
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user