Merge branch 'dev' into cluster

This commit is contained in:
Ingo Oppermann 2022-08-17 15:37:14 +03:00
commit cd565deb51
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
5 changed files with 92 additions and 26 deletions

Binary file not shown.

View File

@ -0,0 +1,18 @@
package main
import (
"os"
"os/signal"
"time"
)
func main() {
// Wait for interrupt signal to gracefully shutdown the app
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
time.Sleep(3 * time.Second)
os.Exit(255)
}

View File

@ -33,11 +33,11 @@ type Process interface {
// Stop stops the process and will not let it restart
// automatically.
Stop() error
Stop(wait bool) error
// Kill stops the process such that it will restart
// automatically if it is defined to do so.
Kill() error
Kill(wait bool) error
// IsRunning returns whether the process is currently
// running or not.
@ -190,7 +190,7 @@ type process struct {
debuglogger log.Logger
callbacks struct {
onStart func()
onStop func()
onExit func()
onStateChange func(from, to string)
}
limits Limiter
@ -239,7 +239,7 @@ func New(config Config) (Process, error) {
p.stale.timeout = config.StaleTimeout
p.callbacks.onStart = config.OnStart
p.callbacks.onStop = config.OnExit
p.callbacks.onExit = config.OnExit
p.callbacks.onStateChange = config.OnStateChange
p.limits = NewLimiter(LimiterConfig{
@ -251,7 +251,7 @@ func New(config Config) (Process, error) {
"cpu": cpu,
"memory": memory,
}).Warn().Log("Stopping because limits are exceeded")
p.Kill()
p.Kill(false)
},
})
@ -523,7 +523,7 @@ func (p *process) start() error {
}
// Stop will stop the process and set the order to "stop"
func (p *process) Stop() error {
func (p *process) Stop(wait bool) error {
p.order.lock.Lock()
defer p.order.lock.Unlock()
@ -533,7 +533,7 @@ func (p *process) Stop() error {
p.order.order = "stop"
err := p.stop()
err := p.stop(wait)
if err != nil {
p.debuglogger.WithFields(log.Fields{
"state": p.getStateString(),
@ -547,7 +547,7 @@ func (p *process) Stop() error {
// Kill will stop the process without changing the order such that it
// will restart automatically if enabled.
func (p *process) Kill() error {
func (p *process) Kill(wait bool) error {
// If the process is currently not running, we don't need
// to do anything.
if !p.isRunning() {
@ -557,13 +557,13 @@ func (p *process) Kill() error {
p.order.lock.Lock()
defer p.order.lock.Unlock()
err := p.stop()
err := p.stop(wait)
return err
}
// stop will stop a process considering the current order and state.
func (p *process) stop() error {
func (p *process) stop(wait bool) error {
// If the process is currently not running, stop the restart timer
if !p.isRunning() {
p.unreconnect()
@ -583,6 +583,26 @@ func (p *process) stop() error {
"order": p.order.order,
}).Debug().Log("Stopping")
wg := sync.WaitGroup{}
if wait {
wg.Add(1)
if p.callbacks.onExit == nil {
p.callbacks.onExit = func() {
wg.Done()
p.callbacks.onExit = nil
}
} else {
cb := p.callbacks.onExit
p.callbacks.onExit = func() {
cb()
wg.Done()
p.callbacks.onExit = cb
}
}
}
var err error
if runtime.GOOS == "windows" {
// Windows doesn't know the SIGINT
@ -606,6 +626,10 @@ func (p *process) stop() error {
}
}
if err == nil && wait {
wg.Wait()
}
if err != nil {
p.parser.Parse(err.Error())
p.debuglogger.WithFields(log.Fields{
@ -683,7 +707,7 @@ func (p *process) staler(ctx context.Context) {
d := t.Sub(last)
if d.Seconds() > timeout.Seconds() {
p.logger.Info().Log("Stale timeout after %s (%.2f).", timeout, d.Seconds())
p.stop()
p.stop(false)
return
}
}
@ -729,7 +753,7 @@ func (p *process) reader() {
// be scheduled for a restart.
func (p *process) waiter() {
if p.getState() == stateFinishing {
p.stop()
p.stop(false)
}
if err := p.cmd.Wait(); err != nil {
@ -806,8 +830,8 @@ func (p *process) waiter() {
p.parser.ResetStats()
// Call the onStop callback
if p.callbacks.onStop != nil {
go p.callbacks.onStop()
if p.callbacks.onExit != nil {
go p.callbacks.onExit()
}
p.order.lock.Lock()

View File

@ -28,7 +28,7 @@ func TestProcess(t *testing.T) {
require.Equal(t, "running", p.Status().State)
p.Stop()
p.Stop(false)
time.Sleep(2 * time.Second)
@ -52,7 +52,7 @@ func TestReconnectProcess(t *testing.T) {
require.Equal(t, "finished", p.Status().State)
p.Stop()
p.Stop(false)
require.Equal(t, "finished", p.Status().State)
}
@ -73,7 +73,7 @@ func TestStaleProcess(t *testing.T) {
require.Equal(t, "killed", p.Status().State)
p.Stop()
p.Stop(false)
require.Equal(t, "killed", p.Status().State)
}
@ -94,7 +94,7 @@ func TestStaleReconnectProcess(t *testing.T) {
require.Equal(t, "killed", p.Status().State)
p.Stop()
p.Stop(false)
require.Equal(t, "killed", p.Status().State)
}
@ -116,7 +116,7 @@ func TestNonExistingProcess(t *testing.T) {
require.Equal(t, "failed", p.Status().State)
p.Stop()
p.Stop(false)
require.Equal(t, "failed", p.Status().State)
}
@ -138,7 +138,7 @@ func TestNonExistingReconnectProcess(t *testing.T) {
require.Equal(t, "failed", p.Status().State)
p.Stop()
p.Stop(false)
require.Equal(t, "failed", p.Status().State)
}
@ -157,11 +157,35 @@ func TestProcessFailed(t *testing.T) {
time.Sleep(5 * time.Second)
p.Stop()
p.Stop(false)
require.Equal(t, "failed", p.Status().State)
}
func TestFFmpegWaitStop(t *testing.T) {
binary, err := testhelper.BuildBinary("sigintwait", "../internal/testhelper")
require.NoError(t, err, "Failed to build helper program")
p, _ := New(Config{
Binary: binary,
Args: []string{},
Reconnect: false,
StaleTimeout: 0,
OnExit: func() {
time.Sleep(2 * time.Second)
},
})
err = p.Start()
require.NoError(t, err)
time.Sleep(4 * time.Second)
p.Stop(true)
require.Equal(t, "finished", p.Status().State)
}
func TestFFmpegKill(t *testing.T) {
binary, err := testhelper.BuildBinary("sigint", "../internal/testhelper")
require.NoError(t, err, "Failed to build helper program")
@ -178,7 +202,7 @@ func TestFFmpegKill(t *testing.T) {
time.Sleep(5 * time.Second)
p.Stop()
p.Stop(false)
time.Sleep(3 * time.Second)
@ -201,7 +225,7 @@ func TestProcessForceKill(t *testing.T) {
time.Sleep(3 * time.Second)
p.Stop()
p.Stop(false)
time.Sleep(1 * time.Second)

View File

@ -204,7 +204,7 @@ func (r *restream) Stop() {
// Start() they will get restarted.
for id, t := range r.tasks {
if t.ffmpeg != nil {
t.ffmpeg.Stop()
t.ffmpeg.Stop(true)
}
r.unsetCleanup(id)
@ -996,7 +996,7 @@ func (r *restream) stopProcess(id string) error {
task.process.Order = "stop"
task.ffmpeg.Stop()
task.ffmpeg.Stop(true)
r.nProc--
@ -1024,7 +1024,7 @@ func (r *restream) restartProcess(id string) error {
return nil
}
task.ffmpeg.Kill()
task.ffmpeg.Kill(true)
return nil
}