From d5c03932b5543ae3062e3a77c7f78f226b4d9496 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Thu, 15 May 2025 11:10:10 +0200 Subject: [PATCH] Validate process config before adding/updating the cluster DB --- cluster/node/core.go | 12 ++++++++++ cluster/node/manager.go | 22 ++++++++++++++++++ cluster/process.go | 12 ++++++++++ http/client/client.go | 1 + http/client/process.go | 18 +++++++++++++++ http/handler/api/process.go | 45 +++++++++++++++++++++++++++++++++++- http/server.go | 1 + restream/app/process_test.go | 28 ++++++++++++++++++++++ restream/core.go | 25 ++++++++++++++++++++ 9 files changed, 163 insertions(+), 1 deletion(-) diff --git a/cluster/node/core.go b/cluster/node/core.go index 9dc87e87..bd5e4f84 100644 --- a/cluster/node/core.go +++ b/cluster/node/core.go @@ -411,6 +411,18 @@ func (n *Core) ProcessProbeConfig(config *app.Config) (api.Probe, error) { return probe, err } +func (n *Core) ProcessValidateConfig(config *app.Config) error { + n.lock.RLock() + client := n.client + n.lock.RUnlock() + + if client == nil { + return ErrNoPeer + } + + return client.ProcessValidateConfig(config) +} + func (n *Core) ProcessList(options client.ProcessListOptions) ([]api.Process, error) { n.lock.RLock() client := n.client diff --git a/cluster/node/manager.go b/cluster/node/manager.go index 53ad6428..c9f51b7d 100644 --- a/cluster/node/manager.go +++ b/cluster/node/manager.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math/rand/v2" "net/url" "sort" "sync" @@ -493,6 +494,18 @@ func (p *Manager) FindNodeForResources(nodeid string, cpu float64, memory uint64 return "" } +func (p *Manager) GetRandomNode() string { + p.lock.RLock() + defer p.lock.RUnlock() + + nodes := []string{} + for nodeid := range p.nodes { + nodes = append(nodes, nodeid) + } + + return nodes[rand.IntN(len(p.nodes))] +} + func (p *Manager) ProcessList(options client.ProcessListOptions) []api.Process { processChan := make(chan []api.Process, 64) processList := []api.Process{} @@ -627,6 +640,15 @@ func (p *Manager) ProcessProbeConfig(nodeid string, config *app.Config) (api.Pro return node.Core().ProcessProbeConfig(config) } +func (p *Manager) ProcessValidateConfig(nodeid string, config *app.Config) error { + node, err := p.NodeGet(nodeid) + if err != nil { + return err + } + + return node.Core().ProcessValidateConfig(config) +} + func (p *Manager) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) { eventChan := make(chan api.Event, 128) diff --git a/cluster/process.go b/cluster/process.go index c67f0de2..48da77a4 100644 --- a/cluster/process.go +++ b/cluster/process.go @@ -12,6 +12,12 @@ func (c *cluster) ProcessAdd(origin string, config *app.Config) error { return c.forwarder.ProcessAdd(origin, config) } + nodeid := c.manager.GetRandomNode() + err := c.manager.ProcessValidateConfig(nodeid, config) + if err != nil { + return err + } + cmd := &store.Command{ Operation: store.OpAddProcess, Data: &store.CommandAddProcess{ @@ -57,6 +63,12 @@ func (c *cluster) ProcessUpdate(origin string, id app.ProcessID, config *app.Con return c.forwarder.ProcessUpdate(origin, id, config) } + nodeid := c.manager.GetRandomNode() + err := c.manager.ProcessValidateConfig(nodeid, config) + if err != nil { + return err + } + cmd := &store.Command{ Operation: store.OpUpdateProcess, Data: &store.CommandUpdateProcess{ diff --git a/http/client/client.go b/http/client/client.go index 4cc9d5b7..0e1c56fd 100644 --- a/http/client/client.go +++ b/http/client/client.go @@ -67,6 +67,7 @@ type RestClient interface { ProcessCommand(id app.ProcessID, command string) error // PUT /v3/process/{id}/command ProcessProbe(id app.ProcessID) (api.Probe, error) // GET /v3/process/{id}/probe ProcessProbeConfig(config *app.Config) (api.Probe, error) // POST /v3/process/probe + ProcessValidateConfig(p *app.Config) error // POST /v3/process/validate ProcessConfig(id app.ProcessID) (api.ProcessConfig, error) // GET /v3/process/{id}/config ProcessReport(id app.ProcessID) (api.ProcessReport, error) // GET /v3/process/{id}/report ProcessReportSet(id app.ProcessID, report *app.Report) error // PUT /v3/process/{id}/report diff --git a/http/client/process.go b/http/client/process.go index 8de34aa8..1e5c9364 100644 --- a/http/client/process.go +++ b/http/client/process.go @@ -226,6 +226,24 @@ func (r *restclient) ProcessProbeConfig(p *app.Config) (api.Probe, error) { return probe, err } +func (r *restclient) ProcessValidateConfig(p *app.Config) error { + buf := mem.Get() + defer mem.Put(buf) + + config := api.ProcessConfig{} + config.Unmarshal(p, nil) + + e := json.NewEncoder(buf) + e.Encode(config) + + _, err := r.call("POST", "/v3/process/validate", nil, nil, "application/json", buf.Reader()) + if err != nil { + return err + } + + return nil +} + func (r *restclient) ProcessConfig(id app.ProcessID) (api.ProcessConfig, error) { var p api.ProcessConfig diff --git a/http/handler/api/process.go b/http/handler/api/process.go index d992ec96..6ea0b1f7 100644 --- a/http/handler/api/process.go +++ b/http/handler/api/process.go @@ -751,7 +751,7 @@ func (h *ProcessHandler) Probe(c echo.Context) error { } // ProbeConfig probes a process -// @Summary Add a new process +// @Summary Probe a process config // @Description Probe a process to get a detailed stream information on the inputs. // @Tags v16.?.? // @ID process-3-probe-config @@ -797,6 +797,49 @@ func (h *ProcessHandler) ProbeConfig(c echo.Context) error { return c.JSON(http.StatusOK, apiprobe) } +// ValaidateConfig validates a config +// @Summary Validate a process config +// @Description Probe a process to get a detailed stream information on the inputs. +// @Tags v16.?.? +// @ID process-3-validate-config +// @Accept json +// @Produce json +// @Param config body api.ProcessConfig true "Process config" +// @Success 200 {object} api.Probe +// @Failure 400 {object} api.Error +// @Failure 403 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/process/validate [post] +func (h *ProcessHandler) ValidateConfig(c echo.Context) error { + ctxuser := util.DefaultContext(c, "user", "") + + process := api.ProcessConfig{ + Owner: ctxuser, + Type: "ffmpeg", + } + + if err := util.ShouldBindJSON(c, &process); err != nil { + return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) + } + + if !h.iam.Enforce(ctxuser, process.Domain, "process", process.ID, "write") { + return api.Err(http.StatusForbidden, "", "You are not allowed to validate this process, check the domain and process ID") + } + + if process.Type != "ffmpeg" { + return api.Err(http.StatusBadRequest, "", "unsupported process type, supported process types are: ffmpeg") + } + + config, _ := process.Marshal() + + err := h.restream.Validate(config) + if err != nil { + return api.Err(http.StatusBadRequest, "", "invalid config: %s", err.Error()) + } + + return c.JSON(http.StatusOK, process) +} + // Skills returns the detected FFmpeg capabilities // @Summary FFmpeg capabilities // @Description List all detected FFmpeg capabilities. diff --git a/http/server.go b/http/server.go index 0dd84797..1e545425 100644 --- a/http/server.go +++ b/http/server.go @@ -641,6 +641,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) { if !s.readOnly { v3.POST("/process/probe", s.v3handler.process.ProbeConfig) + v3.POST("/process/validate", s.v3handler.process.ValidateConfig) v3.GET("/process/:id/probe", s.v3handler.process.Probe) v3.POST("/process", s.v3handler.process.Add) v3.PUT("/process/:id", s.v3handler.process.Update) diff --git a/restream/app/process_test.go b/restream/app/process_test.go index 2aa6168b..20794214 100644 --- a/restream/app/process_test.go +++ b/restream/app/process_test.go @@ -66,6 +66,34 @@ func TestConfigHash(t *testing.T) { require.False(t, bytes.Equal(hash1, hash2)) } +func TestConfigIOHash(t *testing.T) { + io1 := ConfigIO{ + ID: "0", + Address: "-", + Options: []string{ + "-codec", + "copy", + "-f", + "null", + }, + Cleanup: []ConfigIOCleanup{}, + } + + io2 := ConfigIO{ + ID: "0", + Address: "-", + Options: []string{ + "-codec", + "copy", + "-f", + "null", + }, + Cleanup: nil, + } + + require.Equal(t, io1.HashString(), io2.HashString()) +} + func TestProcessUsageCPU(t *testing.T) { original := parse.UsageCPU{ NCPU: 1.5, diff --git a/restream/core.go b/restream/core.go index 935565a2..f954739f 100644 --- a/restream/core.go +++ b/restream/core.go @@ -58,6 +58,7 @@ type Restreamer interface { GetProcessMetadata(id app.ProcessID, key string) (interface{}, error) // Get previously set metadata from a process Probe(config *app.Config, timeout time.Duration) app.Probe // Probe a process with specific timeout + Validate(config *app.Config) error // Validate a process config } // Config is the required configuration for a new restreamer instance. @@ -1920,3 +1921,27 @@ func hasPlaceholder(config *app.Config, r replace.Replacer, placeholder string) return false } + +func (r *restream) Validate(config *app.Config) error { + cfg := config.Clone() + + resolveStaticPlaceholders(cfg, r.replace) + + err := r.resolveAddresses(r.tasks, cfg) + if err != nil { + return err + } + + resolveDynamicPlaceholder(cfg, r.replace, map[string]string{ + "hwdevice": "0", + }, map[string]string{ + "timestamp": time.Now().UTC().Format(time.RFC3339), + }) + + _, err = validateConfig(cfg, r.fs.list, r.ffmpeg) + if err != nil { + return err + } + + return nil +}