From 3cad139952b039cbf565f49060dfd84c5e2a3203 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 1 Mar 2023 11:08:20 +0100 Subject: [PATCH] Create directories for output files to be written on disk --- http/mock/mock.go | 5 ++- restream/restream.go | 82 +++++++++++++++++++-------------------- restream/restream_test.go | 78 ++++++++++++++++++++++++++++++------- 3 files changed, 108 insertions(+), 57 deletions(-) diff --git a/http/mock/mock.go b/http/mock/mock.go index 621204a7..4285e5ef 100644 --- a/http/mock/mock.go +++ b/http/mock/mock.go @@ -53,8 +53,9 @@ func DummyRestreamer(pathPrefix string) (restream.Restreamer, error) { } rs, err := restream.New(restream.Config{ - Store: store, - FFmpeg: ffmpeg, + Store: store, + FFmpeg: ffmpeg, + Filesystems: []fs.Filesystem{memfs}, }) if err != nil { return nil, err diff --git a/restream/restream.go b/restream/restream.go index 3c6104e7..299ac5a7 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -94,7 +94,6 @@ type restream struct { nProc int64 fs struct { list []rfs.Filesystem - diskfs []rfs.Filesystem stopObserver context.CancelFunc } replace replace.Replacer @@ -134,6 +133,10 @@ func New(config Config) (Restreamer, error) { r.store = s } + if len(config.Filesystems) == 0 { + return nil, fmt.Errorf("at least one filesystem must be provided") + } + for _, fs := range config.Filesystems { fs := rfs.New(rfs.Config{ FS: fs, @@ -141,11 +144,6 @@ func New(config Config) (Restreamer, error) { }) r.fs.list = append(r.fs.list, fs) - - // Add the diskfs filesystems also to a separate array. We need it later for input and output validation - if fs.Type() == "disk" { - r.fs.diskfs = append(r.fs.diskfs, fs) - } } if r.replace == nil { @@ -341,7 +339,7 @@ func (r *restream) load() error { config := t.config.Clone() resolveDynamicPlaceholder(config, r.replace) - t.usesDisk, err = validateConfig(config, r.fs.diskfs, r.ffmpeg) + t.usesDisk, err = validateConfig(config, r.fs.list, r.ffmpeg) if err != nil { r.logger.Warn().WithField("id", t.id).WithError(err).Log("Ignoring") continue @@ -487,7 +485,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) { config := t.config.Clone() resolveDynamicPlaceholder(config, r.replace) - t.usesDisk, err = validateConfig(config, r.fs.diskfs, r.ffmpeg) + t.usesDisk, err = validateConfig(config, r.fs.list, r.ffmpeg) if err != nil { return nil, err } @@ -527,7 +525,7 @@ func (r *restream) onArgs(cfg *app.Config) func([]string) []string { resolveDynamicPlaceholder(config, r.replace) - _, err := validateConfig(config, r.fs.diskfs, r.ffmpeg) + _, err := validateConfig(config, r.fs.list, r.ffmpeg) if err != nil { return []string{} } @@ -641,6 +639,9 @@ func (r *restream) unsetPlayoutPorts(t *task) { t.playout = nil } +// validateConfig verifies a process config, whether the accessed files (read and write) can be accessed +// based on the provided filesystems and the ffmpeg validators. Returns an error if somethingis wrong, +// otherwise nil and whether there is a disk filesystem involved. func validateConfig(config *app.Config, fss []rfs.Filesystem, ffmpeg ffmpeg.FFmpeg) (bool, error) { if len(config.Input) == 0 { return false, fmt.Errorf("at least one input must be defined for the process '%s'", config.ID) @@ -669,24 +670,22 @@ func validateConfig(config *app.Config, fss []rfs.Filesystem, ffmpeg ffmpeg.FFmp return false, fmt.Errorf("the address for input '#%s:%s' must not be empty", config.ID, io.ID) } - if len(fss) != 0 { - maxFails := 0 - for _, fs := range fss { - io.Address, err = validateInputAddress(io.Address, fs.Metadata("base"), ffmpeg) - if err != nil { - maxFails++ - } + maxFails := 0 + for _, fs := range fss { + basedir := "/" + if fs.Type() == "disk" { + basedir = fs.Metadata("base") } - if maxFails == len(fss) { - return false, fmt.Errorf("the address for input '#%s:%s' (%s) is invalid: %w", config.ID, io.ID, io.Address, err) - } - } else { - io.Address, err = validateInputAddress(io.Address, "/", ffmpeg) + io.Address, err = validateInputAddress(io.Address, basedir, ffmpeg) if err != nil { - return false, fmt.Errorf("the address for input '#%s:%s' (%s) is invalid: %w", config.ID, io.ID, io.Address, err) + maxFails++ } } + + if maxFails == len(fss) { + return false, fmt.Errorf("the address for input '#%s:%s' (%s) is invalid: %w", config.ID, io.ID, io.Address, err) + } } if len(config.Output) == 0 { @@ -715,34 +714,33 @@ func validateConfig(config *app.Config, fss []rfs.Filesystem, ffmpeg ffmpeg.FFmp return false, fmt.Errorf("the address for output '#%s:%s' must not be empty", config.ID, io.ID) } - if len(fss) != 0 { - maxFails := 0 - for _, fs := range fss { - isFile := false - io.Address, isFile, err = validateOutputAddress(io.Address, fs.Metadata("base"), ffmpeg) - if err != nil { - maxFails++ - } - - if isFile { - hasFiles = true - } + maxFails := 0 + for _, fs := range fss { + basedir := "/" + if fs.Type() == "disk" { + basedir = fs.Metadata("base") } - if maxFails == len(fss) { - return false, fmt.Errorf("the address for output '#%s:%s' is invalid: %w", config.ID, io.ID, err) - } - } else { isFile := false - io.Address, isFile, err = validateOutputAddress(io.Address, "/", ffmpeg) + io.Address, isFile, err = validateOutputAddress(io.Address, basedir, ffmpeg) if err != nil { - return false, fmt.Errorf("the address for output '#%s:%s' is invalid: %w", config.ID, io.ID, err) + maxFails++ } if isFile { - hasFiles = true + if fs.Type() == "disk" { + hasFiles = true + } + + dir := filepath.Dir(strings.TrimPrefix(io.Address, "file:"+basedir)) + fs.MkdirAll(dir, 0744) } } + + if maxFails == len(fss) { + return false, fmt.Errorf("the address for output '#%s:%s' is invalid: %w", config.ID, io.ID, err) + } + } return hasFiles, nil @@ -1189,7 +1187,7 @@ func (r *restream) reloadProcess(id string) error { config := t.config.Clone() resolveDynamicPlaceholder(config, r.replace) - t.usesDisk, err = validateConfig(config, r.fs.diskfs, r.ffmpeg) + t.usesDisk, err = validateConfig(config, r.fs.list, r.ffmpeg) if err != nil { return err } diff --git a/restream/restream_test.go b/restream/restream_test.go index 82e11322..79d96e8f 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -2,13 +2,16 @@ package restream import ( "fmt" + "os" "testing" "time" "github.com/datarhei/core/v16/ffmpeg" "github.com/datarhei/core/v16/internal/testhelper" + "github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/restream/app" + rfs "github.com/datarhei/core/v16/restream/fs" "github.com/datarhei/core/v16/restream/replace" "github.com/lestrrat-go/strftime" @@ -31,9 +34,15 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp return nil, err } + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + if err != nil { + return nil, err + } + rs, err := New(Config{ - FFmpeg: ffmpeg, - Replace: replacer, + FFmpeg: ffmpeg, + Replace: replacer, + Filesystems: []fs.Filesystem{memfs}, }) if err != nil { return nil, err @@ -547,37 +556,80 @@ func TestConfigValidation(t *testing.T) { config := getDummyProcess() - _, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg) + hasfiles, err := validateConfig(config, rs.fs.list, rs.ffmpeg) require.NoError(t, err) + require.False(t, hasfiles) config.Input = []app.ConfigIO{} - _, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg) + hasfiles, err = validateConfig(config, rs.fs.list, rs.ffmpeg) require.Error(t, err) + require.False(t, hasfiles) config = getDummyProcess() config.Input[0].ID = "" - _, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg) + hasfiles, err = validateConfig(config, rs.fs.list, rs.ffmpeg) require.Error(t, err) + require.False(t, hasfiles) config = getDummyProcess() config.Input[0].Address = "" - _, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg) + hasfiles, err = validateConfig(config, rs.fs.list, rs.ffmpeg) require.Error(t, err) + require.False(t, hasfiles) config = getDummyProcess() config.Output = []app.ConfigIO{} - _, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg) + hasfiles, err = validateConfig(config, rs.fs.list, rs.ffmpeg) require.Error(t, err) + require.False(t, hasfiles) config = getDummyProcess() config.Output[0].ID = "" - _, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg) + hasfiles, err = validateConfig(config, rs.fs.list, rs.ffmpeg) require.Error(t, err) + require.False(t, hasfiles) config = getDummyProcess() config.Output[0].Address = "" - _, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg) + hasfiles, err = validateConfig(config, rs.fs.list, rs.ffmpeg) require.Error(t, err) + require.False(t, hasfiles) +} + +func TestConfigValidationWithMkdir(t *testing.T) { + rsi, err := getDummyRestreamer(nil, nil, nil, nil) + require.NoError(t, err) + + rs := rsi.(*restream) + + config := getDummyProcess() + config.Output[0].Address = "/path/to/a/file/image.jpg" + hasfiles, err := validateConfig(config, rs.fs.list, rs.ffmpeg) + require.NoError(t, err) + require.False(t, hasfiles) + + info, err := rs.fs.list[0].Stat("/path/to/a/file") + require.NoError(t, err) + require.True(t, info.IsDir()) + + diskfs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{ + Root: "./testing", + }) + require.NoError(t, err) + + diskrfs := rfs.New(rfs.Config{ + FS: diskfs, + }) + + hasfiles, err = validateConfig(config, []rfs.Filesystem{diskrfs}, rs.ffmpeg) + require.NoError(t, err) + require.True(t, hasfiles) + + info, err = diskfs.Stat("/path/to/a/file") + require.NoError(t, err) + require.True(t, info.IsDir()) + + os.RemoveAll("./testing") } func TestConfigValidationFFmpeg(t *testing.T) { @@ -594,21 +646,21 @@ func TestConfigValidationFFmpeg(t *testing.T) { config := getDummyProcess() - _, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg) + _, err = validateConfig(config, rs.fs.list, rs.ffmpeg) require.Error(t, err) config.Input[0].Address = "http://stream.example.com/master.m3u8" config.Output[0].Address = "http://stream.example.com/master2.m3u8" - _, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg) + _, err = validateConfig(config, rs.fs.list, rs.ffmpeg) require.NoError(t, err) config.Output[0].Address = "[f=flv]http://stream.example.com/master2.m3u8" - _, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg) + _, err = validateConfig(config, rs.fs.list, rs.ffmpeg) require.NoError(t, err) config.Output[0].Address = "[f=hls]http://stream.example.com/master2.m3u8|[f=flv]rtmp://stream.example.com/stream" - _, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg) + _, err = validateConfig(config, rs.fs.list, rs.ffmpeg) require.NoError(t, err) }