Improve FS.List and FS.RemoveList functions, improve CleanupFS

The FS.List and FS.RemoveList are up to 42x faster by precompiling the
globbing pattern.

The CleanupFS from restreamer/fs is up to 32x faster in the benchmarks
and cleanup is now only every 5 seconds instead of every second.
This commit is contained in:
Ingo Oppermann 2023-06-12 11:36:07 +02:00
parent 82ba3a8f82
commit 129058e633
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
10 changed files with 398 additions and 73 deletions

View File

@ -1,14 +1,33 @@
package glob
import (
"github.com/gobwas/glob"
)
import "github.com/gobwas/glob"
type Glob interface {
Match(name string) bool
}
type globber struct {
glob glob.Glob
}
func Compile(pattern string, separators ...rune) (Glob, error) {
g, err := glob.Compile(pattern, separators...)
if err != nil {
return nil, err
}
return &globber{glob: g}, nil
}
func (g *globber) Match(name string) bool {
return g.glob.Match(name)
}
// Match returns whether the name matches the glob pattern, also considering
// one or several optionnal separator. An error is only returned if the pattern
// is invalid.
func Match(pattern, name string, separators ...rune) (bool, error) {
g, err := glob.Compile(pattern, separators...)
g, err := Compile(pattern, separators...)
if err != nil {
return false, err
}

View File

@ -525,6 +525,16 @@ func (fs *diskFilesystem) RemoveList(path string, options ListOptions) ([]string
var size int64 = 0
files := []string{}
var compiledPattern glob.Glob
var err error
if len(options.Pattern) != 0 {
compiledPattern, err = glob.Compile(options.Pattern, '/')
if err != nil {
return nil, 0
}
}
fs.walk(path, func(path string, info os.FileInfo) {
if path == fs.root {
return
@ -539,8 +549,8 @@ func (fs *diskFilesystem) RemoveList(path string, options ListOptions) ([]string
return
}
if len(options.Pattern) != 0 {
if ok, _ := glob.Match(options.Pattern, name, '/'); !ok {
if compiledPattern != nil {
if !compiledPattern.Match(name) {
return
}
}
@ -582,6 +592,16 @@ func (fs *diskFilesystem) List(path string, options ListOptions) []FileInfo {
path = fs.cleanPath(path)
files := []FileInfo{}
var compiledPattern glob.Glob
var err error
if len(options.Pattern) != 0 {
compiledPattern, err = glob.Compile(options.Pattern, '/')
if err != nil {
return nil
}
}
fs.walk(path, func(path string, info os.FileInfo) {
if path == fs.root {
return
@ -596,8 +616,8 @@ func (fs *diskFilesystem) List(path string, options ListOptions) []FileInfo {
return
}
if len(options.Pattern) != 0 {
if ok, _ := glob.Match(options.Pattern, name, '/'); !ok {
if compiledPattern != nil {
if !compiledPattern.Match(name) {
return
}
}

View File

@ -120,8 +120,8 @@ type WriteFilesystem interface {
// the removed file in bytes. The size is negative if the file doesn't exist.
Remove(path string) int64
// RemoveList removes all files from the filesystem. Returns the size of the
// removed files in bytes.
// RemoveList removes all files from the filesystem. Returns a list of the names of
// the removed file and the total size of all removed files in bytes.
RemoveList(path string, options ListOptions) ([]string, int64)
}

View File

@ -663,6 +663,16 @@ func (fs *memFilesystem) remove(path string) int64 {
func (fs *memFilesystem) RemoveList(path string, options ListOptions) ([]string, int64) {
path = fs.cleanPath(path)
var compiledPattern glob.Glob
var err error
if len(options.Pattern) != 0 {
compiledPattern, err = glob.Compile(options.Pattern, '/')
if err != nil {
return nil, 0
}
}
fs.filesLock.Lock()
defer fs.filesLock.Unlock()
@ -674,8 +684,8 @@ func (fs *memFilesystem) RemoveList(path string, options ListOptions) ([]string,
continue
}
if len(options.Pattern) != 0 {
if ok, _ := glob.Match(options.Pattern, file.name, '/'); !ok {
if compiledPattern != nil {
if !compiledPattern.Match(file.name) {
continue
}
}
@ -720,6 +730,16 @@ func (fs *memFilesystem) List(path string, options ListOptions) []FileInfo {
path = fs.cleanPath(path)
files := []FileInfo{}
var compiledPattern glob.Glob
var err error
if len(options.Pattern) != 0 {
compiledPattern, err = glob.Compile(options.Pattern, '/')
if err != nil {
return nil
}
}
fs.filesLock.RLock()
defer fs.filesLock.RUnlock()
@ -728,8 +748,8 @@ func (fs *memFilesystem) List(path string, options ListOptions) []FileInfo {
continue
}
if len(options.Pattern) != 0 {
if ok, _ := glob.Match(options.Pattern, file.name, '/'); !ok {
if compiledPattern != nil {
if !compiledPattern.Match(file.name) {
continue
}
}

View File

@ -1,8 +1,10 @@
package fs
import (
"fmt"
"testing"
"github.com/datarhei/core/v16/math/rand"
"github.com/stretchr/testify/require"
)
@ -20,3 +22,41 @@ func TestMemFromDir(t *testing.T) {
"/b.txt",
}, names)
}
func BenchmarkMemList(b *testing.B) {
mem, err := NewMemFilesystem(MemConfig{})
require.NoError(b, err)
for i := 0; i < 1000; i++ {
id := rand.StringAlphanumeric(8)
path := fmt.Sprintf("/%d/%s.dat", i, id)
mem.WriteFile(path, []byte("foobar"))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
mem.List("/", ListOptions{
Pattern: "/5/**",
})
}
}
func BenchmarkMemRemoveList(b *testing.B) {
mem, err := NewMemFilesystem(MemConfig{})
require.NoError(b, err)
for i := 0; i < 1000; i++ {
id := rand.StringAlphanumeric(8)
path := fmt.Sprintf("/%d/%s.dat", i, id)
mem.WriteFile(path, []byte("foobar"))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
mem.RemoveList("/", ListOptions{
Pattern: "/5/**",
})
}
}

View File

@ -453,6 +453,16 @@ func (fs *s3Filesystem) RemoveList(path string, options ListOptions) ([]string,
var totalSize int64 = 0
files := []string{}
var compiledPattern glob.Glob
var err error
if len(options.Pattern) != 0 {
compiledPattern, err = glob.Compile(options.Pattern, '/')
if err != nil {
return nil, 0
}
}
objectsCh := make(chan minio.ObjectInfo)
// Send object names that are needed to be removed to objectsCh
@ -478,8 +488,8 @@ func (fs *s3Filesystem) RemoveList(path string, options ListOptions) ([]string,
continue
}
if len(options.Pattern) != 0 {
if ok, _ := glob.Match(options.Pattern, key, '/'); !ok {
if compiledPattern != nil {
if !compiledPattern.Match(key) {
continue
}
}
@ -529,6 +539,18 @@ func (fs *s3Filesystem) RemoveList(path string, options ListOptions) ([]string,
func (fs *s3Filesystem) List(path string, options ListOptions) []FileInfo {
path = fs.cleanPath(path)
var compiledPattern glob.Glob
var err error
if len(options.Pattern) != 0 {
compiledPattern, err = glob.Compile(options.Pattern, '/')
if err != nil {
return nil
}
}
files := []FileInfo{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -542,8 +564,6 @@ func (fs *s3Filesystem) List(path string, options ListOptions) []FileInfo {
UseV1: false,
})
files := []FileInfo{}
for object := range ch {
if object.Err != nil {
fs.logger.WithError(object.Err).Log("Listing object failed")
@ -556,8 +576,8 @@ func (fs *s3Filesystem) List(path string, options ListOptions) []FileInfo {
continue
}
if len(options.Pattern) != 0 {
if ok, _ := glob.Match(options.Pattern, key, '/'); !ok {
if compiledPattern != nil {
if !compiledPattern.Match(key) {
continue
}
}

View File

@ -1,25 +1,30 @@
// Package FS implements a FS that supports cleanup rules for removing files.
package fs
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/log"
)
type Config struct {
FS fs.Filesystem
Logger log.Logger
FS fs.Filesystem
Interval time.Duration
Logger log.Logger
}
type Pattern struct {
Pattern string
MaxFiles uint
MaxFileAge time.Duration
PurgeOnDelete bool
Pattern string
compiledPattern glob.Glob
MaxFiles uint
MaxFileAge time.Duration
PurgeOnDelete bool
}
type Filesystem interface {
@ -44,6 +49,7 @@ type filesystem struct {
cleanupPatterns map[string][]Pattern
cleanupLock sync.RWMutex
interval time.Duration
stopTicker context.CancelFunc
startOnce sync.Once
@ -52,12 +58,17 @@ type filesystem struct {
logger log.Logger
}
func New(config Config) Filesystem {
func New(config Config) (Filesystem, error) {
rfs := &filesystem{
Filesystem: config.FS,
interval: config.Interval,
logger: config.Logger,
}
if rfs.interval <= time.Duration(0) {
return nil, fmt.Errorf("interval must be greater than 0")
}
if rfs.logger == nil {
rfs.logger = log.New("")
}
@ -72,14 +83,14 @@ func New(config Config) Filesystem {
// already drain the stop
rfs.stopOnce.Do(func() {})
return rfs
return rfs, nil
}
func (rfs *filesystem) Start() {
rfs.startOnce.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
rfs.stopTicker = cancel
go rfs.cleanupTicker(ctx, time.Second)
go rfs.cleanupTicker(ctx, rfs.interval)
rfs.stopOnce = sync.Once{}
@ -102,7 +113,15 @@ func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) {
return
}
for _, p := range patterns {
for i, p := range patterns {
g, err := glob.Compile(p.Pattern, '/')
if err != nil {
continue
}
p.compiledPattern = g
patterns[i] = p
rfs.logger.Debug().WithFields(log.Fields{
"id": id,
"pattern": p.Pattern,
@ -130,35 +149,50 @@ func (rfs *filesystem) UnsetCleanup(id string) {
}
func (rfs *filesystem) cleanup() {
filesAndDirs := rfs.Filesystem.List("/", fs.ListOptions{})
sort.SliceStable(filesAndDirs, func(i, j int) bool { return filesAndDirs[i].ModTime().Before(filesAndDirs[j].ModTime()) })
rfs.cleanupLock.RLock()
defer rfs.cleanupLock.RUnlock()
for _, patterns := range rfs.cleanupPatterns {
for _, pattern := range patterns {
filesAndDirs := rfs.Filesystem.List("/", fs.ListOptions{Pattern: pattern.Pattern})
files := []fs.FileInfo{}
matchFiles := []fs.FileInfo{}
for _, f := range filesAndDirs {
if f.IsDir() {
if !pattern.compiledPattern.Match(f.Name()) {
continue
}
files = append(files, f)
matchFiles = append(matchFiles, f)
}
sort.Slice(files, func(i, j int) bool { return files[i].ModTime().Before(files[j].ModTime()) })
if pattern.MaxFiles > 0 && uint(len(matchFiles)) > pattern.MaxFiles {
nFiles := uint(len(matchFiles)) - pattern.MaxFiles
if nFiles > 0 {
for _, f := range matchFiles {
if f.IsDir() {
continue
}
if pattern.MaxFiles > 0 && uint(len(files)) > pattern.MaxFiles {
for i := uint(0); i < uint(len(files))-pattern.MaxFiles; i++ {
rfs.logger.Debug().WithField("path", files[i].Name()).Log("Remove file because MaxFiles is exceeded")
rfs.Filesystem.Remove(files[i].Name())
rfs.logger.Debug().WithField("path", f.Name()).Log("Remove file because MaxFiles is exceeded")
rfs.Filesystem.Remove(f.Name())
nFiles--
if nFiles == 0 {
break
}
}
}
}
if pattern.MaxFileAge > 0 {
bestBefore := time.Now().Add(-pattern.MaxFileAge)
for _, f := range files {
for _, f := range matchFiles {
if f.IsDir() {
continue
}
if f.ModTime().Before(bestBefore) {
rfs.logger.Debug().WithField("path", f.Name()).Log("Remove file because MaxFileAge is exceeded")
rfs.Filesystem.Remove(f.Name())
@ -169,22 +203,22 @@ func (rfs *filesystem) cleanup() {
}
}
func (rfs *filesystem) purge(patterns []Pattern) (nfiles uint64) {
func (rfs *filesystem) purge(patterns []Pattern) int64 {
nfilesTotal := int64(0)
for _, pattern := range patterns {
if !pattern.PurgeOnDelete {
continue
}
files := rfs.Filesystem.List("/", fs.ListOptions{Pattern: pattern.Pattern})
sort.Slice(files, func(i, j int) bool { return len(files[i].Name()) > len(files[j].Name()) })
for _, f := range files {
rfs.logger.Debug().WithField("path", f.Name()).Log("Purging file")
rfs.Filesystem.Remove(f.Name())
nfiles++
}
_, nfiles := rfs.Filesystem.RemoveList("/", fs.ListOptions{
Pattern: pattern.Pattern,
})
nfilesTotal += nfiles
}
return
return nfilesTotal
}
func (rfs *filesystem) cleanupTicker(ctx context.Context, interval time.Duration) {

View File

@ -1,20 +1,26 @@
package fs
import (
"fmt"
"strings"
"testing"
"time"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/math/rand"
"github.com/stretchr/testify/require"
)
func TestMaxFiles(t *testing.T) {
memfs, _ := fs.NewMemFilesystem(fs.MemConfig{})
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(t, err)
cleanfs := New(Config{
FS: memfs,
cleanfs, err := New(Config{
FS: memfs,
Interval: time.Second,
})
require.NoError(t, err)
cleanfs.Start()
@ -56,11 +62,14 @@ func TestMaxFiles(t *testing.T) {
}
func TestMaxAge(t *testing.T) {
memfs, _ := fs.NewMemFilesystem(fs.MemConfig{})
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(t, err)
cleanfs := New(Config{
FS: memfs,
cleanfs, err := New(Config{
FS: memfs,
Interval: time.Second,
})
require.NoError(t, err)
cleanfs.Start()
@ -102,11 +111,14 @@ func TestMaxAge(t *testing.T) {
}
func TestUnsetCleanup(t *testing.T) {
memfs, _ := fs.NewMemFilesystem(fs.MemConfig{})
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(t, err)
cleanfs := New(Config{
FS: memfs,
cleanfs, err := New(Config{
FS: memfs,
Interval: time.Second,
})
require.NoError(t, err)
cleanfs.Start()
@ -166,3 +178,161 @@ func TestUnsetCleanup(t *testing.T) {
cleanfs.Stop()
}
func BenchmarkCleanup(b *testing.B) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(b, err)
cleanfs, err := New(Config{
FS: memfs,
Interval: time.Second,
})
require.NoError(b, err)
nProcs := 200
ids := make([]string, nProcs)
for i := 0; i < nProcs; i++ {
id := rand.StringAlphanumeric(8)
patterns := []Pattern{
{
Pattern: fmt.Sprintf("/%d/%s.m3u8", i, id),
MaxFiles: 2,
MaxFileAge: 0,
PurgeOnDelete: true,
},
{
Pattern: fmt.Sprintf("/%d/%s_0.m3u8", i, id),
MaxFiles: 2,
MaxFileAge: 0,
PurgeOnDelete: true,
},
{
Pattern: fmt.Sprintf("/%d/%s_1.m3u8", i, id),
MaxFiles: 2,
MaxFileAge: 0,
PurgeOnDelete: true,
},
{
Pattern: fmt.Sprintf("/%d/%s_0_*.ts", i, id),
MaxFiles: 16,
MaxFileAge: 0,
PurgeOnDelete: true,
},
{
Pattern: fmt.Sprintf("/%d/%s_1_*.ts", i, id),
MaxFiles: 16,
MaxFileAge: 0,
PurgeOnDelete: true,
},
}
cleanfs.SetCleanup(id, patterns)
ids[i] = id
}
// Fill the filesystem with files
for j := 0; j < nProcs; j++ {
path := fmt.Sprintf("/%d/%s.m3u8", j, ids[j])
memfs.WriteFile(path, []byte("foobar"))
path = fmt.Sprintf("/%d/%s_0.m3u8", j, ids[j])
memfs.WriteFile(path, []byte("foobar"))
path = fmt.Sprintf("/%d/%s_1.m3u8", j, ids[j])
memfs.WriteFile(path, []byte("foobar"))
for k := 0; k < 20; k++ {
path = fmt.Sprintf("/%d/%s_0_%d.m3u8", j, ids[j], k)
memfs.WriteFile(path, []byte("foobar"))
path = fmt.Sprintf("/%d/%s_1_%d.m3u8", j, ids[j], k)
memfs.WriteFile(path, []byte("foobar"))
}
}
rfs := cleanfs.(*filesystem)
b.ResetTimer()
for i := 0; i < b.N; i++ {
rfs.cleanup()
}
}
func BenchmarkPurge(b *testing.B) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(b, err)
cleanfs, err := New(Config{
FS: memfs,
Interval: time.Second,
})
require.NoError(b, err)
nProcs := 200
ids := make([]string, nProcs)
for i := 0; i < nProcs; i++ {
id := rand.StringAlphanumeric(8)
patterns := []Pattern{
{
Pattern: fmt.Sprintf("/%d/%s.m3u8", i, id),
MaxFiles: 2,
MaxFileAge: 0,
PurgeOnDelete: true,
},
{
Pattern: fmt.Sprintf("/%d/%s_0.m3u8", i, id),
MaxFiles: 2,
MaxFileAge: 0,
PurgeOnDelete: true,
},
{
Pattern: fmt.Sprintf("/%d/%s_1.m3u8", i, id),
MaxFiles: 2,
MaxFileAge: 0,
PurgeOnDelete: true,
},
{
Pattern: fmt.Sprintf("/%d/%s_0_*.ts", i, id),
MaxFiles: 16,
MaxFileAge: 0,
PurgeOnDelete: true,
},
{
Pattern: fmt.Sprintf("/%d/%s_1_*.ts", i, id),
MaxFiles: 16,
MaxFileAge: 0,
PurgeOnDelete: true,
},
}
cleanfs.SetCleanup(id, patterns)
ids[i] = id
}
// Fill the filesystem with files
for j := 0; j < nProcs; j++ {
path := fmt.Sprintf("/%d/%s.m3u8", j, ids[j])
memfs.WriteFile(path, []byte("foobar"))
path = fmt.Sprintf("/%d/%s_0.m3u8", j, ids[j])
memfs.WriteFile(path, []byte("foobar"))
path = fmt.Sprintf("/%d/%s_1.m3u8", j, ids[j])
memfs.WriteFile(path, []byte("foobar"))
for k := 0; k < 20; k++ {
path = fmt.Sprintf("/%d/%s_0_%d.m3u8", j, ids[j], k)
memfs.WriteFile(path, []byte("foobar"))
path = fmt.Sprintf("/%d/%s_1_%d.m3u8", j, ids[j], k)
memfs.WriteFile(path, []byte("foobar"))
}
}
rfs := cleanfs.(*filesystem)
b.ResetTimer()
for i := 0; i < b.N; i++ {
rfs.purge(rfs.cleanupPatterns[ids[42]])
}
}

View File

@ -175,12 +175,16 @@ func New(config Config) (Restreamer, error) {
}
for _, fs := range config.Filesystems {
fs := rfs.New(rfs.Config{
FS: fs,
Logger: r.logger.WithComponent("Cleanup"),
newfs, err := rfs.New(rfs.Config{
FS: fs,
Interval: 5 * time.Second,
Logger: r.logger.WithComponent("Cleanup"),
})
if err != nil {
return nil, fmt.Errorf("failed to create cleaup fs for %s", fs.Name())
}
r.fs.list = append(r.fs.list, fs)
r.fs.list = append(r.fs.list, newfs)
}
if r.replace == nil {
@ -709,18 +713,14 @@ func (r *restream) onArgs(cfg *app.Config) func([]string) []string {
}
func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
rePrefix := regexp.MustCompile(`^([a-z]+):`)
for _, output := range config.Output {
for _, c := range output.Cleanup {
// TODO: strings.Cut(c.Pattern, ":")
matches := rePrefix.FindStringSubmatch(c.Pattern)
if matches == nil {
name, path, found := strings.Cut(c.Pattern, ":")
if !found {
r.logger.Warn().WithField("pattern", c.Pattern).Log("invalid pattern, no prefix")
continue
}
name := matches[1]
// Support legacy names
if name == "diskfs" {
name = "disk"
@ -734,7 +734,7 @@ func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
}
pattern := rfs.Pattern{
Pattern: rePrefix.ReplaceAllString(c.Pattern, ""),
Pattern: path,
MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
PurgeOnDelete: c.PurgeOnDelete,

View File

@ -943,9 +943,11 @@ func TestConfigValidationWithMkdir(t *testing.T) {
})
require.NoError(t, err)
diskrfs := rfs.New(rfs.Config{
FS: diskfs,
diskrfs, err := rfs.New(rfs.Config{
FS: diskfs,
Interval: time.Second,
})
require.NoError(t, err)
hasfiles, err = validateConfig(config, []rfs.Filesystem{diskrfs}, rs.ffmpeg)
require.NoError(t, err)