diff --git a/cluster/leader.go b/cluster/leader.go index d89f6f64..bd2dea32 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -521,8 +521,6 @@ func normalizeProcessesAndResources(processes []ProcessConfig, resources map[str // synchronize returns a list of operations in order to adjust the "have" list to the "want" list // with taking the available resources on each node into account. func synchronize(want []app.Config, have []ProcessConfig, resources map[string]NodeResources) []interface{} { - opStack := []interface{}{} - normalizeProcessesAndResources(have, resources) // A map from the process ID to the process config of the processes @@ -532,11 +530,13 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N wantMap[config.ID] = &config } - haveAfterRemove := []ProcessConfig{} + opStack := []interface{}{} // Now we iterate through the processes we actually have running on the nodes // and remove them from the wantMap. We also make sure that they are running. // If a process is not on the wantMap, it will be deleted from the nodes. + haveAfterRemove := []ProcessConfig{} + for _, p := range have { if _, ok := wantMap[p.Config.ID]; !ok { opStack = append(opStack, processOpDelete{ @@ -569,18 +569,10 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N have = haveAfterRemove - // A map from the process reference to the node it is running on - haveReferenceAffinityMap := map[string]string{} - for _, p := range have { - if len(p.Config.Reference) == 0 { - continue - } + createReferenceAffinityMap(have) - // This is a simplification because a reference could be on several nodes, - // but here take into consideration on which node the reference has been - // seen the last. This is good enough for now. - haveReferenceAffinityMap[p.Config.Reference] = p.NodeID - } + // A map from the process reference to the node it is running on + haveReferenceAffinityMap := createReferenceAffinityMap(have) // Now all remaining processes in the wantMap must be added to one of the nodes for _, config := range wantMap { @@ -597,49 +589,50 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N // Check if there are already processes with the same reference, and if so // chose this node. Then check the node if it has enough resources left. If // not, then select a node with the most available resources. - reference := config.Reference - nodeid := haveReferenceAffinityMap[reference] + nodeid := "" - if len(nodeid) != 0 { - cpu := config.LimitCPU / resources[nodeid].NCPU - mem := float64(config.LimitMemory) / float64(resources[nodeid].MemTotal) * 100 + // Try to add the process to a node where other processes with the same + // reference currently reside. + if len(config.Reference) != 0 { + for _, count := range haveReferenceAffinityMap[config.Reference] { + r := resources[count.nodeid] + cpu := config.LimitCPU / r.NCPU + mem := float64(config.LimitMemory) / r.MemTotal * 100 - if resources[nodeid].CPU+cpu < resources[nodeid].CPULimit && resources[nodeid].Mem+mem < resources[nodeid].MemLimit { - opStack = append(opStack, processOpAdd{ - nodeid: nodeid, - config: config, - }) - - continue + if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit { + nodeid = count.nodeid + break + } } } // Find the node with the most resources available - nodeid = "" - for id, r := range resources { - cpu := config.LimitCPU / r.NCPU - mem := float64(config.LimitMemory) / float64(r.MemTotal) * 100 + if len(nodeid) == 0 { + for id, r := range resources { + cpu := config.LimitCPU / r.NCPU + mem := float64(config.LimitMemory) / float64(r.MemTotal) * 100 - if len(nodeid) == 0 { - if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit { - nodeid = id + if len(nodeid) == 0 { + if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit { + nodeid = id + } + + continue } - continue - } - - if r.CPU+r.Mem < resources[nodeid].CPU+resources[nodeid].Mem { - nodeid = id - } - /* - if r.CPU < resources[nodeid].CPU && r.Mem < resources[nodeid].Mem { - nodeid = id - } else if r.Mem < resources[nodeid].Mem { - nodeid = id - } else if r.CPU < resources[nodeid].CPU { + if r.CPU+r.Mem < resources[nodeid].CPU+resources[nodeid].Mem { nodeid = id } - */ + /* + if r.CPU < resources[nodeid].CPU && r.Mem < resources[nodeid].Mem { + nodeid = id + } else if r.Mem < resources[nodeid].Mem { + nodeid = id + } else if r.CPU < resources[nodeid].CPU { + nodeid = id + } + */ + } } if len(nodeid) != 0 { @@ -717,6 +710,8 @@ func createReferenceAffinityMap(processes []ProcessConfig) map[string][]referenc // rebalance returns a list of operations that will move running processes away from nodes // that are overloaded. func rebalance(have []ProcessConfig, resources map[string]NodeResources) []interface{} { + normalizeProcessesAndResources(have, resources) + // Group the processes by node processNodeMap := map[string][]ProcessConfig{} diff --git a/cluster/leader_test.go b/cluster/leader_test.go index 388c399f..c371af2e 100644 --- a/cluster/leader_test.go +++ b/cluster/leader_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/datarhei/core/v16/restream/app" + "github.com/stretchr/testify/require" ) @@ -164,6 +165,71 @@ func TestSynchronizeAdd(t *testing.T) { }, resources) } +func TestSynchronizeAddReferenceAffinity(t *testing.T) { + want := []app.Config{ + { + ID: "foobar", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 50 * 1024 * 1024, + }, + { + ID: "foobar2", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 50 * 1024 * 1024, + }, + } + + have := []ProcessConfig{ + { + NodeID: "node2", + Order: "start", + State: "running", + CPU: 12, + Mem: 5, + Runtime: 42, + Config: &app.Config{ + ID: "foobar", + Reference: "barfoo", + }, + }, + } + + resources := map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 1, + Mem: 1, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 1, + Mem: 1, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + } + + stack := synchronize(want, have, resources) + + require.Equal(t, []interface{}{ + processOpAdd{ + nodeid: "node2", + config: &app.Config{ + ID: "foobar2", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 50 * 1024 * 1024, + }, + }, + }, stack) +} + func TestSynchronizeAddLimit(t *testing.T) { want := []app.Config{ { @@ -689,7 +755,7 @@ func TestRebalanceSkip(t *testing.T) { require.NotEmpty(t, opStack) - require.Equal(t, []interface{}{ + require.ElementsMatch(t, []interface{}{ processOpSkip{ nodeid: "node1", processid: "foobar3",