diff --git a/app/api/api.go b/app/api/api.go index 6394eebb..fdcd5a14 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -541,8 +541,6 @@ func (a *api) start() error { Name: cfg.Name, Store: store, Filesystems: filesystems, - DiskFS: a.diskfs, - MemFS: a.memfs, Replace: a.replacer, FFmpeg: a.ffmpeg, MaxProcesses: cfg.FFmpeg.MaxProcesses, diff --git a/io/fs/disk.go b/io/fs/disk.go index 9cdf3fc2..8ae29977 100644 --- a/io/fs/disk.go +++ b/io/fs/disk.go @@ -140,7 +140,10 @@ func NewDiskFilesystem(config DiskConfig) (Filesystem, error) { fs.logger = log.New("") } - fs.logger = fs.logger.WithField("type", "disk") + fs.logger = fs.logger.WithFields(log.Fields{ + "name": fs.name, + "type": "disk", + }) if err := fs.Rebase(config.Dir); err != nil { return nil, err diff --git a/io/fs/mem.go b/io/fs/mem.go index 745a0adf..1b8ca87e 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -162,6 +162,7 @@ func NewMemFilesystem(config MemConfig) Filesystem { } fs.logger.WithFields(log.Fields{ + "name": fs.name, "size_bytes": fs.maxSize, "purge": fs.purge, }).Debug().Log("Created") diff --git a/io/fs/s3.go b/io/fs/s3.go index cb9b6897..dff8c738 100644 --- a/io/fs/s3.go +++ b/io/fs/s3.go @@ -70,6 +70,7 @@ func NewS3Filesystem(config S3Config) (Filesystem, error) { } fs.logger = fs.logger.WithFields(log.Fields{ + "name": fs.name, "type": "s3", "bucket": fs.bucket, "region": fs.region, diff --git a/restream/fs/fs.go b/restream/fs/fs.go index 4769597f..c470aa22 100644 --- a/restream/fs/fs.go +++ b/restream/fs/fs.go @@ -62,6 +62,11 @@ func New(config Config) Filesystem { fs.logger = log.New("") } + fs.logger = fs.logger.WithFields(log.Fields{ + "name": config.FS.Name(), + "type": config.FS.Type(), + }) + fs.cleanupPatterns = make(map[string][]Pattern) // already drain the stop diff --git a/restream/restream.go b/restream/restream.go index 606960cd..03a37574 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -60,8 +60,6 @@ type Config struct { Name string Store store.Store Filesystems []fs.Filesystem - DiskFS fs.Filesystem - MemFS fs.Filesystem Replace replace.Replacer FFmpeg ffmpeg.FFmpeg MaxProcesses int64 @@ -92,8 +90,8 @@ type restream struct { maxProc int64 nProc int64 fs struct { + list []rfs.Filesystem diskfs rfs.Filesystem - memfs rfs.Filesystem stopObserver context.CancelFunc } replace replace.Replacer @@ -126,26 +124,18 @@ func New(config Config) (Restreamer, error) { r.store = store.NewDummyStore(store.DummyConfig{}) } - if config.DiskFS != nil { - r.fs.diskfs = rfs.New(rfs.Config{ - FS: config.DiskFS, - Logger: r.logger.WithComponent("DiskFS"), + for _, fs := range config.Filesystems { + fs := rfs.New(rfs.Config{ + FS: fs, + Logger: r.logger.WithComponent("Cleanup"), }) - } else { - r.fs.diskfs = rfs.New(rfs.Config{ - FS: fs.NewDummyFilesystem(), - }) - } - if config.MemFS != nil { - r.fs.memfs = rfs.New(rfs.Config{ - FS: config.MemFS, - Logger: r.logger.WithComponent("MemFS"), - }) - } else { - r.fs.memfs = rfs.New(rfs.Config{ - FS: fs.NewDummyFilesystem(), - }) + r.fs.list = append(r.fs.list, fs) + + // TODO: This is just to make the input and output address validator happy for now. This needs to be replaced with a more general approach. + if fs.Type() == "diskfs" { + r.fs.diskfs = fs + } } if r.replace == nil { @@ -184,12 +174,16 @@ func (r *restream) Start() { r.setCleanup(id, t.config) } - r.fs.diskfs.Start() - r.fs.memfs.Start() - ctx, cancel := context.WithCancel(context.Background()) r.fs.stopObserver = cancel - go r.observe(ctx, r.fs.diskfs, 10*time.Second) + + for _, fs := range r.fs.list { + fs.Start() + + if fs.Type() == "diskfs" { + go r.observe(ctx, fs, 10*time.Second) + } + } r.stopOnce = sync.Once{} }) @@ -213,8 +207,10 @@ func (r *restream) Stop() { r.fs.stopObserver() - r.fs.diskfs.Stop() - r.fs.memfs.Stop() + // Stop the cleanup jobs + for _, fs := range r.fs.list { + fs.Stop() + } r.startOnce = sync.Once{} }) @@ -467,34 +463,50 @@ func (r *restream) createTask(config *app.Config) (*task, error) { } func (r *restream) setCleanup(id string, config *app.Config) { + rePrefix := regexp.MustCompile(`^([a-z]+):`) + for _, output := range config.Output { for _, c := range output.Cleanup { - if strings.HasPrefix(c.Pattern, "memfs:") { - r.fs.memfs.SetCleanup(id, []rfs.Pattern{ - { - Pattern: strings.TrimPrefix(c.Pattern, "memfs:"), - MaxFiles: c.MaxFiles, - MaxFileAge: time.Duration(c.MaxFileAge) * time.Second, - PurgeOnDelete: c.PurgeOnDelete, - }, - }) - } else if strings.HasPrefix(c.Pattern, "diskfs:") { - r.fs.memfs.SetCleanup(id, []rfs.Pattern{ - { - Pattern: strings.TrimPrefix(c.Pattern, "diskfs:"), - MaxFiles: c.MaxFiles, - MaxFileAge: time.Duration(c.MaxFileAge) * time.Second, - PurgeOnDelete: c.PurgeOnDelete, - }, + matches := rePrefix.FindStringSubmatch(c.Pattern) + if matches == nil { + continue + } + + name := matches[1] + + // Support legacy names + if name == "diskfs" { + name = "disk" + } else if name == "memfs" { + name = "mem" + } + + for _, fs := range r.fs.list { + if fs.Name() != name { + continue + } + + pattern := rfs.Pattern{ + Pattern: rePrefix.ReplaceAllString(c.Pattern, ""), + MaxFiles: c.MaxFiles, + MaxFileAge: time.Duration(c.MaxFileAge) * time.Second, + PurgeOnDelete: c.PurgeOnDelete, + } + + fs.SetCleanup(id, []rfs.Pattern{ + pattern, }) + + break } } } } func (r *restream) unsetCleanup(id string) { - r.fs.diskfs.UnsetCleanup(id) - r.fs.memfs.UnsetCleanup(id) + for _, fs := range r.fs.list { + fs.UnsetCleanup(id) + } } func (r *restream) setPlayoutPorts(t *task) error {