diff --git a/glob/glob.go b/glob/glob.go index 89b57f00..ea8b5762 100644 --- a/glob/glob.go +++ b/glob/glob.go @@ -1,14 +1,33 @@ package glob -import ( - "github.com/gobwas/glob" -) +import "github.com/gobwas/glob" + +type Glob interface { + Match(name string) bool +} + +type globber struct { + glob glob.Glob +} + +func Compile(pattern string, separators ...rune) (Glob, error) { + g, err := glob.Compile(pattern, separators...) + if err != nil { + return nil, err + } + + return &globber{glob: g}, nil +} + +func (g *globber) Match(name string) bool { + return g.glob.Match(name) +} // Match returns whether the name matches the glob pattern, also considering // one or several optionnal separator. An error is only returned if the pattern // is invalid. func Match(pattern, name string, separators ...rune) (bool, error) { - g, err := glob.Compile(pattern, separators...) + g, err := Compile(pattern, separators...) if err != nil { return false, err } diff --git a/io/fs/disk.go b/io/fs/disk.go index 3ecc5dd5..e714d0ed 100644 --- a/io/fs/disk.go +++ b/io/fs/disk.go @@ -525,6 +525,16 @@ func (fs *diskFilesystem) RemoveList(path string, options ListOptions) ([]string var size int64 = 0 files := []string{} + var compiledPattern glob.Glob + var err error + + if len(options.Pattern) != 0 { + compiledPattern, err = glob.Compile(options.Pattern, '/') + if err != nil { + return nil, 0 + } + } + fs.walk(path, func(path string, info os.FileInfo) { if path == fs.root { return @@ -539,8 +549,8 @@ func (fs *diskFilesystem) RemoveList(path string, options ListOptions) ([]string return } - if len(options.Pattern) != 0 { - if ok, _ := glob.Match(options.Pattern, name, '/'); !ok { + if compiledPattern != nil { + if !compiledPattern.Match(name) { return } } @@ -582,6 +592,16 @@ func (fs *diskFilesystem) List(path string, options ListOptions) []FileInfo { path = fs.cleanPath(path) files := []FileInfo{} + var compiledPattern glob.Glob + var err error + + if len(options.Pattern) != 0 { + compiledPattern, err = glob.Compile(options.Pattern, '/') + if err != nil { + return nil + } + } + fs.walk(path, func(path string, info os.FileInfo) { if path == fs.root { return @@ -596,8 +616,8 @@ func (fs *diskFilesystem) List(path string, options ListOptions) []FileInfo { return } - if len(options.Pattern) != 0 { - if ok, _ := glob.Match(options.Pattern, name, '/'); !ok { + if compiledPattern != nil { + if !compiledPattern.Match(name) { return } } diff --git a/io/fs/fs.go b/io/fs/fs.go index 82ed5117..72f36c53 100644 --- a/io/fs/fs.go +++ b/io/fs/fs.go @@ -120,8 +120,8 @@ type WriteFilesystem interface { // the removed file in bytes. The size is negative if the file doesn't exist. Remove(path string) int64 - // RemoveList removes all files from the filesystem. Returns the size of the - // removed files in bytes. + // RemoveList removes all files from the filesystem. Returns a list of the names of + // the removed file and the total size of all removed files in bytes. RemoveList(path string, options ListOptions) ([]string, int64) } diff --git a/io/fs/mem.go b/io/fs/mem.go index a837ae1d..baaee301 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -663,6 +663,16 @@ func (fs *memFilesystem) remove(path string) int64 { func (fs *memFilesystem) RemoveList(path string, options ListOptions) ([]string, int64) { path = fs.cleanPath(path) + var compiledPattern glob.Glob + var err error + + if len(options.Pattern) != 0 { + compiledPattern, err = glob.Compile(options.Pattern, '/') + if err != nil { + return nil, 0 + } + } + fs.filesLock.Lock() defer fs.filesLock.Unlock() @@ -674,8 +684,8 @@ func (fs *memFilesystem) RemoveList(path string, options ListOptions) ([]string, continue } - if len(options.Pattern) != 0 { - if ok, _ := glob.Match(options.Pattern, file.name, '/'); !ok { + if compiledPattern != nil { + if !compiledPattern.Match(file.name) { continue } } @@ -720,6 +730,16 @@ func (fs *memFilesystem) List(path string, options ListOptions) []FileInfo { path = fs.cleanPath(path) files := []FileInfo{} + var compiledPattern glob.Glob + var err error + + if len(options.Pattern) != 0 { + compiledPattern, err = glob.Compile(options.Pattern, '/') + if err != nil { + return nil + } + } + fs.filesLock.RLock() defer fs.filesLock.RUnlock() @@ -728,8 +748,8 @@ func (fs *memFilesystem) List(path string, options ListOptions) []FileInfo { continue } - if len(options.Pattern) != 0 { - if ok, _ := glob.Match(options.Pattern, file.name, '/'); !ok { + if compiledPattern != nil { + if !compiledPattern.Match(file.name) { continue } } diff --git a/io/fs/mem_test.go b/io/fs/mem_test.go index 3e81f512..c532307e 100644 --- a/io/fs/mem_test.go +++ b/io/fs/mem_test.go @@ -1,8 +1,10 @@ package fs import ( + "fmt" "testing" + "github.com/datarhei/core/v16/math/rand" "github.com/stretchr/testify/require" ) @@ -20,3 +22,41 @@ func TestMemFromDir(t *testing.T) { "/b.txt", }, names) } + +func BenchmarkMemList(b *testing.B) { + mem, err := NewMemFilesystem(MemConfig{}) + require.NoError(b, err) + + for i := 0; i < 1000; i++ { + id := rand.StringAlphanumeric(8) + path := fmt.Sprintf("/%d/%s.dat", i, id) + mem.WriteFile(path, []byte("foobar")) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mem.List("/", ListOptions{ + Pattern: "/5/**", + }) + } +} + +func BenchmarkMemRemoveList(b *testing.B) { + mem, err := NewMemFilesystem(MemConfig{}) + require.NoError(b, err) + + for i := 0; i < 1000; i++ { + id := rand.StringAlphanumeric(8) + path := fmt.Sprintf("/%d/%s.dat", i, id) + mem.WriteFile(path, []byte("foobar")) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mem.RemoveList("/", ListOptions{ + Pattern: "/5/**", + }) + } +} diff --git a/io/fs/s3.go b/io/fs/s3.go index 2707522b..78fc1411 100644 --- a/io/fs/s3.go +++ b/io/fs/s3.go @@ -453,6 +453,16 @@ func (fs *s3Filesystem) RemoveList(path string, options ListOptions) ([]string, var totalSize int64 = 0 files := []string{} + var compiledPattern glob.Glob + var err error + + if len(options.Pattern) != 0 { + compiledPattern, err = glob.Compile(options.Pattern, '/') + if err != nil { + return nil, 0 + } + } + objectsCh := make(chan minio.ObjectInfo) // Send object names that are needed to be removed to objectsCh @@ -478,8 +488,8 @@ func (fs *s3Filesystem) RemoveList(path string, options ListOptions) ([]string, continue } - if len(options.Pattern) != 0 { - if ok, _ := glob.Match(options.Pattern, key, '/'); !ok { + if compiledPattern != nil { + if !compiledPattern.Match(key) { continue } } @@ -529,6 +539,18 @@ func (fs *s3Filesystem) RemoveList(path string, options ListOptions) ([]string, func (fs *s3Filesystem) List(path string, options ListOptions) []FileInfo { path = fs.cleanPath(path) + var compiledPattern glob.Glob + var err error + + if len(options.Pattern) != 0 { + compiledPattern, err = glob.Compile(options.Pattern, '/') + if err != nil { + return nil + } + } + + files := []FileInfo{} + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -542,8 +564,6 @@ func (fs *s3Filesystem) List(path string, options ListOptions) []FileInfo { UseV1: false, }) - files := []FileInfo{} - for object := range ch { if object.Err != nil { fs.logger.WithError(object.Err).Log("Listing object failed") @@ -556,8 +576,8 @@ func (fs *s3Filesystem) List(path string, options ListOptions) []FileInfo { continue } - if len(options.Pattern) != 0 { - if ok, _ := glob.Match(options.Pattern, key, '/'); !ok { + if compiledPattern != nil { + if !compiledPattern.Match(key) { continue } } diff --git a/restream/fs/fs.go b/restream/fs/fs.go index 6ee7285b..43ea5b62 100644 --- a/restream/fs/fs.go +++ b/restream/fs/fs.go @@ -1,25 +1,30 @@ +// Package FS implements a FS that supports cleanup rules for removing files. package fs import ( "context" + "fmt" "sort" "sync" "time" + "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/log" ) type Config struct { - FS fs.Filesystem - Logger log.Logger + FS fs.Filesystem + Interval time.Duration + Logger log.Logger } type Pattern struct { - Pattern string - MaxFiles uint - MaxFileAge time.Duration - PurgeOnDelete bool + Pattern string + compiledPattern glob.Glob + MaxFiles uint + MaxFileAge time.Duration + PurgeOnDelete bool } type Filesystem interface { @@ -44,6 +49,7 @@ type filesystem struct { cleanupPatterns map[string][]Pattern cleanupLock sync.RWMutex + interval time.Duration stopTicker context.CancelFunc startOnce sync.Once @@ -52,12 +58,17 @@ type filesystem struct { logger log.Logger } -func New(config Config) Filesystem { +func New(config Config) (Filesystem, error) { rfs := &filesystem{ Filesystem: config.FS, + interval: config.Interval, logger: config.Logger, } + if rfs.interval <= time.Duration(0) { + return nil, fmt.Errorf("interval must be greater than 0") + } + if rfs.logger == nil { rfs.logger = log.New("") } @@ -72,14 +83,14 @@ func New(config Config) Filesystem { // already drain the stop rfs.stopOnce.Do(func() {}) - return rfs + return rfs, nil } func (rfs *filesystem) Start() { rfs.startOnce.Do(func() { ctx, cancel := context.WithCancel(context.Background()) rfs.stopTicker = cancel - go rfs.cleanupTicker(ctx, time.Second) + go rfs.cleanupTicker(ctx, rfs.interval) rfs.stopOnce = sync.Once{} @@ -102,7 +113,15 @@ func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) { return } - for _, p := range patterns { + for i, p := range patterns { + g, err := glob.Compile(p.Pattern, '/') + if err != nil { + continue + } + + p.compiledPattern = g + patterns[i] = p + rfs.logger.Debug().WithFields(log.Fields{ "id": id, "pattern": p.Pattern, @@ -130,35 +149,50 @@ func (rfs *filesystem) UnsetCleanup(id string) { } func (rfs *filesystem) cleanup() { + filesAndDirs := rfs.Filesystem.List("/", fs.ListOptions{}) + sort.SliceStable(filesAndDirs, func(i, j int) bool { return filesAndDirs[i].ModTime().Before(filesAndDirs[j].ModTime()) }) + rfs.cleanupLock.RLock() defer rfs.cleanupLock.RUnlock() for _, patterns := range rfs.cleanupPatterns { for _, pattern := range patterns { - filesAndDirs := rfs.Filesystem.List("/", fs.ListOptions{Pattern: pattern.Pattern}) - - files := []fs.FileInfo{} + matchFiles := []fs.FileInfo{} for _, f := range filesAndDirs { - if f.IsDir() { + if !pattern.compiledPattern.Match(f.Name()) { continue } - files = append(files, f) + matchFiles = append(matchFiles, f) } - sort.Slice(files, func(i, j int) bool { return files[i].ModTime().Before(files[j].ModTime()) }) + if pattern.MaxFiles > 0 && uint(len(matchFiles)) > pattern.MaxFiles { + nFiles := uint(len(matchFiles)) - pattern.MaxFiles + if nFiles > 0 { + for _, f := range matchFiles { + if f.IsDir() { + continue + } - if pattern.MaxFiles > 0 && uint(len(files)) > pattern.MaxFiles { - for i := uint(0); i < uint(len(files))-pattern.MaxFiles; i++ { - rfs.logger.Debug().WithField("path", files[i].Name()).Log("Remove file because MaxFiles is exceeded") - rfs.Filesystem.Remove(files[i].Name()) + rfs.logger.Debug().WithField("path", f.Name()).Log("Remove file because MaxFiles is exceeded") + rfs.Filesystem.Remove(f.Name()) + + nFiles-- + if nFiles == 0 { + break + } + } } } if pattern.MaxFileAge > 0 { bestBefore := time.Now().Add(-pattern.MaxFileAge) - for _, f := range files { + for _, f := range matchFiles { + if f.IsDir() { + continue + } + if f.ModTime().Before(bestBefore) { rfs.logger.Debug().WithField("path", f.Name()).Log("Remove file because MaxFileAge is exceeded") rfs.Filesystem.Remove(f.Name()) @@ -169,22 +203,22 @@ func (rfs *filesystem) cleanup() { } } -func (rfs *filesystem) purge(patterns []Pattern) (nfiles uint64) { +func (rfs *filesystem) purge(patterns []Pattern) int64 { + nfilesTotal := int64(0) + for _, pattern := range patterns { if !pattern.PurgeOnDelete { continue } - files := rfs.Filesystem.List("/", fs.ListOptions{Pattern: pattern.Pattern}) - sort.Slice(files, func(i, j int) bool { return len(files[i].Name()) > len(files[j].Name()) }) - for _, f := range files { - rfs.logger.Debug().WithField("path", f.Name()).Log("Purging file") - rfs.Filesystem.Remove(f.Name()) - nfiles++ - } + _, nfiles := rfs.Filesystem.RemoveList("/", fs.ListOptions{ + Pattern: pattern.Pattern, + }) + + nfilesTotal += nfiles } - return + return nfilesTotal } func (rfs *filesystem) cleanupTicker(ctx context.Context, interval time.Duration) { diff --git a/restream/fs/fs_test.go b/restream/fs/fs_test.go index d34872ff..24d11773 100644 --- a/restream/fs/fs_test.go +++ b/restream/fs/fs_test.go @@ -1,20 +1,26 @@ package fs import ( + "fmt" "strings" "testing" "time" "github.com/datarhei/core/v16/io/fs" + "github.com/datarhei/core/v16/math/rand" + "github.com/stretchr/testify/require" ) func TestMaxFiles(t *testing.T) { - memfs, _ := fs.NewMemFilesystem(fs.MemConfig{}) + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) - cleanfs := New(Config{ - FS: memfs, + cleanfs, err := New(Config{ + FS: memfs, + Interval: time.Second, }) + require.NoError(t, err) cleanfs.Start() @@ -56,11 +62,14 @@ func TestMaxFiles(t *testing.T) { } func TestMaxAge(t *testing.T) { - memfs, _ := fs.NewMemFilesystem(fs.MemConfig{}) + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) - cleanfs := New(Config{ - FS: memfs, + cleanfs, err := New(Config{ + FS: memfs, + Interval: time.Second, }) + require.NoError(t, err) cleanfs.Start() @@ -102,11 +111,14 @@ func TestMaxAge(t *testing.T) { } func TestUnsetCleanup(t *testing.T) { - memfs, _ := fs.NewMemFilesystem(fs.MemConfig{}) + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) - cleanfs := New(Config{ - FS: memfs, + cleanfs, err := New(Config{ + FS: memfs, + Interval: time.Second, }) + require.NoError(t, err) cleanfs.Start() @@ -166,3 +178,161 @@ func TestUnsetCleanup(t *testing.T) { cleanfs.Stop() } + +func BenchmarkCleanup(b *testing.B) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(b, err) + + cleanfs, err := New(Config{ + FS: memfs, + Interval: time.Second, + }) + require.NoError(b, err) + + nProcs := 200 + + ids := make([]string, nProcs) + + for i := 0; i < nProcs; i++ { + id := rand.StringAlphanumeric(8) + + patterns := []Pattern{ + { + Pattern: fmt.Sprintf("/%d/%s.m3u8", i, id), + MaxFiles: 2, + MaxFileAge: 0, + PurgeOnDelete: true, + }, + { + Pattern: fmt.Sprintf("/%d/%s_0.m3u8", i, id), + MaxFiles: 2, + MaxFileAge: 0, + PurgeOnDelete: true, + }, + { + Pattern: fmt.Sprintf("/%d/%s_1.m3u8", i, id), + MaxFiles: 2, + MaxFileAge: 0, + PurgeOnDelete: true, + }, + { + Pattern: fmt.Sprintf("/%d/%s_0_*.ts", i, id), + MaxFiles: 16, + MaxFileAge: 0, + PurgeOnDelete: true, + }, + { + Pattern: fmt.Sprintf("/%d/%s_1_*.ts", i, id), + MaxFiles: 16, + MaxFileAge: 0, + PurgeOnDelete: true, + }, + } + + cleanfs.SetCleanup(id, patterns) + + ids[i] = id + } + + // Fill the filesystem with files + for j := 0; j < nProcs; j++ { + path := fmt.Sprintf("/%d/%s.m3u8", j, ids[j]) + memfs.WriteFile(path, []byte("foobar")) + path = fmt.Sprintf("/%d/%s_0.m3u8", j, ids[j]) + memfs.WriteFile(path, []byte("foobar")) + path = fmt.Sprintf("/%d/%s_1.m3u8", j, ids[j]) + memfs.WriteFile(path, []byte("foobar")) + for k := 0; k < 20; k++ { + path = fmt.Sprintf("/%d/%s_0_%d.m3u8", j, ids[j], k) + memfs.WriteFile(path, []byte("foobar")) + path = fmt.Sprintf("/%d/%s_1_%d.m3u8", j, ids[j], k) + memfs.WriteFile(path, []byte("foobar")) + } + } + + rfs := cleanfs.(*filesystem) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + rfs.cleanup() + } +} + +func BenchmarkPurge(b *testing.B) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(b, err) + + cleanfs, err := New(Config{ + FS: memfs, + Interval: time.Second, + }) + require.NoError(b, err) + + nProcs := 200 + + ids := make([]string, nProcs) + + for i := 0; i < nProcs; i++ { + id := rand.StringAlphanumeric(8) + + patterns := []Pattern{ + { + Pattern: fmt.Sprintf("/%d/%s.m3u8", i, id), + MaxFiles: 2, + MaxFileAge: 0, + PurgeOnDelete: true, + }, + { + Pattern: fmt.Sprintf("/%d/%s_0.m3u8", i, id), + MaxFiles: 2, + MaxFileAge: 0, + PurgeOnDelete: true, + }, + { + Pattern: fmt.Sprintf("/%d/%s_1.m3u8", i, id), + MaxFiles: 2, + MaxFileAge: 0, + PurgeOnDelete: true, + }, + { + Pattern: fmt.Sprintf("/%d/%s_0_*.ts", i, id), + MaxFiles: 16, + MaxFileAge: 0, + PurgeOnDelete: true, + }, + { + Pattern: fmt.Sprintf("/%d/%s_1_*.ts", i, id), + MaxFiles: 16, + MaxFileAge: 0, + PurgeOnDelete: true, + }, + } + + cleanfs.SetCleanup(id, patterns) + + ids[i] = id + } + + // Fill the filesystem with files + for j := 0; j < nProcs; j++ { + path := fmt.Sprintf("/%d/%s.m3u8", j, ids[j]) + memfs.WriteFile(path, []byte("foobar")) + path = fmt.Sprintf("/%d/%s_0.m3u8", j, ids[j]) + memfs.WriteFile(path, []byte("foobar")) + path = fmt.Sprintf("/%d/%s_1.m3u8", j, ids[j]) + memfs.WriteFile(path, []byte("foobar")) + for k := 0; k < 20; k++ { + path = fmt.Sprintf("/%d/%s_0_%d.m3u8", j, ids[j], k) + memfs.WriteFile(path, []byte("foobar")) + path = fmt.Sprintf("/%d/%s_1_%d.m3u8", j, ids[j], k) + memfs.WriteFile(path, []byte("foobar")) + } + } + + rfs := cleanfs.(*filesystem) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + rfs.purge(rfs.cleanupPatterns[ids[42]]) + } +} diff --git a/restream/restream.go b/restream/restream.go index 01870d4c..edf0d132 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -175,12 +175,16 @@ func New(config Config) (Restreamer, error) { } for _, fs := range config.Filesystems { - fs := rfs.New(rfs.Config{ - FS: fs, - Logger: r.logger.WithComponent("Cleanup"), + newfs, err := rfs.New(rfs.Config{ + FS: fs, + Interval: 5 * time.Second, + Logger: r.logger.WithComponent("Cleanup"), }) + if err != nil { + return nil, fmt.Errorf("failed to create cleaup fs for %s", fs.Name()) + } - r.fs.list = append(r.fs.list, fs) + r.fs.list = append(r.fs.list, newfs) } if r.replace == nil { @@ -709,18 +713,14 @@ func (r *restream) onArgs(cfg *app.Config) func([]string) []string { } func (r *restream) setCleanup(id app.ProcessID, config *app.Config) { - rePrefix := regexp.MustCompile(`^([a-z]+):`) - for _, output := range config.Output { for _, c := range output.Cleanup { - // TODO: strings.Cut(c.Pattern, ":") - matches := rePrefix.FindStringSubmatch(c.Pattern) - if matches == nil { + name, path, found := strings.Cut(c.Pattern, ":") + if !found { + r.logger.Warn().WithField("pattern", c.Pattern).Log("invalid pattern, no prefix") continue } - name := matches[1] - // Support legacy names if name == "diskfs" { name = "disk" @@ -734,7 +734,7 @@ func (r *restream) setCleanup(id app.ProcessID, config *app.Config) { } pattern := rfs.Pattern{ - Pattern: rePrefix.ReplaceAllString(c.Pattern, ""), + Pattern: path, MaxFiles: c.MaxFiles, MaxFileAge: time.Duration(c.MaxFileAge) * time.Second, PurgeOnDelete: c.PurgeOnDelete, diff --git a/restream/restream_test.go b/restream/restream_test.go index e974e39a..0a876e79 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -943,9 +943,11 @@ func TestConfigValidationWithMkdir(t *testing.T) { }) require.NoError(t, err) - diskrfs := rfs.New(rfs.Config{ - FS: diskfs, + diskrfs, err := rfs.New(rfs.Config{ + FS: diskfs, + Interval: time.Second, }) + require.NoError(t, err) hasfiles, err = validateConfig(config, []rfs.Filesystem{diskrfs}, rs.ffmpeg) require.NoError(t, err)