From e49de44eb77aca7bf44b64a4e9463b67139153c9 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 5 Jul 2023 09:55:11 +0200 Subject: [PATCH] Deploy processes with order stop to nodes --- cluster/leader.go | 89 ++++++++++++----- cluster/leader_test.go | 218 +++++++++++++++++++++++++++++++++++++++-- cluster/proxy/proxy.go | 18 ---- 3 files changed, 277 insertions(+), 48 deletions(-) diff --git a/cluster/leader.go b/cluster/leader.go index 766b3bf2..c8919a3a 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -409,10 +409,16 @@ type processOpStart struct { processid app.ProcessID } +type processOpStop struct { + nodeid string + processid app.ProcessID +} + type processOpAdd struct { nodeid string config *app.Config metadata map[string]interface{} + order string } type processOpUpdate struct { @@ -446,13 +452,15 @@ func (c *cluster) applyOpStack(stack []interface{}) { break } - err = c.proxy.StartProcess(v.nodeid, v.config.ProcessID()) - if err != nil { - c.logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.config.ID, - "nodeid": v.nodeid, - }).Log("Starting process") - break + if v.order == "start" { + err = c.proxy.CommandProcess(v.nodeid, v.config.ProcessID(), "start") + if err != nil { + c.logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.config.ID, + "nodeid": v.nodeid, + }).Log("Starting process") + break + } } c.logger.Info().WithFields(log.Fields{ "processid": v.config.ID, @@ -507,7 +515,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { break } - err = c.proxy.StartProcess(v.toNodeid, v.config.ProcessID()) + err = c.proxy.CommandProcess(v.toNodeid, v.config.ProcessID(), "start") if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.config.ID, @@ -523,7 +531,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { "tonodeid": v.toNodeid, }).Log("Moving process") case processOpStart: - err := c.proxy.StartProcess(v.nodeid, v.processid) + err := c.proxy.CommandProcess(v.nodeid, v.processid, "start") if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.processid, @@ -536,6 +544,20 @@ func (c *cluster) applyOpStack(stack []interface{}) { "processid": v.processid, "nodeid": v.nodeid, }).Log("Starting process") + case processOpStop: + err := c.proxy.CommandProcess(v.nodeid, v.processid, "stop") + if err != nil { + c.logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.processid, + "nodeid": v.nodeid, + }).Log("Stopping process") + break + } + + c.logger.Info().WithFields(log.Fields{ + "processid": v.processid, + "nodeid": v.nodeid, + }).Log("Stopping process") case processOpReject: c.logger.Warn().WithError(v.err).WithField("processid", v.processid).Log("Process rejected") case processOpSkip: @@ -610,7 +632,7 @@ func (c *cluster) doRebalance(emergency bool) { c.applyOpStack(opStack) } -// isMetadataUpdateRequired compares two metadata. It relies on the documents property that json.Marshal +// isMetadataUpdateRequired compares two metadata. It relies on the documented property that json.Marshal // sorts the map keys prior encoding. func isMetadataUpdateRequired(wantMap map[string]interface{}, haveMap map[string]interface{}) (bool, map[string]interface{}) { hasChanges := false @@ -674,9 +696,6 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc // we want to be running on the nodes. wantMap := map[string]store.Process{} for _, process := range want { - if process.Order != "start" { - continue - } pid := process.Config.ProcessID().String() wantMap[pid] = process } @@ -684,7 +703,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc opStack := []interface{}{} // Now we iterate through the processes we actually have running on the nodes - // and remove them from the wantMap. We also make sure that they are running. + // and remove them from the wantMap. We also make sure that they have the correct order. // If a process cannot be found on the wantMap, it will be deleted from the nodes. haveAfterRemove := []proxy.Process{} @@ -723,12 +742,34 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc delete(wantMap, pid) reality[pid] = haveP.NodeID - if haveP.Order != wantP.Order { // wantP.Order is always "start" because we selected only those above - opStack = append(opStack, processOpStart{ - nodeid: haveP.NodeID, - processid: haveP.Config.ProcessID(), - }) + if haveP.Order != wantP.Order { + if wantP.Order == "start" { + opStack = append(opStack, processOpStart{ + nodeid: haveP.NodeID, + processid: haveP.Config.ProcessID(), + }) + // Consume the resources + r, ok := resources[haveP.NodeID] + if ok { + r.CPU += haveP.Config.LimitCPU + r.Mem += haveP.Config.LimitMemory + resources[haveP.NodeID] = r + } + } else { + opStack = append(opStack, processOpStop{ + nodeid: haveP.NodeID, + processid: haveP.Config.ProcessID(), + }) + + // Release the resources + r, ok := resources[haveP.NodeID] + if ok { + r.CPU -= haveP.CPU + r.Mem -= haveP.Mem + resources[haveP.NodeID] = r + } + } } haveAfterRemove = append(haveAfterRemove, haveP) @@ -755,10 +796,10 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc // The wantMap now contains only those processes that need to be installed on a node. - // A map from the process reference to the node it is running on. + // Create a map from the process reference to the node it is running on. haveReferenceAffinityMap := createReferenceAffinityMap(have) - // Now all remaining processes in the wantMap must be added to one of the nodes. + // Now, all remaining processes in the wantMap must be added to one of the nodes. for pid, process := range wantMap { // If a process doesn't have any limits defined, reject that process if process.Config.LimitCPU <= 0 || process.Config.LimitMemory <= 0 { @@ -775,8 +816,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc // not, then select a node with the most available resources. nodeid := "" - // Try to add the process to a node where other processes with the same - // reference currently reside. + // Try to add the process to a node where other processes with the same reference currently reside. if len(process.Config.Reference) != 0 { for _, count := range haveReferenceAffinityMap[process.Config.Reference+"@"+process.Config.Domain] { r := resources[count.nodeid] @@ -815,9 +855,10 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc nodeid: nodeid, config: process.Config, metadata: process.Metadata, + order: process.Order, }) - // Adjust the resources + // Consume the resources r, ok := resources[nodeid] if ok { r.CPU += process.Config.LimitCPU diff --git a/cluster/leader_test.go b/cluster/leader_test.go index 95440fdb..11c4b638 100644 --- a/cluster/leader_test.go +++ b/cluster/leader_test.go @@ -20,10 +20,19 @@ func TestSynchronizeAdd(t *testing.T) { Config: &app.Config{ ID: "foobar", LimitCPU: 10, - LimitMemory: 50, + LimitMemory: 20, }, Order: "start", }, + { + UpdatedAt: time.Now(), + Config: &app.Config{ + ID: "foobaz", + LimitCPU: 10, + LimitMemory: 20, + }, + Order: "stop", + }, } have := []proxy.Process{} @@ -59,8 +68,109 @@ func TestSynchronizeAdd(t *testing.T) { config: &app.Config{ ID: "foobar", LimitCPU: 10, - LimitMemory: 50, + LimitMemory: 20, }, + order: "start", + }, + processOpAdd{ + nodeid: "node1", + config: &app.Config{ + ID: "foobaz", + LimitCPU: 10, + LimitMemory: 20, + }, + order: "stop", + }, + }, stack) + + require.Equal(t, map[string]string{ + "foobar@": "node1", + "foobaz@": "node1", + }, reality) + + require.Equal(t, map[string]proxy.NodeResources{ + "node1": { + NCPU: 1, + CPU: 27, + Mem: 75, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 85, + Mem: 11, + CPULimit: 90, + MemLimit: 90, + }, + }, resources) +} + +func TestSynchronizeOrderStop(t *testing.T) { + wish := map[string]string{ + "foobar@": "node1", + } + + now := time.Now() + + want := []store.Process{ + { + UpdatedAt: now, + Config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 20, + }, + Order: "stop", + }, + } + + have := []proxy.Process{ + { + NodeID: "node1", + Order: "start", + State: "running", + CPU: 12, + Mem: 5, + Runtime: 42, + UpdatedAt: now, + Config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 20, + }, + }, + } + + nodes := map[string]proxy.NodeAbout{ + "node1": { + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 20, + Mem: 35, + CPULimit: 90, + MemLimit: 90, + }, + }, + "node2": { + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 1, + Mem: 1, + CPULimit: 90, + MemLimit: 90, + }, + }, + } + + stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute) + + require.Equal(t, []interface{}{ + processOpStop{ + nodeid: "node1", + processid: app.ProcessID{ID: "foobar"}, }, }, stack) @@ -71,15 +181,105 @@ func TestSynchronizeAdd(t *testing.T) { require.Equal(t, map[string]proxy.NodeResources{ "node1": { NCPU: 1, - CPU: 17, - Mem: 85, + CPU: 8, + Mem: 30, CPULimit: 90, MemLimit: 90, }, "node2": { NCPU: 1, - CPU: 85, - Mem: 11, + CPU: 1, + Mem: 1, + CPULimit: 90, + MemLimit: 90, + }, + }, resources) +} + +func TestSynchronizeOrderStart(t *testing.T) { + wish := map[string]string{ + "foobar@": "node1", + } + + now := time.Now() + + want := []store.Process{ + { + UpdatedAt: now, + Config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 20, + }, + Order: "start", + }, + } + + have := []proxy.Process{ + { + NodeID: "node1", + Order: "stop", + State: "finished", + CPU: 0, + Mem: 0, + Runtime: 42, + UpdatedAt: now, + Config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 20, + }, + }, + } + + nodes := map[string]proxy.NodeAbout{ + "node1": { + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 20, + Mem: 35, + CPULimit: 90, + MemLimit: 90, + }, + }, + "node2": { + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 1, + Mem: 1, + CPULimit: 90, + MemLimit: 90, + }, + }, + } + + stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute) + + require.Equal(t, []interface{}{ + processOpStart{ + nodeid: "node1", + processid: app.ProcessID{ID: "foobar"}, + }, + }, stack) + + require.Equal(t, map[string]string{ + "foobar@": "node1", + }, reality) + + require.Equal(t, map[string]proxy.NodeResources{ + "node1": { + NCPU: 1, + CPU: 30, + Mem: 55, + CPULimit: 90, + MemLimit: 90, + }, + "node2": { + NCPU: 1, + CPU: 1, + Mem: 1, CPULimit: 90, MemLimit: 90, }, @@ -168,6 +368,7 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) { LimitCPU: 10, LimitMemory: 30, }, + order: "start", }, }, stack) @@ -227,6 +428,7 @@ func TestSynchronizeAddLimit(t *testing.T) { LimitCPU: 10, LimitMemory: 5, }, + order: "start", }, }, stack) @@ -541,6 +743,7 @@ func TestSynchronizeAddRemove(t *testing.T) { LimitCPU: 10, LimitMemory: 5, }, + order: "start", }, }, stack) @@ -962,6 +1165,7 @@ func TestSynchronizeWaitDisconnectedNodeNoWish(t *testing.T) { LimitCPU: 10, LimitMemory: 30, }, + order: "start", }, }, stack) @@ -1055,6 +1259,7 @@ func TestSynchronizeWaitDisconnectedNodeUnrealisticWish(t *testing.T) { LimitCPU: 10, LimitMemory: 30, }, + order: "start", }, }, stack) @@ -1148,6 +1353,7 @@ func TestSynchronizeTimeoutDisconnectedNode(t *testing.T) { LimitCPU: 10, LimitMemory: 30, }, + order: "start", }, }, stack) diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index 371e300a..fbd14efe 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -27,7 +27,6 @@ type Proxy interface { AddProcess(nodeid string, config *app.Config, metadata map[string]interface{}) error DeleteProcess(nodeid string, id app.ProcessID) error - StartProcess(nodeid string, id app.ProcessID) error UpdateProcess(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]interface{}) error CommandProcess(nodeid string, id app.ProcessID, command string) error } @@ -596,23 +595,6 @@ func (p *proxy) DeleteProcess(nodeid string, id app.ProcessID) error { return nil } -func (p *proxy) StartProcess(nodeid string, id app.ProcessID) error { - p.lock.RLock() - defer p.lock.RUnlock() - - node, ok := p.nodes[nodeid] - if !ok { - return fmt.Errorf("node not found") - } - - err := node.StartProcess(id) - if err != nil { - return err - } - - return nil -} - func (p *proxy) UpdateProcess(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]interface{}) error { p.lock.RLock() defer p.lock.RUnlock()