diff --git a/cluster/leader.go b/cluster/leader.go index 13fac49d..0fbf1852 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -856,6 +856,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc // and remove them from the wantMap. We also make sure that they have the correct order. // If a process cannot be found on the wantMap, it will be deleted from the nodes. haveAfterRemove := []proxy.Process{} + wantOrderStart := []proxy.Process{} for _, haveP := range have { pid := haveP.Config.ProcessID().String() @@ -875,18 +876,21 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc } continue - } else { - // The process is on the wantMap. Update the process if the configuration and/or metadata differ. - hasConfigChanges := !wantP.Config.Equal(haveP.Config) - hasMetadataChanges, metadata := isMetadataUpdateRequired(wantP.Metadata, haveP.Metadata) - if hasConfigChanges || hasMetadataChanges { - opStack = append(opStack, processOpUpdate{ - nodeid: haveP.NodeID, - processid: haveP.Config.ProcessID(), - config: wantP.Config, - metadata: metadata, - }) - } + } + + // The process is on the wantMap. Update the process if the configuration and/or metadata differ. + hasConfigChanges := !wantP.Config.Equal(haveP.Config) + hasMetadataChanges, metadata := isMetadataUpdateRequired(wantP.Metadata, haveP.Metadata) + if hasConfigChanges || hasMetadataChanges { + // TODO: When the required resources increase, should we move this process to a node + // that has them available? Otherwise, this node might start throttling. However, this + // will result in rebalancing. + opStack = append(opStack, processOpUpdate{ + nodeid: haveP.NodeID, + processid: haveP.Config.ProcessID(), + config: wantP.Config, + metadata: metadata, + }) } delete(wantMap, pid) @@ -894,18 +898,9 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc if haveP.Order != wantP.Order { if wantP.Order == "start" { - opStack = append(opStack, processOpStart{ - nodeid: haveP.NodeID, - processid: haveP.Config.ProcessID(), - }) - - // Consume the resources - r, ok := resources[haveP.NodeID] - if ok { - r.CPU += haveP.Config.LimitCPU - r.Mem += haveP.Config.LimitMemory - resources[haveP.NodeID] = r - } + // Delay pushing them to the stack in order to have + // all resources released first. + wantOrderStart = append(wantOrderStart, haveP) } else { opStack = append(opStack, processOpStop{ nodeid: haveP.NodeID, @@ -925,6 +920,65 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc haveAfterRemove = append(haveAfterRemove, haveP) } + for _, haveP := range wantOrderStart { + nodeid := haveP.NodeID + + r, ok := resources[nodeid] + if ok { + // Consume the resources + r.CPU += haveP.Config.LimitCPU + r.Mem += haveP.Config.LimitMemory + resources[nodeid] = r + + // TODO: check if the current node has actually enough resources available, + // otherwise it needs to be moved somewhere else. If the node doesn't + // have enough resources available, the process will be prevented + // from starting. + + /* + if hasNodeEnoughResources(r, haveP.Config.LimitCPU, haveP.Config.LimitMemory) { + // Consume the resources + r.CPU += haveP.Config.LimitCPU + r.Mem += haveP.Config.LimitMemory + resources[nodeid] = r + } else { + nodeid = findBestNodeForProcess(resources, haveP.Config.LimitCPU, haveP.Config.LimitMemory) + if len(nodeid) == 0 { + // Start it anyways and let it run into an error + opStack = append(opStack, processOpStart{ + nodeid: nodeid, + processid: haveP.Config.ProcessID(), + }) + + continue + } + + if nodeid != haveP.NodeID { + opStack = append(opStack, processOpMove{ + fromNodeid: haveP.NodeID, + toNodeid: nodeid, + config: haveP.Config, + metadata: haveP.Metadata, + }) + } + + // Consume the resources + r, ok := resources[nodeid] + if ok { + r.CPU += haveP.Config.LimitCPU + r.Mem += haveP.Config.LimitMemory + resources[nodeid] = r + } + } + */ + } + + opStack = append(opStack, processOpStart{ + nodeid: nodeid, + processid: haveP.Config.ProcessID(), + }) + } + have = haveAfterRemove // In case a node didn't respond, some PID are still on the wantMap, that would run on @@ -969,11 +1023,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc // Try to add the process to a node where other processes with the same reference currently reside. if len(process.Config.Reference) != 0 { for _, count := range haveReferenceAffinityMap[process.Config.Reference+"@"+process.Config.Domain] { - r := resources[count.nodeid] - cpu := process.Config.LimitCPU - mem := process.Config.LimitMemory - - if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling { + if hasNodeEnoughResources(resources[count.nodeid], process.Config.LimitCPU, process.Config.LimitMemory) { nodeid = count.nodeid break } @@ -982,22 +1032,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc // Find the node with the most resources available if len(nodeid) == 0 { - for id, r := range resources { - cpu := process.Config.LimitCPU - mem := process.Config.LimitMemory - - if len(nodeid) == 0 { - if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling { - nodeid = id - } - - continue - } - - if r.CPU < resources[nodeid].CPU && r.Mem <= resources[nodeid].Mem { - nodeid = id - } - } + nodeid = findBestNodeForProcess(resources, process.Config.LimitCPU, process.Config.LimitMemory) } if len(nodeid) != 0 { @@ -1028,6 +1063,37 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc return opStack, resources, reality } +// hasNodeEnoughResources returns whether a node has enough resources available for the +// requested cpu and memory consumption. +func hasNodeEnoughResources(r proxy.NodeResources, cpu float64, mem uint64) bool { + if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling { + return true + } + + return false +} + +// findBestNodeForProcess returns the nodeid that can fit the requested cpu and memory requirements. +func findBestNodeForProcess(resources map[string]proxy.NodeResources, cpu float64, mem uint64) string { + nodeid := "" + + for id, r := range resources { + if len(nodeid) == 0 { + if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling { + nodeid = id + } + + continue + } + + if r.CPU < resources[nodeid].CPU && r.Mem <= resources[nodeid].Mem { + nodeid = id + } + } + + return nodeid +} + type referenceAffinityNodeCount struct { nodeid string count uint64