From 5af5c686ee8448c95b1ba31b0da6f123f01e0e46 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 10 May 2023 14:01:40 +0200 Subject: [PATCH] Add basic rebalancing of processes --- cluster/leader.go | 206 +++++++++++++----- cluster/leader_test.go | 483 +++++++++++++++++++++++++++++++++++++++-- cluster/node.go | 9 +- 3 files changed, 615 insertions(+), 83 deletions(-) diff --git a/cluster/leader.go b/cluster/leader.go index bb28b4bc..c66550ea 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -2,7 +2,9 @@ package cluster import ( "context" + "errors" "fmt" + "sort" "sync" "time" @@ -224,6 +226,7 @@ func (c *cluster) leadershipTransfer() error { // https://github.com/hashicorp/consul/blob/44b39240a86bc94ddc67bc105286ab450bd869a9/agent/consul/leader.go#L146 func (c *cluster) leaderLoop(stopCh chan struct{}, emergency bool) { establishedLeader := false + RECONCILE: // Setup a reconciliation timer interval := time.After(10 * time.Second) @@ -311,11 +314,47 @@ func (c *cluster) startRebalance(ctx context.Context) { return case <-ticker.C: c.doSynchronize() - //c.doRebalance() + c.doRebalance() } } } +var errNotEnoughResources = errors.New("no node with enough resources is available") +var errNotEnoughResourcesForRebalancing = errors.New("no other node to move the process to is available") +var errNoLimitsDefined = errors.New("no process limits are defined") + +type processOpDelete struct { + nodeid string + processid string +} + +type processOpMove struct { + fromNodeid string + toNodeid string + config *app.Config +} + +type processOpStart struct { + nodeid string + processid string +} + +type processOpAdd struct { + nodeid string + config *app.Config +} + +type processOpReject struct { + processid string + err error +} + +type processOpSkip struct { + nodeid string + processid string + err error +} + func (c *cluster) applyOpStack(stack []interface{}) { for _, op := range stack { switch v := op.(type) { @@ -399,8 +438,13 @@ func (c *cluster) applyOpStack(stack []interface{}) { "processid": v.processid, "nodeid": v.nodeid, }).Log("Starting process") + case processOpReject: + c.logger.Warn().WithError(v.err).WithField("processid", v.processid).Log("Process rejected") case processOpSkip: - c.logger.Warn().WithField("processid", v.processid).Log("Not enough resources to deploy process") + c.logger.Warn().WithError(v.err).WithFields(log.Fields{ + "nodeid": v.nodeid, + "processid": v.processid, + }).Log("Process skipped") default: c.logger.Warn().Log("Unknown operation on stack: %+v", v) } @@ -421,36 +465,11 @@ func (c *cluster) doRebalance() { have := c.proxy.ProcessList() resources := c.proxy.Resources() - opStack := c.rebalance(have, resources) + opStack := rebalance(have, resources) c.applyOpStack(opStack) } -type processOpDelete struct { - nodeid string - processid string -} - -type processOpMove struct { - fromNodeid string - toNodeid string - config *app.Config -} - -type processOpStart struct { - nodeid string - processid string -} - -type processOpAdd struct { - nodeid string - config *app.Config -} - -type processOpSkip struct { - processid string -} - // normalizeProcessesAndResources normalizes the CPU and memory consumption of the processes and resources in-place. func normalizeProcessesAndResources(processes []ProcessConfig, resources map[string]NodeResources) { maxNCPU := .0 @@ -525,6 +544,7 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N processid: p.Config.ID, }) + // Adjust the resources r, ok := resources[p.NodeID] if ok { r.CPU -= p.CPU @@ -557,6 +577,16 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N // Now all remaining processes in the wantMap must be added to one of the nodes for _, config := range wantMap { + // If a process doesn't have any limits defined, reject that process + if config.LimitCPU <= 0 || config.LimitMemory <= 0 { + opStack = append(opStack, processOpReject{ + processid: config.ID, + err: errNoLimitsDefined, + }) + + continue + } + // Check if there are already processes with the same reference, and if so // chose this node. Then check the node if it has enough resources left. If // not, then select a node with the most available resources. @@ -565,9 +595,9 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N if len(nodeid) != 0 { cpu := config.LimitCPU / resources[nodeid].NCPU - mem := float64(config.LimitMemory) / float64(resources[nodeid].MemTotal) + mem := float64(config.LimitMemory) / float64(resources[nodeid].MemTotal) * 100 - if resources[nodeid].CPU+cpu < 90 && resources[nodeid].Mem+mem < 90 { + if resources[nodeid].CPU+cpu < resources[nodeid].CPULimit && resources[nodeid].Mem+mem < resources[nodeid].MemLimit { opStack = append(opStack, processOpAdd{ nodeid: nodeid, config: config, @@ -581,10 +611,10 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N nodeid = "" for id, r := range resources { cpu := config.LimitCPU / r.NCPU - mem := float64(config.LimitMemory) / float64(r.MemTotal) + mem := float64(config.LimitMemory) / float64(r.MemTotal) * 100 if len(nodeid) == 0 { - if r.CPU+cpu < 90 && r.Mem+mem < 90 { + if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit { nodeid = id } @@ -610,9 +640,18 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N nodeid: nodeid, config: config, }) + + // Adjust the resources + r, ok := resources[nodeid] + if ok { + r.CPU += config.LimitCPU / r.NCPU + r.Mem += float64(config.LimitMemory) / float64(r.MemTotal) * 100 + resources[nodeid] = r + } } else { - opStack = append(opStack, processOpSkip{ + opStack = append(opStack, processOpReject{ processid: config.ID, + err: errNotEnoughResources, }) } } @@ -620,52 +659,101 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N return opStack } -// rebalance returns a list of operations that will lead to nodes that are not overloaded -func (c *cluster) rebalance(have []ProcessConfig, resources map[string]NodeResources) []interface{} { +// rebalance returns a list of operations that will move running processes away from nodes +// that are overloaded. +func rebalance(have []ProcessConfig, resources map[string]NodeResources) []interface{} { + // Group the processes by node processNodeMap := map[string][]ProcessConfig{} for _, p := range have { processNodeMap[p.NodeID] = append(processNodeMap[p.NodeID], p) } + // Sort the processes by their runtime (if they are running) for each node + for nodeid, processes := range processNodeMap { + sort.SliceStable(processes, func(a, b int) bool { + if processes[a].State == "running" { + if processes[b].State != "running" { + return false + } + + return processes[a].Runtime < processes[b].Runtime + } + + return false + }) + + processNodeMap[nodeid] = processes + } + opStack := []interface{}{} // Check if any of the nodes is overloaded for id, r := range resources { - if r.CPU >= 90 || r.Mem >= 90 { - // Node is overloaded + // Check if node is overloaded + if r.CPU < r.CPULimit && r.Mem < r.MemLimit { + continue + } - // Find another node with more resources available - nodeid := id + // Pick the first process from that node and move it to another node with enough free resources + processes := processNodeMap[id] + if len(processes) == 0 { + // If there are no processes on that node, we can't do anything + continue + } + + overloadedNodeid := id + + for _, p := range processes { + if p.State != "running" { + // We consider only currently running processes + continue + } + + // Find another node with enough resources available + availableNodeid := "" for id, r := range resources { - if id == nodeid { + if id == overloadedNodeid { + // Skip the overloaded node continue } - if r.CPU < resources[nodeid].CPU && r.Mem < resources[nodeid].Mem { - nodeid = id + if r.CPU+p.CPU < r.CPULimit && r.Mem+p.Mem < r.MemLimit { + availableNodeid = id + break } } - if nodeid != id { - // there's an other node with more resources available - diff_cpu := r.CPU - resources[nodeid].CPU - diff_mem := r.Mem - resources[nodeid].Mem + if len(availableNodeid) == 0 { + // There's no other node with enough resources to take over this process + opStack = append(opStack, processOpSkip{ + nodeid: overloadedNodeid, + processid: p.Config.ID, + err: errNotEnoughResourcesForRebalancing, + }) + continue + } - // all processes that could be moved, should be listed in - // an arry. this array should be sorted by the process' runtime - // in order to chose the one with the least runtime. repeat. - for _, p := range processNodeMap[id] { - if p.CPU < diff_cpu && p.Mem < diff_mem { - opStack = append(opStack, processOpMove{ - fromNodeid: id, - toNodeid: nodeid, - config: p.Config, - }) + opStack = append(opStack, processOpMove{ + fromNodeid: overloadedNodeid, + toNodeid: availableNodeid, + config: p.Config, + }) - break - } - } + // Adjust the resources + r = resources[availableNodeid] + r.CPU += p.CPU + r.Mem += p.Mem + resources[availableNodeid] = r + + r = resources[overloadedNodeid] + r.CPU -= p.CPU + r.Mem -= p.Mem + resources[overloadedNodeid] = r + + // If this node is not anymore overloaded, stop moving processes around + if r.CPU < r.CPULimit && r.Mem < r.MemLimit { + break } } } diff --git a/cluster/leader_test.go b/cluster/leader_test.go index 0721a3ea..475b4263 100644 --- a/cluster/leader_test.go +++ b/cluster/leader_test.go @@ -116,14 +116,18 @@ func TestSynchronizeAdd(t *testing.T) { "node1": { NCPU: 1, CPU: 7, - Mem: 67.5, + Mem: 65, MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, }, "node2": { NCPU: 1, - CPU: 87.5, + CPU: 85, Mem: 11, MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, }, } @@ -139,6 +143,25 @@ func TestSynchronizeAdd(t *testing.T) { }, }, }, stack) + + require.Equal(t, map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 17, + Mem: 65 + (50. / (4. * 1024) * 100), + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 85, + Mem: 11, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + }, resources) } func TestSynchronizeAddLimit(t *testing.T) { @@ -158,12 +181,16 @@ func TestSynchronizeAddLimit(t *testing.T) { CPU: 81, Mem: 72, MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, }, "node2": { NCPU: 1, CPU: 79, Mem: 72, MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, }, } @@ -179,6 +206,143 @@ func TestSynchronizeAddLimit(t *testing.T) { }, }, }, stack) + + require.Equal(t, map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 81, + Mem: 72, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 89, + Mem: 72 + (50. / (4. * 1024) * 100), + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + }, resources) +} + +func TestSynchronizeAddNoResourcesCPU(t *testing.T) { + want := []app.Config{ + { + ID: "foobar", + LimitCPU: 30, + LimitMemory: 50 * 1024 * 1024, + }, + } + + have := []ProcessConfig{} + + resources := map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 81, + Mem: 72, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 79, + Mem: 72, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + } + + stack := synchronize(want, have, resources) + + require.Equal(t, []interface{}{ + processOpReject{ + processid: "foobar", + err: errNotEnoughResources, + }, + }, stack) +} + +func TestSynchronizeAddNoResourcesMemory(t *testing.T) { + want := []app.Config{ + { + ID: "foobar", + LimitCPU: 1, + LimitMemory: 2 * 1024 * 1024 * 1024, + }, + } + + have := []ProcessConfig{} + + resources := map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 81, + Mem: 72, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 79, + Mem: 72, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + } + + stack := synchronize(want, have, resources) + + require.Equal(t, []interface{}{ + processOpReject{ + processid: "foobar", + err: errNotEnoughResources, + }, + }, stack) +} + +func TestSynchronizeAddNoLimits(t *testing.T) { + want := []app.Config{ + { + ID: "foobar", + }, + } + + have := []ProcessConfig{} + + resources := map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 81, + Mem: 72, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 79, + Mem: 72, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + } + + stack := synchronize(want, have, resources) + + require.Equal(t, []interface{}{ + processOpReject{ + processid: "foobar", + err: errNoLimitsDefined, + }, + }, stack) } func TestSynchronizeRemove(t *testing.T) { @@ -202,46 +366,56 @@ func TestSynchronizeRemove(t *testing.T) { "node1": { NCPU: 1, CPU: 7, - Mem: 67.5, + Mem: 65, MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, }, "node2": { NCPU: 1, - CPU: 87.5, + CPU: 85, Mem: 11, MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, }, } stack := synchronize(want, have, resources) - require.Equal(t, map[string]NodeResources{ - "node1": { - NCPU: 1, - CPU: 7, - Mem: 67.5, - MemTotal: 4 * 1024 * 1024 * 1024, - }, - "node2": { - NCPU: 1, - CPU: 75.5, - Mem: 6, - MemTotal: 4 * 1024 * 1024 * 1024, - }, - }, resources) - require.Equal(t, []interface{}{ processOpDelete{ nodeid: "node2", processid: "foobar", }, }, stack) + + require.Equal(t, map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 7, + Mem: 65, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 73, + Mem: 6, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + }, resources) } func TestSynchronizeAddRemove(t *testing.T) { want := []app.Config{ { - ID: "foobar1", + ID: "foobar1", + LimitCPU: 10, + LimitMemory: 50 * 1024 * 1024, }, } @@ -263,14 +437,18 @@ func TestSynchronizeAddRemove(t *testing.T) { "node1": { NCPU: 1, CPU: 7, - Mem: 67.5, + Mem: 65, MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, }, "node2": { NCPU: 1, - CPU: 87.5, + CPU: 85, Mem: 11, MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, }, } @@ -284,8 +462,267 @@ func TestSynchronizeAddRemove(t *testing.T) { processOpAdd{ nodeid: "node1", config: &app.Config{ - ID: "foobar1", + ID: "foobar1", + LimitCPU: 10, + LimitMemory: 50 * 1024 * 1024, }, }, }, stack) + + require.Equal(t, map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 17, + Mem: 65 + (50. / (4. * 1024) * 100), + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 73, + Mem: 6, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + }, resources) +} + +func TestRebalanceNothingToDo(t *testing.T) { + processes := []ProcessConfig{ + { + NodeID: "node1", + Order: "start", + State: "running", + CPU: 35, + Mem: 20, + Runtime: 42, + Config: &app.Config{ + ID: "foobar1", + }, + }, + { + NodeID: "node2", + Order: "start", + State: "running", + CPU: 12, + Mem: 5, + Runtime: 42, + Config: &app.Config{ + ID: "foobar2", + }, + }, + } + + resources := map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 42, + Mem: 35, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 37, + Mem: 11, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + } + + opStack := rebalance(processes, resources) + + require.Empty(t, opStack) +} + +func TestRebalanceOverload(t *testing.T) { + processes := []ProcessConfig{ + { + NodeID: "node1", + Order: "start", + State: "running", + CPU: 35, + Mem: 20, + Runtime: 42, + Config: &app.Config{ + ID: "foobar1", + }, + }, + { + NodeID: "node1", + Order: "start", + State: "running", + CPU: 17, + Mem: 31, + Runtime: 27, + Config: &app.Config{ + ID: "foobar3", + }, + }, + { + NodeID: "node2", + Order: "start", + State: "running", + CPU: 12, + Mem: 5, + Runtime: 42, + Config: &app.Config{ + ID: "foobar2", + }, + }, + } + + resources := map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 91, + Mem: 35, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 15, + Mem: 11, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + } + + opStack := rebalance(processes, resources) + + require.NotEmpty(t, opStack) + + require.Equal(t, []interface{}{ + processOpMove{ + fromNodeid: "node1", + toNodeid: "node2", + config: &app.Config{ + ID: "foobar3", + }, + }, + }, opStack) + + require.Equal(t, map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 74, + Mem: 4, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 32, + Mem: 42, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + }, resources) +} + +func TestRebalanceSkip(t *testing.T) { + processes := []ProcessConfig{ + { + NodeID: "node1", + Order: "start", + State: "running", + CPU: 35, + Mem: 20, + Runtime: 42, + Config: &app.Config{ + ID: "foobar1", + }, + }, + { + NodeID: "node1", + Order: "start", + State: "running", + CPU: 17, + Mem: 31, + Runtime: 27, + Config: &app.Config{ + ID: "foobar3", + }, + }, + { + NodeID: "node2", + Order: "start", + State: "running", + CPU: 12, + Mem: 5, + Runtime: 42, + Config: &app.Config{ + ID: "foobar2", + }, + }, + } + + resources := map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 91, + Mem: 35, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 15, + Mem: 92, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + } + + opStack := rebalance(processes, resources) + + require.NotEmpty(t, opStack) + + require.Equal(t, []interface{}{ + processOpSkip{ + nodeid: "node1", + processid: "foobar3", + err: errNotEnoughResourcesForRebalancing, + }, + processOpSkip{ + nodeid: "node1", + processid: "foobar1", + err: errNotEnoughResourcesForRebalancing, + }, + processOpSkip{ + nodeid: "node2", + processid: "foobar2", + err: errNotEnoughResourcesForRebalancing, + }, + }, opStack) + + require.Equal(t, map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 91, + Mem: 35, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 15, + Mem: 92, + MemTotal: 4 * 1024 * 1024 * 1024, + CPULimit: 90, + MemLimit: 90, + }, + }, resources) } diff --git a/cluster/node.go b/cluster/node.go index 9f2665f1..12703327 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -53,8 +53,10 @@ type NodeFiles struct { type NodeResources struct { NCPU float64 CPU float64 + CPULimit float64 Mem float64 MemTotal float64 + MemLimit float64 } type NodeState struct { @@ -405,8 +407,10 @@ func (n *node) State() NodeState { Resources: NodeResources{ NCPU: n.resources.ncpu, CPU: n.resources.cpu, + CPULimit: 90, Mem: n.resources.mem, MemTotal: n.resources.memTotal, + MemLimit: 90, }, } @@ -576,7 +580,10 @@ func (n *node) GetFile(path string) (io.ReadCloser, error) { } func (n *node) ProcessList() ([]ProcessConfig, error) { - list, err := n.peer.ProcessList(nil, nil) + list, err := n.peer.ProcessList(nil, []string{ + "state", + "config", + }) if err != nil { return nil, err }