From 86b3c053f1eb0d681a42fe0aa3b701b6b129afd1 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 1 Mar 2023 15:28:28 +0100 Subject: [PATCH] Add exit state and last progress data to process report history --- docs/docs.go | 9 ++- docs/swagger.json | 6 ++ docs/swagger.yaml | 4 + ffmpeg/ffmpeg.go | 2 +- ffmpeg/parse/parser.go | 52 +++++++++---- ffmpeg/parse/parser_test.go | 3 +- ffmpeg/parse/types.go | 90 +++++++++++++++++++--- ffmpeg/probe/prober.go | 9 ++- ffmpeg/probe/types.go | 39 ++++++++-- http/api/process.go | 28 +++++-- process/parser.go | 16 ++-- process/process.go | 35 +++++---- process/process_test.go | 2 +- restream/app/log.go | 15 +++- restream/restream.go | 150 +++++++++++++++++++++++++++++++++--- 15 files changed, 372 insertions(+), 88 deletions(-) diff --git a/docs/docs.go b/docs/docs.go index 353c8b58..2f0dacc9 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1,5 +1,4 @@ -// Package docs GENERATED BY SWAG; DO NOT EDIT -// This file was generated by swaggo/swag +// Code generated by swaggo/swag. DO NOT EDIT package docs import "github.com/swaggo/swag" @@ -3191,6 +3190,9 @@ const docTemplate = `{ "type": "integer", "format": "int64" }, + "exit_state": { + "type": "string" + }, "log": { "type": "array", "items": { @@ -3205,6 +3207,9 @@ const docTemplate = `{ "items": { "type": "string" } + }, + "progress": { + "$ref": "#/definitions/api.Progress" } } }, diff --git a/docs/swagger.json b/docs/swagger.json index 75d15a44..e8e48fc9 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -3183,6 +3183,9 @@ "type": "integer", "format": "int64" }, + "exit_state": { + "type": "string" + }, "log": { "type": "array", "items": { @@ -3197,6 +3200,9 @@ "items": { "type": "string" } + }, + "progress": { + "$ref": "#/definitions/api.Progress" } } }, diff --git a/docs/swagger.yaml b/docs/swagger.yaml index d735b7c8..b648985e 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -809,6 +809,8 @@ definitions: created_at: format: int64 type: integer + exit_state: + type: string log: items: items: @@ -819,6 +821,8 @@ definitions: items: type: string type: array + progress: + $ref: '#/definitions/api.Progress' type: object api.ProcessState: properties: diff --git a/ffmpeg/ffmpeg.go b/ffmpeg/ffmpeg.go index ee85327a..345e4a46 100644 --- a/ffmpeg/ffmpeg.go +++ b/ffmpeg/ffmpeg.go @@ -36,7 +36,7 @@ type ProcessConfig struct { Parser process.Parser Logger log.Logger OnArgs func([]string) []string - OnExit func() + OnExit func(state string) OnStart func() OnStateChange func(from, to string) } diff --git a/ffmpeg/parse/parser.go b/ffmpeg/parse/parser.go index 0adb53d2..a3854856 100644 --- a/ffmpeg/parse/parser.go +++ b/ffmpeg/parse/parser.go @@ -14,7 +14,6 @@ import ( "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/net/url" "github.com/datarhei/core/v16/process" - "github.com/datarhei/core/v16/restream/app" "github.com/datarhei/core/v16/session" ) @@ -23,7 +22,7 @@ type Parser interface { process.Parser // Progress returns the current progress information of the process - Progress() app.Progress + Progress() Progress // Prelude returns an array of the lines before the progress information started Prelude() []string @@ -32,7 +31,7 @@ type Parser interface { Report() Report // ReportHistory returns an array of previews logs - ReportHistory() []Report + ReportHistory() []ReportHistoryEntry } // Config is the config for the Parser implementation @@ -116,7 +115,7 @@ func New(config Config) Parser { } if p.logger == nil { - p.logger = log.New("Parser") + p.logger = log.New("") } if p.logLines <= 0 { @@ -503,7 +502,12 @@ func (p *parser) parseAVstreamProgress(line string) error { return nil } -func (p *parser) Progress() app.Progress { +func (p *parser) Stop(state string) { + // The process stopped. The right moment to store the current state to the log history + p.storeReportHistory(state) +} + +func (p *parser) Progress() Progress { p.lock.progress.RLock() defer p.lock.progress.RUnlock() @@ -685,8 +689,6 @@ func (p *parser) ResetStats() { } func (p *parser) ResetLog() { - p.storeLogHistory() - p.lock.prelude.Lock() p.prelude.data = []string{} p.prelude.tail = ring.New(p.prelude.tailLines) @@ -700,25 +702,41 @@ func (p *parser) ResetLog() { p.lock.log.Unlock() } -// Report represents a log report, including the prelude and the last log lines -// of the process. +// Report represents a log report, including the prelude and the last log lines of the process. type Report struct { CreatedAt time.Time Prelude []string Log []process.Line } -func (p *parser) storeLogHistory() { +// ReportHistoryEntry represents an historical log report, including the exit status of the +// process and the last progress data. +type ReportHistoryEntry struct { + Report + + ExitState string + Progress Progress +} + +func (p *parser) storeReportHistory(state string) { if p.logHistory == nil { return } - h := p.Report() + report := p.Report() - if len(h.Prelude) != 0 { - p.logHistory.Value = h - p.logHistory = p.logHistory.Next() + if len(report.Prelude) == 0 { + return } + + h := ReportHistoryEntry{ + Report: report, + ExitState: state, + Progress: p.Progress(), + } + + p.logHistory.Value = h + p.logHistory = p.logHistory.Next() } func (p *parser) Report() Report { @@ -734,15 +752,15 @@ func (p *parser) Report() Report { return h } -func (p *parser) ReportHistory() []Report { - var history = []Report{} +func (p *parser) ReportHistory() []ReportHistoryEntry { + var history = []ReportHistoryEntry{} p.logHistory.Do(func(l interface{}) { if l == nil { return } - history = append(history, l.(Report)) + history = append(history, l.(ReportHistoryEntry)) }) return history diff --git a/ffmpeg/parse/parser_test.go b/ffmpeg/parse/parser_test.go index 9a72ed40..eaa273ef 100644 --- a/ffmpeg/parse/parser_test.go +++ b/ffmpeg/parse/parser_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/datarhei/core/v16/restream/app" "github.com/stretchr/testify/require" ) @@ -19,7 +18,7 @@ func TestParserProgress(t *testing.T) { parser.Parse("frame= 5968 fps= 25 q=19.4 size=443kB time=00:03:58.44 bitrate=5632kbits/s speed=0.999x skip=9733 drop=3522 dup=87463") d, _ := time.ParseDuration("3m58s440ms") - wantP := app.Progress{ + wantP := Progress{ Frame: 5968, FPS: 25, Quantizer: 19.4, diff --git a/ffmpeg/parse/types.go b/ffmpeg/parse/types.go index bf031fb0..96b88968 100644 --- a/ffmpeg/parse/types.go +++ b/ffmpeg/parse/types.go @@ -4,8 +4,6 @@ import ( "encoding/json" "errors" "time" - - "github.com/datarhei/core/v16/restream/app" ) // Duration represents a time.Duration @@ -49,8 +47,8 @@ type ffmpegAVstreamIO struct { Size uint64 `json:"size_kb"` } -func (avio *ffmpegAVstreamIO) export() app.AVstreamIO { - return app.AVstreamIO{ +func (avio *ffmpegAVstreamIO) export() AVstreamIO { + return AVstreamIO{ State: avio.State, Packet: avio.Packet, Time: avio.Time, @@ -74,8 +72,8 @@ type ffmpegAVstream struct { GOP string `json:"gop"` } -func (av *ffmpegAVstream) export() *app.AVstream { - return &app.AVstream{ +func (av *ffmpegAVstream) export() *AVstream { + return &AVstream{ Aqueue: av.Aqueue, Queue: av.Queue, Drop: av.Drop, @@ -104,7 +102,7 @@ type ffmpegProgressIO struct { Quantizer float64 `json:"q"` } -func (io *ffmpegProgressIO) exportTo(progress *app.ProgressIO) { +func (io *ffmpegProgressIO) exportTo(progress *ProgressIO) { progress.Index = io.Index progress.Stream = io.Stream progress.Frame = io.Frame @@ -132,7 +130,7 @@ type ffmpegProgress struct { Dup uint64 `json:"dup"` } -func (p *ffmpegProgress) exportTo(progress *app.Progress) { +func (p *ffmpegProgress) exportTo(progress *Progress) { progress.Frame = p.Frame progress.Packet = p.Packet progress.FPS = p.FPS @@ -184,8 +182,8 @@ type ffmpegProcessIO struct { Channels uint64 `json:"channels"` } -func (io *ffmpegProcessIO) export() app.ProgressIO { - return app.ProgressIO{ +func (io *ffmpegProcessIO) export() ProgressIO { + return ProgressIO{ Address: io.Address, Format: io.Format, Index: io.Index, @@ -207,8 +205,8 @@ type ffmpegProcess struct { output []ffmpegProcessIO } -func (p *ffmpegProcess) export() app.Progress { - progress := app.Progress{} +func (p *ffmpegProcess) export() Progress { + progress := Progress{} for _, io := range p.input { aio := io.export() @@ -224,3 +222,71 @@ func (p *ffmpegProcess) export() app.Progress { return progress } + +type ProgressIO struct { + Address string + + // General + Index uint64 + Stream uint64 + Format string + Type string + Codec string + Coder string + Frame uint64 + FPS float64 + Packet uint64 + PPS float64 + Size uint64 // bytes + Bitrate float64 // bit/s + + // Video + Pixfmt string + Quantizer float64 + Width uint64 + Height uint64 + + // Audio + Sampling uint64 + Layout string + Channels uint64 + + // avstream + AVstream *AVstream +} + +type Progress struct { + Input []ProgressIO + Output []ProgressIO + Frame uint64 + Packet uint64 + FPS float64 + PPS float64 + Quantizer float64 + Size uint64 // bytes + Time float64 + Bitrate float64 // bit/s + Speed float64 + Drop uint64 + Dup uint64 +} + +type AVstreamIO struct { + State string + Packet uint64 + Time uint64 + Size uint64 +} + +type AVstream struct { + Input AVstreamIO + Output AVstreamIO + Aqueue uint64 + Queue uint64 + Dup uint64 + Drop uint64 + Enc uint64 + Looping bool + Duplicating bool + GOP string +} diff --git a/ffmpeg/probe/prober.go b/ffmpeg/probe/prober.go index 19cd4c3d..d569bb58 100644 --- a/ffmpeg/probe/prober.go +++ b/ffmpeg/probe/prober.go @@ -8,13 +8,12 @@ import ( "github.com/datarhei/core/v16/ffmpeg/prelude" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/process" - "github.com/datarhei/core/v16/restream/app" ) type Parser interface { process.Parser - Probe() app.Probe + Probe() Probe } type Config struct { @@ -40,8 +39,8 @@ func New(config Config) Parser { return p } -func (p *prober) Probe() app.Probe { - probe := app.Probe{} +func (p *prober) Probe() Probe { + probe := Probe{} for _, io := range p.inputs { probe.Streams = append(probe.Streams, io.export()) @@ -112,6 +111,8 @@ func (p *prober) parseDefault() { } } +func (p *prober) Stop(state string) {} + func (p *prober) Log() []process.Line { return p.data } diff --git a/ffmpeg/probe/types.go b/ffmpeg/probe/types.go index e9bc9d6b..4e2697bd 100644 --- a/ffmpeg/probe/types.go +++ b/ffmpeg/probe/types.go @@ -1,9 +1,5 @@ package probe -import ( - "github.com/datarhei/core/v16/restream/app" -) - type probeIO struct { // common Address string `json:"url"` @@ -29,8 +25,8 @@ type probeIO struct { Channels uint64 `json:"channels"` } -func (io *probeIO) export() app.ProbeIO { - return app.ProbeIO{ +func (io *probeIO) export() ProbeIO { + return ProbeIO{ Address: io.Address, Format: io.Format, Index: io.Index, @@ -50,3 +46,34 @@ func (io *probeIO) export() app.ProbeIO { Channels: io.Channels, } } + +type ProbeIO struct { + Address string + + // General + Index uint64 + Stream uint64 + Language string + Format string + Type string + Codec string + Coder string + Bitrate float64 // kbit/s + Duration float64 + + // Video + Pixfmt string + Width uint64 + Height uint64 + FPS float64 + + // Audio + Sampling uint64 + Layout string + Channels uint64 +} + +type Probe struct { + Streams []ProbeIO + Log []string +} diff --git a/http/api/process.go b/http/api/process.go index e217b455..e564cc0b 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -186,16 +186,23 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config) { } } -// ProcessReportHistoryEntry represents the logs of a run of a restream process -type ProcessReportHistoryEntry struct { +// ProcessReportEntry represents the logs of a run of a restream process +type ProcessReportEntry struct { CreatedAt int64 `json:"created_at" format:"int64"` Prelude []string `json:"prelude"` Log [][2]string `json:"log"` } +type ProcessReportHistoryEntry struct { + ProcessReportEntry + + ExitState string `json:"exit_state"` + Progress Progress `json:"progress"` +} + // ProcessReport represents the current log and the logs of previous runs of a restream process type ProcessReport struct { - ProcessReportHistoryEntry + ProcessReportEntry History []ProcessReportHistoryEntry `json:"history"` } @@ -217,14 +224,19 @@ func (report *ProcessReport) Unmarshal(l *app.Log) { for _, h := range l.History { he := ProcessReportHistoryEntry{ - CreatedAt: h.CreatedAt.Unix(), - Prelude: h.Prelude, - Log: make([][2]string, len(h.Log)), + ProcessReportEntry: ProcessReportEntry{ + CreatedAt: h.CreatedAt.Unix(), + Prelude: h.Prelude, + Log: make([][2]string, len(h.Log)), + }, + ExitState: h.ExitState, } + he.Progress.Unmarshal(&h.Progress) + for i, line := range h.Log { - he.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10) - he.Log[i][1] = line.Data + he.ProcessReportEntry.Log[i][0] = strconv.FormatInt(line.Timestamp.Unix(), 10) + he.ProcessReportEntry.Log[i][1] = line.Data } report.History = append(report.History, he) diff --git a/process/parser.go b/process/parser.go index ef53f4d6..2e7591c4 100644 --- a/process/parser.go +++ b/process/parser.go @@ -12,6 +12,10 @@ type Parser interface { // or previous line, ...) Parse(line string) uint64 + // Stop tells the parser that the process stopped and provides + // its exit state. + Stop(state string) + // Reset resets any collected statistics or temporary data. // This is called before the process starts and after the // process stopped. The stats are meant to be collected @@ -43,10 +47,8 @@ func NewNullParser() Parser { var _ Parser = &nullParser{} -func (p *nullParser) Parse(line string) uint64 { return 1 } - -func (p *nullParser) Log() []Line { return []Line{} } - -func (p *nullParser) ResetStats() {} - -func (p *nullParser) ResetLog() {} +func (p *nullParser) Parse(string) uint64 { return 1 } +func (p *nullParser) Stop(string) {} +func (p *nullParser) ResetStats() {} +func (p *nullParser) ResetLog() {} +func (p *nullParser) Log() []Line { return []Line{} } diff --git a/process/process.go b/process/process.go index 11c3bfe2..3e806b12 100644 --- a/process/process.go +++ b/process/process.go @@ -57,7 +57,7 @@ type Config struct { Parser Parser // A parser for the output of the process OnArgs func(args []string) []string // A callback which is called right before the process will start with the command args OnStart func() // A callback which is called after the process started - OnExit func() // A callback which is called after the process exited + OnExit func(state string) // A callback which is called after the process exited with the exit state OnStateChange func(from, to string) // A callback which is called after a state changed Logger log.Logger } @@ -192,7 +192,7 @@ type process struct { callbacks struct { onArgs func(args []string) []string onStart func() - onExit func() + onExit func(state string) onStateChange func(from, to string) lock sync.Mutex } @@ -602,14 +602,14 @@ func (p *process) stop(wait bool) error { p.callbacks.lock.Lock() if p.callbacks.onExit == nil { - p.callbacks.onExit = func() { + p.callbacks.onExit = func(string) { wg.Done() p.callbacks.onExit = nil } } else { cb := p.callbacks.onExit - p.callbacks.onExit = func() { - cb() + p.callbacks.onExit = func(state string) { + cb(state) wg.Done() p.callbacks.onExit = cb } @@ -770,6 +770,10 @@ func (p *process) waiter() { p.stop(false) } + // The process exited normally, i.e. the return code is zero and no signal + // has been raised + state := stateFinished + if err := p.cmd.Wait(); err != nil { // The process exited abnormally, i.e. the return code is non-zero or a signal // has been raised. @@ -791,34 +795,32 @@ func (p *process) waiter() { // If ffmpeg has been killed with a SIGINT, SIGTERM, etc., then it exited normally, // i.e. closing all stream properly such that all written data is sane. p.logger.Info().Log("Finished") - p.setState(stateFinished) + state = stateFinished } else { // The process exited by itself with a non-zero return code p.logger.Info().Log("Failed") - p.setState(stateFailed) + state = stateFailed } } else if status.Signaled() { // If ffmpeg has been killed the hard way, something went wrong and // it can be assumed that any written data is not sane. p.logger.Info().Log("Killed") - p.setState(stateKilled) + state = stateKilled } else { // The process exited because of something else (e.g. coredump, ...) p.logger.Info().Log("Killed") - p.setState(stateKilled) + state = stateKilled } } else { // Some other error regarding I/O triggered during Wait() p.logger.Info().Log("Killed") p.logger.WithError(err).Debug().Log("Killed") - p.setState(stateKilled) + state = stateKilled } - } else { - // The process exited normally, i.e. the return code is zero and no signal - // has been raised - p.setState(stateFinished) } + p.setState(state) + p.logger.Info().Log("Stopped") p.debuglogger.WithField("log", p.parser.Log()).Debug().Log("Stopped") @@ -840,13 +842,16 @@ func (p *process) waiter() { } p.stale.lock.Unlock() + // Send exit state to the parser + p.parser.Stop(state.String()) + // Reset the parser stats p.parser.ResetStats() // Call the onExit callback p.callbacks.lock.Lock() if p.callbacks.onExit != nil { - go p.callbacks.onExit() + go p.callbacks.onExit(state.String()) } p.callbacks.lock.Unlock() diff --git a/process/process_test.go b/process/process_test.go index 3a9a1fa3..aa6dd429 100644 --- a/process/process_test.go +++ b/process/process_test.go @@ -171,7 +171,7 @@ func TestFFmpegWaitStop(t *testing.T) { Args: []string{}, Reconnect: false, StaleTimeout: 0, - OnExit: func() { + OnExit: func(state string) { time.Sleep(2 * time.Second) }, }) diff --git a/restream/app/log.go b/restream/app/log.go index 0a3e8a2b..43640b6c 100644 --- a/restream/app/log.go +++ b/restream/app/log.go @@ -4,18 +4,25 @@ import ( "time" ) -type LogEntry struct { +type LogLine struct { Timestamp time.Time Data string } -type LogHistoryEntry struct { +type LogEntry struct { CreatedAt time.Time Prelude []string - Log []LogEntry + Log []LogLine +} + +type LogHistoryEntry struct { + LogEntry + + ExitState string + Progress Progress } type Log struct { - LogHistoryEntry + LogEntry History []LogHistoryEntry } diff --git a/restream/restream.go b/restream/restream.go index 299ac5a7..c3716adc 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -13,6 +13,7 @@ import ( "github.com/datarhei/core/v16/ffmpeg" "github.com/datarhei/core/v16/ffmpeg/parse" + "github.com/datarhei/core/v16/ffmpeg/probe" "github.com/datarhei/core/v16/ffmpeg/skills" "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/io/fs" @@ -1266,7 +1267,7 @@ func (r *restream) GetProcessState(id string) (*app.State, error) { } } - state.Progress = task.parser.Progress() + convertProgressFromParser(&state.Progress, task.parser.Progress()) for i, p := range state.Progress.Input { if int(p.Index) >= len(task.process.Config.Input) { @@ -1293,6 +1294,103 @@ func (r *restream) GetProcessState(id string) (*app.State, error) { return state, nil } +func convertProgressFromParser(progress *app.Progress, pprogress parse.Progress) { + progress.Frame = pprogress.Frame + progress.Packet = pprogress.Packet + progress.FPS = pprogress.FPS + progress.PPS = pprogress.PPS + progress.Quantizer = pprogress.Quantizer + progress.Size = pprogress.Size + progress.Time = pprogress.Time + progress.Bitrate = pprogress.Bitrate + progress.Speed = pprogress.Speed + progress.Drop = pprogress.Drop + progress.Dup = pprogress.Dup + + for _, pinput := range pprogress.Input { + input := app.ProgressIO{ + Address: pinput.Address, + Index: pinput.Index, + Stream: pinput.Stream, + Format: pinput.Format, + Type: pinput.Type, + Codec: pinput.Codec, + Coder: pinput.Coder, + Frame: pinput.Frame, + FPS: pinput.FPS, + Packet: pinput.Packet, + PPS: pinput.PPS, + Size: pinput.Size, + Bitrate: pinput.Bitrate, + Pixfmt: pinput.Pixfmt, + Quantizer: pinput.Quantizer, + Width: pinput.Width, + Height: pinput.Height, + Sampling: pinput.Sampling, + Layout: pinput.Layout, + Channels: pinput.Channels, + AVstream: nil, + } + + if pinput.AVstream != nil { + avstream := &app.AVstream{ + Input: app.AVstreamIO{ + State: pinput.AVstream.Input.State, + Packet: pinput.AVstream.Input.Packet, + Time: pinput.AVstream.Input.Time, + Size: pinput.AVstream.Input.Size, + }, + Output: app.AVstreamIO{ + State: pinput.AVstream.Output.State, + Packet: pinput.AVstream.Output.Packet, + Time: pinput.AVstream.Output.Time, + Size: pinput.AVstream.Output.Size, + }, + Aqueue: pinput.AVstream.Aqueue, + Queue: pinput.AVstream.Queue, + Dup: pinput.AVstream.Dup, + Drop: pinput.AVstream.Drop, + Enc: pinput.AVstream.Enc, + Looping: pinput.AVstream.Looping, + Duplicating: pinput.AVstream.Duplicating, + GOP: pinput.AVstream.GOP, + } + + input.AVstream = avstream + } + + progress.Input = append(progress.Input, input) + } + + for _, poutput := range pprogress.Output { + output := app.ProgressIO{ + Address: poutput.Address, + Index: poutput.Index, + Stream: poutput.Stream, + Format: poutput.Format, + Type: poutput.Type, + Codec: poutput.Codec, + Coder: poutput.Coder, + Frame: poutput.Frame, + FPS: poutput.FPS, + Packet: poutput.Packet, + PPS: poutput.PPS, + Size: poutput.Size, + Bitrate: poutput.Bitrate, + Pixfmt: poutput.Pixfmt, + Quantizer: poutput.Quantizer, + Width: poutput.Width, + Height: poutput.Height, + Sampling: poutput.Sampling, + Layout: poutput.Layout, + Channels: poutput.Channels, + AVstream: nil, + } + + progress.Output = append(progress.Output, output) + } +} + func (r *restream) GetProcessLog(id string) (*app.Log, error) { r.lock.RLock() defer r.lock.RUnlock() @@ -1312,9 +1410,9 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) { log.CreatedAt = current.CreatedAt log.Prelude = current.Prelude - log.Log = make([]app.LogEntry, len(current.Log)) + log.Log = make([]app.LogLine, len(current.Log)) for i, line := range current.Log { - log.Log[i] = app.LogEntry{ + log.Log[i] = app.LogLine{ Timestamp: line.Timestamp, Data: line.Data, } @@ -1324,13 +1422,18 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) { for _, h := range history { e := app.LogHistoryEntry{ - CreatedAt: h.CreatedAt, - Prelude: h.Prelude, + LogEntry: app.LogEntry{ + CreatedAt: h.CreatedAt, + Prelude: h.Prelude, + }, + ExitState: h.ExitState, } - e.Log = make([]app.LogEntry, len(h.Log)) + convertProgressFromParser(&e.Progress, h.Progress) + + e.LogEntry.Log = make([]app.LogLine, len(h.Log)) for i, line := range h.Log { - e.Log[i] = app.LogEntry{ + e.LogEntry.Log[i] = app.LogLine{ Timestamp: line.Timestamp, Data: line.Data, } @@ -1388,7 +1491,7 @@ func (r *restream) ProbeWithTimeout(id string, timeout time.Duration) app.Probe Args: command, Parser: prober, Logger: task.logger, - OnExit: func() { + OnExit: func(string) { wg.Done() }, }) @@ -1402,11 +1505,40 @@ func (r *restream) ProbeWithTimeout(id string, timeout time.Duration) app.Probe wg.Wait() - appprobe = prober.Probe() + convertProbeFromProber(&appprobe, prober.Probe()) return appprobe } +func convertProbeFromProber(appprobe *app.Probe, pprobe probe.Probe) { + appprobe.Log = make([]string, len(pprobe.Log)) + copy(appprobe.Log, pprobe.Log) + + for _, s := range pprobe.Streams { + stream := app.ProbeIO{ + Address: s.Address, + Index: s.Index, + Stream: s.Stream, + Language: s.Language, + Format: s.Format, + Type: s.Type, + Codec: s.Codec, + Coder: s.Coder, + Bitrate: s.Bitrate, + Duration: s.Duration, + Pixfmt: s.Pixfmt, + Width: s.Width, + Height: s.Height, + FPS: s.FPS, + Sampling: s.Sampling, + Layout: s.Layout, + Channels: s.Channels, + } + + appprobe.Streams = append(appprobe.Streams, stream) + } +} + func (r *restream) Skills() skills.Skills { return r.ffmpeg.Skills() }