diff --git a/app/import/main.go b/app/import/main.go index 71a99341..f49e16ba 100644 --- a/app/import/main.go +++ b/app/import/main.go @@ -1,6 +1,7 @@ package main import ( + "errors" "fmt" "os" @@ -35,7 +36,7 @@ func main() { } } -func doImport(logger log.Logger, fs fs.Filesystem, configstore cfgstore.Store) error { +func doImport(logger log.Logger, filesystem fs.Filesystem, configstore cfgstore.Store) error { if logger == nil { logger = log.New("") } @@ -74,8 +75,8 @@ func doImport(logger log.Logger, fs fs.Filesystem, configstore cfgstore.Store) e logger = logger.WithField("database", v1filename) - if _, err := fs.Stat(v1filename); err != nil { - if os.IsNotExist(err) { + if _, err := filesystem.Stat(v1filename); err != nil { + if errors.Is(err, fs.ErrNotExist) { logger.Info().Log("Database doesn't exist and nothing will be imported") return nil } @@ -88,7 +89,7 @@ func doImport(logger log.Logger, fs fs.Filesystem, configstore cfgstore.Store) e // Load an existing DB datastore, err := jsonstore.New(jsonstore.Config{ - Filesystem: fs, + Filesystem: filesystem, Filepath: cfg.DB.Dir + "/db.json", }) if err != nil { @@ -116,7 +117,7 @@ func doImport(logger log.Logger, fs fs.Filesystem, configstore cfgstore.Store) e importConfig.binary = cfg.FFmpeg.Binary // Rewrite the old database to the new database - r, err := importV1(fs, v1filename, importConfig) + r, err := importV1(filesystem, v1filename, importConfig) if err != nil { logger.Error().WithError(err).Log("Importing database failed") return fmt.Errorf("importing database failed: %w", err) diff --git a/cluster/leader.go b/cluster/leader.go index 035cdce7..bbde80b2 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -853,9 +853,9 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc // A map from the process ID to the process config of the processes // we want to be running on the nodes. wantMap := map[string]store.Process{} - for _, process := range want { - pid := process.Config.ProcessID().String() - wantMap[pid] = process + for _, wantP := range want { + pid := wantP.Config.ProcessID().String() + wantMap[pid] = wantP } opStack := []interface{}{} @@ -1007,16 +1007,29 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc } // The wantMap now contains only those processes that need to be installed on a node. + // We will rebuild the "want" array from the wantMap in the same order as the original + // "want" array to make the resulting opStack deterministic. + wantReduced := []store.Process{} + for _, wantP := range want { + pid := wantP.Config.ProcessID().String() + if _, ok := wantMap[pid]; !ok { + continue + } + + wantReduced = append(wantReduced, wantP) + } // Create a map from the process reference to the node it is running on. haveReferenceAffinityMap := createReferenceAffinityMap(have) // Now, all remaining processes in the wantMap must be added to one of the nodes. - for pid, process := range wantMap { + for _, wantP := range wantReduced { + pid := wantP.Config.ProcessID().String() + // If a process doesn't have any limits defined, reject that process - if process.Config.LimitCPU <= 0 || process.Config.LimitMemory <= 0 { + if wantP.Config.LimitCPU <= 0 || wantP.Config.LimitMemory <= 0 { opStack = append(opStack, processOpReject{ - processid: process.Config.ProcessID(), + processid: wantP.Config.ProcessID(), err: errNoLimitsDefined, }) @@ -1029,9 +1042,9 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc nodeid := "" // Try to add the process to a node where other processes with the same reference currently reside. - if len(process.Config.Reference) != 0 { - for _, count := range haveReferenceAffinityMap[process.Config.Reference+"@"+process.Config.Domain] { - if hasNodeEnoughResources(resources[count.nodeid], process.Config.LimitCPU, process.Config.LimitMemory) { + if len(wantP.Config.Reference) != 0 { + for _, count := range haveReferenceAffinityMap[wantP.Config.Reference+"@"+wantP.Config.Domain] { + if hasNodeEnoughResources(resources[count.nodeid], wantP.Config.LimitCPU, wantP.Config.LimitMemory) { nodeid = count.nodeid break } @@ -1040,29 +1053,29 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc // Find the node with the most resources available if len(nodeid) == 0 { - nodeid = findBestNodeForProcess(resources, process.Config.LimitCPU, process.Config.LimitMemory) + nodeid = findBestNodeForProcess(resources, wantP.Config.LimitCPU, wantP.Config.LimitMemory) } if len(nodeid) != 0 { opStack = append(opStack, processOpAdd{ nodeid: nodeid, - config: process.Config, - metadata: process.Metadata, - order: process.Order, + config: wantP.Config, + metadata: wantP.Metadata, + order: wantP.Order, }) // Consume the resources r, ok := resources[nodeid] if ok { - r.CPU += process.Config.LimitCPU - r.Mem += process.Config.LimitMemory + r.CPU += wantP.Config.LimitCPU + r.Mem += wantP.Config.LimitMemory resources[nodeid] = r } reality[pid] = nodeid } else { opStack = append(opStack, processOpReject{ - processid: process.Config.ProcessID(), + processid: wantP.Config.ProcessID(), err: errNotEnoughResourcesForDeployment, }) } diff --git a/config/store/json.go b/config/store/json.go index 976d18a0..91a200e1 100644 --- a/config/store/json.go +++ b/config/store/json.go @@ -2,8 +2,8 @@ package store import ( gojson "encoding/json" + "errors" "fmt" - "os" "path/filepath" "github.com/datarhei/core/v16/config" @@ -121,8 +121,12 @@ func (c *jsonStore) load(cfg *config.Config) error { return nil } - if _, err := c.fs.Stat(c.path); os.IsNotExist(err) { - return nil + if _, err := c.fs.Stat(c.path); err != nil { + if errors.Is(err, fs.ErrNotExist) { + return nil + } + + return err } jsondata, err := c.fs.ReadFile(c.path) diff --git a/iam/access/adapter.go b/iam/access/adapter.go index 23926df2..e444e793 100644 --- a/iam/access/adapter.go +++ b/iam/access/adapter.go @@ -2,8 +2,8 @@ package access import ( "encoding/json" + "errors" "fmt" - "os" "sort" "strings" "sync" @@ -63,9 +63,13 @@ func (a *adapter) LoadPolicy(model model.Model) error { } func (a *adapter) loadPolicyFile(model model.Model) error { - if _, err := a.fs.Stat(a.filePath); os.IsNotExist(err) { - a.domains = []Domain{} - return nil + if _, err := a.fs.Stat(a.filePath); err != nil { + if errors.Is(err, fs.ErrNotExist) { + a.domains = []Domain{} + return nil + } + + return err } data, err := a.fs.ReadFile(a.filePath) diff --git a/iam/identity/adapter.go b/iam/identity/adapter.go index d288e31d..dd21f4b6 100644 --- a/iam/identity/adapter.go +++ b/iam/identity/adapter.go @@ -2,8 +2,8 @@ package identity import ( "encoding/json" + "errors" "fmt" - "os" "sync" "github.com/datarhei/core/v16/io/fs" @@ -48,8 +48,12 @@ func (a *fileAdapter) LoadIdentities() ([]User, error) { a.lock.Lock() defer a.lock.Unlock() - if _, err := a.fs.Stat(a.filePath); os.IsNotExist(err) { - return nil, nil + if _, err := a.fs.Stat(a.filePath); err != nil { + if errors.Is(err, fs.ErrNotExist) { + return nil, nil + } + + return nil, err } data, err := a.fs.ReadFile(a.filePath) diff --git a/restream/store/json/json.go b/restream/store/json/json.go index 2ca51b65..cf8237d3 100644 --- a/restream/store/json/json.go +++ b/restream/store/json/json.go @@ -2,8 +2,8 @@ package json import ( gojson "encoding/json" + "errors" "fmt" - "os" "sync" "github.com/datarhei/core/v16/encoding/json" @@ -180,9 +180,8 @@ type storeVersion struct { func (s *jsonStore) load(filepath string, version uint64) (Data, error) { r := NewData() - _, err := s.fs.Stat(filepath) - if err != nil { - if os.IsNotExist(err) { + if _, err := s.fs.Stat(filepath); err != nil { + if errors.Is(err, fs.ErrNotExist) { return r, nil }