Add tests

This commit is contained in:
Ingo Oppermann 2023-06-08 13:21:14 +02:00
parent 7e7d1caca7
commit a03ce87ec7
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
5 changed files with 1472 additions and 103 deletions

View File

@ -318,7 +318,9 @@ func (c *cluster) revokeLeadership() {
// All this happens if there's a leader. If there's no leader election possible, the node goes into the
// emergency leadership mode after a certain duration (emergencyLeaderTimeout). The synchronization phase will
// happen based on the last known list of processes from the cluster DB. Until nodeRecoverTimeout is reached,
// process that would run on unreachable nodes will not be moved to the node. Rebalancing will be disabled.
// process that would run on unreachable nodes will not be moved to the node. After that, all processes will
// end on the node, but only if there are enough resources. Not to bog down the node. Rebalancing will be
// disabled.
//
// The goal of synchronizing and rebalancing is to make as little moves as possible and to be tolerant for
// a while if a node is not reachable.

View File

@ -558,6 +558,359 @@ func TestSynchronizeAddRemove(t *testing.T) {
}, reality)
}
func TestSynchronizeWaitDisconnectedNode(t *testing.T) {
wish := map[string]string{
"foobar1@": "node1",
"foobar2@": "node2",
}
now := time.Now()
want := []store.Process{
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar1",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 20,
},
},
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar2",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 30,
},
},
}
have := []proxy.Process{
{
NodeID: "node1",
Order: "start",
State: "running",
CPU: 12,
Mem: 5,
Runtime: 42,
UpdatedAt: now,
Config: &app.Config{
ID: "foobar1",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 20,
},
},
}
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now().Add(-time.Minute),
Resources: proxy.NodeResources{
IsThrottling: true,
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Empty(t, stack)
require.Equal(t, map[string]string{
"foobar1@": "node1",
"foobar2@": "node2",
}, reality)
}
func TestSynchronizeWaitDisconnectedNodeNoWish(t *testing.T) {
wish := map[string]string{
"foobar1@": "node1",
}
now := time.Now()
want := []store.Process{
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar1",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 20,
},
},
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar2",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 30,
},
},
}
have := []proxy.Process{
{
NodeID: "node1",
Order: "start",
State: "running",
CPU: 12,
Mem: 5,
Runtime: 42,
UpdatedAt: now,
Config: &app.Config{
ID: "foobar1",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 20,
},
},
}
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now().Add(-time.Minute),
Resources: proxy.NodeResources{
IsThrottling: true,
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpAdd{
nodeid: "node1",
config: &app.Config{
ID: "foobar2",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 30,
},
},
}, stack)
require.Equal(t, map[string]string{
"foobar1@": "node1",
"foobar2@": "node1",
}, reality)
}
func TestSynchronizeWaitDisconnectedNodeUnrealisticWish(t *testing.T) {
wish := map[string]string{
"foobar1@": "node1",
"foobar2@": "node3",
}
now := time.Now()
want := []store.Process{
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar1",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 20,
},
},
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar2",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 30,
},
},
}
have := []proxy.Process{
{
NodeID: "node1",
Order: "start",
State: "running",
CPU: 12,
Mem: 5,
Runtime: 42,
UpdatedAt: now,
Config: &app.Config{
ID: "foobar1",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 20,
},
},
}
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now().Add(-time.Minute),
Resources: proxy.NodeResources{
IsThrottling: true,
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpAdd{
nodeid: "node1",
config: &app.Config{
ID: "foobar2",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 30,
},
},
}, stack)
require.Equal(t, map[string]string{
"foobar1@": "node1",
"foobar2@": "node1",
}, reality)
}
func TestSynchronizeTimeoutDisconnectedNode(t *testing.T) {
wish := map[string]string{
"foobar1@": "node1",
"foobar2@": "node2",
}
now := time.Now()
want := []store.Process{
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar1",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 20,
},
},
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar2",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 30,
},
},
}
have := []proxy.Process{
{
NodeID: "node1",
Order: "start",
State: "running",
CPU: 12,
Mem: 5,
Runtime: 42,
UpdatedAt: now,
Config: &app.Config{
ID: "foobar1",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 20,
},
},
}
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now().Add(-3 * time.Minute),
Resources: proxy.NodeResources{
IsThrottling: true,
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpAdd{
nodeid: "node1",
config: &app.Config{
ID: "foobar2",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 30,
},
},
}, stack)
require.Equal(t, map[string]string{
"foobar1@": "node1",
"foobar2@": "node1",
}, reality)
}
func TestRebalanceNothingToDo(t *testing.T) {
processes := []proxy.Process{
{

View File

@ -225,7 +225,7 @@ func (r *raft) Apply(data []byte) error {
res := future.Response()
if res != nil {
if err, ok := res.(store.StoreError); ok {
if err, ok := res.(error); ok {
return err
}
}

View File

@ -27,17 +27,7 @@ type Store interface {
UserList() Users
GetUser(name string) Users
PolicyList() Policies
PolicyUserList(nam string) Policies
}
type StoreError string
func NewStoreError(format string, a ...any) StoreError {
return StoreError(fmt.Sprintf(format, a...))
}
func (se StoreError) Error() string {
return string(se)
PolicyUserList(name string) Policies
}
type Process struct {
@ -172,70 +162,12 @@ func (s *store) Apply(entry *raft.Log) interface{} {
err := json.Unmarshal(entry.Data, &c)
if err != nil {
logger.Error().WithError(err).Log("Invalid entry")
return NewStoreError("invalid log entry, index: %d, term: %d", entry.Index, entry.Term)
return fmt.Errorf("invalid log entry, index: %d, term: %d", entry.Index, entry.Term)
}
logger.Debug().WithField("operation", c.Operation).Log("")
switch c.Operation {
case OpAddProcess:
b, _ := json.Marshal(c.Data)
cmd := CommandAddProcess{}
json.Unmarshal(b, &cmd)
err = s.addProcess(cmd)
case OpRemoveProcess:
b, _ := json.Marshal(c.Data)
cmd := CommandRemoveProcess{}
json.Unmarshal(b, &cmd)
err = s.removeProcess(cmd)
case OpUpdateProcess:
b, _ := json.Marshal(c.Data)
cmd := CommandUpdateProcess{}
json.Unmarshal(b, &cmd)
err = s.updateProcess(cmd)
case OpSetProcessMetadata:
b, _ := json.Marshal(c.Data)
cmd := CommandSetProcessMetadata{}
json.Unmarshal(b, &cmd)
err = s.setProcessMetadata(cmd)
case OpAddIdentity:
b, _ := json.Marshal(c.Data)
cmd := CommandAddIdentity{}
json.Unmarshal(b, &cmd)
err = s.addIdentity(cmd)
case OpUpdateIdentity:
b, _ := json.Marshal(c.Data)
cmd := CommandUpdateIdentity{}
json.Unmarshal(b, &cmd)
err = s.updateIdentity(cmd)
case OpRemoveIdentity:
b, _ := json.Marshal(c.Data)
cmd := CommandRemoveIdentity{}
json.Unmarshal(b, &cmd)
err = s.removeIdentity(cmd)
case OpSetPolicies:
b, _ := json.Marshal(c.Data)
cmd := CommandSetPolicies{}
json.Unmarshal(b, &cmd)
err = s.setPolicies(cmd)
case OpSetProcessNodeMap:
b, _ := json.Marshal(c.Data)
cmd := CommandSetProcessNodeMap{}
json.Unmarshal(b, &cmd)
err = s.setProcessNodeMap(cmd)
default:
s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation")
return nil
}
err = s.applyCommand(c)
if err != nil {
logger.Debug().WithError(err).WithField("operation", c.Operation).Log("")
@ -251,6 +183,127 @@ func (s *store) Apply(entry *raft.Log) interface{} {
return nil
}
func (s *store) applyCommand(c Command) error {
var b []byte
var err error = nil
switch c.Operation {
case OpAddProcess:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandAddProcess{}
err = json.Unmarshal(b, &cmd)
if err != nil {
break
}
err = s.addProcess(cmd)
case OpRemoveProcess:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandRemoveProcess{}
err = json.Unmarshal(b, &cmd)
if err != nil {
break
}
err = s.removeProcess(cmd)
case OpUpdateProcess:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandUpdateProcess{}
err = json.Unmarshal(b, &cmd)
if err != nil {
break
}
err = s.updateProcess(cmd)
case OpSetProcessMetadata:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandSetProcessMetadata{}
err = json.Unmarshal(b, &cmd)
if err != nil {
break
}
err = s.setProcessMetadata(cmd)
case OpAddIdentity:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandAddIdentity{}
err = json.Unmarshal(b, &cmd)
if err != nil {
break
}
err = s.addIdentity(cmd)
case OpUpdateIdentity:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandUpdateIdentity{}
err = json.Unmarshal(b, &cmd)
if err != nil {
break
}
err = s.updateIdentity(cmd)
case OpRemoveIdentity:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandRemoveIdentity{}
err = json.Unmarshal(b, &cmd)
if err != nil {
break
}
err = s.removeIdentity(cmd)
case OpSetPolicies:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandSetPolicies{}
err = json.Unmarshal(b, &cmd)
if err != nil {
break
}
err = s.setPolicies(cmd)
case OpSetProcessNodeMap:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandSetProcessNodeMap{}
err = json.Unmarshal(b, &cmd)
if err != nil {
break
}
err = s.setProcessNodeMap(cmd)
default:
s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation")
err = fmt.Errorf("unknown operation: %s", c.Operation)
}
return err
}
func (s *store) addProcess(cmd CommandAddProcess) error {
s.lock.Lock()
defer s.lock.Unlock()
@ -258,12 +311,12 @@ func (s *store) addProcess(cmd CommandAddProcess) error {
id := cmd.Config.ProcessID().String()
if cmd.Config.LimitCPU <= 0 || cmd.Config.LimitMemory <= 0 {
return NewStoreError("the process with the ID '%s' must have limits defined", id)
return fmt.Errorf("the process with the ID '%s' must have limits defined", id)
}
_, ok := s.Process[id]
if ok {
return NewStoreError("the process with the ID '%s' already exists", id)
return fmt.Errorf("the process with the ID '%s' already exists", id)
}
now := time.Now()
@ -285,7 +338,7 @@ func (s *store) removeProcess(cmd CommandRemoveProcess) error {
_, ok := s.Process[id]
if !ok {
return NewStoreError("the process with the ID '%s' doesn't exist", id)
return fmt.Errorf("the process with the ID '%s' doesn't exist", id)
}
delete(s.Process, id)
@ -301,12 +354,12 @@ func (s *store) updateProcess(cmd CommandUpdateProcess) error {
dstid := cmd.Config.ProcessID().String()
if cmd.Config.LimitCPU <= 0 || cmd.Config.LimitMemory <= 0 {
return NewStoreError("the process with the ID '%s' must have limits defined", dstid)
return fmt.Errorf("the process with the ID '%s' must have limits defined", dstid)
}
p, ok := s.Process[srcid]
if !ok {
return NewStoreError("the process with the ID '%s' doesn't exists", srcid)
return fmt.Errorf("the process with the ID '%s' doesn't exists", srcid)
}
if p.Config.Equal(cmd.Config) {
@ -318,17 +371,19 @@ func (s *store) updateProcess(cmd CommandUpdateProcess) error {
UpdatedAt: time.Now(),
Config: cmd.Config,
}
} else {
_, ok := s.Process[dstid]
if ok {
return NewStoreError("the process with the ID '%s' already exists", dstid)
}
delete(s.Process, srcid)
s.Process[dstid] = Process{
UpdatedAt: time.Now(),
Config: cmd.Config,
}
return nil
}
_, ok = s.Process[dstid]
if ok {
return fmt.Errorf("the process with the ID '%s' already exists", dstid)
}
delete(s.Process, srcid)
s.Process[dstid] = Process{
UpdatedAt: time.Now(),
Config: cmd.Config,
}
return nil
@ -342,7 +397,7 @@ func (s *store) setProcessMetadata(cmd CommandSetProcessMetadata) error {
p, ok := s.Process[id]
if !ok {
return NewStoreError("the process with the ID '%s' doesn't exists", cmd.ID)
return fmt.Errorf("the process with the ID '%s' doesn't exists", cmd.ID)
}
if p.Metadata == nil {
@ -367,7 +422,7 @@ func (s *store) addIdentity(cmd CommandAddIdentity) error {
_, ok := s.Users.Users[cmd.Identity.Name]
if ok {
return NewStoreError("the identity with the name '%s' already exists", cmd.Identity.Name)
return fmt.Errorf("the identity with the name '%s' already exists", cmd.Identity.Name)
}
s.Users.UpdatedAt = time.Now()
@ -381,21 +436,32 @@ func (s *store) updateIdentity(cmd CommandUpdateIdentity) error {
defer s.lock.Unlock()
_, ok := s.Users.Users[cmd.Name]
if ok {
if cmd.Name == cmd.Identity.Name {
s.Users.UpdatedAt = time.Now()
s.Users.Users[cmd.Identity.Name] = cmd.Identity
} else {
_, ok := s.Users.Users[cmd.Identity.Name]
if !ok {
s.Users.UpdatedAt = time.Now()
s.Users.Users[cmd.Identity.Name] = cmd.Identity
} else {
return NewStoreError("the identity with the name '%s' already exists", cmd.Identity.Name)
}
}
if !ok {
return fmt.Errorf("the identity with the name '%s' doesn't exist", cmd.Name)
}
if cmd.Name == cmd.Identity.Name {
s.Users.UpdatedAt = time.Now()
s.Users.Users[cmd.Identity.Name] = cmd.Identity
return nil
}
_, ok = s.Users.Users[cmd.Identity.Name]
if ok {
return fmt.Errorf("the identity with the name '%s' already exists", cmd.Identity.Name)
}
now := time.Now()
s.Users.UpdatedAt = now
s.Users.Users[cmd.Identity.Name] = cmd.Identity
s.Policies.UpdatedAt = now
s.Policies.Policies[cmd.Identity.Name] = s.Policies.Policies[cmd.Name]
delete(s.Users.Users, cmd.Name)
delete(s.Policies.Policies, cmd.Name)
return nil
}
@ -415,6 +481,10 @@ func (s *store) setPolicies(cmd CommandSetPolicies) error {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.Users.Users[cmd.Name]; !ok {
return fmt.Errorf("the identity with the name '%s' doesn't exist", cmd.Name)
}
delete(s.Policies.Policies, cmd.Name)
s.Policies.Policies[cmd.Name] = cmd.Policies
s.Policies.UpdatedAt = time.Now()

944
cluster/store/store_test.go Normal file
View File

@ -0,0 +1,944 @@
package store
import (
"encoding/json"
"testing"
"github.com/datarhei/core/v16/iam/access"
"github.com/datarhei/core/v16/iam/identity"
"github.com/datarhei/core/v16/restream/app"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/require"
)
func createStore() (*store, error) {
si, err := NewStore(Config{
Logger: nil,
})
if err != nil {
return nil, err
}
s := si.(*store)
return s, nil
}
func TestCreateStore(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
require.NotNil(t, s.Process)
require.NotNil(t, s.ProcessNodeMap)
require.NotNil(t, s.Users.Users)
require.NotNil(t, s.Policies.Policies)
}
func TestAddProcessCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
config := &app.Config{
ID: "foobar",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.applyCommand(Command{
Operation: OpAddProcess,
Data: CommandAddProcess{
Config: config,
},
})
require.NoError(t, err)
require.NotEmpty(t, s.Process)
p, ok := s.Process["foobar@"]
require.True(t, ok)
require.NotZero(t, p.CreatedAt)
require.NotZero(t, p.UpdatedAt)
require.NotNil(t, p.Config)
require.True(t, p.Config.Equal(config))
require.NotNil(t, p.Metadata)
}
func TestAddProcess(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
config := &app.Config{
ID: "foobar",
}
err = s.addProcess(CommandAddProcess{
Config: config,
})
require.Error(t, err)
require.Empty(t, s.Process)
config = &app.Config{
ID: "foobar",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.addProcess(CommandAddProcess{
Config: config,
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Process))
config = &app.Config{
ID: "foobar",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.addProcess(CommandAddProcess{
Config: config,
})
require.Error(t, err)
require.Equal(t, 1, len(s.Process))
config = &app.Config{
ID: "foobar",
Domain: "barfoo",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.addProcess(CommandAddProcess{
Config: config,
})
require.NoError(t, err)
require.Equal(t, 2, len(s.Process))
}
func TestRemoveProcessCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
config := &app.Config{
ID: "foobar",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.applyCommand(Command{
Operation: OpAddProcess,
Data: CommandAddProcess{
Config: config,
},
})
require.NoError(t, err)
require.NotEmpty(t, s.Process)
err = s.applyCommand(Command{
Operation: OpRemoveProcess,
Data: CommandRemoveProcess{
ID: config.ProcessID(),
},
})
require.NoError(t, err)
require.Empty(t, s.Process)
err = s.applyCommand(Command{
Operation: OpRemoveProcess,
Data: CommandRemoveProcess{
ID: config.ProcessID(),
},
})
require.Error(t, err)
}
func TestRemoveProcess(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
config1 := &app.Config{
ID: "foobar",
LimitCPU: 1,
LimitMemory: 1,
}
config2 := &app.Config{
ID: "foobar",
Domain: "barfoo",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.removeProcess(CommandRemoveProcess{
ID: config1.ProcessID(),
})
require.Error(t, err)
err = s.removeProcess(CommandRemoveProcess{
ID: config2.ProcessID(),
})
require.Error(t, err)
err = s.addProcess(CommandAddProcess{
Config: config1,
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Process))
err = s.addProcess(CommandAddProcess{
Config: config2,
})
require.NoError(t, err)
require.Equal(t, 2, len(s.Process))
err = s.removeProcess(CommandRemoveProcess{
ID: config1.ProcessID(),
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Process))
err = s.removeProcess(CommandRemoveProcess{
ID: config2.ProcessID(),
})
require.NoError(t, err)
require.Empty(t, s.Process)
}
func TestUpdateProcessCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
config := &app.Config{
ID: "foobar",
LimitCPU: 1,
LimitMemory: 1,
}
pid := config.ProcessID()
err = s.applyCommand(Command{
Operation: OpAddProcess,
Data: CommandAddProcess{
Config: config,
},
})
require.NoError(t, err)
require.NotEmpty(t, s.Process)
config = &app.Config{
ID: "foobaz",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.applyCommand(Command{
Operation: OpUpdateProcess,
Data: CommandUpdateProcess{
ID: pid,
Config: config,
},
})
require.NoError(t, err)
require.NotEmpty(t, s.Process)
}
func TestUpdateProcess(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
config1 := &app.Config{
ID: "foobar",
LimitCPU: 1,
LimitMemory: 1,
}
config2 := &app.Config{
ID: "fooboz",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.addProcess(CommandAddProcess{
Config: config1,
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Process))
err = s.addProcess(CommandAddProcess{
Config: config2,
})
require.NoError(t, err)
require.Equal(t, 2, len(s.Process))
config := &app.Config{
ID: "foobaz",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.updateProcess(CommandUpdateProcess{
ID: config.ProcessID(),
Config: config,
})
require.Error(t, err)
config.ID = "fooboz"
err = s.updateProcess(CommandUpdateProcess{
ID: config1.ProcessID(),
Config: config,
})
require.Error(t, err)
config.ID = "foobaz"
config.LimitCPU = 0
err = s.updateProcess(CommandUpdateProcess{
ID: config1.ProcessID(),
Config: config,
})
require.Error(t, err)
config.LimitCPU = 1
err = s.updateProcess(CommandUpdateProcess{
ID: config1.ProcessID(),
Config: config,
})
require.NoError(t, err)
require.Equal(t, 2, len(s.Process))
err = s.updateProcess(CommandUpdateProcess{
ID: config.ProcessID(),
Config: config,
})
require.NoError(t, err)
require.Equal(t, 2, len(s.Process))
config3 := &app.Config{
ID: config.ID,
LimitCPU: 1,
LimitMemory: 2,
}
err = s.updateProcess(CommandUpdateProcess{
ID: config.ProcessID(),
Config: config3,
})
require.NoError(t, err)
require.Equal(t, 2, len(s.Process))
_, err = s.GetProcess(config1.ProcessID())
require.Error(t, err)
_, err = s.GetProcess(config2.ProcessID())
require.NoError(t, err)
_, err = s.GetProcess(config.ProcessID())
require.NoError(t, err)
}
func TestSetProcessMetadataCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
config := &app.Config{
ID: "foobar",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.applyCommand(Command{
Operation: OpAddProcess,
Data: CommandAddProcess{
Config: config,
},
})
require.NoError(t, err)
require.NotEmpty(t, s.Process)
p, err := s.GetProcess(config.ProcessID())
require.NoError(t, err)
require.Empty(t, p.Metadata)
metadata := "bar"
err = s.applyCommand(Command{
Operation: OpSetProcessMetadata,
Data: CommandSetProcessMetadata{
ID: config.ProcessID(),
Key: "foo",
Data: metadata,
},
})
require.NoError(t, err)
p, err = s.GetProcess(config.ProcessID())
require.NoError(t, err)
require.NotEmpty(t, p.Metadata)
require.Equal(t, 1, len(p.Metadata))
require.Equal(t, "bar", p.Metadata["foo"])
}
func TestSetProcessMetadata(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
config := &app.Config{
ID: "foobar",
LimitCPU: 1,
LimitMemory: 1,
}
err = s.setProcessMetadata(CommandSetProcessMetadata{
ID: config.ProcessID(),
Key: "foo",
Data: "bar",
})
require.Error(t, err)
err = s.addProcess(CommandAddProcess{
Config: config,
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Process))
err = s.setProcessMetadata(CommandSetProcessMetadata{
ID: config.ProcessID(),
Key: "foo",
Data: "bar",
})
require.NoError(t, err)
err = s.setProcessMetadata(CommandSetProcessMetadata{
ID: config.ProcessID(),
Key: "faa",
Data: "boz",
})
require.NoError(t, err)
p, err := s.GetProcess(config.ProcessID())
require.NoError(t, err)
require.NotEmpty(t, p.Metadata)
require.Equal(t, 2, len(p.Metadata))
require.Equal(t, "bar", p.Metadata["foo"])
require.Equal(t, "boz", p.Metadata["faa"])
err = s.setProcessMetadata(CommandSetProcessMetadata{
ID: config.ProcessID(),
Key: "faa",
Data: nil,
})
require.NoError(t, err)
p, err = s.GetProcess(config.ProcessID())
require.NoError(t, err)
require.NotEmpty(t, p.Metadata)
require.Equal(t, 1, len(p.Metadata))
require.Equal(t, "bar", p.Metadata["foo"])
err = s.setProcessMetadata(CommandSetProcessMetadata{
ID: config.ProcessID(),
Key: "foo",
Data: "bor",
})
require.NoError(t, err)
p, err = s.GetProcess(config.ProcessID())
require.NoError(t, err)
require.NotEmpty(t, p.Metadata)
require.Equal(t, 1, len(p.Metadata))
require.Equal(t, "bor", p.Metadata["foo"])
}
func TestAddIdentityCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
identity := identity.User{
Name: "foobar",
}
err = s.applyCommand(Command{
Operation: OpAddIdentity,
Data: CommandAddIdentity{
Identity: identity,
},
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Users.Users))
}
func TestAddIdentity(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
identity := identity.User{
Name: "foobar",
}
err = s.addIdentity(CommandAddIdentity{
Identity: identity,
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Users.Users))
require.Equal(t, 0, len(s.Policies.Policies))
err = s.addIdentity(CommandAddIdentity{
Identity: identity,
})
require.Error(t, err)
require.Equal(t, 1, len(s.Users.Users))
require.Equal(t, 0, len(s.Policies.Policies))
}
func TestRemoveIdentityCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
identity := identity.User{
Name: "foobar",
}
err = s.applyCommand(Command{
Operation: OpAddIdentity,
Data: CommandAddIdentity{
Identity: identity,
},
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Users.Users))
err = s.applyCommand(Command{
Operation: OpRemoveIdentity,
Data: CommandRemoveIdentity{
Name: "foobar",
},
})
require.NoError(t, err)
require.Equal(t, 0, len(s.Users.Users))
}
func TestRemoveIdentity(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
identity := identity.User{
Name: "foobar",
}
err = s.addIdentity(CommandAddIdentity{
Identity: identity,
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Users.Users))
require.Equal(t, 0, len(s.Policies.Policies))
err = s.removeIdentity(CommandRemoveIdentity{
Name: "foobar",
})
require.NoError(t, err)
require.Equal(t, 0, len(s.Users.Users))
require.Equal(t, 0, len(s.Policies.Policies))
err = s.removeIdentity(CommandRemoveIdentity{
Name: "foobar",
})
require.NoError(t, err)
require.Equal(t, 0, len(s.Users.Users))
require.Equal(t, 0, len(s.Policies.Policies))
}
func TestSetPoliciesCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
identity := identity.User{
Name: "foobar",
}
err = s.applyCommand(Command{
Operation: OpAddIdentity,
Data: CommandAddIdentity{
Identity: identity,
},
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Users.Users))
require.Equal(t, 0, len(s.Policies.Policies))
err = s.applyCommand(Command{
Operation: OpSetPolicies,
Data: CommandSetPolicies{
Name: "foobar",
Policies: []access.Policy{
{
Name: "bla",
Domain: "bla",
Resource: "bla",
Actions: []string{},
},
{
Name: "foo",
Domain: "foo",
Resource: "foo",
Actions: []string{},
},
},
},
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Users.Users))
require.Equal(t, 2, len(s.Policies.Policies["foobar"]))
}
func TestSetPolicies(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
identity := identity.User{
Name: "foobar",
}
policies := []access.Policy{
{
Name: "bla",
Domain: "bla",
Resource: "bla",
Actions: []string{},
},
{
Name: "foo",
Domain: "foo",
Resource: "foo",
Actions: []string{},
},
}
err = s.setPolicies(CommandSetPolicies{
Name: "foobar",
Policies: policies,
})
require.Error(t, err)
require.Equal(t, 0, len(s.Policies.Policies["foobar"]))
err = s.addIdentity(CommandAddIdentity{
Identity: identity,
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Users.Users))
require.Equal(t, 0, len(s.Policies.Policies))
err = s.setPolicies(CommandSetPolicies{
Name: "foobar",
Policies: policies,
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Users.Users))
require.Equal(t, 2, len(s.Policies.Policies["foobar"]))
}
func TestUpdateUserCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
idty1 := identity.User{
Name: "foobar1",
}
idty2 := identity.User{
Name: "foobar2",
}
err = s.applyCommand(Command{
Operation: OpAddIdentity,
Data: CommandAddIdentity{
Identity: idty1,
},
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Users.Users))
err = s.applyCommand(Command{
Operation: OpAddIdentity,
Data: CommandAddIdentity{
Identity: idty2,
},
})
require.NoError(t, err)
require.Equal(t, 2, len(s.Users.Users))
err = s.applyCommand(Command{
Operation: OpUpdateIdentity,
Data: CommandUpdateIdentity{
Name: "foobar1",
Identity: identity.User{
Name: "foobar3",
},
},
})
require.NoError(t, err)
require.Equal(t, 2, len(s.Users.Users))
}
func TestUpdateIdentity(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
idty1 := identity.User{
Name: "foobar1",
}
idty2 := identity.User{
Name: "foobar2",
}
err = s.addIdentity(CommandAddIdentity{
Identity: idty1,
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Users.Users))
err = s.addIdentity(CommandAddIdentity{
Identity: idty2,
})
require.NoError(t, err)
require.Equal(t, 2, len(s.Users.Users))
idty := identity.User{
Name: "foobaz",
}
err = s.updateIdentity(CommandUpdateIdentity{
Name: "foobar",
Identity: idty,
})
require.Error(t, err)
require.Equal(t, 2, len(s.Users.Users))
idty.Name = "foobar2"
err = s.updateIdentity(CommandUpdateIdentity{
Name: "foobar1",
Identity: idty,
})
require.Error(t, err)
require.Equal(t, 2, len(s.Users.Users))
idty.Name = "foobaz"
err = s.updateIdentity(CommandUpdateIdentity{
Name: "foobar1",
Identity: idty,
})
require.NoError(t, err)
require.Equal(t, 2, len(s.Users.Users))
u := s.GetUser("foobar1")
require.Empty(t, u.Users)
u = s.GetUser("foobar2")
require.NotEmpty(t, u.Users)
u = s.GetUser("foobaz")
require.NotEmpty(t, u.Users)
}
func TestUpdateIdentityWithPolicies(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
idty1 := identity.User{
Name: "foobar",
}
policies := []access.Policy{
{
Name: "bla",
Domain: "bla",
Resource: "bla",
Actions: []string{},
},
{
Name: "foo",
Domain: "foo",
Resource: "foo",
Actions: []string{},
},
}
err = s.addIdentity(CommandAddIdentity{
Identity: idty1,
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Users.Users))
err = s.setPolicies(CommandSetPolicies{
Name: "foobar",
Policies: policies,
})
require.NoError(t, err)
require.Equal(t, 2, len(s.Policies.Policies["foobar"]))
err = s.updateIdentity(CommandUpdateIdentity{
Name: "foobar",
Identity: idty1,
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Users.Users))
require.Equal(t, 2, len(s.Policies.Policies["foobar"]))
idty2 := identity.User{
Name: "foobaz",
}
err = s.updateIdentity(CommandUpdateIdentity{
Name: "foobar",
Identity: idty2,
})
require.NoError(t, err)
require.Equal(t, 1, len(s.Users.Users))
require.Equal(t, 0, len(s.Policies.Policies["foobar"]))
require.Equal(t, 2, len(s.Policies.Policies["foobaz"]))
}
func TestSetProcessNodeMapCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
m1 := map[string]string{
"key": "value1",
}
err = s.applyCommand(Command{
Operation: OpSetProcessNodeMap,
Data: CommandSetProcessNodeMap{
Map: m1,
},
})
require.NoError(t, err)
require.Equal(t, m1, s.ProcessNodeMap)
}
func TestSetProcessNodeMap(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
m1 := map[string]string{
"key": "value1",
}
err = s.setProcessNodeMap(CommandSetProcessNodeMap{
Map: m1,
})
require.NoError(t, err)
require.Equal(t, m1, s.ProcessNodeMap)
m2 := map[string]string{
"key": "value2",
}
err = s.setProcessNodeMap(CommandSetProcessNodeMap{
Map: m2,
})
require.NoError(t, err)
require.Equal(t, m2, s.ProcessNodeMap)
m := s.GetProcessNodeMap()
require.Equal(t, m2, m)
}
func TestApplyCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
err = s.applyCommand(Command{
Operation: "unknown",
Data: nil,
})
require.Error(t, err)
}
func TestApply(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
entry := &raft.Log{
Index: 1,
Term: 1,
Type: raft.LogCommand,
Data: []byte("123"),
}
res := s.Apply(entry)
require.NotNil(t, res)
cmd := Command{
Operation: "unknown",
Data: nil,
}
data, err := json.Marshal(&cmd)
require.NoError(t, err)
require.NotEmpty(t, data)
entry.Data = data
res = s.Apply(entry)
require.NotNil(t, res)
cmd = Command{
Operation: OpSetProcessNodeMap,
Data: CommandSetProcessNodeMap{
Map: nil,
},
}
data, err = json.Marshal(&cmd)
require.NoError(t, err)
require.NotEmpty(t, data)
entry.Data = data
res = s.Apply(entry)
require.Nil(t, res)
}
func TestApplyWithCallback(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
var op Operation
s.OnApply(func(o Operation) {
op = o
})
cmd := Command{
Operation: OpSetProcessNodeMap,
Data: CommandSetProcessNodeMap{
Map: nil,
},
}
data, err := json.Marshal(&cmd)
require.NoError(t, err)
require.NotEmpty(t, data)
entry := &raft.Log{
Index: 1,
Term: 1,
Type: raft.LogCommand,
Data: data,
}
res := s.Apply(entry)
require.Nil(t, res)
require.Equal(t, OpSetProcessNodeMap, op)
}