From e6af09b9829b8b559c1530cd9ea0c28a5860a9f4 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 1 Oct 2024 16:11:38 +0200 Subject: [PATCH] Add test for fs cleanup --- restream/core_test.go | 95 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/restream/core_test.go b/restream/core_test.go index aad54bec..3d9e1a68 100644 --- a/restream/core_test.go +++ b/restream/core_test.go @@ -1,6 +1,7 @@ package restream import ( + "bytes" "fmt" "math/rand" "os" @@ -1614,3 +1615,97 @@ func BenchmarkGetProcessState(b *testing.B) { rs.DeleteProcess(app.NewProcessID("test_"+strconv.Itoa(n), "")) } } + +func TestProcessCleanup(t *testing.T) { + rsi, err := getDummyRestreamer(nil, nil, nil, nil) + require.NoError(t, err) + + rsi.Start() + + rs := rsi.(*restream) + + memfs, ok := rs.fs.list[0].(fs.Filesystem) + require.True(t, ok) + + for i := 0; i < 10; i++ { + memfs.WriteFileReader(fmt.Sprintf("/foobar_%02d.dat", i), bytes.NewReader([]byte("hello")), -1) + } + + files := memfs.List("/", fs.ListOptions{ + Pattern: "/foobar_*", + }) + require.Equal(t, 10, len(files)) + + process := getDummyProcess() + process.ID = "foobar" + output := process.Output[0] + output.Cleanup = append(output.Cleanup, app.ConfigIOCleanup{ + Pattern: "mem:/{processid}_*", + MaxFiles: 5, + MaxFileAge: 0, + PurgeOnDelete: true, + }) + process.Output[0] = output + + err = rsi.AddProcess(process) + require.NoError(t, err) + + require.Eventually(t, func() bool { + files := memfs.List("/", fs.ListOptions{ + Pattern: "/foobar_*", + }) + + return len(files) == 5 + }, 15*time.Second, time.Second) + + rsi.Stop() + + for i := 0; i < 10; i++ { + memfs.WriteFileReader(fmt.Sprintf("/foobar_%02d.dat", i), bytes.NewReader([]byte("hello")), -1) + } + + files = memfs.List("/", fs.ListOptions{ + Pattern: "/foobar_*", + }) + require.Equal(t, 10, len(files)) + + rsi.ReloadProcess(app.ProcessID{ID: process.ID}) + + rsi.Start() + + require.Eventually(t, func() bool { + files := memfs.List("/", fs.ListOptions{ + Pattern: "/foobar_*", + }) + + return len(files) == 5 + }, 15*time.Second, time.Second) + + rsi.Stop() + + for i := 0; i < 10; i++ { + memfs.WriteFileReader(fmt.Sprintf("/foobar_%02d.dat", i), bytes.NewReader([]byte("hello")), -1) + } + + files = memfs.List("/", fs.ListOptions{ + Pattern: "/foobar_*", + }) + require.Equal(t, 10, len(files)) + + process.Reference = "foobar" + rsi.UpdateProcess(app.ProcessID{ID: process.ID}, process) + + rsi.Start() + + require.Eventually(t, func() bool { + files := memfs.List("/", fs.ListOptions{ + Pattern: "/foobar_*", + }) + + return len(files) == 5 + }, 15*time.Second, time.Second) + + rsi.Stop() + + //task, ok := rs.tasks.Load(app.ProcessID{ID: process.ID}) +}