From 6ddd58a1243c3a3898562008f07bb30e6762c40e Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 24 Apr 2023 11:59:09 +0200 Subject: [PATCH 1/3] Preserve process log history when updating a process --- ffmpeg/parse/parser.go | 21 ++++++++++++++++++++ restream/restream.go | 1 + restream/restream_test.go | 41 +++++++++++++++++++++++++++++++++------ 3 files changed, 57 insertions(+), 6 deletions(-) diff --git a/ffmpeg/parse/parser.go b/ffmpeg/parse/parser.go index edf0ca03..a3c60a2b 100644 --- a/ffmpeg/parse/parser.go +++ b/ffmpeg/parse/parser.go @@ -33,6 +33,9 @@ type Parser interface { // ReportHistory returns an array of previews logs ReportHistory() []Report + + // TransferReportHistory transfers the report history to another parser + TransferReportHistory(Parser) error } // Config is the config for the Parser implementation @@ -767,3 +770,21 @@ func (p *parser) ReportHistory() []Report { return history } + +func (p *parser) TransferReportHistory(dst Parser) error { + pp, ok := dst.(*parser) + if !ok { + return fmt.Errorf("the target parser is not of the required type") + } + + p.logHistory.Do(func(l interface{}) { + if l == nil { + return + } + + pp.logHistory.Value = l + pp.logHistory = pp.logHistory.Next() + }) + + return nil +} diff --git a/restream/restream.go b/restream/restream.go index fcf38999..597a2e0a 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -867,6 +867,7 @@ func (r *restream) UpdateProcess(id string, config *app.Config) error { return ErrUnknownProcess } + task.parser.TransferReportHistory(t.parser) t.process.Order = task.process.Order if id != t.id { diff --git a/restream/restream_test.go b/restream/restream_test.go index 11b08240..a0d782b0 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -21,10 +21,11 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp } ffmpeg, err := ffmpeg.New(ffmpeg.Config{ - Binary: binary, - Portrange: portrange, - ValidatorInput: validatorIn, - ValidatorOutput: validatorOut, + Binary: binary, + LogHistoryLength: 3, + Portrange: portrange, + ValidatorInput: validatorIn, + ValidatorOutput: validatorOut, }) if err != nil { return nil, err @@ -437,10 +438,10 @@ func TestLog(t *testing.T) { rs.AddProcess(process) _, err = rs.GetProcessLog("foobar") - require.NotEqual(t, nil, err, "shouldn't be able to get log from non-existing process") + require.Error(t, err) log, err := rs.GetProcessLog(process.ID) - require.Equal(t, nil, err, "should be able to get log from existing process") + require.NoError(t, err) require.Equal(t, 0, len(log.Prelude)) require.Equal(t, 0, len(log.Log)) @@ -461,6 +462,34 @@ func TestLog(t *testing.T) { require.NotEqual(t, 0, len(log.Log)) } +func TestLogTransfer(t *testing.T) { + rs, err := getDummyRestreamer(nil, nil, nil, nil) + require.NoError(t, err) + + process := getDummyProcess() + + err = rs.AddProcess(process) + require.NoError(t, err) + + rs.StartProcess(process.ID) + time.Sleep(3 * time.Second) + rs.StopProcess(process.ID) + + rs.StartProcess(process.ID) + rs.StopProcess(process.ID) + + log, _ := rs.GetProcessLog(process.ID) + + require.Equal(t, 1, len(log.History)) + + err = rs.UpdateProcess(process.ID, process) + require.NoError(t, err) + + log, _ = rs.GetProcessLog(process.ID) + + require.Equal(t, 1, len(log.History)) +} + func TestPlayoutNoRange(t *testing.T) { rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) From 317d6eb4d91ad978e03c973e6a1fe65fd082daf1 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 24 Apr 2023 12:05:01 +0200 Subject: [PATCH 2/3] Add updated_at field in process infos --- http/api/process.go | 1 + http/handler/api/restream.go | 1 + restream/app/process.go | 2 ++ restream/restream.go | 5 +++++ restream/restream_test.go | 13 ++++++++++++- 5 files changed, 21 insertions(+), 1 deletion(-) diff --git a/http/api/process.go b/http/api/process.go index e217b455..d52d4393 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -14,6 +14,7 @@ type Process struct { Type string `json:"type" jsonschema:"enum=ffmpeg"` Reference string `json:"reference"` CreatedAt int64 `json:"created_at" jsonschema:"minimum=0" format:"int64"` + UpdatedAt int64 `json:"updated_at" jsonschema:"minimum=0" format:"int64"` Config *ProcessConfig `json:"config,omitempty"` State *ProcessState `json:"state,omitempty"` Report *ProcessReport `json:"report,omitempty"` diff --git a/http/handler/api/restream.go b/http/handler/api/restream.go index c61f363a..96a4f75c 100644 --- a/http/handler/api/restream.go +++ b/http/handler/api/restream.go @@ -544,6 +544,7 @@ func (h *RestreamHandler) getProcess(id, filterString string) (api.Process, erro Reference: process.Reference, Type: "ffmpeg", CreatedAt: process.CreatedAt, + UpdatedAt: process.UpdatedAt, } if wants["config"] { diff --git a/restream/app/process.go b/restream/app/process.go index 4ec6036a..5e301c39 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -106,6 +106,7 @@ type Process struct { Reference string `json:"reference"` Config *Config `json:"config"` CreatedAt int64 `json:"created_at"` + UpdatedAt int64 `json:"updated_at"` Order string `json:"order"` } @@ -115,6 +116,7 @@ func (process *Process) Clone() *Process { Reference: process.Reference, Config: process.Config.Clone(), CreatedAt: process.CreatedAt, + UpdatedAt: process.UpdatedAt, Order: process.Order, } diff --git a/restream/restream.go b/restream/restream.go index 597a2e0a..bd5e1482 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -456,6 +456,8 @@ func (r *restream) createTask(config *app.Config) (*task, error) { CreatedAt: time.Now().Unix(), } + process.UpdatedAt = process.CreatedAt + if config.Autostart { process.Order = "start" } @@ -867,6 +869,9 @@ func (r *restream) UpdateProcess(id string, config *app.Config) error { return ErrUnknownProcess } + // This would require a major version jump + //t.process.CreatedAt = task.process.CreatedAt + t.process.UpdatedAt = time.Now().Unix() task.parser.TransferReportHistory(t.parser) t.process.Order = task.process.Order diff --git a/restream/restream_test.go b/restream/restream_test.go index a0d782b0..208e30b0 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -216,6 +216,14 @@ func TestUpdateProcess(t *testing.T) { err = rs.AddProcess(process2) require.Equal(t, nil, err) + process, err := rs.GetProcess(process2.ID) + require.NoError(t, err) + + //createdAt := process.CreatedAt + updatedAt := process.UpdatedAt + + time.Sleep(2 * time.Second) + process3 := getDummyProcess() require.NotNil(t, process3) process3.ID = "process2" @@ -230,8 +238,11 @@ func TestUpdateProcess(t *testing.T) { _, err = rs.GetProcess(process1.ID) require.Error(t, err) - _, err = rs.GetProcess(process3.ID) + process, err = rs.GetProcess(process3.ID) require.NoError(t, err) + + //require.Equal(t, createdAt, process.CreatedAt) + require.NotEqual(t, updatedAt, process.UpdatedAt) } func TestGetProcess(t *testing.T) { From b3696f492db7c8d96aa8f312e47046d9a79037ea Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 24 Apr 2023 12:10:40 +0200 Subject: [PATCH 3/3] Update changelog --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bdd254d2..5b82f401 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,17 @@ ### Core v16.12.0 > v16.?.? +- Add updated_at field in process infos +- Add preserve process log history when updating a process +- Add support for input framerate data from jsonstats patch +- Add number of keyframes and extradata size to process progress data - Fix better naming for storage endpoint documentation - Fix freeing up S3 mounts - Fix URL validation if the path contains FFmpeg specific placeholders - Fix purging default file from HTTP cache +- Fix parsing S3 storage definition from environment variable +- Fix checking length of CPU time array ([#10](https://github.com/datarhei/core/issues/10)) +- Deprecate ENV names that do not correspond to JSON name ### Core v16.11.0 > v16.12.0