Add basic rebalancing of processes

This commit is contained in:
Ingo Oppermann 2023-05-10 14:01:40 +02:00
parent ae04dc50c7
commit 5af5c686ee
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
3 changed files with 615 additions and 83 deletions

View File

@ -2,7 +2,9 @@ package cluster
import (
"context"
"errors"
"fmt"
"sort"
"sync"
"time"
@ -224,6 +226,7 @@ func (c *cluster) leadershipTransfer() error {
// https://github.com/hashicorp/consul/blob/44b39240a86bc94ddc67bc105286ab450bd869a9/agent/consul/leader.go#L146
func (c *cluster) leaderLoop(stopCh chan struct{}, emergency bool) {
establishedLeader := false
RECONCILE:
// Setup a reconciliation timer
interval := time.After(10 * time.Second)
@ -311,11 +314,47 @@ func (c *cluster) startRebalance(ctx context.Context) {
return
case <-ticker.C:
c.doSynchronize()
//c.doRebalance()
c.doRebalance()
}
}
}
var errNotEnoughResources = errors.New("no node with enough resources is available")
var errNotEnoughResourcesForRebalancing = errors.New("no other node to move the process to is available")
var errNoLimitsDefined = errors.New("no process limits are defined")
type processOpDelete struct {
nodeid string
processid string
}
type processOpMove struct {
fromNodeid string
toNodeid string
config *app.Config
}
type processOpStart struct {
nodeid string
processid string
}
type processOpAdd struct {
nodeid string
config *app.Config
}
type processOpReject struct {
processid string
err error
}
type processOpSkip struct {
nodeid string
processid string
err error
}
func (c *cluster) applyOpStack(stack []interface{}) {
for _, op := range stack {
switch v := op.(type) {
@ -399,8 +438,13 @@ func (c *cluster) applyOpStack(stack []interface{}) {
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Starting process")
case processOpReject:
c.logger.Warn().WithError(v.err).WithField("processid", v.processid).Log("Process rejected")
case processOpSkip:
c.logger.Warn().WithField("processid", v.processid).Log("Not enough resources to deploy process")
c.logger.Warn().WithError(v.err).WithFields(log.Fields{
"nodeid": v.nodeid,
"processid": v.processid,
}).Log("Process skipped")
default:
c.logger.Warn().Log("Unknown operation on stack: %+v", v)
}
@ -421,36 +465,11 @@ func (c *cluster) doRebalance() {
have := c.proxy.ProcessList()
resources := c.proxy.Resources()
opStack := c.rebalance(have, resources)
opStack := rebalance(have, resources)
c.applyOpStack(opStack)
}
type processOpDelete struct {
nodeid string
processid string
}
type processOpMove struct {
fromNodeid string
toNodeid string
config *app.Config
}
type processOpStart struct {
nodeid string
processid string
}
type processOpAdd struct {
nodeid string
config *app.Config
}
type processOpSkip struct {
processid string
}
// normalizeProcessesAndResources normalizes the CPU and memory consumption of the processes and resources in-place.
func normalizeProcessesAndResources(processes []ProcessConfig, resources map[string]NodeResources) {
maxNCPU := .0
@ -525,6 +544,7 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N
processid: p.Config.ID,
})
// Adjust the resources
r, ok := resources[p.NodeID]
if ok {
r.CPU -= p.CPU
@ -557,6 +577,16 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N
// Now all remaining processes in the wantMap must be added to one of the nodes
for _, config := range wantMap {
// If a process doesn't have any limits defined, reject that process
if config.LimitCPU <= 0 || config.LimitMemory <= 0 {
opStack = append(opStack, processOpReject{
processid: config.ID,
err: errNoLimitsDefined,
})
continue
}
// Check if there are already processes with the same reference, and if so
// chose this node. Then check the node if it has enough resources left. If
// not, then select a node with the most available resources.
@ -565,9 +595,9 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N
if len(nodeid) != 0 {
cpu := config.LimitCPU / resources[nodeid].NCPU
mem := float64(config.LimitMemory) / float64(resources[nodeid].MemTotal)
mem := float64(config.LimitMemory) / float64(resources[nodeid].MemTotal) * 100
if resources[nodeid].CPU+cpu < 90 && resources[nodeid].Mem+mem < 90 {
if resources[nodeid].CPU+cpu < resources[nodeid].CPULimit && resources[nodeid].Mem+mem < resources[nodeid].MemLimit {
opStack = append(opStack, processOpAdd{
nodeid: nodeid,
config: config,
@ -581,10 +611,10 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N
nodeid = ""
for id, r := range resources {
cpu := config.LimitCPU / r.NCPU
mem := float64(config.LimitMemory) / float64(r.MemTotal)
mem := float64(config.LimitMemory) / float64(r.MemTotal) * 100
if len(nodeid) == 0 {
if r.CPU+cpu < 90 && r.Mem+mem < 90 {
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit {
nodeid = id
}
@ -610,9 +640,18 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N
nodeid: nodeid,
config: config,
})
// Adjust the resources
r, ok := resources[nodeid]
if ok {
r.CPU += config.LimitCPU / r.NCPU
r.Mem += float64(config.LimitMemory) / float64(r.MemTotal) * 100
resources[nodeid] = r
}
} else {
opStack = append(opStack, processOpSkip{
opStack = append(opStack, processOpReject{
processid: config.ID,
err: errNotEnoughResources,
})
}
}
@ -620,52 +659,101 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N
return opStack
}
// rebalance returns a list of operations that will lead to nodes that are not overloaded
func (c *cluster) rebalance(have []ProcessConfig, resources map[string]NodeResources) []interface{} {
// rebalance returns a list of operations that will move running processes away from nodes
// that are overloaded.
func rebalance(have []ProcessConfig, resources map[string]NodeResources) []interface{} {
// Group the processes by node
processNodeMap := map[string][]ProcessConfig{}
for _, p := range have {
processNodeMap[p.NodeID] = append(processNodeMap[p.NodeID], p)
}
// Sort the processes by their runtime (if they are running) for each node
for nodeid, processes := range processNodeMap {
sort.SliceStable(processes, func(a, b int) bool {
if processes[a].State == "running" {
if processes[b].State != "running" {
return false
}
return processes[a].Runtime < processes[b].Runtime
}
return false
})
processNodeMap[nodeid] = processes
}
opStack := []interface{}{}
// Check if any of the nodes is overloaded
for id, r := range resources {
if r.CPU >= 90 || r.Mem >= 90 {
// Node is overloaded
// Check if node is overloaded
if r.CPU < r.CPULimit && r.Mem < r.MemLimit {
continue
}
// Find another node with more resources available
nodeid := id
// Pick the first process from that node and move it to another node with enough free resources
processes := processNodeMap[id]
if len(processes) == 0 {
// If there are no processes on that node, we can't do anything
continue
}
overloadedNodeid := id
for _, p := range processes {
if p.State != "running" {
// We consider only currently running processes
continue
}
// Find another node with enough resources available
availableNodeid := ""
for id, r := range resources {
if id == nodeid {
if id == overloadedNodeid {
// Skip the overloaded node
continue
}
if r.CPU < resources[nodeid].CPU && r.Mem < resources[nodeid].Mem {
nodeid = id
if r.CPU+p.CPU < r.CPULimit && r.Mem+p.Mem < r.MemLimit {
availableNodeid = id
break
}
}
if nodeid != id {
// there's an other node with more resources available
diff_cpu := r.CPU - resources[nodeid].CPU
diff_mem := r.Mem - resources[nodeid].Mem
if len(availableNodeid) == 0 {
// There's no other node with enough resources to take over this process
opStack = append(opStack, processOpSkip{
nodeid: overloadedNodeid,
processid: p.Config.ID,
err: errNotEnoughResourcesForRebalancing,
})
continue
}
// all processes that could be moved, should be listed in
// an arry. this array should be sorted by the process' runtime
// in order to chose the one with the least runtime. repeat.
for _, p := range processNodeMap[id] {
if p.CPU < diff_cpu && p.Mem < diff_mem {
opStack = append(opStack, processOpMove{
fromNodeid: id,
toNodeid: nodeid,
config: p.Config,
})
opStack = append(opStack, processOpMove{
fromNodeid: overloadedNodeid,
toNodeid: availableNodeid,
config: p.Config,
})
break
}
}
// Adjust the resources
r = resources[availableNodeid]
r.CPU += p.CPU
r.Mem += p.Mem
resources[availableNodeid] = r
r = resources[overloadedNodeid]
r.CPU -= p.CPU
r.Mem -= p.Mem
resources[overloadedNodeid] = r
// If this node is not anymore overloaded, stop moving processes around
if r.CPU < r.CPULimit && r.Mem < r.MemLimit {
break
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -53,8 +53,10 @@ type NodeFiles struct {
type NodeResources struct {
NCPU float64
CPU float64
CPULimit float64
Mem float64
MemTotal float64
MemLimit float64
}
type NodeState struct {
@ -405,8 +407,10 @@ func (n *node) State() NodeState {
Resources: NodeResources{
NCPU: n.resources.ncpu,
CPU: n.resources.cpu,
CPULimit: 90,
Mem: n.resources.mem,
MemTotal: n.resources.memTotal,
MemLimit: 90,
},
}
@ -576,7 +580,10 @@ func (n *node) GetFile(path string) (io.ReadCloser, error) {
}
func (n *node) ProcessList() ([]ProcessConfig, error) {
list, err := n.peer.ProcessList(nil, nil)
list, err := n.peer.ProcessList(nil, []string{
"state",
"config",
})
if err != nil {
return nil, err
}