diff --git a/CHANGELOG.md b/CHANGELOG.md index d308a9da..072667f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ #### Core v16.8.0 > ? +- Add new placeholders and parameters for placeholder - Allow RTMP server if RTMPS server is enabled - Add optional escape character to process placeholder - Fix output address validation for tee outputs diff --git a/README.md b/README.md index c22593ee..eb7b1d02 100644 --- a/README.md +++ b/README.md @@ -644,14 +644,16 @@ A command is defined as: Currently supported placeholders are: -| Placeholder | Description | Location | -| ------------- | --------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------- | -| `{diskfs}` | Will be replaced by the provided `CORE_STORAGE_DISK_DIR`. | `options`, `input.address`, `input.options`, `output.address`, `output.options` | -| `{memfs}` | Will be replace by the base URL of the MemFS. | `input.address`, `input.options`, `output.address`, `output.options` | -| `{processid}` | Will be replaced by the ID of the process. | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` | -| `{reference}` | Will be replaced by the reference of the process | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` | -| `{inputid}` | Will be replaced by the ID of the input. | `input.address`, `input.options` | -| `{outputid}` | Will be replaced by the ID of the output. | `output.address`, `output.options`, `output.cleanup.pattern` | +| Placeholder | Description | Location | +| ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------- | +| `{diskfs}` | Will be replaced by the provided `CORE_STORAGE_DISK_DIR`. | `options`, `input.address`, `input.options`, `output.address`, `output.options` | +| `{memfs}` | Will be replace by the base URL of the MemFS. | `input.address`, `input.options`, `output.address`, `output.options` | +| `{processid}` | Will be replaced by the ID of the process. | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` | +| `{reference}` | Will be replaced by the reference of the process | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` | +| `{inputid}` | Will be replaced by the ID of the input. | `input.address`, `input.options` | +| `{outputid}` | Will be replaced by the ID of the output. | `output.address`, `output.options`, `output.cleanup.pattern` | +| `{rtmp}` | Will be replaced by the internal address of the RTMP server. Requires parameter `name` (name of the stream). | `input.address`, `output.address` | +| `{srt}` | Will be replaced by the internal address of the SRT server. Requires parameter `name` (name of the stream) and `mode` (either `publish` or `request`). | `input.address`, `output.address` | Before replacing the placeholders in the process config, all references (see below) will be resolved. @@ -659,6 +661,8 @@ If the value that gets filled in on the place of the placeholder needs escaping, E.g. escape all `:` in the value (`http://example.com:8080`) for `{memfs}` placeholder, write `{memfs^:}`. It will then be replaced by `http\://example.com\:8080`. The escape character is always `\`. In case there are `\` in the value, they will also get escaped. If the placeholder doesn't imply escaping, the value will be uses as-is. +Add parameters to a placeholder by appending a comma separated list of key/values, e.g. `{placeholder,key1=value1,key2=value2}`. This can be combined with escaping. + ### References The input address of a process may contain a reference to the output of another process. It has the form `#[processid]:output=[id]`. diff --git a/app/api/api.go b/app/api/api.go index 255c700a..6c5c0764 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -29,6 +29,7 @@ import ( "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/prometheus" "github.com/datarhei/core/v16/restream" + "github.com/datarhei/core/v16/restream/replace" "github.com/datarhei/core/v16/restream/store" "github.com/datarhei/core/v16/rtmp" "github.com/datarhei/core/v16/service" @@ -75,6 +76,7 @@ type api struct { sidecarserver *gohttp.Server httpjwt jwt.JWT update update.Checker + replacer replace.Replacer errorChan chan error @@ -439,12 +441,50 @@ func (a *api) start() error { a.ffmpeg = ffmpeg + a.replacer = replace.New() + + { + a.replacer.RegisterTemplate("diskfs", a.diskfs.Base()) + a.replacer.RegisterTemplate("memfs", a.memfs.Base()) + + if cfg.RTMP.Enable { + host, port, _ := gonet.SplitHostPort(cfg.RTMP.Address) + if len(host) == 0 { + host = "localhost" + } + + template := "rtmp://" + host + ":" + port + cfg.RTMP.App + "/{name}" + if len(cfg.RTMP.Token) != 0 { + template += "?token=" + cfg.RTMP.Token + } + + a.replacer.RegisterTemplate("rtmp", template) + } + + if cfg.SRT.Enable { + host, port, _ = gonet.SplitHostPort(cfg.SRT.Address) + if len(host) == 0 { + host = "localhost" + } + + template := "srt://" + host + ":" + port + "?mode=caller&transtype=live&streamid=#!:m={mode},r={name}" + if len(cfg.SRT.Token) != 0 { + template += ",token=" + cfg.SRT.Token + } + if len(cfg.SRT.Passphrase) != 0 { + template += "&passphrase=" + cfg.SRT.Passphrase + } + a.replacer.RegisterTemplate("srt", template) + } + } + restream, err := restream.New(restream.Config{ ID: cfg.ID, Name: cfg.Name, Store: store, DiskFS: a.diskfs, MemFS: a.memfs, + Replace: a.replacer, FFmpeg: a.ffmpeg, MaxProcesses: cfg.FFmpeg.MaxProcesses, Logger: a.log.logger.core.WithComponent("Process"), @@ -940,6 +980,8 @@ func (a *api) start() error { } else { err = nil } + + sendError(err) }() } } diff --git a/config/config.go b/config/config.go index c36ad943..9d8e8bd3 100644 --- a/config/config.go +++ b/config/config.go @@ -351,7 +351,7 @@ func (d *Config) init() { d.val(newBoolValue(&d.RTMP.EnableTLS, false), "rtmp.enable_tls", "CORE_RTMP_ENABLE_TLS", nil, "Enable RTMPS server instead of RTMP", false, false) d.val(newAddressValue(&d.RTMP.Address, ":1935"), "rtmp.address", "CORE_RTMP_ADDRESS", nil, "RTMP server listen address", false, false) d.val(newAddressValue(&d.RTMP.AddressTLS, ":1936"), "rtmp.address_tls", "CORE_RTMP_ADDRESS_TLS", nil, "RTMPS server listen address", false, false) - d.val(newStringValue(&d.RTMP.App, "/"), "rtmp.app", "CORE_RTMP_APP", nil, "RTMP app for publishing", false, false) + d.val(newAbsolutePathValue(&d.RTMP.App, "/"), "rtmp.app", "CORE_RTMP_APP", nil, "RTMP app for publishing", false, false) d.val(newStringValue(&d.RTMP.Token, ""), "rtmp.token", "CORE_RTMP_TOKEN", nil, "RTMP token for publishing and playing", false, true) // SRT diff --git a/config/types.go b/config/types.go index 767fb847..3b5532ec 100644 --- a/config/types.go +++ b/config/types.go @@ -8,6 +8,7 @@ import ( "net/url" "os" "os/exec" + "path/filepath" "regexp" "strconv" "strings" @@ -772,3 +773,35 @@ func (u *urlValue) Validate() error { func (u *urlValue) IsEmpty() bool { return len(string(*u)) == 0 } + +// absolute path + +type absolutePathValue string + +func newAbsolutePathValue(p *string, val string) *absolutePathValue { + *p = filepath.Clean(val) + return (*absolutePathValue)(p) +} + +func (s *absolutePathValue) Set(val string) error { + *s = absolutePathValue(filepath.Clean(val)) + return nil +} + +func (s *absolutePathValue) String() string { + return string(*s) +} + +func (s *absolutePathValue) Validate() error { + path := string(*s) + + if !filepath.IsAbs(path) { + return fmt.Errorf("%s is not an absolute path", path) + } + + return nil +} + +func (s *absolutePathValue) IsEmpty() bool { + return len(string(*s)) == 0 +} diff --git a/restream/app/process.go b/restream/app/process.go index cf7769a8..b8fb75d8 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -1,10 +1,8 @@ package app import ( - "regexp" - "strings" - "github.com/datarhei/core/v16/process" + "github.com/datarhei/core/v16/restream/replace" ) type ConfigIOCleanup struct { @@ -80,35 +78,12 @@ func (config *Config) Clone() *Config { return clone } -func replace(what, placeholder, value string) string { - re, err := regexp.Compile(`{` + regexp.QuoteMeta(placeholder) + `(\^(.))?}`) - if err != nil { - return what - } - - what = re.ReplaceAllStringFunc(what, func(match string) string { - matches := re.FindStringSubmatch(match) - v := value - - if matches[2] != "" { - if matches[2] != `\` { - v = strings.ReplaceAll(v, `\`, `\\`) - } - v = strings.ReplaceAll(v, matches[2], `\\`+matches[2]) - } - - return strings.Replace(match, match, v, 1) - }) - - return what -} - // ReplacePlaceholders replaces all placeholders in the config. The config // will be modified in place. -func (config *Config) ResolvePlaceholders(basediskfs, basememfs string) { +func (config *Config) ResolvePlaceholders(r replace.Replacer) { for i, option := range config.Options { // Replace any known placeholders - option = replace(option, "diskfs", basediskfs) + option = r.Replace(option, "diskfs", "") config.Options[i] = option } @@ -116,21 +91,23 @@ func (config *Config) ResolvePlaceholders(basediskfs, basememfs string) { // Resolving the given inputs for i, input := range config.Input { // Replace any known placeholders - input.ID = replace(input.ID, "processid", config.ID) - input.ID = replace(input.ID, "reference", config.Reference) - input.Address = replace(input.Address, "inputid", input.ID) - input.Address = replace(input.Address, "processid", config.ID) - input.Address = replace(input.Address, "reference", config.Reference) - input.Address = replace(input.Address, "diskfs", basediskfs) - input.Address = replace(input.Address, "memfs", basememfs) + input.ID = r.Replace(input.ID, "processid", config.ID) + input.ID = r.Replace(input.ID, "reference", config.Reference) + input.Address = r.Replace(input.Address, "inputid", input.ID) + input.Address = r.Replace(input.Address, "processid", config.ID) + input.Address = r.Replace(input.Address, "reference", config.Reference) + input.Address = r.Replace(input.Address, "diskfs", "") + input.Address = r.Replace(input.Address, "memfs", "") + input.Address = r.Replace(input.Address, "rtmp", "") + input.Address = r.Replace(input.Address, "srt", "") for j, option := range input.Options { // Replace any known placeholders - option = replace(option, "inputid", input.ID) - option = replace(option, "processid", config.ID) - option = replace(option, "reference", config.Reference) - option = replace(option, "diskfs", basediskfs) - option = replace(option, "memfs", basememfs) + option = r.Replace(option, "inputid", input.ID) + option = r.Replace(option, "processid", config.ID) + option = r.Replace(option, "reference", config.Reference) + option = r.Replace(option, "diskfs", "") + option = r.Replace(option, "memfs", "") input.Options[j] = option } @@ -141,29 +118,31 @@ func (config *Config) ResolvePlaceholders(basediskfs, basememfs string) { // Resolving the given outputs for i, output := range config.Output { // Replace any known placeholders - output.ID = replace(output.ID, "processid", config.ID) - output.Address = replace(output.Address, "outputid", output.ID) - output.Address = replace(output.Address, "processid", config.ID) - output.Address = replace(output.Address, "reference", config.Reference) - output.Address = replace(output.Address, "diskfs", basediskfs) - output.Address = replace(output.Address, "memfs", basememfs) + output.ID = r.Replace(output.ID, "processid", config.ID) + output.Address = r.Replace(output.Address, "outputid", output.ID) + output.Address = r.Replace(output.Address, "processid", config.ID) + output.Address = r.Replace(output.Address, "reference", config.Reference) + output.Address = r.Replace(output.Address, "diskfs", "") + output.Address = r.Replace(output.Address, "memfs", "") + output.Address = r.Replace(output.Address, "rtmp", "") + output.Address = r.Replace(output.Address, "srt", "") for j, option := range output.Options { // Replace any known placeholders - option = replace(option, "outputid", output.ID) - option = replace(option, "processid", config.ID) - option = replace(option, "reference", config.Reference) - option = replace(option, "diskfs", basediskfs) - option = replace(option, "memfs", basememfs) + option = r.Replace(option, "outputid", output.ID) + option = r.Replace(option, "processid", config.ID) + option = r.Replace(option, "reference", config.Reference) + option = r.Replace(option, "diskfs", "") + option = r.Replace(option, "memfs", "") output.Options[j] = option } for j, cleanup := range output.Cleanup { // Replace any known placeholders - cleanup.Pattern = replace(cleanup.Pattern, "outputid", output.ID) - cleanup.Pattern = replace(cleanup.Pattern, "processid", config.ID) - cleanup.Pattern = replace(cleanup.Pattern, "reference", config.Reference) + cleanup.Pattern = r.Replace(cleanup.Pattern, "outputid", output.ID) + cleanup.Pattern = r.Replace(cleanup.Pattern, "processid", config.ID) + cleanup.Pattern = r.Replace(cleanup.Pattern, "reference", config.Reference) output.Cleanup[j] = cleanup } diff --git a/restream/app/process_test.go b/restream/app/process_test.go index b7f296da..ad933a79 100644 --- a/restream/app/process_test.go +++ b/restream/app/process_test.go @@ -6,26 +6,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestReplace(t *testing.T) { - foobar := `;:.,-_$\£!^` - - samples := [][2]string{ - {"{foobar}", foobar}, - {"{foobar^:}", `;\:.,-_$\\£!^`}, - {"{foobar^:}barfoo{foobar^:}", `;\:.,-_$\\£!^barfoo;\:.,-_$\\£!^`}, - {"{foobar^:.}", "{foobar^:.}"}, - {"{foobar^}", "{foobar^}"}, - {"{barfoo^:}", "{barfoo^:}"}, - {"{foobar^^}", `;:.,-_$\\£!\^`}, - {`{foobar^\}`, `;:.,-_$\\£!^`}, - } - - for _, e := range samples { - replaced := replace(e[0], "foobar", foobar) - require.Equal(t, e[1], replaced, e[0]) - } -} - func TestCreateCommand(t *testing.T) { config := &Config{ Options: []string{"-global", "global"}, diff --git a/restream/replace/replace.go b/restream/replace/replace.go new file mode 100644 index 00000000..47885a38 --- /dev/null +++ b/restream/replace/replace.go @@ -0,0 +1,138 @@ +package replace + +import ( + "net/url" + "regexp" + "strings" +) + +type Replacer interface { + // RegisterTemplate registers a template for a specific placeholder. Template + // may contain placeholders as well of the form {name}. They will be replaced + // by the parameters of the placeholder (see Replace). + RegisterTemplate(placeholder, template string) + + // RegisterTemplateFunc does the same as RegisterTemplate, but the template + // is returned by the template function. + RegisterTemplateFunc(placeholder string, template func() string) + + // Replace replaces all occurences of placeholder in str with value. The placeholder is of the + // form {placeholder}. It is possible to escape a characters in value with \\ by appending a ^ + // and the character to escape to the placeholder name, e.g. {placeholder^:} to escape ":". + // A placeholder may also have parameters of the form {placeholder,key1=value1,key2=value2}. + // If the value has placeholders itself (see RegisterTemplate), they will be replaced by + // the value of the corresponding key in the parameters. + // If the value is an empty string, the registered templates will be searched for that + // placeholder. If no template is found, the placeholder will be replaced by the empty string. + // A placeholder name may consist on of the letters a-z. + Replace(str, placeholder, value string) string +} + +type replacer struct { + templates map[string]func() string + + re *regexp.Regexp + templateRe *regexp.Regexp +} + +// New returns a Replacer +func New() Replacer { + r := &replacer{ + templates: make(map[string]func() string), + re: regexp.MustCompile(`{([a-z]+)(?:\^(.))?(?:,(.*?))?}`), + templateRe: regexp.MustCompile(`{([a-z]+)}`), + } + + return r +} + +func (r *replacer) RegisterTemplate(placeholder, template string) { + r.templates[placeholder] = func() string { return template } +} + +func (r *replacer) RegisterTemplateFunc(placeholder string, template func() string) { + r.templates[placeholder] = template +} + +func (r *replacer) Replace(str, placeholder, value string) string { + str = r.re.ReplaceAllStringFunc(str, func(match string) string { + matches := r.re.FindStringSubmatch(match) + if matches[1] != placeholder { + return match + } + + // We need a copy from the value + v := value + + // Check for a registered template + if len(v) == 0 { + tmplFunc, ok := r.templates[placeholder] + if ok { + v = tmplFunc() + } + } + + v = r.compileTemplate(v, matches[3]) + + if len(matches[2]) != 0 { + // If there's a character to escape, we also have to escape the + // escape character, but only if it is different from the character + // to escape. + if matches[2] != "\\" { + v = strings.ReplaceAll(v, "\\", "\\\\\\") + } + v = strings.ReplaceAll(v, matches[2], "\\\\"+matches[2]) + } + + return strings.Replace(match, match, v, 1) + }) + + return str +} + +// compileTemplate fills in the placeholder in the template with the values from the params +// string. The placeholders in the template are delimited by {} and their name may only +// contain the letters a-z. The params string is a comma-separated string of key=value pairs. +// Example: the template is "Hello {who}!", the params string is "who=World". The key is the +// placeholder name and will be replaced with the value. The resulting string is "Hello World!". +// If a placeholder name is not present in the params string, it will not be replaced. The key +// and values can be escaped as in net/url.QueryEscape. +func (r *replacer) compileTemplate(str, params string) string { + if len(params) == 0 { + return str + } + + p := make(map[string]string) + + // taken from net/url.ParseQuery + for params != "" { + var key string + key, params, _ = strings.Cut(params, ",") + if key == "" { + continue + } + key, value, _ := strings.Cut(key, "=") + key, err := url.QueryUnescape(key) + if err != nil { + continue + } + value, err = url.QueryUnescape(value) + if err != nil { + continue + } + p[key] = value + } + + str = r.templateRe.ReplaceAllStringFunc(str, func(match string) string { + matches := r.templateRe.FindStringSubmatch(match) + + value, ok := p[matches[1]] + if !ok { + return match + } + + return strings.Replace(match, matches[0], value, 1) + }) + + return str +} diff --git a/restream/replace/replace_test.go b/restream/replace/replace_test.go new file mode 100644 index 00000000..7474775d --- /dev/null +++ b/restream/replace/replace_test.go @@ -0,0 +1,64 @@ +package replace + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestReplace(t *testing.T) { + foobar := ";:.,-_$\\£!^" + + samples := [][2]string{ + {"{foobar}", foobar}, + {"{foobar^:}", ";\\\\:.,-_$\\\\\\£!^"}, + {"{foobar^:}barfoo{foobar^:}", ";\\\\:.,-_$\\\\\\£!^barfoo;\\\\:.,-_$\\\\\\£!^"}, + {"{foobar^:.}", "{foobar^:.}"}, + {"{foobar^}", "{foobar^}"}, + {"{barfoo^:}", "{barfoo^:}"}, + {"{foobar^^}", ";:.,-_$\\\\\\£!\\\\^"}, + {`{foobar^\}`, ";:.,-_$\\\\\\£!^"}, + {`{barfoo}`, "{barfoo}"}, + } + + r := New() + + for _, e := range samples { + replaced := r.Replace(e[0], "foobar", foobar) + require.Equal(t, e[1], replaced, e[0]) + } + + replaced := r.Replace("{foobar}", "foobar", "") + require.Equal(t, "", replaced) +} + +func TestReplaceTemplate(t *testing.T) { + r := New() + r.RegisterTemplate("foobar", "Hello {who}! {what}?") + + replaced := r.Replace("{foobar,who=World}", "foobar", "") + require.Equal(t, "Hello World! {what}?", replaced) + + replaced = r.Replace("{foobar,who=World,what=E%3dmc^2}", "foobar", "") + require.Equal(t, "Hello World! E=mc^2?", replaced) + + replaced = r.Replace("{foobar^:,who=World,what=E%3dmc:2}", "foobar", "") + require.Equal(t, "Hello World! E=mc\\\\:2?", replaced) +} + +func TestReplaceCompileTemplate(t *testing.T) { + samples := [][3]string{ + {"Hello {who}!", "who=World", "Hello World!"}, + {"Hello {who}! {what}?", "who=World", "Hello World! {what}?"}, + {"Hello {who}! {what}?", "who=World,what=Yeah", "Hello World! Yeah?"}, + {"Hello {who}! {what}?", "who=World,what=", "Hello World! ?"}, + {"Hello {who}!", "who=E%3dmc^2", "Hello E=mc^2!"}, + } + + r := New().(*replacer) + + for _, e := range samples { + replaced := r.compileTemplate(e[0], e[1]) + require.Equal(t, e[2], replaced, e[0]) + } +} diff --git a/restream/restream.go b/restream/restream.go index bcd89314..c1a9426b 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -21,6 +21,7 @@ import ( "github.com/datarhei/core/v16/process" "github.com/datarhei/core/v16/restream/app" rfs "github.com/datarhei/core/v16/restream/fs" + "github.com/datarhei/core/v16/restream/replace" "github.com/datarhei/core/v16/restream/store" ) @@ -59,6 +60,7 @@ type Config struct { Store store.Store DiskFS fs.Filesystem MemFS fs.Filesystem + Replace replace.Replacer FFmpeg ffmpeg.FFmpeg MaxProcesses int64 Logger log.Logger @@ -92,6 +94,7 @@ type restream struct { memfs rfs.Filesystem stopObserver context.CancelFunc } + replace replace.Replacer tasks map[string]*task logger log.Logger metadata map[string]interface{} @@ -109,6 +112,7 @@ func New(config Config) (Restreamer, error) { name: config.Name, createdAt: time.Now(), store: config.Store, + replace: config.Replace, logger: config.Logger, } @@ -142,6 +146,10 @@ func New(config Config) (Restreamer, error) { }) } + if r.replace == nil { + r.replace = replace.New() + } + r.ffmpeg = config.FFmpeg if r.ffmpeg == nil { return nil, fmt.Errorf("ffmpeg must be provided") @@ -268,7 +276,7 @@ func (r *restream) load() error { } // Replace all placeholders in the config - t.config.ResolvePlaceholders(r.fs.diskfs.Base(), r.fs.memfs.Base()) + t.config.ResolvePlaceholders(r.replace) tasks[id] = t } @@ -418,7 +426,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) { logger: r.logger.WithField("id", process.ID), } - t.config.ResolvePlaceholders(r.fs.diskfs.Base(), r.fs.memfs.Base()) + t.config.ResolvePlaceholders(r.replace) err := r.resolveAddresses(r.tasks, t.config) if err != nil { @@ -983,7 +991,7 @@ func (r *restream) reloadProcess(id string) error { t.config = t.process.Config.Clone() - t.config.ResolvePlaceholders(r.fs.diskfs.Base(), r.fs.memfs.Base()) + t.config.ResolvePlaceholders(r.replace) err := r.resolveAddresses(r.tasks, t.config) if err != nil {