From 505fbff03f924be8ec73f00c75847438ab7eaa48 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 23 Jan 2023 11:42:17 +0100 Subject: [PATCH] Add tests --- restream/replace/replace.go | 4 +- restream/restream.go | 11 +- restream/restream_test.go | 323 +++++++++++++++++++++++++++++++++--- 3 files changed, 305 insertions(+), 33 deletions(-) diff --git a/restream/replace/replace.go b/restream/replace/replace.go index 83202ff1..e9b45adc 100644 --- a/restream/replace/replace.go +++ b/restream/replace/replace.go @@ -69,7 +69,7 @@ func (r *replacer) RegisterTemplateFunc(placeholder string, templateFn TemplateF } } -func (r *replacer) Replace(str, placeholder, value string, vars map[string]string, config *app.Config, kind string) string { +func (r *replacer) Replace(str, placeholder, value string, vars map[string]string, config *app.Config, section string) string { str = r.re.ReplaceAllStringFunc(str, func(match string) string { matches := r.re.FindStringSubmatch(match) @@ -93,7 +93,7 @@ func (r *replacer) Replace(str, placeholder, value string, vars map[string]strin } } - v = tmpl.fn(config, kind) + v = tmpl.fn(config, section) v = r.compileTemplate(v, matches[3], vars, tmpl.defaults) if len(matches[2]) != 0 { diff --git a/restream/restream.go b/restream/restream.go index 66f71b1b..9f7c6ee1 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -1456,11 +1456,12 @@ func resolvePlaceholders(config *app.Config, r replace.Replacer) { // Resolving the given inputs for i, input := range config.Input { - vars["inputid"] = input.ID - // Replace any known placeholders input.ID = r.Replace(input.ID, "processid", config.ID, nil, nil, "input") input.ID = r.Replace(input.ID, "reference", config.Reference, nil, nil, "input") + + vars["inputid"] = input.ID + input.Address = r.Replace(input.Address, "inputid", input.ID, nil, nil, "input") input.Address = r.Replace(input.Address, "processid", config.ID, nil, nil, "input") input.Address = r.Replace(input.Address, "reference", config.Reference, nil, nil, "input") @@ -1489,10 +1490,12 @@ func resolvePlaceholders(config *app.Config, r replace.Replacer) { // Resolving the given outputs for i, output := range config.Output { - vars["outputid"] = output.ID - // Replace any known placeholders output.ID = r.Replace(output.ID, "processid", config.ID, nil, nil, "output") + output.ID = r.Replace(output.ID, "reference", config.Reference, nil, nil, "output") + + vars["outputid"] = output.ID + output.Address = r.Replace(output.Address, "outputid", output.ID, nil, nil, "output") output.Address = r.Replace(output.Address, "processid", config.ID, nil, nil, "output") output.Address = r.Replace(output.Address, "reference", config.Reference, nil, nil, "output") diff --git a/restream/restream_test.go b/restream/restream_test.go index 18c53bf5..e4b0510d 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -9,11 +9,12 @@ import ( "github.com/datarhei/core/v16/internal/testhelper" "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/restream/app" + "github.com/datarhei/core/v16/restream/replace" "github.com/stretchr/testify/require" ) -func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmpeg.Validator) (Restreamer, error) { +func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmpeg.Validator, replacer replace.Replacer) (Restreamer, error) { binary, err := testhelper.BuildBinary("ffmpeg", "../internal/testhelper") if err != nil { return nil, fmt.Errorf("failed to build helper program: %w", err) @@ -30,7 +31,8 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp } rs, err := New(Config{ - FFmpeg: ffmpeg, + FFmpeg: ffmpeg, + Replace: replacer, }) if err != nil { return nil, err @@ -77,7 +79,7 @@ func getDummyProcess() *app.Config { } func TestAddProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) process := getDummyProcess() @@ -97,7 +99,7 @@ func TestAddProcess(t *testing.T) { } func TestAutostartProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) process := getDummyProcess() @@ -112,7 +114,7 @@ func TestAutostartProcess(t *testing.T) { } func TestAddInvalidProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) // Invalid process ID @@ -180,7 +182,7 @@ func TestAddInvalidProcess(t *testing.T) { } func TestRemoveProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) process := getDummyProcess() @@ -195,24 +197,98 @@ func TestRemoveProcess(t *testing.T) { require.NotEqual(t, nil, err, "Unset process found (%s)", process.ID) } -func TestGetProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil) +func TestUpdateProcess(t *testing.T) { + rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) - process := getDummyProcess() + process1 := getDummyProcess() + require.NotNil(t, process1) + process1.ID = "process1" - rs.AddProcess(process) + process2 := getDummyProcess() + require.NotNil(t, process2) + process2.ID = "process2" - _, err = rs.GetProcess(process.ID) - require.Equal(t, nil, err, "Process not found (%s)", process.ID) + err = rs.AddProcess(process1) + require.Equal(t, nil, err) + + err = rs.AddProcess(process2) + require.Equal(t, nil, err) + + process3 := getDummyProcess() + require.NotNil(t, process3) + process3.ID = "process2" + + err = rs.UpdateProcess("process1", process3) + require.Error(t, err) + + process3.ID = "process3" + err = rs.UpdateProcess("process1", process3) + require.NoError(t, err) + + _, err = rs.GetProcess(process1.ID) + require.Error(t, err) + + _, err = rs.GetProcess(process3.ID) + require.NoError(t, err) +} + +func TestGetProcess(t *testing.T) { + rs, err := getDummyRestreamer(nil, nil, nil, nil) + require.NoError(t, err) + + process1 := getDummyProcess() + process1.ID = "foo_aaa_1" + process1.Reference = "foo_aaa_1" + process2 := getDummyProcess() + process2.ID = "bar_bbb_2" + process2.Reference = "bar_bbb_2" + process3 := getDummyProcess() + process3.ID = "foo_ccc_3" + process3.Reference = "foo_ccc_3" + process4 := getDummyProcess() + process4.ID = "bar_ddd_4" + process4.Reference = "bar_ddd_4" + + rs.AddProcess(process1) + rs.AddProcess(process2) + rs.AddProcess(process3) + rs.AddProcess(process4) + + _, err = rs.GetProcess(process1.ID) + require.Equal(t, nil, err) list := rs.GetProcessIDs("", "") - require.Len(t, list, 1, "expected 1 process") - require.Equal(t, process.ID, list[0], "expected same process ID") + require.Len(t, list, 4) + require.ElementsMatch(t, []string{"foo_aaa_1", "bar_bbb_2", "foo_ccc_3", "bar_ddd_4"}, list) + + list = rs.GetProcessIDs("foo_*", "") + require.Len(t, list, 2) + require.ElementsMatch(t, []string{"foo_aaa_1", "foo_ccc_3"}, list) + + list = rs.GetProcessIDs("bar_*", "") + require.Len(t, list, 2) + require.ElementsMatch(t, []string{"bar_bbb_2", "bar_ddd_4"}, list) + + list = rs.GetProcessIDs("*_bbb_*", "") + require.Len(t, list, 1) + require.ElementsMatch(t, []string{"bar_bbb_2"}, list) + + list = rs.GetProcessIDs("", "foo_*") + require.Len(t, list, 2) + require.ElementsMatch(t, []string{"foo_aaa_1", "foo_ccc_3"}, list) + + list = rs.GetProcessIDs("", "bar_*") + require.Len(t, list, 2) + require.ElementsMatch(t, []string{"bar_bbb_2", "bar_ddd_4"}, list) + + list = rs.GetProcessIDs("", "*_bbb_*") + require.Len(t, list, 1) + require.ElementsMatch(t, []string{"bar_bbb_2"}, list) } func TestStartProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) process := getDummyProcess() @@ -238,7 +314,7 @@ func TestStartProcess(t *testing.T) { } func TestStopProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) process := getDummyProcess() @@ -263,7 +339,7 @@ func TestStopProcess(t *testing.T) { } func TestRestartProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) process := getDummyProcess() @@ -288,7 +364,7 @@ func TestRestartProcess(t *testing.T) { } func TestReloadProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) process := getDummyProcess() @@ -318,8 +394,8 @@ func TestReloadProcess(t *testing.T) { rs.StopProcess(process.ID) } -func TestProcessData(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil) +func TestProcessMetadata(t *testing.T) { + rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) process := getDummyProcess() @@ -340,7 +416,7 @@ func TestProcessData(t *testing.T) { } func TestLog(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) process := getDummyProcess() @@ -373,7 +449,7 @@ func TestLog(t *testing.T) { } func TestPlayoutNoRange(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) process := getDummyProcess() @@ -396,7 +472,7 @@ func TestPlayoutRange(t *testing.T) { portrange, err := net.NewPortrange(3000, 3001) require.NoError(t, err) - rs, err := getDummyRestreamer(portrange, nil, nil) + rs, err := getDummyRestreamer(portrange, nil, nil, nil) require.NoError(t, err) process := getDummyProcess() @@ -417,7 +493,7 @@ func TestPlayoutRange(t *testing.T) { } func TestAddressReference(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) process1 := getDummyProcess() @@ -449,7 +525,7 @@ func TestAddressReference(t *testing.T) { } func TestConfigValidation(t *testing.T) { - rsi, err := getDummyRestreamer(nil, nil, nil) + rsi, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) rs := rsi.(*restream) @@ -496,7 +572,7 @@ func TestConfigValidationFFmpeg(t *testing.T) { valOut, err := ffmpeg.NewValidator([]string{"^https?://", "^rtmp://"}, nil) require.NoError(t, err) - rsi, err := getDummyRestreamer(nil, valIn, valOut) + rsi, err := getDummyRestreamer(nil, valIn, valOut, nil) require.NoError(t, err) rs := rsi.(*restream) @@ -522,7 +598,7 @@ func TestConfigValidationFFmpeg(t *testing.T) { } func TestOutputAddressValidation(t *testing.T) { - rsi, err := getDummyRestreamer(nil, nil, nil) + rsi, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) rs := rsi.(*restream) @@ -561,3 +637,196 @@ func TestOutputAddressValidation(t *testing.T) { require.Equal(t, r.path, path) } } + +func TestMetadata(t *testing.T) { + rs, err := getDummyRestreamer(nil, nil, nil, nil) + require.NoError(t, err) + + process := getDummyProcess() + + data, _ := rs.GetMetadata("foobar") + require.Equal(t, nil, data, "nothing should be stored under the key") + + rs.SetMetadata("foobar", process) + + data, _ = rs.GetMetadata("foobar") + require.NotEqual(t, nil, data, "there should be something stored under the key") + + p := data.(*app.Config) + + require.Equal(t, process.ID, p.ID, "failed to retrieve stored data") +} + +func TestReplacer(t *testing.T) { + replacer := replace.New() + + replacer.RegisterTemplateFunc("diskfs", func(config *app.Config, section string) string { + return "/mnt/diskfs" + }, nil) + + replacer.RegisterTemplateFunc("fs:disk", func(config *app.Config, section string) string { + return "/mnt/diskfs" + }, nil) + + replacer.RegisterTemplateFunc("memfs", func(config *app.Config, section string) string { + return "http://localhost/mnt/memfs" + }, nil) + + replacer.RegisterTemplateFunc("fs:mem", func(config *app.Config, section string) string { + return "http://localhost/mnt/memfs" + }, nil) + + replacer.RegisterTemplateFunc("rtmp", func(config *app.Config, section string) string { + return "rtmp://localhost/app/{name}?token=foobar" + }, nil) + + replacer.RegisterTemplateFunc("srt", func(config *app.Config, section string) string { + template := "srt://localhost:6000?mode=caller&transtype=live&latency={latency}&streamid={name}" + if section == "output" { + template += ",mode:publish" + } else { + template += ",mode:request" + } + template += ",token:abcfoobar&passphrase=secret" + + return template + }, map[string]string{ + "latency": "20000", // 20 milliseconds, FFmpeg requires microseconds + }) + + rsi, err := getDummyRestreamer(nil, nil, nil, replacer) + require.NoError(t, err) + + process := &app.Config{ + ID: "314159265359", + Reference: "refref", + Input: []app.ConfigIO{ + { + ID: "in_{processid}_{reference}", + Address: "input:{inputid}_process:{processid}_reference:{reference}_diskfs:{diskfs}/disk.txt_memfs:{memfs}/mem.txt_fsdisk:{fs:disk}/fsdisk.txt_fsmem:{fs:mem}/fsmem.txt_rtmp:{rtmp,name=pmtr}_srt:{srt,name=trs}_rtmp:{rtmp,name=$inputid}", + Options: []string{ + "-f", + "lavfi", + "-re", + "input:{inputid}", + "process:{processid}", + "reference:{reference}", + "diskfs:{diskfs}/disk.txt", + "memfs:{memfs}/mem.txt", + "fsdisk:{fs:disk}/fsdisk.txt", + "fsmem:{fs:mem}/$inputid.txt", + }, + }, + }, + Output: []app.ConfigIO{ + { + ID: "out_{processid}_{reference}", + Address: "output:{outputid}_process:{processid}_reference:{reference}_diskfs:{diskfs}/disk.txt_memfs:{memfs}/mem.txt_fsdisk:{fs:disk}/fsdisk.txt_fsmem:{fs:mem}/fsmem.txt_rtmp:{rtmp,name=$processid}_srt:{srt,name=$reference,latency=42}_rtmp:{rtmp,name=$outputid}", + Options: []string{ + "-codec", + "copy", + "-f", + "null", + "output:{outputid}", + "process:{processid}", + "reference:{reference}", + "diskfs:{diskfs}/disk.txt", + "memfs:{memfs}/mem.txt", + "fsdisk:{fs:disk}/fsdisk.txt", + "fsmem:{fs:mem}/$outputid.txt", + }, + Cleanup: []app.ConfigIOCleanup{ + { + Pattern: "pattern_{outputid}_{processid}_{reference}_{rtmp,name=$outputid}", + MaxFiles: 0, + MaxFileAge: 0, + PurgeOnDelete: false, + }, + }, + }, + }, + Options: []string{ + "-loglevel", + "info", + "{diskfs}/foobar_on_disk.txt", + "{memfs}/foobar_in_mem.txt", + "{fs:disk}/foobar_on_disk_aswell.txt", + "{fs:mem}/foobar_in_mem_aswell.txt", + }, + Reconnect: true, + ReconnectDelay: 10, + Autostart: false, + StaleTimeout: 0, + } + + err = rsi.AddProcess(process) + require.NoError(t, err) + + rs := rsi.(*restream) + + process = &app.Config{ + ID: "314159265359", + Reference: "refref", + FFVersion: "^4.0.2", + Input: []app.ConfigIO{ + { + ID: "in_314159265359_refref", + Address: "input:in_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/pmtr?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=20000&streamid=trs,mode:request,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/in_314159265359_refref?token=foobar", + Options: []string{ + "-f", + "lavfi", + "-re", + "input:in_314159265359_refref", + "process:314159265359", + "reference:refref", + "diskfs:/mnt/diskfs/disk.txt", + "memfs:http://localhost/mnt/memfs/mem.txt", + "fsdisk:/mnt/diskfs/fsdisk.txt", + "fsmem:http://localhost/mnt/memfs/$inputid.txt", + }, + Cleanup: []app.ConfigIOCleanup{}, + }, + }, + Output: []app.ConfigIO{ + { + ID: "out_314159265359_refref", + Address: "output:out_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/314159265359?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=42&streamid=refref,mode:publish,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/out_314159265359_refref?token=foobar", + Options: []string{ + "-codec", + "copy", + "-f", + "null", + "output:out_314159265359_refref", + "process:314159265359", + "reference:refref", + "diskfs:/mnt/diskfs/disk.txt", + "memfs:http://localhost/mnt/memfs/mem.txt", + "fsdisk:/mnt/diskfs/fsdisk.txt", + "fsmem:http://localhost/mnt/memfs/$outputid.txt", + }, + Cleanup: []app.ConfigIOCleanup{ + { + Pattern: "pattern_out_314159265359_refref_314159265359_refref_{rtmp,name=$outputid}", + MaxFiles: 0, + MaxFileAge: 0, + PurgeOnDelete: false, + }, + }, + }, + }, + Options: []string{ + "-loglevel", + "info", + "/mnt/diskfs/foobar_on_disk.txt", + "{memfs}/foobar_in_mem.txt", + "/mnt/diskfs/foobar_on_disk_aswell.txt", + "http://localhost/mnt/memfs/foobar_in_mem_aswell.txt", + }, + Reconnect: true, + ReconnectDelay: 10, + Autostart: false, + StaleTimeout: 0, + } + + require.Equal(t, process, rs.tasks["314159265359"].config) +}