Create directories for output files to be written on disk

This commit is contained in:
Ingo Oppermann 2023-03-01 11:08:20 +01:00
parent be718eac0a
commit 3cad139952
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
3 changed files with 108 additions and 57 deletions

View File

@ -53,8 +53,9 @@ func DummyRestreamer(pathPrefix string) (restream.Restreamer, error) {
}
rs, err := restream.New(restream.Config{
Store: store,
FFmpeg: ffmpeg,
Store: store,
FFmpeg: ffmpeg,
Filesystems: []fs.Filesystem{memfs},
})
if err != nil {
return nil, err

View File

@ -94,7 +94,6 @@ type restream struct {
nProc int64
fs struct {
list []rfs.Filesystem
diskfs []rfs.Filesystem
stopObserver context.CancelFunc
}
replace replace.Replacer
@ -134,6 +133,10 @@ func New(config Config) (Restreamer, error) {
r.store = s
}
if len(config.Filesystems) == 0 {
return nil, fmt.Errorf("at least one filesystem must be provided")
}
for _, fs := range config.Filesystems {
fs := rfs.New(rfs.Config{
FS: fs,
@ -141,11 +144,6 @@ func New(config Config) (Restreamer, error) {
})
r.fs.list = append(r.fs.list, fs)
// Add the diskfs filesystems also to a separate array. We need it later for input and output validation
if fs.Type() == "disk" {
r.fs.diskfs = append(r.fs.diskfs, fs)
}
}
if r.replace == nil {
@ -341,7 +339,7 @@ func (r *restream) load() error {
config := t.config.Clone()
resolveDynamicPlaceholder(config, r.replace)
t.usesDisk, err = validateConfig(config, r.fs.diskfs, r.ffmpeg)
t.usesDisk, err = validateConfig(config, r.fs.list, r.ffmpeg)
if err != nil {
r.logger.Warn().WithField("id", t.id).WithError(err).Log("Ignoring")
continue
@ -487,7 +485,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
config := t.config.Clone()
resolveDynamicPlaceholder(config, r.replace)
t.usesDisk, err = validateConfig(config, r.fs.diskfs, r.ffmpeg)
t.usesDisk, err = validateConfig(config, r.fs.list, r.ffmpeg)
if err != nil {
return nil, err
}
@ -527,7 +525,7 @@ func (r *restream) onArgs(cfg *app.Config) func([]string) []string {
resolveDynamicPlaceholder(config, r.replace)
_, err := validateConfig(config, r.fs.diskfs, r.ffmpeg)
_, err := validateConfig(config, r.fs.list, r.ffmpeg)
if err != nil {
return []string{}
}
@ -641,6 +639,9 @@ func (r *restream) unsetPlayoutPorts(t *task) {
t.playout = nil
}
// validateConfig verifies a process config, whether the accessed files (read and write) can be accessed
// based on the provided filesystems and the ffmpeg validators. Returns an error if somethingis wrong,
// otherwise nil and whether there is a disk filesystem involved.
func validateConfig(config *app.Config, fss []rfs.Filesystem, ffmpeg ffmpeg.FFmpeg) (bool, error) {
if len(config.Input) == 0 {
return false, fmt.Errorf("at least one input must be defined for the process '%s'", config.ID)
@ -669,24 +670,22 @@ func validateConfig(config *app.Config, fss []rfs.Filesystem, ffmpeg ffmpeg.FFmp
return false, fmt.Errorf("the address for input '#%s:%s' must not be empty", config.ID, io.ID)
}
if len(fss) != 0 {
maxFails := 0
for _, fs := range fss {
io.Address, err = validateInputAddress(io.Address, fs.Metadata("base"), ffmpeg)
if err != nil {
maxFails++
}
maxFails := 0
for _, fs := range fss {
basedir := "/"
if fs.Type() == "disk" {
basedir = fs.Metadata("base")
}
if maxFails == len(fss) {
return false, fmt.Errorf("the address for input '#%s:%s' (%s) is invalid: %w", config.ID, io.ID, io.Address, err)
}
} else {
io.Address, err = validateInputAddress(io.Address, "/", ffmpeg)
io.Address, err = validateInputAddress(io.Address, basedir, ffmpeg)
if err != nil {
return false, fmt.Errorf("the address for input '#%s:%s' (%s) is invalid: %w", config.ID, io.ID, io.Address, err)
maxFails++
}
}
if maxFails == len(fss) {
return false, fmt.Errorf("the address for input '#%s:%s' (%s) is invalid: %w", config.ID, io.ID, io.Address, err)
}
}
if len(config.Output) == 0 {
@ -715,34 +714,33 @@ func validateConfig(config *app.Config, fss []rfs.Filesystem, ffmpeg ffmpeg.FFmp
return false, fmt.Errorf("the address for output '#%s:%s' must not be empty", config.ID, io.ID)
}
if len(fss) != 0 {
maxFails := 0
for _, fs := range fss {
isFile := false
io.Address, isFile, err = validateOutputAddress(io.Address, fs.Metadata("base"), ffmpeg)
if err != nil {
maxFails++
}
if isFile {
hasFiles = true
}
maxFails := 0
for _, fs := range fss {
basedir := "/"
if fs.Type() == "disk" {
basedir = fs.Metadata("base")
}
if maxFails == len(fss) {
return false, fmt.Errorf("the address for output '#%s:%s' is invalid: %w", config.ID, io.ID, err)
}
} else {
isFile := false
io.Address, isFile, err = validateOutputAddress(io.Address, "/", ffmpeg)
io.Address, isFile, err = validateOutputAddress(io.Address, basedir, ffmpeg)
if err != nil {
return false, fmt.Errorf("the address for output '#%s:%s' is invalid: %w", config.ID, io.ID, err)
maxFails++
}
if isFile {
hasFiles = true
if fs.Type() == "disk" {
hasFiles = true
}
dir := filepath.Dir(strings.TrimPrefix(io.Address, "file:"+basedir))
fs.MkdirAll(dir, 0744)
}
}
if maxFails == len(fss) {
return false, fmt.Errorf("the address for output '#%s:%s' is invalid: %w", config.ID, io.ID, err)
}
}
return hasFiles, nil
@ -1189,7 +1187,7 @@ func (r *restream) reloadProcess(id string) error {
config := t.config.Clone()
resolveDynamicPlaceholder(config, r.replace)
t.usesDisk, err = validateConfig(config, r.fs.diskfs, r.ffmpeg)
t.usesDisk, err = validateConfig(config, r.fs.list, r.ffmpeg)
if err != nil {
return err
}

View File

@ -2,13 +2,16 @@ package restream
import (
"fmt"
"os"
"testing"
"time"
"github.com/datarhei/core/v16/ffmpeg"
"github.com/datarhei/core/v16/internal/testhelper"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/restream/app"
rfs "github.com/datarhei/core/v16/restream/fs"
"github.com/datarhei/core/v16/restream/replace"
"github.com/lestrrat-go/strftime"
@ -31,9 +34,15 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp
return nil, err
}
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
if err != nil {
return nil, err
}
rs, err := New(Config{
FFmpeg: ffmpeg,
Replace: replacer,
FFmpeg: ffmpeg,
Replace: replacer,
Filesystems: []fs.Filesystem{memfs},
})
if err != nil {
return nil, err
@ -547,37 +556,80 @@ func TestConfigValidation(t *testing.T) {
config := getDummyProcess()
_, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg)
hasfiles, err := validateConfig(config, rs.fs.list, rs.ffmpeg)
require.NoError(t, err)
require.False(t, hasfiles)
config.Input = []app.ConfigIO{}
_, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg)
hasfiles, err = validateConfig(config, rs.fs.list, rs.ffmpeg)
require.Error(t, err)
require.False(t, hasfiles)
config = getDummyProcess()
config.Input[0].ID = ""
_, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg)
hasfiles, err = validateConfig(config, rs.fs.list, rs.ffmpeg)
require.Error(t, err)
require.False(t, hasfiles)
config = getDummyProcess()
config.Input[0].Address = ""
_, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg)
hasfiles, err = validateConfig(config, rs.fs.list, rs.ffmpeg)
require.Error(t, err)
require.False(t, hasfiles)
config = getDummyProcess()
config.Output = []app.ConfigIO{}
_, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg)
hasfiles, err = validateConfig(config, rs.fs.list, rs.ffmpeg)
require.Error(t, err)
require.False(t, hasfiles)
config = getDummyProcess()
config.Output[0].ID = ""
_, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg)
hasfiles, err = validateConfig(config, rs.fs.list, rs.ffmpeg)
require.Error(t, err)
require.False(t, hasfiles)
config = getDummyProcess()
config.Output[0].Address = ""
_, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg)
hasfiles, err = validateConfig(config, rs.fs.list, rs.ffmpeg)
require.Error(t, err)
require.False(t, hasfiles)
}
func TestConfigValidationWithMkdir(t *testing.T) {
rsi, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
rs := rsi.(*restream)
config := getDummyProcess()
config.Output[0].Address = "/path/to/a/file/image.jpg"
hasfiles, err := validateConfig(config, rs.fs.list, rs.ffmpeg)
require.NoError(t, err)
require.False(t, hasfiles)
info, err := rs.fs.list[0].Stat("/path/to/a/file")
require.NoError(t, err)
require.True(t, info.IsDir())
diskfs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{
Root: "./testing",
})
require.NoError(t, err)
diskrfs := rfs.New(rfs.Config{
FS: diskfs,
})
hasfiles, err = validateConfig(config, []rfs.Filesystem{diskrfs}, rs.ffmpeg)
require.NoError(t, err)
require.True(t, hasfiles)
info, err = diskfs.Stat("/path/to/a/file")
require.NoError(t, err)
require.True(t, info.IsDir())
os.RemoveAll("./testing")
}
func TestConfigValidationFFmpeg(t *testing.T) {
@ -594,21 +646,21 @@ func TestConfigValidationFFmpeg(t *testing.T) {
config := getDummyProcess()
_, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg)
_, err = validateConfig(config, rs.fs.list, rs.ffmpeg)
require.Error(t, err)
config.Input[0].Address = "http://stream.example.com/master.m3u8"
config.Output[0].Address = "http://stream.example.com/master2.m3u8"
_, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg)
_, err = validateConfig(config, rs.fs.list, rs.ffmpeg)
require.NoError(t, err)
config.Output[0].Address = "[f=flv]http://stream.example.com/master2.m3u8"
_, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg)
_, err = validateConfig(config, rs.fs.list, rs.ffmpeg)
require.NoError(t, err)
config.Output[0].Address = "[f=hls]http://stream.example.com/master2.m3u8|[f=flv]rtmp://stream.example.com/stream"
_, err = validateConfig(config, rs.fs.diskfs, rs.ffmpeg)
_, err = validateConfig(config, rs.fs.list, rs.ffmpeg)
require.NoError(t, err)
}