Extract functions, draft strategy for starting a process

This commit is contained in:
Ingo Oppermann 2023-07-12 21:02:46 +02:00
parent c8ab8567d9
commit fd7354286e
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E

View File

@ -856,6 +856,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
// and remove them from the wantMap. We also make sure that they have the correct order.
// If a process cannot be found on the wantMap, it will be deleted from the nodes.
haveAfterRemove := []proxy.Process{}
wantOrderStart := []proxy.Process{}
for _, haveP := range have {
pid := haveP.Config.ProcessID().String()
@ -875,18 +876,21 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
}
continue
} else {
// The process is on the wantMap. Update the process if the configuration and/or metadata differ.
hasConfigChanges := !wantP.Config.Equal(haveP.Config)
hasMetadataChanges, metadata := isMetadataUpdateRequired(wantP.Metadata, haveP.Metadata)
if hasConfigChanges || hasMetadataChanges {
opStack = append(opStack, processOpUpdate{
nodeid: haveP.NodeID,
processid: haveP.Config.ProcessID(),
config: wantP.Config,
metadata: metadata,
})
}
}
// The process is on the wantMap. Update the process if the configuration and/or metadata differ.
hasConfigChanges := !wantP.Config.Equal(haveP.Config)
hasMetadataChanges, metadata := isMetadataUpdateRequired(wantP.Metadata, haveP.Metadata)
if hasConfigChanges || hasMetadataChanges {
// TODO: When the required resources increase, should we move this process to a node
// that has them available? Otherwise, this node might start throttling. However, this
// will result in rebalancing.
opStack = append(opStack, processOpUpdate{
nodeid: haveP.NodeID,
processid: haveP.Config.ProcessID(),
config: wantP.Config,
metadata: metadata,
})
}
delete(wantMap, pid)
@ -894,18 +898,9 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
if haveP.Order != wantP.Order {
if wantP.Order == "start" {
opStack = append(opStack, processOpStart{
nodeid: haveP.NodeID,
processid: haveP.Config.ProcessID(),
})
// Consume the resources
r, ok := resources[haveP.NodeID]
if ok {
r.CPU += haveP.Config.LimitCPU
r.Mem += haveP.Config.LimitMemory
resources[haveP.NodeID] = r
}
// Delay pushing them to the stack in order to have
// all resources released first.
wantOrderStart = append(wantOrderStart, haveP)
} else {
opStack = append(opStack, processOpStop{
nodeid: haveP.NodeID,
@ -925,6 +920,65 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
haveAfterRemove = append(haveAfterRemove, haveP)
}
for _, haveP := range wantOrderStart {
nodeid := haveP.NodeID
r, ok := resources[nodeid]
if ok {
// Consume the resources
r.CPU += haveP.Config.LimitCPU
r.Mem += haveP.Config.LimitMemory
resources[nodeid] = r
// TODO: check if the current node has actually enough resources available,
// otherwise it needs to be moved somewhere else. If the node doesn't
// have enough resources available, the process will be prevented
// from starting.
/*
if hasNodeEnoughResources(r, haveP.Config.LimitCPU, haveP.Config.LimitMemory) {
// Consume the resources
r.CPU += haveP.Config.LimitCPU
r.Mem += haveP.Config.LimitMemory
resources[nodeid] = r
} else {
nodeid = findBestNodeForProcess(resources, haveP.Config.LimitCPU, haveP.Config.LimitMemory)
if len(nodeid) == 0 {
// Start it anyways and let it run into an error
opStack = append(opStack, processOpStart{
nodeid: nodeid,
processid: haveP.Config.ProcessID(),
})
continue
}
if nodeid != haveP.NodeID {
opStack = append(opStack, processOpMove{
fromNodeid: haveP.NodeID,
toNodeid: nodeid,
config: haveP.Config,
metadata: haveP.Metadata,
})
}
// Consume the resources
r, ok := resources[nodeid]
if ok {
r.CPU += haveP.Config.LimitCPU
r.Mem += haveP.Config.LimitMemory
resources[nodeid] = r
}
}
*/
}
opStack = append(opStack, processOpStart{
nodeid: nodeid,
processid: haveP.Config.ProcessID(),
})
}
have = haveAfterRemove
// In case a node didn't respond, some PID are still on the wantMap, that would run on
@ -969,11 +1023,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
// Try to add the process to a node where other processes with the same reference currently reside.
if len(process.Config.Reference) != 0 {
for _, count := range haveReferenceAffinityMap[process.Config.Reference+"@"+process.Config.Domain] {
r := resources[count.nodeid]
cpu := process.Config.LimitCPU
mem := process.Config.LimitMemory
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling {
if hasNodeEnoughResources(resources[count.nodeid], process.Config.LimitCPU, process.Config.LimitMemory) {
nodeid = count.nodeid
break
}
@ -982,22 +1032,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
// Find the node with the most resources available
if len(nodeid) == 0 {
for id, r := range resources {
cpu := process.Config.LimitCPU
mem := process.Config.LimitMemory
if len(nodeid) == 0 {
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling {
nodeid = id
}
continue
}
if r.CPU < resources[nodeid].CPU && r.Mem <= resources[nodeid].Mem {
nodeid = id
}
}
nodeid = findBestNodeForProcess(resources, process.Config.LimitCPU, process.Config.LimitMemory)
}
if len(nodeid) != 0 {
@ -1028,6 +1063,37 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
return opStack, resources, reality
}
// hasNodeEnoughResources returns whether a node has enough resources available for the
// requested cpu and memory consumption.
func hasNodeEnoughResources(r proxy.NodeResources, cpu float64, mem uint64) bool {
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling {
return true
}
return false
}
// findBestNodeForProcess returns the nodeid that can fit the requested cpu and memory requirements.
func findBestNodeForProcess(resources map[string]proxy.NodeResources, cpu float64, mem uint64) string {
nodeid := ""
for id, r := range resources {
if len(nodeid) == 0 {
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling {
nodeid = id
}
continue
}
if r.CPU < resources[nodeid].CPU && r.Mem <= resources[nodeid].Mem {
nodeid = id
}
}
return nodeid
}
type referenceAffinityNodeCount struct {
nodeid string
count uint64