From 46810bf64dfeab562a701daea3e73080734157f8 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 23 Jul 2025 13:51:41 +0200 Subject: [PATCH] Limit s3 filesystem to put, get, and delete, no more listing. reject s3 cleanup rules with wildcards --- glob/glob.go | 5 + glob/glob_test.go | 13 ++ io/fs/fs_test.go | 121 ++++++++++++-- io/fs/s3.go | 379 ++++++++++++++++++------------------------ restream/core.go | 71 +++++++- restream/core_test.go | 4 +- restream/fs/fs.go | 32 +++- 7 files changed, 387 insertions(+), 238 deletions(-) diff --git a/glob/glob.go b/glob/glob.go index ffcb5379..bb5b4461 100644 --- a/glob/glob.go +++ b/glob/glob.go @@ -48,6 +48,11 @@ func Prefix(pattern string) string { return strings.Clone(pattern[:index]) } +func IsPattern(pattern string) bool { + index := strings.IndexAny(pattern, "*[{") + return index != -1 +} + // Match returns whether the name matches the glob pattern, also considering // one or several optionnal separator. An error is only returned if the pattern // is invalid. diff --git a/glob/glob_test.go b/glob/glob_test.go index 9166f1f9..cbe0bc40 100644 --- a/glob/glob_test.go +++ b/glob/glob_test.go @@ -22,3 +22,16 @@ func TestPatterns(t *testing.T) { require.NoError(t, err) require.True(t, ok) } + +func TestPrefix(t *testing.T) { + prefix := Prefix("/a/b/c/d") + require.Equal(t, "/a/b/c/d", prefix) + + prefix = Prefix("/a/b/*/d") + require.Equal(t, "/a/b/", prefix) +} + +func TestIsPattern(t *testing.T) { + require.False(t, IsPattern("/a/b/c/d")) + require.True(t, IsPattern("/a/b/*/d")) +} diff --git a/io/fs/fs_test.go b/io/fs/fs_test.go index 2855e20f..47af52ea 100644 --- a/io/fs/fs_test.go +++ b/io/fs/fs_test.go @@ -171,12 +171,25 @@ func testWriteFile(t *testing.T, fs Filesystem) { cur, max := fs.Size() - require.Equal(t, int64(5), cur) - require.Equal(t, int64(-1), max) + if _, ok := fs.(*s3Filesystem); ok { + require.Equal(t, int64(0), cur) + require.Equal(t, int64(-1), max) + + data, err := fs.ReadFile("/foobar") + require.NoError(t, err) + require.Equal(t, []byte("xxxxx"), data) + } else { + require.Equal(t, int64(5), cur) + require.Equal(t, int64(-1), max) + } cur = fs.Files() - require.Equal(t, int64(1), cur) + if _, ok := fs.(*s3Filesystem); ok { + require.Equal(t, int64(0), cur) + } else { + require.Equal(t, int64(1), cur) + } } func testWriteFileSafe(t *testing.T, fs Filesystem) { @@ -186,6 +199,10 @@ func testWriteFileSafe(t *testing.T, fs Filesystem) { require.Equal(t, int64(5), size) require.Equal(t, true, created) + if _, ok := fs.(*s3Filesystem); ok { + return + } + cur, max := fs.Size() require.Equal(t, int64(5), cur) @@ -207,6 +224,17 @@ func testWriteFileReader(t *testing.T, fs Filesystem) { cur, max := fs.Size() + if _, ok := fs.(*s3Filesystem); ok { + require.Equal(t, int64(0), cur) + require.Equal(t, int64(-1), max) + + data, err := fs.ReadFile("/foobar") + require.NoError(t, err) + require.Equal(t, []byte("xxxxx"), data) + + return + } + require.Equal(t, int64(5), cur) require.Equal(t, int64(-1), max) @@ -266,6 +294,11 @@ func testFiles(t *testing.T, fs Filesystem) { fs.WriteFileReader("/foobar.txt", strings.NewReader("bar"), -1) + if _, ok := fs.(*s3Filesystem); ok { + require.Equal(t, int64(0), fs.Files()) + return + } + require.Equal(t, int64(1), fs.Files()) fs.MkdirAll("/path/to/foo", 0755) @@ -288,12 +321,21 @@ func testReplace(t *testing.T, fs Filesystem) { cur, max := fs.Size() - require.Equal(t, int64(5), cur) - require.Equal(t, int64(-1), max) + if _, ok := fs.(*s3Filesystem); ok { + require.Equal(t, int64(0), cur) + require.Equal(t, int64(-1), max) + } else { + require.Equal(t, int64(5), cur) + require.Equal(t, int64(-1), max) + } cur = fs.Files() - require.Equal(t, int64(1), cur) + if _, ok := fs.(*s3Filesystem); ok { + require.Equal(t, int64(0), cur) + } else { + require.Equal(t, int64(1), cur) + } data = strings.NewReader("yyy") @@ -303,6 +345,13 @@ func testReplace(t *testing.T, fs Filesystem) { require.Equal(t, int64(3), size) require.Equal(t, false, created) + if _, ok := fs.(*s3Filesystem); ok { + data, err := fs.ReadFile("/foobar") + require.NoError(t, err) + require.Equal(t, []byte("yyy"), data) + return + } + cur, max = fs.Size() require.Equal(t, int64(3), cur) @@ -323,6 +372,12 @@ func testList(t *testing.T, fs Filesystem) { cur, max := fs.Size() + if _, ok := fs.(*s3Filesystem); ok { + require.Equal(t, int64(0), cur) + require.Equal(t, int64(-1), max) + return + } + require.Equal(t, int64(17), cur) require.Equal(t, int64(-1), max) @@ -357,7 +412,11 @@ func testListGlob(t *testing.T, fs Filesystem) { cur := fs.Files() - require.Equal(t, int64(4), cur) + if _, ok := fs.(*s3Filesystem); ok { + require.Equal(t, int64(0), cur) + } else { + require.Equal(t, int64(4), cur) + } getNames := func(files []FileInfo) []string { names := []string{} @@ -367,6 +426,13 @@ func testListGlob(t *testing.T, fs Filesystem) { return names } + if _, ok := fs.(*s3Filesystem); ok { + files := getNames(fs.List("/", ListOptions{Pattern: "/foo*"})) + require.Equal(t, 0, len(files)) + require.ElementsMatch(t, []string{}, files) + return + } + files := getNames(fs.List("/", ListOptions{Pattern: "/foo*"})) require.Equal(t, 2, len(files)) require.ElementsMatch(t, []string{"/foobar1", "/foobar4"}, files) @@ -389,6 +455,10 @@ func testListGlob(t *testing.T, fs Filesystem) { } func testListSize(t *testing.T, fs Filesystem) { + if _, ok := fs.(*s3Filesystem); ok { + return + } + fs.WriteFileReader("/a", strings.NewReader("a"), -1) fs.WriteFileReader("/aa", strings.NewReader("aa"), -1) fs.WriteFileReader("/aaa", strings.NewReader("aaa"), -1) @@ -434,6 +504,11 @@ func testListModified(t *testing.T, fs Filesystem) { cur := fs.Files() + if _, ok := fs.(*s3Filesystem); ok { + require.Equal(t, int64(0), cur) + return + } + require.Equal(t, int64(4), cur) getNames := func(files []FileInfo) []string { @@ -483,16 +558,29 @@ func testRemoveAll(t *testing.T, fs Filesystem) { cur := fs.Files() - require.Equal(t, int64(4), cur) + if _, ok := fs.(*s3Filesystem); ok { + require.Equal(t, int64(0), cur) + } else { + require.Equal(t, int64(4), cur) + } _, size := fs.RemoveList("/", ListOptions{ Pattern: "", }) - require.Equal(t, int64(12), size) + + if _, ok := fs.(*s3Filesystem); ok { + require.Equal(t, int64(0), size) + } else { + require.Equal(t, int64(12), size) + } cur = fs.Files() - require.Equal(t, int64(0), cur) + if _, ok := fs.(*s3Filesystem); ok { + require.Equal(t, int64(0), cur) + } else { + require.Equal(t, int64(0), cur) + } } func testRemoveList(t *testing.T, fs Filesystem) { @@ -503,6 +591,11 @@ func testRemoveList(t *testing.T, fs Filesystem) { cur := fs.Files() + if _, ok := fs.(*s3Filesystem); ok { + require.Equal(t, int64(0), cur) + return + } + require.Equal(t, int64(4), cur) _, size := fs.RemoveList("/", ListOptions{ @@ -555,6 +648,10 @@ func testStatDir(t *testing.T, fs Filesystem) { require.NotNil(t, info) require.Equal(t, true, info.IsDir()) + if _, ok := fs.(*s3Filesystem); ok { + return + } + fs.WriteFileReader("/these/are/some/directories/foobar", strings.NewReader("gduwotoxqb"), -1) info, err = fs.Stat("/foobar") @@ -593,6 +690,10 @@ func testStatDir(t *testing.T, fs Filesystem) { } func testMkdirAll(t *testing.T, fs Filesystem) { + if _, ok := fs.(*s3Filesystem); ok { + return + } + info, err := fs.Stat("/foo/bar/dir") require.Error(t, err) require.Nil(t, info) diff --git a/io/fs/s3.go b/io/fs/s3.go index 1d66e3ed..e70b1079 100644 --- a/io/fs/s3.go +++ b/io/fs/s3.go @@ -12,7 +12,6 @@ import ( "sync" "time" - "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/mem" "github.com/minio/minio-go/v7" @@ -139,47 +138,11 @@ func (fs *s3Filesystem) SetMetadata(key, data string) { } func (fs *s3Filesystem) Size() (int64, int64) { - size := int64(0) - - files := fs.List("/", ListOptions{}) - - for _, file := range files { - size += file.Size() - } - - return size, -1 + return 0, -1 } func (fs *s3Filesystem) Files() int64 { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ch := fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{ - WithVersions: false, - WithMetadata: false, - Prefix: "", - Recursive: true, - MaxKeys: 0, - StartAfter: "", - UseV1: false, - }) - - nfiles := int64(0) - - for object := range ch { - if object.Err != nil { - fs.logger.WithError(object.Err).Log("Listing object failed") - } - - if strings.HasSuffix("/"+object.Key, "/"+fakeDirEntry) { - // Skip fake entries (see MkdirAll) - continue - } - - nfiles++ - } - - return nfiles + return 0 } func (fs *s3Filesystem) Symlink(oldname, newname string) error { @@ -403,8 +366,7 @@ func (fs *s3Filesystem) Copy(src, dst string) error { src = fs.cleanPath(src) dst = fs.cleanPath(dst) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := context.Background() _, err := fs.client.CopyObject(ctx, minio.CopyDestOptions{ Bucket: fs.bucket, @@ -444,8 +406,7 @@ func (fs *s3Filesystem) MkdirAll(path string, perm os.FileMode) error { func (fs *s3Filesystem) Remove(path string) int64 { path = fs.cleanPath(path) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := context.Background() stat, err := fs.client.StatObject(ctx, fs.bucket, path, minio.StatObjectOptions{}) if err != nil { @@ -467,191 +428,203 @@ func (fs *s3Filesystem) Remove(path string) int64 { } func (fs *s3Filesystem) RemoveList(path string, options ListOptions) ([]string, int64) { - path = fs.cleanPath(path) + fs.logger.Warn().Log("Removing files with pattern is not supported") + return nil, 0 - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + /* + path = fs.cleanPath(path) - var totalSize int64 = 0 - files := []string{} - recursive := false + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() - var compiledPattern glob.Glob - var err error + var totalSize int64 = 0 + files := []string{} + recursive := false - if len(options.Pattern) != 0 { - compiledPattern, err = glob.Compile(options.Pattern, '/') - if err != nil { - return nil, 0 - } + var compiledPattern glob.Glob + var err error - if strings.Contains(options.Pattern, "**") { - recursive = true - } - } + if len(options.Pattern) != 0 { + compiledPattern, err = glob.Compile(options.Pattern, '/') + if err != nil { + return nil, 0 + } - recursive = true + if strings.Contains(options.Pattern, "**") { + recursive = true + } + } - objectsCh := make(chan minio.ObjectInfo) + recursive = true - // Send object names that are needed to be removed to objectsCh - go func() { - defer close(objectsCh) + objectsCh := make(chan minio.ObjectInfo) - for object := range fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{ - WithVersions: false, - WithMetadata: false, - Prefix: path, - Recursive: recursive, - MaxKeys: 0, - StartAfter: "", - UseV1: false, - }) { - if object.Err != nil { - fs.logger.WithError(object.Err).Log("Listing object failed") - continue - } - key := "/" + object.Key - if strings.HasSuffix(key, "/"+fakeDirEntry) { - // filter out fake directory entries (see MkdirAll) - continue - } + // Send object names that are needed to be removed to objectsCh - if compiledPattern != nil { - if !compiledPattern.Match(key) { - continue - } - } + go func() { + defer close(objectsCh) - if options.ModifiedStart != nil { - if object.LastModified.Before(*options.ModifiedStart) { - continue - } - } + for object := range fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{ + WithVersions: false, + WithMetadata: false, + Prefix: path, + Recursive: recursive, + MaxKeys: 0, + StartAfter: "", + UseV1: false, + }) { + if object.Err != nil { + fs.logger.WithError(object.Err).Log("Listing object failed") + continue + } + key := "/" + object.Key + if strings.HasSuffix(key, "/"+fakeDirEntry) { + // filter out fake directory entries (see MkdirAll) + continue + } - if options.ModifiedEnd != nil { - if object.LastModified.After(*options.ModifiedEnd) { - continue - } - } + if compiledPattern != nil { + if !compiledPattern.Match(key) { + continue + } + } - if options.SizeMin > 0 { - if object.Size < options.SizeMin { - continue - } - } + if options.ModifiedStart != nil { + if object.LastModified.Before(*options.ModifiedStart) { + continue + } + } - if options.SizeMax > 0 { - if object.Size > options.SizeMax { - continue - } - } + if options.ModifiedEnd != nil { + if object.LastModified.After(*options.ModifiedEnd) { + continue + } + } - totalSize += object.Size - objectsCh <- object + if options.SizeMin > 0 { + if object.Size < options.SizeMin { + continue + } + } - files = append(files, key) - } - }() + if options.SizeMax > 0 { + if object.Size > options.SizeMax { + continue + } + } - for err := range fs.client.RemoveObjects(context.Background(), fs.bucket, objectsCh, minio.RemoveObjectsOptions{ - GovernanceBypass: true, - }) { - fs.logger.WithError(err.Err).WithField("key", err.ObjectName).Log("Deleting object failed") - } + totalSize += object.Size + objectsCh <- object - fs.logger.Debug().Log("Deleted all files") + files = append(files, key) + } + }() - return files, totalSize + for err := range fs.client.RemoveObjects(context.Background(), fs.bucket, objectsCh, minio.RemoveObjectsOptions{ + GovernanceBypass: true, + }) { + + fs.logger.WithError(err.Err).WithField("key", err.ObjectName).Log("Deleting object failed") + } + + fs.logger.Debug().Log("Deleted all files") + + return files, totalSize + */ } func (fs *s3Filesystem) List(path string, options ListOptions) []FileInfo { - path = fs.cleanPath(path) + fs.logger.Warn().Log("Listing files is not supported") + return nil - var compiledPattern glob.Glob - var err error - recursive := false + /* + path = fs.cleanPath(path) - if len(options.Pattern) != 0 { - compiledPattern, err = glob.Compile(options.Pattern, '/') - if err != nil { - return nil - } + var compiledPattern glob.Glob + var err error + recursive := false - if strings.Contains(options.Pattern, "**") { - recursive = true - } - } + if len(options.Pattern) != 0 { + compiledPattern, err = glob.Compile(options.Pattern, '/') + if err != nil { + return nil + } - recursive = true + if strings.Contains(options.Pattern, "**") { + recursive = true + } + } - files := []FileInfo{} + recursive = true - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + files := []FileInfo{} - ch := fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{ - WithVersions: false, - WithMetadata: false, - Prefix: path, - Recursive: recursive, - MaxKeys: 0, - StartAfter: "", - UseV1: false, - }) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() - for object := range ch { - if object.Err != nil { - fs.logger.WithError(object.Err).Log("Listing object failed") - continue - } + ch := fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{ + WithVersions: false, + WithMetadata: false, + Prefix: path, + Recursive: recursive, + MaxKeys: 0, + StartAfter: "", + UseV1: false, + }) - key := "/" + object.Key - if strings.HasSuffix(key, "/"+fakeDirEntry) { - // filter out fake directory entries (see MkdirAll) - continue - } + for object := range ch { + if object.Err != nil { + fs.logger.WithError(object.Err).Log("Listing object failed") + continue + } - if compiledPattern != nil { - if !compiledPattern.Match(key) { - continue - } - } + key := "/" + object.Key + if strings.HasSuffix(key, "/"+fakeDirEntry) { + // filter out fake directory entries (see MkdirAll) + continue + } - if options.ModifiedStart != nil { - if object.LastModified.Before(*options.ModifiedStart) { - continue - } - } + if compiledPattern != nil { + if !compiledPattern.Match(key) { + continue + } + } - if options.ModifiedEnd != nil { - if object.LastModified.After(*options.ModifiedEnd) { - continue - } - } + if options.ModifiedStart != nil { + if object.LastModified.Before(*options.ModifiedStart) { + continue + } + } - if options.SizeMin > 0 { - if object.Size < options.SizeMin { - continue - } - } + if options.ModifiedEnd != nil { + if object.LastModified.After(*options.ModifiedEnd) { + continue + } + } - if options.SizeMax > 0 { - if object.Size > options.SizeMax { - continue - } - } + if options.SizeMin > 0 { + if object.Size < options.SizeMin { + continue + } + } - f := &s3FileInfo{ - name: key, - size: object.Size, - lastModified: object.LastModified, - } + if options.SizeMax > 0 { + if object.Size > options.SizeMax { + continue + } + } - files = append(files, f) - } + f := &s3FileInfo{ + name: key, + size: object.Size, + lastModified: object.LastModified, + } - return files + files = append(files, f) + } + + return files + */ } func (fs *s3Filesystem) LookPath(file string) (string, error) { @@ -689,38 +662,18 @@ func (fs *s3Filesystem) LookPath(file string) (string, error) { func (fs *s3Filesystem) isDir(path string) bool { if !strings.HasSuffix(path, "/") { - path = path + "/" + path = path + "/" + fakeDirEntry } - if path == "/" { + if path == "/"+fakeDirEntry { return true } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ch := fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{ - WithVersions: false, - WithMetadata: false, - Prefix: path, - Recursive: true, - MaxKeys: 1, - StartAfter: "", - UseV1: false, - }) - - files := uint64(0) - - for object := range ch { - if object.Err != nil { - fs.logger.WithError(object.Err).Log("Listing object failed") - continue - } - - files++ - } - - return files > 0 + _, err := fs.client.StatObject(ctx, fs.bucket, path, minio.StatObjectOptions{}) + return err == nil } func (fs *s3Filesystem) cleanPath(path string) string { diff --git a/restream/core.go b/restream/core.go index 3943e6ba..eb4327cb 100644 --- a/restream/core.go +++ b/restream/core.go @@ -187,7 +187,8 @@ func (r *restream) Start() { t.Restore() // The filesystem cleanup rules can be set - r.setCleanup(id, t.config) + patterns, _ := r.compileCleanup(t.ResolvedConfig()) + r.setCleanup(id, patterns) return true }) @@ -537,7 +538,12 @@ func (r *restream) AddProcess(config *app.Config) error { } // set filesystem cleanup rules - r.setCleanup(tid, t.config) + patterns, err := r.compileCleanup(t.ResolvedConfig()) + if err != nil { + t.Destroy() + return err + } + r.setCleanup(tid, patterns) err = t.Restore() if err != nil { @@ -703,14 +709,21 @@ func (r *restream) onBeforeStart(cfg *app.Config) func([]string) ([]string, erro } } -func (r *restream) setCleanup(id app.ProcessID, config *app.Config) { +func (r *restream) compileCleanup(config *app.Config) (map[string][]rfs.Pattern, error) { patterns := map[string][]rfs.Pattern{} + var err error = nil + + logger := r.logger.WithFields(log.Fields{ + "id": config.ID, + "domain": config.Domain, + }) for _, output := range config.Output { for _, c := range output.Cleanup { name, path, found := strings.Cut(c.Pattern, ":") if !found { - r.logger.Warn().WithField("pattern", c.Pattern).Log("invalid pattern, no prefix") + logger.Warn().WithField("pattern", c.Pattern).Log("invalid pattern, no prefix") + err = fmt.Errorf("invalid pattern, no prefix: %s", c.Pattern) continue } @@ -726,6 +739,38 @@ func (r *restream) setCleanup(id app.ProcessID, config *app.Config) { name = "mem" } + fstype := "" + + for _, fs := range r.fs.list { + if fs.Name() != name { + continue + } + + fstype = fs.Type() + + break + } + + if len(fstype) == 0 { + logger.Warn().WithField("pattern", c.Pattern).Log("no filesystem with the name '%s' found", name) + err = fmt.Errorf("no filesystem with the name '%s' found: %s", name, c.Pattern) + continue + } + + if fstype == "s3" { + if glob.IsPattern(path) { + logger.Warn().WithField("pattern", c.Pattern).Log("wildcards are not allowed for s3 filesystems") + err = fmt.Errorf("wildcards are not allowed for s3 filesystems: %s", c.Pattern) + continue + } + + if c.MaxFiles != 0 || c.MaxFileAge != 0 { + logger.Warn().WithField("pattern", c.Pattern).Log("cleanup filter rule are not allowed for s3 filesystems") + err = fmt.Errorf("cleanup filter rules are not allowed for s3 filesystems: %s", c.Pattern) + continue + } + } + p := patterns[name] p = append(p, rfs.Pattern{ Pattern: path, @@ -737,6 +782,10 @@ func (r *restream) setCleanup(id app.ProcessID, config *app.Config) { } } + return patterns, err +} + +func (r *restream) setCleanup(id app.ProcessID, patterns map[string][]rfs.Pattern) { for name, p := range patterns { for _, fs := range r.fs.list { if fs.Name() != name { @@ -1191,6 +1240,11 @@ func (r *restream) updateProcess(task *task, config *app.Config, force bool) err return err } + cleanupPatterns, err := r.compileCleanup(t.ResolvedConfig()) + if err != nil { + return err + } + tid := t.ID() if !tid.Equal(task.ID()) { @@ -1227,7 +1281,7 @@ func (r *restream) updateProcess(task *task, config *app.Config, force bool) err r.tasks.LoadAndStore(tid, t) // Set the filesystem cleanup rules - r.setCleanup(tid, t.config) + r.setCleanup(tid, cleanupPatterns) t.Restore() @@ -1426,6 +1480,11 @@ func (r *restream) ReloadProcess(id app.ProcessID) error { } func (r *restream) reloadProcess(task *task) error { + cleanupPatterns, err := r.compileCleanup(task.ResolvedConfig()) + if err != nil { + return err + } + t, err := r.createTask(task.Config()) if err != nil { return err @@ -1457,7 +1516,7 @@ func (r *restream) reloadProcess(task *task) error { r.tasks.LoadAndStore(tid, t) // Set the filesystem cleanup rules - r.setCleanup(tid, t.config) + r.setCleanup(tid, cleanupPatterns) t.Restore() diff --git a/restream/core_test.go b/restream/core_test.go index a1587018..6da4b522 100644 --- a/restream/core_test.go +++ b/restream/core_test.go @@ -1503,7 +1503,7 @@ func TestProcessReplacer(t *testing.T) { }, Cleanup: []app.ConfigIOCleanup{ { - Pattern: "pattern_{outputid}_{processid}_{reference}_{rtmp,name=$outputid}", + Pattern: "foo:pattern_{outputid}_{processid}_{reference}_{rtmp,name=$outputid}", MaxFiles: 0, MaxFileAge: 0, PurgeOnDelete: false, @@ -1574,7 +1574,7 @@ func TestProcessReplacer(t *testing.T) { }, Cleanup: []app.ConfigIOCleanup{ { - Pattern: "pattern_out_314159265359_refref_314159265359_refref_{rtmp,name=$outputid}", + Pattern: "foo:pattern_out_314159265359_refref_314159265359_refref_{rtmp,name=$outputid}", MaxFiles: 0, MaxFileAge: 0, PurgeOnDelete: false, diff --git a/restream/fs/fs.go b/restream/fs/fs.go index 8bb4bd02..c8d37577 100644 --- a/restream/fs/fs.go +++ b/restream/fs/fs.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "sort" + "strings" "sync" "time" @@ -181,6 +182,10 @@ func (rfs *filesystem) UpdateCleanup(id string, newPatterns []Pattern, purge boo } func (rfs *filesystem) cleanup() { + if rfs.Filesystem.Type() == "s3" { + return + } + rfs.cleanupLock.RLock() nPatterns := len(rfs.cleanupPatterns) rfs.cleanupLock.RUnlock() @@ -251,15 +256,28 @@ func (rfs *filesystem) purge(patterns []Pattern) int64 { continue } - files, nfiles := rfs.Filesystem.RemoveList("/", fs.ListOptions{ - Pattern: pattern.Pattern, - }) - - for _, file := range files { - rfs.logger.Debug().WithField("path", file).Log("Purged file") + if len(pattern.Pattern) == 0 { + continue } - nfilesTotal += nfiles + if rfs.Filesystem.Type() == "s3" { + rfs.Filesystem.Remove(pattern.Pattern) + nfilesTotal++ + } else { + prefix := glob.Prefix(pattern.Pattern) + index := strings.LastIndex(prefix, "/") + path := prefix[:index+1] + + files, nfiles := rfs.Filesystem.RemoveList(path, fs.ListOptions{ + Pattern: pattern.Pattern, + }) + + for _, file := range files { + rfs.logger.Debug().WithField("path", file).Log("Purged file") + } + + nfilesTotal += nfiles + } } return nfilesTotal