Adapt cleanup patterns to generalized filesystems

This commit is contained in:
Ingo Oppermann 2022-08-24 16:06:22 +03:00
parent ea98205bd6
commit 6751346566
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
6 changed files with 69 additions and 49 deletions

View File

@ -541,8 +541,6 @@ func (a *api) start() error {
Name: cfg.Name,
Store: store,
Filesystems: filesystems,
DiskFS: a.diskfs,
MemFS: a.memfs,
Replace: a.replacer,
FFmpeg: a.ffmpeg,
MaxProcesses: cfg.FFmpeg.MaxProcesses,

View File

@ -140,7 +140,10 @@ func NewDiskFilesystem(config DiskConfig) (Filesystem, error) {
fs.logger = log.New("")
}
fs.logger = fs.logger.WithField("type", "disk")
fs.logger = fs.logger.WithFields(log.Fields{
"name": fs.name,
"type": "disk",
})
if err := fs.Rebase(config.Dir); err != nil {
return nil, err

View File

@ -162,6 +162,7 @@ func NewMemFilesystem(config MemConfig) Filesystem {
}
fs.logger.WithFields(log.Fields{
"name": fs.name,
"size_bytes": fs.maxSize,
"purge": fs.purge,
}).Debug().Log("Created")

View File

@ -70,6 +70,7 @@ func NewS3Filesystem(config S3Config) (Filesystem, error) {
}
fs.logger = fs.logger.WithFields(log.Fields{
"name": fs.name,
"type": "s3",
"bucket": fs.bucket,
"region": fs.region,

View File

@ -62,6 +62,11 @@ func New(config Config) Filesystem {
fs.logger = log.New("")
}
fs.logger = fs.logger.WithFields(log.Fields{
"name": config.FS.Name(),
"type": config.FS.Type(),
})
fs.cleanupPatterns = make(map[string][]Pattern)
// already drain the stop

View File

@ -60,8 +60,6 @@ type Config struct {
Name string
Store store.Store
Filesystems []fs.Filesystem
DiskFS fs.Filesystem
MemFS fs.Filesystem
Replace replace.Replacer
FFmpeg ffmpeg.FFmpeg
MaxProcesses int64
@ -92,8 +90,8 @@ type restream struct {
maxProc int64
nProc int64
fs struct {
list []rfs.Filesystem
diskfs rfs.Filesystem
memfs rfs.Filesystem
stopObserver context.CancelFunc
}
replace replace.Replacer
@ -126,26 +124,18 @@ func New(config Config) (Restreamer, error) {
r.store = store.NewDummyStore(store.DummyConfig{})
}
if config.DiskFS != nil {
r.fs.diskfs = rfs.New(rfs.Config{
FS: config.DiskFS,
Logger: r.logger.WithComponent("DiskFS"),
for _, fs := range config.Filesystems {
fs := rfs.New(rfs.Config{
FS: fs,
Logger: r.logger.WithComponent("Cleanup"),
})
} else {
r.fs.diskfs = rfs.New(rfs.Config{
FS: fs.NewDummyFilesystem(),
})
}
if config.MemFS != nil {
r.fs.memfs = rfs.New(rfs.Config{
FS: config.MemFS,
Logger: r.logger.WithComponent("MemFS"),
})
} else {
r.fs.memfs = rfs.New(rfs.Config{
FS: fs.NewDummyFilesystem(),
})
r.fs.list = append(r.fs.list, fs)
// TODO: This is just to make the input and output address validator happy for now. This needs to be replaced with a more general approach.
if fs.Type() == "diskfs" {
r.fs.diskfs = fs
}
}
if r.replace == nil {
@ -184,12 +174,16 @@ func (r *restream) Start() {
r.setCleanup(id, t.config)
}
r.fs.diskfs.Start()
r.fs.memfs.Start()
ctx, cancel := context.WithCancel(context.Background())
r.fs.stopObserver = cancel
go r.observe(ctx, r.fs.diskfs, 10*time.Second)
for _, fs := range r.fs.list {
fs.Start()
if fs.Type() == "diskfs" {
go r.observe(ctx, fs, 10*time.Second)
}
}
r.stopOnce = sync.Once{}
})
@ -213,8 +207,10 @@ func (r *restream) Stop() {
r.fs.stopObserver()
r.fs.diskfs.Stop()
r.fs.memfs.Stop()
// Stop the cleanup jobs
for _, fs := range r.fs.list {
fs.Stop()
}
r.startOnce = sync.Once{}
})
@ -467,34 +463,50 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
}
func (r *restream) setCleanup(id string, config *app.Config) {
rePrefix := regexp.MustCompile(`^([a-z]+):`)
for _, output := range config.Output {
for _, c := range output.Cleanup {
if strings.HasPrefix(c.Pattern, "memfs:") {
r.fs.memfs.SetCleanup(id, []rfs.Pattern{
{
Pattern: strings.TrimPrefix(c.Pattern, "memfs:"),
MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
PurgeOnDelete: c.PurgeOnDelete,
},
})
} else if strings.HasPrefix(c.Pattern, "diskfs:") {
r.fs.memfs.SetCleanup(id, []rfs.Pattern{
{
Pattern: strings.TrimPrefix(c.Pattern, "diskfs:"),
MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
PurgeOnDelete: c.PurgeOnDelete,
},
matches := rePrefix.FindStringSubmatch(c.Pattern)
if matches == nil {
continue
}
name := matches[1]
// Support legacy names
if name == "diskfs" {
name = "disk"
} else if name == "memfs" {
name = "mem"
}
for _, fs := range r.fs.list {
if fs.Name() != name {
continue
}
pattern := rfs.Pattern{
Pattern: rePrefix.ReplaceAllString(c.Pattern, ""),
MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
PurgeOnDelete: c.PurgeOnDelete,
}
fs.SetCleanup(id, []rfs.Pattern{
pattern,
})
break
}
}
}
}
func (r *restream) unsetCleanup(id string) {
r.fs.diskfs.UnsetCleanup(id)
r.fs.memfs.UnsetCleanup(id)
for _, fs := range r.fs.list {
fs.UnsetCleanup(id)
}
}
func (r *restream) setPlayoutPorts(t *task) error {