Validate process config before adding/updating the cluster DB

This commit is contained in:
Ingo Oppermann 2025-05-15 11:10:10 +02:00
parent 29d0e753ae
commit d5c03932b5
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
9 changed files with 163 additions and 1 deletions

View File

@ -411,6 +411,18 @@ func (n *Core) ProcessProbeConfig(config *app.Config) (api.Probe, error) {
return probe, err
}
func (n *Core) ProcessValidateConfig(config *app.Config) error {
n.lock.RLock()
client := n.client
n.lock.RUnlock()
if client == nil {
return ErrNoPeer
}
return client.ProcessValidateConfig(config)
}
func (n *Core) ProcessList(options client.ProcessListOptions) ([]api.Process, error) {
n.lock.RLock()
client := n.client

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"math/rand/v2"
"net/url"
"sort"
"sync"
@ -493,6 +494,18 @@ func (p *Manager) FindNodeForResources(nodeid string, cpu float64, memory uint64
return ""
}
func (p *Manager) GetRandomNode() string {
p.lock.RLock()
defer p.lock.RUnlock()
nodes := []string{}
for nodeid := range p.nodes {
nodes = append(nodes, nodeid)
}
return nodes[rand.IntN(len(p.nodes))]
}
func (p *Manager) ProcessList(options client.ProcessListOptions) []api.Process {
processChan := make(chan []api.Process, 64)
processList := []api.Process{}
@ -627,6 +640,15 @@ func (p *Manager) ProcessProbeConfig(nodeid string, config *app.Config) (api.Pro
return node.Core().ProcessProbeConfig(config)
}
func (p *Manager) ProcessValidateConfig(nodeid string, config *app.Config) error {
node, err := p.NodeGet(nodeid)
if err != nil {
return err
}
return node.Core().ProcessValidateConfig(config)
}
func (p *Manager) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) {
eventChan := make(chan api.Event, 128)

View File

@ -12,6 +12,12 @@ func (c *cluster) ProcessAdd(origin string, config *app.Config) error {
return c.forwarder.ProcessAdd(origin, config)
}
nodeid := c.manager.GetRandomNode()
err := c.manager.ProcessValidateConfig(nodeid, config)
if err != nil {
return err
}
cmd := &store.Command{
Operation: store.OpAddProcess,
Data: &store.CommandAddProcess{
@ -57,6 +63,12 @@ func (c *cluster) ProcessUpdate(origin string, id app.ProcessID, config *app.Con
return c.forwarder.ProcessUpdate(origin, id, config)
}
nodeid := c.manager.GetRandomNode()
err := c.manager.ProcessValidateConfig(nodeid, config)
if err != nil {
return err
}
cmd := &store.Command{
Operation: store.OpUpdateProcess,
Data: &store.CommandUpdateProcess{

View File

@ -67,6 +67,7 @@ type RestClient interface {
ProcessCommand(id app.ProcessID, command string) error // PUT /v3/process/{id}/command
ProcessProbe(id app.ProcessID) (api.Probe, error) // GET /v3/process/{id}/probe
ProcessProbeConfig(config *app.Config) (api.Probe, error) // POST /v3/process/probe
ProcessValidateConfig(p *app.Config) error // POST /v3/process/validate
ProcessConfig(id app.ProcessID) (api.ProcessConfig, error) // GET /v3/process/{id}/config
ProcessReport(id app.ProcessID) (api.ProcessReport, error) // GET /v3/process/{id}/report
ProcessReportSet(id app.ProcessID, report *app.Report) error // PUT /v3/process/{id}/report

View File

@ -226,6 +226,24 @@ func (r *restclient) ProcessProbeConfig(p *app.Config) (api.Probe, error) {
return probe, err
}
func (r *restclient) ProcessValidateConfig(p *app.Config) error {
buf := mem.Get()
defer mem.Put(buf)
config := api.ProcessConfig{}
config.Unmarshal(p, nil)
e := json.NewEncoder(buf)
e.Encode(config)
_, err := r.call("POST", "/v3/process/validate", nil, nil, "application/json", buf.Reader())
if err != nil {
return err
}
return nil
}
func (r *restclient) ProcessConfig(id app.ProcessID) (api.ProcessConfig, error) {
var p api.ProcessConfig

View File

@ -751,7 +751,7 @@ func (h *ProcessHandler) Probe(c echo.Context) error {
}
// ProbeConfig probes a process
// @Summary Add a new process
// @Summary Probe a process config
// @Description Probe a process to get a detailed stream information on the inputs.
// @Tags v16.?.?
// @ID process-3-probe-config
@ -797,6 +797,49 @@ func (h *ProcessHandler) ProbeConfig(c echo.Context) error {
return c.JSON(http.StatusOK, apiprobe)
}
// ValaidateConfig validates a config
// @Summary Validate a process config
// @Description Probe a process to get a detailed stream information on the inputs.
// @Tags v16.?.?
// @ID process-3-validate-config
// @Accept json
// @Produce json
// @Param config body api.ProcessConfig true "Process config"
// @Success 200 {object} api.Probe
// @Failure 400 {object} api.Error
// @Failure 403 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/process/validate [post]
func (h *ProcessHandler) ValidateConfig(c echo.Context) error {
ctxuser := util.DefaultContext(c, "user", "")
process := api.ProcessConfig{
Owner: ctxuser,
Type: "ffmpeg",
}
if err := util.ShouldBindJSON(c, &process); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
}
if !h.iam.Enforce(ctxuser, process.Domain, "process", process.ID, "write") {
return api.Err(http.StatusForbidden, "", "You are not allowed to validate this process, check the domain and process ID")
}
if process.Type != "ffmpeg" {
return api.Err(http.StatusBadRequest, "", "unsupported process type, supported process types are: ffmpeg")
}
config, _ := process.Marshal()
err := h.restream.Validate(config)
if err != nil {
return api.Err(http.StatusBadRequest, "", "invalid config: %s", err.Error())
}
return c.JSON(http.StatusOK, process)
}
// Skills returns the detected FFmpeg capabilities
// @Summary FFmpeg capabilities
// @Description List all detected FFmpeg capabilities.

View File

@ -641,6 +641,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
if !s.readOnly {
v3.POST("/process/probe", s.v3handler.process.ProbeConfig)
v3.POST("/process/validate", s.v3handler.process.ValidateConfig)
v3.GET("/process/:id/probe", s.v3handler.process.Probe)
v3.POST("/process", s.v3handler.process.Add)
v3.PUT("/process/:id", s.v3handler.process.Update)

View File

@ -66,6 +66,34 @@ func TestConfigHash(t *testing.T) {
require.False(t, bytes.Equal(hash1, hash2))
}
func TestConfigIOHash(t *testing.T) {
io1 := ConfigIO{
ID: "0",
Address: "-",
Options: []string{
"-codec",
"copy",
"-f",
"null",
},
Cleanup: []ConfigIOCleanup{},
}
io2 := ConfigIO{
ID: "0",
Address: "-",
Options: []string{
"-codec",
"copy",
"-f",
"null",
},
Cleanup: nil,
}
require.Equal(t, io1.HashString(), io2.HashString())
}
func TestProcessUsageCPU(t *testing.T) {
original := parse.UsageCPU{
NCPU: 1.5,

View File

@ -58,6 +58,7 @@ type Restreamer interface {
GetProcessMetadata(id app.ProcessID, key string) (interface{}, error) // Get previously set metadata from a process
Probe(config *app.Config, timeout time.Duration) app.Probe // Probe a process with specific timeout
Validate(config *app.Config) error // Validate a process config
}
// Config is the required configuration for a new restreamer instance.
@ -1920,3 +1921,27 @@ func hasPlaceholder(config *app.Config, r replace.Replacer, placeholder string)
return false
}
func (r *restream) Validate(config *app.Config) error {
cfg := config.Clone()
resolveStaticPlaceholders(cfg, r.replace)
err := r.resolveAddresses(r.tasks, cfg)
if err != nil {
return err
}
resolveDynamicPlaceholder(cfg, r.replace, map[string]string{
"hwdevice": "0",
}, map[string]string{
"timestamp": time.Now().UTC().Format(time.RFC3339),
})
_, err = validateConfig(cfg, r.fs.list, r.ffmpeg)
if err != nil {
return err
}
return nil
}