Simplify rebalancing
This commit is contained in:
parent
02079e30c5
commit
bc04bb2df8
@ -393,9 +393,9 @@ func (c *cluster) clearLocks(ctx context.Context, interval time.Duration) {
|
||||
}
|
||||
}
|
||||
|
||||
var errNotEnoughResources = errors.New("no node with enough resources is available")
|
||||
var errNotEnoughResourcesForRebalancing = errors.New("no other node to move the process to is available")
|
||||
var errNoLimitsDefined = errors.New("no process limits are defined")
|
||||
var errNotEnoughResourcesForDeployment = errors.New("no node with enough resources for deployment is available")
|
||||
var errNotEnoughResourcesForRebalancing = errors.New("no node with enough resources for rebalancing is available")
|
||||
var errNoLimitsDefined = errors.New("this process has no limits defined")
|
||||
|
||||
type processOpDelete struct {
|
||||
nodeid string
|
||||
@ -1059,7 +1059,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
|
||||
} else {
|
||||
opStack = append(opStack, processOpReject{
|
||||
processid: process.Config.ProcessID(),
|
||||
err: errNotEnoughResources,
|
||||
err: errNotEnoughResourcesForDeployment,
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1077,13 +1077,14 @@ func hasNodeEnoughResources(r proxy.NodeResources, cpu float64, mem uint64) bool
|
||||
return false
|
||||
}
|
||||
|
||||
// findBestNodeForProcess returns the nodeid that can fit the requested cpu and memory requirements.
|
||||
// findBestNodeForProcess returns the nodeid that can fit the requested cpu and memory requirements. If no
|
||||
// such node is available, an empty string is returned.
|
||||
func findBestNodeForProcess(resources map[string]proxy.NodeResources, cpu float64, mem uint64) string {
|
||||
nodeid := ""
|
||||
|
||||
for id, r := range resources {
|
||||
if len(nodeid) == 0 {
|
||||
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling {
|
||||
if hasNodeEnoughResources(r, cpu, mem) {
|
||||
nodeid = id
|
||||
}
|
||||
|
||||
@ -1157,7 +1158,7 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
|
||||
resources[nodeid] = about.Resources
|
||||
}
|
||||
|
||||
// Group the processes by node and sort them
|
||||
// Group all running processes by node and sort them by their runtime in ascending order.
|
||||
nodeProcessMap := createNodeProcessMap(have)
|
||||
|
||||
// A map from the process reference to the nodes it is running on
|
||||
@ -1185,11 +1186,6 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
|
||||
overloadedNodeid := id
|
||||
|
||||
for i, p := range processes {
|
||||
if p.State != "running" {
|
||||
// We consider only currently running processes
|
||||
continue
|
||||
}
|
||||
|
||||
availableNodeid := ""
|
||||
|
||||
// Try to move the process to a node where other processes with the same
|
||||
@ -1209,21 +1205,9 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
|
||||
}
|
||||
}
|
||||
|
||||
// Find another node with enough resources available
|
||||
// Find the best node with enough resources available
|
||||
if len(availableNodeid) == 0 {
|
||||
for id, node := range nodes {
|
||||
if id == overloadedNodeid {
|
||||
// Skip the overloaded node
|
||||
continue
|
||||
}
|
||||
|
||||
r := node.Resources
|
||||
|
||||
if hasNodeEnoughResources(r, p.CPU, p.Mem) {
|
||||
availableNodeid = id
|
||||
break
|
||||
}
|
||||
}
|
||||
availableNodeid = findBestNodeForProcess(resources, p.CPU, p.Mem)
|
||||
}
|
||||
|
||||
if len(availableNodeid) == 0 {
|
||||
@ -1267,27 +1251,24 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
|
||||
}
|
||||
|
||||
// createNodeProcessMap takes a list of processes and groups them by the nodeid they
|
||||
// are running on. Each group gets sorted by their preference to be moved somewhere
|
||||
// else, decreasing.
|
||||
// are running on. Each group contains only running processes and gets sorted by their
|
||||
// preference to be moved somewhere else, increasing. From the running processes, the
|
||||
// ones with the shortest runtime have the highest preference.
|
||||
func createNodeProcessMap(processes []proxy.Process) map[string][]proxy.Process {
|
||||
nodeProcessMap := map[string][]proxy.Process{}
|
||||
|
||||
for _, p := range processes {
|
||||
if p.State != "running" {
|
||||
continue
|
||||
}
|
||||
|
||||
nodeProcessMap[p.NodeID] = append(nodeProcessMap[p.NodeID], p)
|
||||
}
|
||||
|
||||
// Sort the processes by their runtime (if they are running) for each node
|
||||
for nodeid, processes := range nodeProcessMap {
|
||||
sort.SliceStable(processes, func(a, b int) bool {
|
||||
if processes[a].State == "running" {
|
||||
if processes[b].State != "running" {
|
||||
return false
|
||||
}
|
||||
|
||||
return processes[a].Runtime < processes[b].Runtime
|
||||
}
|
||||
|
||||
return false
|
||||
return processes[a].Runtime < processes[b].Runtime
|
||||
})
|
||||
|
||||
nodeProcessMap[nodeid] = processes
|
||||
|
||||
@ -499,7 +499,7 @@ func TestSynchronizeAddNoResourcesCPU(t *testing.T) {
|
||||
require.Equal(t, []interface{}{
|
||||
processOpReject{
|
||||
processid: app.ProcessID{ID: "foobar"},
|
||||
err: errNotEnoughResources,
|
||||
err: errNotEnoughResourcesForDeployment,
|
||||
},
|
||||
}, stack)
|
||||
}
|
||||
@ -549,7 +549,7 @@ func TestSynchronizeAddNoResourcesMemory(t *testing.T) {
|
||||
require.Equal(t, []interface{}{
|
||||
processOpReject{
|
||||
processid: app.ProcessID{ID: "foobar"},
|
||||
err: errNotEnoughResources,
|
||||
err: errNotEnoughResourcesForDeployment,
|
||||
},
|
||||
}, stack)
|
||||
}
|
||||
@ -1747,6 +1747,30 @@ func TestRebalanceReferenceAffinity(t *testing.T) {
|
||||
|
||||
func TestCreateNodeProcessMap(t *testing.T) {
|
||||
processes := []proxy.Process{
|
||||
{
|
||||
NodeID: "node1",
|
||||
Order: "start",
|
||||
State: "finished",
|
||||
CPU: 1,
|
||||
Mem: 1,
|
||||
Runtime: 1,
|
||||
Config: &app.Config{
|
||||
ID: "foobar7",
|
||||
Reference: "ref1",
|
||||
},
|
||||
},
|
||||
{
|
||||
NodeID: "node1",
|
||||
Order: "start",
|
||||
State: "failed",
|
||||
CPU: 1,
|
||||
Mem: 1,
|
||||
Runtime: 1,
|
||||
Config: &app.Config{
|
||||
ID: "foobar8",
|
||||
Reference: "ref1",
|
||||
},
|
||||
},
|
||||
{
|
||||
NodeID: "node1",
|
||||
Order: "start",
|
||||
@ -1790,7 +1814,7 @@ func TestCreateNodeProcessMap(t *testing.T) {
|
||||
Mem: 1,
|
||||
Runtime: 42,
|
||||
Config: &app.Config{
|
||||
ID: "foobar3",
|
||||
ID: "foobar6",
|
||||
Reference: "ref2",
|
||||
},
|
||||
},
|
||||
@ -1857,7 +1881,7 @@ func TestCreateNodeProcessMap(t *testing.T) {
|
||||
Mem: 1,
|
||||
Runtime: 42,
|
||||
Config: &app.Config{
|
||||
ID: "foobar3",
|
||||
ID: "foobar6",
|
||||
Reference: "ref2",
|
||||
},
|
||||
},
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user