Check for correct error

This commit is contained in:
Ingo Oppermann 2023-07-19 17:18:58 +02:00
parent bdcf4bdeb4
commit 1f24ea1b00
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
6 changed files with 60 additions and 35 deletions

View File

@ -1,6 +1,7 @@
package main
import (
"errors"
"fmt"
"os"
@ -35,7 +36,7 @@ func main() {
}
}
func doImport(logger log.Logger, fs fs.Filesystem, configstore cfgstore.Store) error {
func doImport(logger log.Logger, filesystem fs.Filesystem, configstore cfgstore.Store) error {
if logger == nil {
logger = log.New("")
}
@ -74,8 +75,8 @@ func doImport(logger log.Logger, fs fs.Filesystem, configstore cfgstore.Store) e
logger = logger.WithField("database", v1filename)
if _, err := fs.Stat(v1filename); err != nil {
if os.IsNotExist(err) {
if _, err := filesystem.Stat(v1filename); err != nil {
if errors.Is(err, fs.ErrNotExist) {
logger.Info().Log("Database doesn't exist and nothing will be imported")
return nil
}
@ -88,7 +89,7 @@ func doImport(logger log.Logger, fs fs.Filesystem, configstore cfgstore.Store) e
// Load an existing DB
datastore, err := jsonstore.New(jsonstore.Config{
Filesystem: fs,
Filesystem: filesystem,
Filepath: cfg.DB.Dir + "/db.json",
})
if err != nil {
@ -116,7 +117,7 @@ func doImport(logger log.Logger, fs fs.Filesystem, configstore cfgstore.Store) e
importConfig.binary = cfg.FFmpeg.Binary
// Rewrite the old database to the new database
r, err := importV1(fs, v1filename, importConfig)
r, err := importV1(filesystem, v1filename, importConfig)
if err != nil {
logger.Error().WithError(err).Log("Importing database failed")
return fmt.Errorf("importing database failed: %w", err)

View File

@ -853,9 +853,9 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
// A map from the process ID to the process config of the processes
// we want to be running on the nodes.
wantMap := map[string]store.Process{}
for _, process := range want {
pid := process.Config.ProcessID().String()
wantMap[pid] = process
for _, wantP := range want {
pid := wantP.Config.ProcessID().String()
wantMap[pid] = wantP
}
opStack := []interface{}{}
@ -1007,16 +1007,29 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
}
// The wantMap now contains only those processes that need to be installed on a node.
// We will rebuild the "want" array from the wantMap in the same order as the original
// "want" array to make the resulting opStack deterministic.
wantReduced := []store.Process{}
for _, wantP := range want {
pid := wantP.Config.ProcessID().String()
if _, ok := wantMap[pid]; !ok {
continue
}
wantReduced = append(wantReduced, wantP)
}
// Create a map from the process reference to the node it is running on.
haveReferenceAffinityMap := createReferenceAffinityMap(have)
// Now, all remaining processes in the wantMap must be added to one of the nodes.
for pid, process := range wantMap {
for _, wantP := range wantReduced {
pid := wantP.Config.ProcessID().String()
// If a process doesn't have any limits defined, reject that process
if process.Config.LimitCPU <= 0 || process.Config.LimitMemory <= 0 {
if wantP.Config.LimitCPU <= 0 || wantP.Config.LimitMemory <= 0 {
opStack = append(opStack, processOpReject{
processid: process.Config.ProcessID(),
processid: wantP.Config.ProcessID(),
err: errNoLimitsDefined,
})
@ -1029,9 +1042,9 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
nodeid := ""
// Try to add the process to a node where other processes with the same reference currently reside.
if len(process.Config.Reference) != 0 {
for _, count := range haveReferenceAffinityMap[process.Config.Reference+"@"+process.Config.Domain] {
if hasNodeEnoughResources(resources[count.nodeid], process.Config.LimitCPU, process.Config.LimitMemory) {
if len(wantP.Config.Reference) != 0 {
for _, count := range haveReferenceAffinityMap[wantP.Config.Reference+"@"+wantP.Config.Domain] {
if hasNodeEnoughResources(resources[count.nodeid], wantP.Config.LimitCPU, wantP.Config.LimitMemory) {
nodeid = count.nodeid
break
}
@ -1040,29 +1053,29 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
// Find the node with the most resources available
if len(nodeid) == 0 {
nodeid = findBestNodeForProcess(resources, process.Config.LimitCPU, process.Config.LimitMemory)
nodeid = findBestNodeForProcess(resources, wantP.Config.LimitCPU, wantP.Config.LimitMemory)
}
if len(nodeid) != 0 {
opStack = append(opStack, processOpAdd{
nodeid: nodeid,
config: process.Config,
metadata: process.Metadata,
order: process.Order,
config: wantP.Config,
metadata: wantP.Metadata,
order: wantP.Order,
})
// Consume the resources
r, ok := resources[nodeid]
if ok {
r.CPU += process.Config.LimitCPU
r.Mem += process.Config.LimitMemory
r.CPU += wantP.Config.LimitCPU
r.Mem += wantP.Config.LimitMemory
resources[nodeid] = r
}
reality[pid] = nodeid
} else {
opStack = append(opStack, processOpReject{
processid: process.Config.ProcessID(),
processid: wantP.Config.ProcessID(),
err: errNotEnoughResourcesForDeployment,
})
}

View File

@ -2,8 +2,8 @@ package store
import (
gojson "encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"github.com/datarhei/core/v16/config"
@ -121,8 +121,12 @@ func (c *jsonStore) load(cfg *config.Config) error {
return nil
}
if _, err := c.fs.Stat(c.path); os.IsNotExist(err) {
return nil
if _, err := c.fs.Stat(c.path); err != nil {
if errors.Is(err, fs.ErrNotExist) {
return nil
}
return err
}
jsondata, err := c.fs.ReadFile(c.path)

View File

@ -2,8 +2,8 @@ package access
import (
"encoding/json"
"errors"
"fmt"
"os"
"sort"
"strings"
"sync"
@ -63,9 +63,13 @@ func (a *adapter) LoadPolicy(model model.Model) error {
}
func (a *adapter) loadPolicyFile(model model.Model) error {
if _, err := a.fs.Stat(a.filePath); os.IsNotExist(err) {
a.domains = []Domain{}
return nil
if _, err := a.fs.Stat(a.filePath); err != nil {
if errors.Is(err, fs.ErrNotExist) {
a.domains = []Domain{}
return nil
}
return err
}
data, err := a.fs.ReadFile(a.filePath)

View File

@ -2,8 +2,8 @@ package identity
import (
"encoding/json"
"errors"
"fmt"
"os"
"sync"
"github.com/datarhei/core/v16/io/fs"
@ -48,8 +48,12 @@ func (a *fileAdapter) LoadIdentities() ([]User, error) {
a.lock.Lock()
defer a.lock.Unlock()
if _, err := a.fs.Stat(a.filePath); os.IsNotExist(err) {
return nil, nil
if _, err := a.fs.Stat(a.filePath); err != nil {
if errors.Is(err, fs.ErrNotExist) {
return nil, nil
}
return nil, err
}
data, err := a.fs.ReadFile(a.filePath)

View File

@ -2,8 +2,8 @@ package json
import (
gojson "encoding/json"
"errors"
"fmt"
"os"
"sync"
"github.com/datarhei/core/v16/encoding/json"
@ -180,9 +180,8 @@ type storeVersion struct {
func (s *jsonStore) load(filepath string, version uint64) (Data, error) {
r := NewData()
_, err := s.fs.Stat(filepath)
if err != nil {
if os.IsNotExist(err) {
if _, err := s.fs.Stat(filepath); err != nil {
if errors.Is(err, fs.ErrNotExist) {
return r, nil
}