diff --git a/cluster/leader.go b/cluster/leader.go index bbde80b2..8bf4bd26 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -1025,6 +1025,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc // Now, all remaining processes in the wantMap must be added to one of the nodes. for _, wantP := range wantReduced { pid := wantP.Config.ProcessID().String() + reference := wantP.Config.Reference + "@" + wantP.Config.Domain // If a process doesn't have any limits defined, reject that process if wantP.Config.LimitCPU <= 0 || wantP.Config.LimitMemory <= 0 { @@ -1043,7 +1044,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc // Try to add the process to a node where other processes with the same reference currently reside. if len(wantP.Config.Reference) != 0 { - for _, count := range haveReferenceAffinityMap[wantP.Config.Reference+"@"+wantP.Config.Domain] { + for _, count := range haveReferenceAffinityMap[reference] { if hasNodeEnoughResources(resources[count.nodeid], wantP.Config.LimitCPU, wantP.Config.LimitMemory) { nodeid = count.nodeid break @@ -1073,6 +1074,8 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc } reality[pid] = nodeid + + haveReferenceAffinityMap = updateReferenceAffinityMap(haveReferenceAffinityMap, reference, nodeid) } else { opStack = append(opStack, processOpReject{ processid: wantP.Config.ProcessID(), @@ -1168,6 +1171,41 @@ func createReferenceAffinityMap(processes []proxy.Process) map[string][]referenc return referenceAffinityMap } +func updateReferenceAffinityMap(raMap map[string][]referenceAffinityNodeCount, reference, nodeid string) map[string][]referenceAffinityNodeCount { + counts, ok := raMap[reference] + if !ok { + raMap[reference] = []referenceAffinityNodeCount{ + { + nodeid: nodeid, + count: 1, + }, + } + + return raMap + } + + found := false + for i, count := range counts { + if count.nodeid == nodeid { + count.count++ + counts[i] = count + found = true + break + } + } + + if !found { + counts = append(counts, referenceAffinityNodeCount{ + nodeid: nodeid, + count: 1, + }) + } + + raMap[reference] = counts + + return raMap +} + // rebalance returns a list of operations that will move running processes away from nodes that are overloaded. func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interface{}, map[string]proxy.NodeResources) { resources := map[string]proxy.NodeResources{} diff --git a/cluster/leader_test.go b/cluster/leader_test.go index 412799e4..fbabb195 100644 --- a/cluster/leader_test.go +++ b/cluster/leader_test.go @@ -378,6 +378,193 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) { }, reality) } +func TestSynchronizeAddReferenceAffinityMultiple(t *testing.T) { + wish := map[string]string{} + + now := time.Now() + + want := []store.Process{ + { + UpdatedAt: now, + Config: &app.Config{ + ID: "foobar1", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 10, + }, + Order: "start", + }, + { + UpdatedAt: now, + Config: &app.Config{ + ID: "foobar2", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 10, + }, + Order: "start", + }, + { + UpdatedAt: now, + Config: &app.Config{ + ID: "foobar3", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 10, + }, + Order: "start", + }, + } + + have := []proxy.Process{ + { + NodeID: "node2", + Order: "start", + State: "running", + CPU: 1, + Mem: 1, + Runtime: 42, + UpdatedAt: now, + Config: &app.Config{ + ID: "foobar1", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 10, + }, + }, + } + + nodes := map[string]proxy.NodeAbout{ + "node1": { + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 1, + Mem: 1, + CPULimit: 90, + MemLimit: 90, + }, + }, + "node2": { + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 1, + Mem: 1, + CPULimit: 90, + MemLimit: 90, + }, + }, + } + + stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute) + + require.Equal(t, []interface{}{ + processOpAdd{ + nodeid: "node2", + config: &app.Config{ + ID: "foobar2", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 10, + }, + order: "start", + }, + processOpAdd{ + nodeid: "node2", + config: &app.Config{ + ID: "foobar3", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 10, + }, + order: "start", + }, + }, stack) + + require.Equal(t, map[string]string{ + "foobar1@": "node2", + "foobar2@": "node2", + "foobar3@": "node2", + }, reality) +} + +func TestSynchronizeAddReferenceAffinityMultipleEmptyNodes(t *testing.T) { + wish := map[string]string{} + + now := time.Now() + + want := []store.Process{ + { + UpdatedAt: now, + Config: &app.Config{ + ID: "foobar1", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 10, + }, + Order: "start", + }, + { + UpdatedAt: now, + Config: &app.Config{ + ID: "foobar2", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 10, + }, + Order: "start", + }, + { + UpdatedAt: now, + Config: &app.Config{ + ID: "foobar3", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 10, + }, + Order: "start", + }, + } + + have := []proxy.Process{} + + nodes := map[string]proxy.NodeAbout{ + "node1": { + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 1, + Mem: 1, + CPULimit: 90, + MemLimit: 90, + }, + }, + "node2": { + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 1, + Mem: 1, + CPULimit: 90, + MemLimit: 90, + }, + }, + } + + stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute) + + require.Equal(t, 3, len(stack)) + + nodeid := reality["foobar1@"] + + require.Equal(t, map[string]string{ + "foobar1@": nodeid, + "foobar2@": nodeid, + "foobar3@": nodeid, + }, reality) +} + func TestSynchronizeAddLimit(t *testing.T) { wish := map[string]string{} @@ -2030,6 +2217,132 @@ func TestCreateReferenceAffinityNodeMap(t *testing.T) { }, affinityMap) } +func TestUpdateReferenceAffinityNodeMap(t *testing.T) { + affinityMap := map[string][]referenceAffinityNodeCount{ + "ref1@": { + { + nodeid: "node3", + count: 2, + }, + { + nodeid: "node1", + count: 1, + }, + }, + "ref2@": { + { + nodeid: "node2", + count: 1, + }, + }, + "ref3@": { + { + nodeid: "node2", + count: 1, + }, + }, + } + + affinityMap = updateReferenceAffinityMap(affinityMap, "ref3@", "node1") + + require.Equal(t, map[string][]referenceAffinityNodeCount{ + "ref1@": { + { + nodeid: "node3", + count: 2, + }, + { + nodeid: "node1", + count: 1, + }, + }, + "ref2@": { + { + nodeid: "node2", + count: 1, + }, + }, + "ref3@": { + { + nodeid: "node2", + count: 1, + }, + { + nodeid: "node1", + count: 1, + }, + }, + }, affinityMap) + + affinityMap = updateReferenceAffinityMap(affinityMap, "ref2@", "node2") + + require.Equal(t, map[string][]referenceAffinityNodeCount{ + "ref1@": { + { + nodeid: "node3", + count: 2, + }, + { + nodeid: "node1", + count: 1, + }, + }, + "ref2@": { + { + nodeid: "node2", + count: 2, + }, + }, + "ref3@": { + { + nodeid: "node2", + count: 1, + }, + { + nodeid: "node1", + count: 1, + }, + }, + }, affinityMap) + + affinityMap = updateReferenceAffinityMap(affinityMap, "ref4@", "node2") + + require.Equal(t, map[string][]referenceAffinityNodeCount{ + "ref1@": { + { + nodeid: "node3", + count: 2, + }, + { + nodeid: "node1", + count: 1, + }, + }, + "ref2@": { + { + nodeid: "node2", + count: 2, + }, + }, + "ref3@": { + { + nodeid: "node2", + count: 1, + }, + { + nodeid: "node1", + count: 1, + }, + }, + "ref4@": { + { + nodeid: "node2", + count: 1, + }, + }, + }, affinityMap) +} + func TestIsMetadataUpdateRequired(t *testing.T) { want1 := map[string]interface{}{ "foo": "boz",