From 981fcd4dd38daa99dc4783c8ed06177959a4edf1 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 21 Jul 2025 16:18:56 +0200 Subject: [PATCH] Add option to prevent purging on delete --- cluster/leader.go | 4 ++-- cluster/node/core.go | 4 ++-- cluster/node/manager.go | 4 ++-- http/client/client.go | 2 +- http/client/process.go | 4 +++- http/handler/api/process.go | 9 ++++++++- restream/core.go | 20 ++++++++++---------- restream/core_test.go | 4 ++-- restream/fs/fs.go | 8 +++++--- restream/fs/fs_test.go | 26 +++++++++++++------------- 10 files changed, 48 insertions(+), 37 deletions(-) diff --git a/cluster/leader.go b/cluster/leader.go index 7900c289..dc063dce 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -618,7 +618,7 @@ func (c *cluster) applyOp(op interface{}, logger log.Logger) processOpError { "nodeid": v.nodeid, }).Log("Updating process") case processOpDelete: - err := c.manager.ProcessDelete(v.nodeid, v.processid) + err := c.manager.ProcessDelete(v.nodeid, v.processid, true) if err != nil { opErr = processOpError{ processid: v.processid, @@ -701,7 +701,7 @@ func (c *cluster) applyOp(op interface{}, logger log.Logger) processOpError { } } - err = c.manager.ProcessDelete(v.fromNodeid, v.config.ProcessID()) + err = c.manager.ProcessDelete(v.fromNodeid, v.config.ProcessID(), false) if err != nil { //opErr = processOpError{ // processid: v.config.ProcessID(), diff --git a/cluster/node/core.go b/cluster/node/core.go index 95ac8115..c10ea752 100644 --- a/cluster/node/core.go +++ b/cluster/node/core.go @@ -337,7 +337,7 @@ func (n *Core) ProcessCommand(id app.ProcessID, command string) error { return client.ProcessCommand(id, command) } -func (n *Core) ProcessDelete(id app.ProcessID) error { +func (n *Core) ProcessDelete(id app.ProcessID, purge bool) error { n.lock.RLock() client := n.client n.lock.RUnlock() @@ -346,7 +346,7 @@ func (n *Core) ProcessDelete(id app.ProcessID) error { return ErrNoPeer } - return client.ProcessDelete(id) + return client.ProcessDelete(id, purge) } func (n *Core) ProcessUpdate(id app.ProcessID, config *app.Config, metadata map[string]any, force bool) error { diff --git a/cluster/node/manager.go b/cluster/node/manager.go index 74b74cf9..b211baa0 100644 --- a/cluster/node/manager.go +++ b/cluster/node/manager.go @@ -572,13 +572,13 @@ func (p *Manager) ProcessAdd(nodeid string, config *app.Config, metadata map[str return node.Core().ProcessAdd(config, metadata) } -func (p *Manager) ProcessDelete(nodeid string, id app.ProcessID) error { +func (p *Manager) ProcessDelete(nodeid string, id app.ProcessID, purge bool) error { node, err := p.NodeGet(nodeid) if err != nil { return err } - return node.Core().ProcessDelete(id) + return node.Core().ProcessDelete(id, purge) } func (p *Manager) ProcessUpdate(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]any, force bool) error { diff --git a/http/client/client.go b/http/client/client.go index 5e59836c..64d990c7 100644 --- a/http/client/client.go +++ b/http/client/client.go @@ -63,7 +63,7 @@ type RestClient interface { ProcessAdd(p *app.Config, metadata map[string]any) error // POST /v3/process Process(id app.ProcessID, filter []string) (api.Process, error) // GET /v3/process/{id} ProcessUpdate(id app.ProcessID, p *app.Config, metadata map[string]any, force bool) error // PUT /v3/process/{id} - ProcessDelete(id app.ProcessID) error // DELETE /v3/process/{id} + ProcessDelete(id app.ProcessID, purge bool) error // DELETE /v3/process/{id} ProcessCommand(id app.ProcessID, command string) error // PUT /v3/process/{id}/command ProcessProbe(id app.ProcessID) (api.Probe, error) // GET /v3/process/{id}/probe ProcessProbeConfig(config *app.Config) (api.Probe, error) // POST /v3/process/probe diff --git a/http/client/process.go b/http/client/process.go index 647819f1..478e41cf 100644 --- a/http/client/process.go +++ b/http/client/process.go @@ -2,6 +2,7 @@ package client import ( "net/url" + "strconv" "strings" "github.com/datarhei/core/v16/encoding/json" @@ -127,9 +128,10 @@ func (r *restclient) ProcessReportSet(id app.ProcessID, report *app.Report) erro return nil } -func (r *restclient) ProcessDelete(id app.ProcessID) error { +func (r *restclient) ProcessDelete(id app.ProcessID, purge bool) error { query := &url.Values{} query.Set("domain", id.Domain) + query.Set("purge", strconv.FormatBool(purge)) _, err := r.call("DELETE", "/v3/process/"+url.PathEscape(id.ID), query, nil, "", nil) diff --git a/http/handler/api/process.go b/http/handler/api/process.go index e63520f8..4ee006a9 100644 --- a/http/handler/api/process.go +++ b/http/handler/api/process.go @@ -233,6 +233,7 @@ func (h *ProcessHandler) Get(c echo.Context) error { // @Produce json // @Param id path string true "Process ID" // @Param domain query string false "Process domain" +// @Param purge query string false "Whether to purge files" // @Success 200 {string} string // @Failure 400 {object} api.Error // @Failure 403 {object} api.Error @@ -244,6 +245,12 @@ func (h *ProcessHandler) Delete(c echo.Context) error { ctxuser := util.DefaultContext(c, "user", "") id := util.PathParam(c, "id") domain := util.DefaultQuery(c, "domain", "") + purgeq := util.DefaultQuery(c, "purge", "true") + + purge := false + if x, err := strconv.ParseBool(purgeq); err == nil { + purge = x + } tid := app.ProcessID{ ID: id, @@ -258,7 +265,7 @@ func (h *ProcessHandler) Delete(c echo.Context) error { return h.apiErrorFromError(err) } - if err := h.restream.DeleteProcess(tid); err != nil { + if err := h.restream.DeleteProcess(tid, purge); err != nil { return h.apiErrorFromError(err) } diff --git a/restream/core.go b/restream/core.go index d8895c2e..3943e6ba 100644 --- a/restream/core.go +++ b/restream/core.go @@ -42,7 +42,7 @@ type Restreamer interface { AddProcess(config *app.Config) error // Add a new process GetProcessIDs(idpattern, refpattern, ownerpattern, domainpattern string) []app.ProcessID // Get a list of process IDs based on patterns for ID and reference - DeleteProcess(id app.ProcessID) error // Delete a process + DeleteProcess(id app.ProcessID, purge bool) error // Delete a process UpdateProcess(id app.ProcessID, config *app.Config, force bool) error // Update a process StartProcess(id app.ProcessID) error // Start a process StopProcess(id app.ProcessID) error // Stop a process @@ -227,7 +227,7 @@ func (r *restream) Stop() { wg.Wait() r.tasks.Range(func(id app.ProcessID, t *task) bool { - r.unsetCleanup(id) + r.unsetCleanup(id, false) return true }) @@ -743,16 +743,16 @@ func (r *restream) setCleanup(id app.ProcessID, config *app.Config) { continue } - fs.UpdateCleanup(id.String(), p) + fs.UpdateCleanup(id.String(), p, true) break } } } -func (r *restream) unsetCleanup(id app.ProcessID) { +func (r *restream) unsetCleanup(id app.ProcessID, purge bool) { for _, fs := range r.fs.list { - fs.UpdateCleanup(id.String(), nil) + fs.UpdateCleanup(id.String(), nil, purge) } } @@ -1232,7 +1232,7 @@ func (r *restream) updateProcess(task *task, config *app.Config, force bool) err t.Restore() if !tid.Equal(task.ID()) { - r.unsetCleanup(task.ID()) + r.unsetCleanup(task.ID(), true) r.tasks.LoadAndDelete(task.ID()) } @@ -1299,14 +1299,14 @@ func (r *restream) GetProcess(id app.ProcessID) (*app.Process, error) { return task.Process(), nil } -func (r *restream) DeleteProcess(id app.ProcessID) error { +func (r *restream) DeleteProcess(id app.ProcessID, purge bool) error { task, ok := r.tasks.LoadAndLock(id) if !ok { return ErrUnknownProcess } defer r.tasks.Unlock(id) - err := r.deleteProcess(task) + err := r.deleteProcess(task, purge) if err != nil { return err @@ -1317,13 +1317,13 @@ func (r *restream) DeleteProcess(id app.ProcessID) error { return nil } -func (r *restream) deleteProcess(task *task) error { +func (r *restream) deleteProcess(task *task, purge bool) error { if task.Order() != "stop" { return fmt.Errorf("the process with the ID '%s' is still running", task.String()) } r.unsetPlayoutPorts(task) - r.unsetCleanup(task.ID()) + r.unsetCleanup(task.ID(), purge) r.tasks.LoadAndDelete(task.ID()) diff --git a/restream/core_test.go b/restream/core_test.go index 5f75b6c6..513e5afe 100644 --- a/restream/core_test.go +++ b/restream/core_test.go @@ -258,7 +258,7 @@ func TestRemoveProcess(t *testing.T) { err = rs.AddProcess(process) require.Equal(t, nil, err, "Failed to add process (%s)", err) - err = rs.DeleteProcess(tid) + err = rs.DeleteProcess(tid, true) require.Equal(t, nil, err, "Set process not found (%s)", process.ID) _, err = rs.GetProcess(tid) @@ -1777,7 +1777,7 @@ func BenchmarkGetProcessState(b *testing.B) { } for i := 0; i < n; i++ { - rs.DeleteProcess(app.NewProcessID("test_"+strconv.Itoa(n), "")) + rs.DeleteProcess(app.NewProcessID("test_"+strconv.Itoa(n), ""), true) } } diff --git a/restream/fs/fs.go b/restream/fs/fs.go index 984fdd5b..8bb4bd02 100644 --- a/restream/fs/fs.go +++ b/restream/fs/fs.go @@ -40,7 +40,7 @@ type Filesystem interface { fs.Filesystem // UpdateCleanup - UpdateCleanup(id string, patterns []Pattern) + UpdateCleanup(id string, patterns []Pattern, purge bool) // Start Start() @@ -128,7 +128,7 @@ func (rfs *filesystem) compilePatterns(patterns []Pattern) []Pattern { return patterns } -func (rfs *filesystem) UpdateCleanup(id string, newPatterns []Pattern) { +func (rfs *filesystem) UpdateCleanup(id string, newPatterns []Pattern, purge bool) { newPatterns = rfs.compilePatterns(newPatterns) rfs.cleanupLock.Lock() @@ -175,7 +175,9 @@ func (rfs *filesystem) UpdateCleanup(id string, newPatterns []Pattern) { }).Log("Remove pattern") } - rfs.purge(onlyCurrent) + if purge { + rfs.purge(onlyCurrent) + } } func (rfs *filesystem) cleanup() { diff --git a/restream/fs/fs_test.go b/restream/fs/fs_test.go index 7f6c67d9..55e9938e 100644 --- a/restream/fs/fs_test.go +++ b/restream/fs/fs_test.go @@ -34,7 +34,7 @@ func TestUpdateCleanup(t *testing.T) { }, } - cleanfs.UpdateCleanup("foobar", patterns) + cleanfs.UpdateCleanup("foobar", patterns, false) require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns) @@ -44,21 +44,21 @@ func TestUpdateCleanup(t *testing.T) { MaxFileAge: 0, }) - cleanfs.UpdateCleanup("foobar", patterns) + cleanfs.UpdateCleanup("foobar", patterns, false) require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns) patterns[0].MaxFiles = 42 - cleanfs.UpdateCleanup("foobar", patterns) + cleanfs.UpdateCleanup("foobar", patterns, false) require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns) - cleanfs.UpdateCleanup("foobar", patterns[1:]) + cleanfs.UpdateCleanup("foobar", patterns[1:], false) require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns[1:]) - cleanfs.UpdateCleanup("foobar", nil) + cleanfs.UpdateCleanup("foobar", nil, false) require.Empty(t, cleanfs.cleanupPatterns["foobar"]) } @@ -81,7 +81,7 @@ func TestMaxFiles(t *testing.T) { MaxFiles: 3, MaxFileAge: 0, }, - }) + }, false) cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1) cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1) @@ -130,7 +130,7 @@ func TestMaxAge(t *testing.T) { MaxFiles: 0, MaxFileAge: 3 * time.Second, }, - }) + }, false) cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1) cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1) @@ -179,7 +179,7 @@ func TestUnsetCleanup(t *testing.T) { MaxFiles: 3, MaxFileAge: 0, }, - }) + }, false) cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1) cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1) @@ -207,7 +207,7 @@ func TestUnsetCleanup(t *testing.T) { return true }, 3*time.Second, time.Second) - cleanfs.UpdateCleanup("foobar", nil) + cleanfs.UpdateCleanup("foobar", nil, false) cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1) @@ -249,7 +249,7 @@ func TestPurge(t *testing.T) { MaxFileAge: 0, PurgeOnDelete: true, }, - }) + }, false) cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1) cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1) @@ -277,7 +277,7 @@ func TestPurge(t *testing.T) { return true }, 3*time.Second, time.Second) - cleanfs.UpdateCleanup("foobar", nil) + cleanfs.UpdateCleanup("foobar", nil, false) cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1) @@ -350,7 +350,7 @@ func BenchmarkCleanup(b *testing.B) { }, } - cleanfs.UpdateCleanup(id, patterns) + cleanfs.UpdateCleanup(id, patterns, false) ids[i] = id } @@ -429,7 +429,7 @@ func BenchmarkPurge(b *testing.B) { }, } - cleanfs.UpdateCleanup(id, patterns) + cleanfs.UpdateCleanup(id, patterns, false) ids[i] = id }