Add storage.memory.backup.dir and storage.memory.backup.patterns
It is now possible to create a backup of the contents of memfs before the core is shutdown. Provide the path where to write the files and a list of glob patterns for files to be written to disk. Use the pattern "**" for all files. During start of the core the backuped files will be written back to memfs.
This commit is contained in:
parent
a7cd4f4e50
commit
2167895bb5
@ -840,12 +840,19 @@ func (a *api) start(ctx context.Context) error {
|
||||
}
|
||||
|
||||
if a.memfs == nil {
|
||||
memfs, _ := fs.NewMemFilesystem(fs.MemConfig{
|
||||
config := fs.MemConfig{
|
||||
Logger: a.log.logger.core.WithComponent("Filesystem").WithFields(log.Fields{
|
||||
"type": "mem",
|
||||
"name": "mem",
|
||||
}),
|
||||
})
|
||||
}
|
||||
var memfs fs.Filesystem = nil
|
||||
if len(cfg.Storage.Memory.Backup.Dir) != 0 {
|
||||
config.Logger.Info().WithField("dir", cfg.Storage.Memory.Backup.Dir).Log("Loading backup")
|
||||
memfs, _ = fs.NewMemFilesystemFromDir(cfg.Storage.Memory.Backup.Dir, config)
|
||||
} else {
|
||||
memfs, _ = fs.NewMemFilesystem(config)
|
||||
}
|
||||
|
||||
memfs.SetMetadata("base", baseMemFS.String())
|
||||
|
||||
@ -1758,6 +1765,26 @@ func (a *api) stop() {
|
||||
a.cluster.Shutdown()
|
||||
}
|
||||
|
||||
if a.memfs != nil {
|
||||
cfg := a.config.store.GetActive()
|
||||
if len(cfg.Storage.Memory.Backup.Dir) != 0 {
|
||||
diskfs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{
|
||||
Root: cfg.Storage.Memory.Backup.Dir,
|
||||
Logger: logger,
|
||||
})
|
||||
if err == nil {
|
||||
err = backupMemFS(diskfs, a.memfs, cfg.Storage.Memory.Backup.Patterns)
|
||||
if err != nil {
|
||||
logger.Error().WithError(err).WithField("dir", cfg.Storage.Memory.Backup.Dir).Log("Failed to create backup from memfs")
|
||||
} else {
|
||||
logger.Info().WithField("dir", cfg.Storage.Memory.Backup.Dir).Log("Created backup from memfs")
|
||||
}
|
||||
} else {
|
||||
logger.Error().WithError(err).WithField("dir", cfg.Storage.Memory.Backup.Dir).Log("Failed to create rooted disk filesystem")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop all restream processes
|
||||
if a.restream != nil {
|
||||
logger.Info().Log("Stopping all processes ...")
|
||||
@ -1907,3 +1934,37 @@ func (a *api) Destroy() {
|
||||
a.memfs = nil
|
||||
}
|
||||
}
|
||||
|
||||
func backupMemFS(target, source fs.Filesystem, patterns []string) error {
|
||||
// Clean the backup directory
|
||||
target.RemoveList("/", fs.ListOptions{
|
||||
Pattern: "**",
|
||||
})
|
||||
|
||||
filelist := map[string]struct{}{}
|
||||
|
||||
for _, p := range patterns {
|
||||
// For each pattern get the file list and store the names
|
||||
files := source.List("/", fs.ListOptions{
|
||||
Pattern: p,
|
||||
})
|
||||
|
||||
for _, f := range files {
|
||||
filelist[f.Name()] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// Write each file from source to target
|
||||
for name := range filelist {
|
||||
file := source.Open(name)
|
||||
if file == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
target.WriteFileReader(name, file)
|
||||
|
||||
file.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -211,6 +211,8 @@ func (d *Config) init() {
|
||||
d.vars.Register(value.NewString(&d.Storage.Memory.Auth.Password, rand.StringAlphanumeric(18)), "storage.memory.auth.password", "CORE_STORAGE_MEMORY_AUTH_PASSWORD", nil, "Password for Basic-Auth of /memfs", false, true)
|
||||
d.vars.Register(value.NewInt64(&d.Storage.Memory.Size, 0), "storage.memory.max_size_mbytes", "CORE_STORAGE_MEMORY_MAX_SIZE_MBYTES", []string{"CORE_STORAGE_MEMORY_MAXSIZEMBYTES"}, "Max. allowed megabytes for /memfs, 0 for unlimited", false, false)
|
||||
d.vars.Register(value.NewBool(&d.Storage.Memory.Purge, false), "storage.memory.purge", "CORE_STORAGE_MEMORY_PURGE", nil, "Automatically remove the oldest files if /memfs is full", false, false)
|
||||
d.vars.Register(value.NewDir(&d.Storage.Memory.Backup.Dir, "", d.fs), "storage.memory.backup.dir", "CORE_STORAGE_MEMORY_BACKUP_DIR", nil, "Directory for writing backups of /memfs to", false, false)
|
||||
d.vars.Register(value.NewStringList(&d.Storage.Memory.Backup.Patterns, []string{}, " "), "storage.memory.backup.patterns", "CORE_STORAGE_MEMORY_BACKUP_PATTERNS", nil, "Glob patterns for files to backup", false, false)
|
||||
|
||||
// Storage (S3)
|
||||
d.vars.Register(value.NewS3StorageListValue(&d.Storage.S3, []value.S3Storage{}, "|"), "storage.s3", "CORE_STORAGE_S3", nil, "List of S3 storage URLS", false, false)
|
||||
|
||||
@ -89,8 +89,12 @@ type Data struct {
|
||||
Username string `json:"username"` // Deprecated, use IAM
|
||||
Password string `json:"password"` // Deprecated, use IAM
|
||||
} `json:"auth"` // Deprecated, use IAM
|
||||
Size int64 `json:"max_size_mbytes" format:"int64"`
|
||||
Purge bool `json:"purge"`
|
||||
Size int64 `json:"max_size_mbytes" format:"int64"`
|
||||
Purge bool `json:"purge"`
|
||||
Backup struct {
|
||||
Dir string `json:"dir"`
|
||||
Patterns []string `json:"patterns"`
|
||||
} `json:"backup"`
|
||||
} `json:"memory"`
|
||||
S3 []value.S3Storage `json:"s3"`
|
||||
CORS struct {
|
||||
@ -259,8 +263,6 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
|
||||
data.Storage.CORS = d.Storage.CORS
|
||||
data.Storage.CORS.Origins = slices.Copy(d.Storage.CORS.Origins)
|
||||
|
||||
data.Storage.Memory = d.Storage.Memory
|
||||
|
||||
// Actual changes
|
||||
data.Debug.Profiling = d.Debug.Profiling
|
||||
data.Debug.ForceGC = d.Debug.ForceGC
|
||||
@ -281,6 +283,12 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
|
||||
data.Storage.Disk.Cache.TTL = d.Storage.Disk.Cache.TTL
|
||||
data.Storage.Disk.Cache.Types.Allow = slices.Copy(d.Storage.Disk.Cache.Types)
|
||||
|
||||
data.Storage.Memory.Auth.Enable = d.Storage.Memory.Auth.Enable
|
||||
data.Storage.Memory.Auth.Username = d.Storage.Memory.Auth.Username
|
||||
data.Storage.Memory.Auth.Password = d.Storage.Memory.Auth.Password
|
||||
data.Storage.Memory.Size = d.Storage.Memory.Size
|
||||
data.Storage.Memory.Purge = d.Storage.Memory.Purge
|
||||
|
||||
data.Storage.S3 = []value.S3Storage{}
|
||||
|
||||
data.FFmpeg.Log.MaxMinimalHistory = 0
|
||||
@ -363,7 +371,11 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
|
||||
data.Storage.CORS = d.Storage.CORS
|
||||
data.Storage.CORS.Origins = slices.Copy(d.Storage.CORS.Origins)
|
||||
|
||||
data.Storage.Memory = d.Storage.Memory
|
||||
data.Storage.Memory.Auth.Enable = d.Storage.Memory.Auth.Enable
|
||||
data.Storage.Memory.Auth.Username = d.Storage.Memory.Auth.Username
|
||||
data.Storage.Memory.Auth.Password = d.Storage.Memory.Auth.Password
|
||||
data.Storage.Memory.Size = d.Storage.Memory.Size
|
||||
data.Storage.Memory.Purge = d.Storage.Memory.Purge
|
||||
|
||||
data.Storage.Disk.Dir = d.Storage.Disk.Dir
|
||||
data.Storage.Disk.Size = d.Storage.Disk.Size
|
||||
|
||||
@ -96,6 +96,10 @@ func (u *Dir) Validate() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := u.fs.MkdirAll(val, 0755); err != nil {
|
||||
return fmt.Errorf("%s can't be created (%w)", val, err)
|
||||
}
|
||||
|
||||
finfo, err := u.fs.Stat(val)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s does not exist", val)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user