Fix updating reference affinity map

This commit is contained in:
Ingo Oppermann 2023-07-20 11:35:16 +02:00
parent 97a8e0f815
commit 910c727585
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
2 changed files with 352 additions and 1 deletions

View File

@ -1025,6 +1025,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
// Now, all remaining processes in the wantMap must be added to one of the nodes.
for _, wantP := range wantReduced {
pid := wantP.Config.ProcessID().String()
reference := wantP.Config.Reference + "@" + wantP.Config.Domain
// If a process doesn't have any limits defined, reject that process
if wantP.Config.LimitCPU <= 0 || wantP.Config.LimitMemory <= 0 {
@ -1043,7 +1044,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
// Try to add the process to a node where other processes with the same reference currently reside.
if len(wantP.Config.Reference) != 0 {
for _, count := range haveReferenceAffinityMap[wantP.Config.Reference+"@"+wantP.Config.Domain] {
for _, count := range haveReferenceAffinityMap[reference] {
if hasNodeEnoughResources(resources[count.nodeid], wantP.Config.LimitCPU, wantP.Config.LimitMemory) {
nodeid = count.nodeid
break
@ -1073,6 +1074,8 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
}
reality[pid] = nodeid
haveReferenceAffinityMap = updateReferenceAffinityMap(haveReferenceAffinityMap, reference, nodeid)
} else {
opStack = append(opStack, processOpReject{
processid: wantP.Config.ProcessID(),
@ -1168,6 +1171,41 @@ func createReferenceAffinityMap(processes []proxy.Process) map[string][]referenc
return referenceAffinityMap
}
func updateReferenceAffinityMap(raMap map[string][]referenceAffinityNodeCount, reference, nodeid string) map[string][]referenceAffinityNodeCount {
counts, ok := raMap[reference]
if !ok {
raMap[reference] = []referenceAffinityNodeCount{
{
nodeid: nodeid,
count: 1,
},
}
return raMap
}
found := false
for i, count := range counts {
if count.nodeid == nodeid {
count.count++
counts[i] = count
found = true
break
}
}
if !found {
counts = append(counts, referenceAffinityNodeCount{
nodeid: nodeid,
count: 1,
})
}
raMap[reference] = counts
return raMap
}
// rebalance returns a list of operations that will move running processes away from nodes that are overloaded.
func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interface{}, map[string]proxy.NodeResources) {
resources := map[string]proxy.NodeResources{}

View File

@ -378,6 +378,193 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) {
}, reality)
}
func TestSynchronizeAddReferenceAffinityMultiple(t *testing.T) {
wish := map[string]string{}
now := time.Now()
want := []store.Process{
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar1",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 10,
},
Order: "start",
},
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar2",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 10,
},
Order: "start",
},
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar3",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 10,
},
Order: "start",
},
}
have := []proxy.Process{
{
NodeID: "node2",
Order: "start",
State: "running",
CPU: 1,
Mem: 1,
Runtime: 42,
UpdatedAt: now,
Config: &app.Config{
ID: "foobar1",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 10,
},
},
}
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpAdd{
nodeid: "node2",
config: &app.Config{
ID: "foobar2",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 10,
},
order: "start",
},
processOpAdd{
nodeid: "node2",
config: &app.Config{
ID: "foobar3",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 10,
},
order: "start",
},
}, stack)
require.Equal(t, map[string]string{
"foobar1@": "node2",
"foobar2@": "node2",
"foobar3@": "node2",
}, reality)
}
func TestSynchronizeAddReferenceAffinityMultipleEmptyNodes(t *testing.T) {
wish := map[string]string{}
now := time.Now()
want := []store.Process{
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar1",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 10,
},
Order: "start",
},
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar2",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 10,
},
Order: "start",
},
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar3",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 10,
},
Order: "start",
},
}
have := []proxy.Process{}
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, 3, len(stack))
nodeid := reality["foobar1@"]
require.Equal(t, map[string]string{
"foobar1@": nodeid,
"foobar2@": nodeid,
"foobar3@": nodeid,
}, reality)
}
func TestSynchronizeAddLimit(t *testing.T) {
wish := map[string]string{}
@ -2030,6 +2217,132 @@ func TestCreateReferenceAffinityNodeMap(t *testing.T) {
}, affinityMap)
}
func TestUpdateReferenceAffinityNodeMap(t *testing.T) {
affinityMap := map[string][]referenceAffinityNodeCount{
"ref1@": {
{
nodeid: "node3",
count: 2,
},
{
nodeid: "node1",
count: 1,
},
},
"ref2@": {
{
nodeid: "node2",
count: 1,
},
},
"ref3@": {
{
nodeid: "node2",
count: 1,
},
},
}
affinityMap = updateReferenceAffinityMap(affinityMap, "ref3@", "node1")
require.Equal(t, map[string][]referenceAffinityNodeCount{
"ref1@": {
{
nodeid: "node3",
count: 2,
},
{
nodeid: "node1",
count: 1,
},
},
"ref2@": {
{
nodeid: "node2",
count: 1,
},
},
"ref3@": {
{
nodeid: "node2",
count: 1,
},
{
nodeid: "node1",
count: 1,
},
},
}, affinityMap)
affinityMap = updateReferenceAffinityMap(affinityMap, "ref2@", "node2")
require.Equal(t, map[string][]referenceAffinityNodeCount{
"ref1@": {
{
nodeid: "node3",
count: 2,
},
{
nodeid: "node1",
count: 1,
},
},
"ref2@": {
{
nodeid: "node2",
count: 2,
},
},
"ref3@": {
{
nodeid: "node2",
count: 1,
},
{
nodeid: "node1",
count: 1,
},
},
}, affinityMap)
affinityMap = updateReferenceAffinityMap(affinityMap, "ref4@", "node2")
require.Equal(t, map[string][]referenceAffinityNodeCount{
"ref1@": {
{
nodeid: "node3",
count: 2,
},
{
nodeid: "node1",
count: 1,
},
},
"ref2@": {
{
nodeid: "node2",
count: 2,
},
},
"ref3@": {
{
nodeid: "node2",
count: 1,
},
{
nodeid: "node1",
count: 1,
},
},
"ref4@": {
{
nodeid: "node2",
count: 1,
},
},
}, affinityMap)
}
func TestIsMetadataUpdateRequired(t *testing.T) {
want1 := map[string]interface{}{
"foo": "boz",