Add operations to relocate processes

This commit is contained in:
Ingo Oppermann 2024-06-18 16:50:59 +02:00
parent f5d9725a48
commit de6a267fd4
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
5 changed files with 841 additions and 129 deletions

View File

@ -356,6 +356,7 @@ func (c *cluster) synchronizeAndRebalance(ctx context.Context, interval time.Dur
c.doSynchronize(emergency, term)
c.doRebalance(emergency, term)
c.doRelocate(emergency, term)
} else {
c.doSynchronize(emergency, term)
}
@ -740,7 +741,7 @@ func (c *cluster) doSynchronize(emergency bool, term uint64) {
func (c *cluster) doRebalance(emergency bool, term uint64) {
if emergency {
// Don't rebalance in emergency mode
// Don't rebalance in emergency mode.
return
}
@ -800,6 +801,83 @@ func (c *cluster) doRebalance(emergency bool, term uint64) {
}
}
func (c *cluster) doRelocate(emergency bool, term uint64) {
if emergency {
// Don't relocate in emergency mode.
return
}
logger := c.logger.WithField("term", term)
logger.Debug().WithField("emergency", emergency).Log("Relocating")
relocateMap := c.store.GetProcessRelocateMap()
have := c.proxy.ListProxyProcesses()
nodes := c.proxy.ListNodes()
nodesMap := map[string]proxy.NodeAbout{}
for _, node := range nodes {
about := node.About()
nodesMap[about.ID] = about
}
logger.Debug().WithFields(log.Fields{
"relocate": relocate,
"have": have,
"nodes": nodesMap,
}).Log("Rebalance")
opStack, _, relocatedProcessIDs := relocate(have, nodesMap, relocateMap)
errors := c.applyOpStack(opStack, term)
for _, e := range errors {
// Only apply the command if the error is different.
process, err := c.store.GetProcess(e.processid)
if err != nil {
continue
}
var errmessage string = ""
if e.err != nil {
if process.Error == e.err.Error() {
continue
}
errmessage = e.err.Error()
} else {
if len(process.Error) == 0 {
continue
}
}
cmd := &store.Command{
Operation: store.OpSetProcessError,
Data: store.CommandSetProcessError{
ID: e.processid,
Error: errmessage,
},
}
c.applyCommand(cmd)
}
cmd := store.CommandUnsetRelocateProcess{
ID: []app.ProcessID{},
}
for _, processid := range relocatedProcessIDs {
cmd.ID = append(cmd.ID, app.ParseProcessID(processid))
}
c.applyCommand(&store.Command{
Operation: store.OpUnsetRelocateProcess,
Data: cmd,
})
}
// isMetadataUpdateRequired compares two metadata. It relies on the documented property that json.Marshal
// sorts the map keys prior encoding.
func isMetadataUpdateRequired(wantMap map[string]interface{}, haveMap map[string]interface{}) (bool, map[string]interface{}) {
@ -815,7 +893,7 @@ func isMetadataUpdateRequired(wantMap map[string]interface{}, haveMap map[string
for key, wantMapValue := range wantMap {
haveMapValue, ok := haveMap[key]
if !ok {
// A key in map1 exists, that doesn't exist in map2, we need to update
// A key in map1 exists, that doesn't exist in map2, we need to update.
hasChanges = true
}
@ -831,7 +909,7 @@ func isMetadataUpdateRequired(wantMap map[string]interface{}, haveMap map[string
}
if !bytes.Equal(changesData, completeData) {
// The values are not equal, we need to update
// The values are not equal, we need to update.
hasChanges = true
}
@ -841,7 +919,7 @@ func isMetadataUpdateRequired(wantMap map[string]interface{}, haveMap map[string
}
for key := range haveMapKeys {
// If there keys in map2 that are not in map1, we have to update
// If there keys in map2 that are not in map1, we have to update.
hasChanges = true
changeMap[key] = nil
}
@ -925,7 +1003,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
processid: haveP.Config.ProcessID(),
})
// Release the resources
// Release the resources.
r, ok := resources[haveP.NodeID]
if ok {
r.CPU -= haveP.CPU
@ -943,7 +1021,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
r, ok := resources[nodeid]
if ok {
// Consume the resources
// Consume the resources.
r.CPU += haveP.Config.LimitCPU
r.Mem += haveP.Config.LimitMemory
resources[nodeid] = r
@ -1034,14 +1112,13 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
}
// Create a map from the process reference to the node it is running on.
haveReferenceAffinityMap := createReferenceAffinityMap(have)
haveReferenceAffinity := NewReferenceAffinity(have)
// Now, all remaining processes in the wantMap must be added to one of the nodes.
for _, wantP := range wantReduced {
pid := wantP.Config.ProcessID().String()
reference := wantP.Config.Reference + "@" + wantP.Config.Domain
// If a process doesn't have any limits defined, reject that process
// If a process doesn't have any limits defined, reject that process.
if wantP.Config.LimitCPU <= 0 || wantP.Config.LimitMemory <= 0 {
opStack = append(opStack, processOpReject{
processid: wantP.Config.ProcessID(),
@ -1057,18 +1134,20 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
nodeid := ""
// Try to add the process to a node where other processes with the same reference currently reside.
if len(wantP.Config.Reference) != 0 {
for _, count := range haveReferenceAffinityMap[reference] {
if hasNodeEnoughResources(resources[count.nodeid], wantP.Config.LimitCPU, wantP.Config.LimitMemory) {
nodeid = count.nodeid
break
}
raNodes := haveReferenceAffinity.Nodes(wantP.Config.Reference, wantP.Config.Domain)
for _, raNodeid := range raNodes {
if hasNodeEnoughResources(resources[raNodeid], wantP.Config.LimitCPU, wantP.Config.LimitMemory) {
nodeid = raNodeid
break
}
}
// Find the node with the most resources available
// Find the node with the most resources available.
if len(nodeid) == 0 {
nodeid = findBestNodeForProcess(resources, wantP.Config.LimitCPU, wantP.Config.LimitMemory)
nodes := findBestNodesForProcess(resources, wantP.Config.LimitCPU, wantP.Config.LimitMemory)
if len(nodes) > 0 {
nodeid = nodes[0]
}
}
if len(nodeid) != 0 {
@ -1089,7 +1168,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
reality[pid] = nodeid
haveReferenceAffinityMap = updateReferenceAffinityMap(haveReferenceAffinityMap, reference, nodeid)
haveReferenceAffinity.Add(wantP.Config.Reference, wantP.Config.Domain, nodeid)
} else {
opStack = append(opStack, processOpReject{
processid: wantP.Config.ProcessID(),
@ -1111,26 +1190,28 @@ func hasNodeEnoughResources(r proxy.NodeResources, cpu float64, mem uint64) bool
return false
}
// findBestNodeForProcess returns the nodeid that can fit the requested cpu and memory requirements. If no
// such node is available, an empty string is returned.
func findBestNodeForProcess(resources map[string]proxy.NodeResources, cpu float64, mem uint64) string {
nodeid := ""
// findBestNodeForProcess returns an array of nodeids that can fit the requested cpu and memory requirements. If no
// such node is available, an empty array is returned. The array is sorted by the most suitable node first.
func findBestNodesForProcess(resources map[string]proxy.NodeResources, cpu float64, mem uint64) []string {
nodes := []string{}
for id, r := range resources {
if len(nodeid) == 0 {
if hasNodeEnoughResources(r, cpu, mem) {
nodeid = id
}
continue
}
if r.CPU < resources[nodeid].CPU && r.Mem <= resources[nodeid].Mem {
nodeid = id
if hasNodeEnoughResources(r, cpu, mem) {
nodes = append(nodes, id)
}
}
return nodeid
sort.SliceStable(nodes, func(i, j int) bool {
nodeA, nodeB := nodes[i], nodes[j]
if resources[nodeA].CPU < resources[nodeB].CPU && resources[nodeA].Mem <= resources[nodeB].Mem {
return true
}
return false
})
return nodes
}
type referenceAffinityNodeCount struct {
@ -1138,22 +1219,29 @@ type referenceAffinityNodeCount struct {
count uint64
}
// createReferenceAffinityMap returns a map of references (per domain) to an array of nodes this reference
// is found on and their count. The array is sorted by the count, the highest first.
func createReferenceAffinityMap(processes []proxy.Process) map[string][]referenceAffinityNodeCount {
referenceAffinityMap := map[string][]referenceAffinityNodeCount{}
type referenceAffinity struct {
m map[string][]referenceAffinityNodeCount
}
// NewReferenceAffinity returns a referenceAffinity. This is a map of references (per domain) to an array of
// nodes this reference is found on and their count.
func NewReferenceAffinity(processes []proxy.Process) *referenceAffinity {
ra := &referenceAffinity{
m: map[string][]referenceAffinityNodeCount{},
}
for _, p := range processes {
if len(p.Config.Reference) == 0 {
continue
}
ref := p.Config.Reference + "@" + p.Config.Domain
key := p.Config.Reference + "@" + p.Config.Domain
// Here we count how often a reference is present on a node. When
// moving processes to a different node, the node with the highest
// count of same references will be the first candidate.
found := false
arr := referenceAffinityMap[ref]
arr := ra.m[key]
for i, count := range arr {
if count.nodeid == p.NodeID {
count.count++
@ -1170,32 +1258,62 @@ func createReferenceAffinityMap(processes []proxy.Process) map[string][]referenc
})
}
referenceAffinityMap[ref] = arr
ra.m[key] = arr
}
// Sort every reference count in decreasing order for each reference
for ref, count := range referenceAffinityMap {
// Sort every reference count in decreasing order for each reference.
for ref, count := range ra.m {
sort.SliceStable(count, func(a, b int) bool {
return count[a].count > count[b].count
})
referenceAffinityMap[ref] = count
ra.m[ref] = count
}
return referenceAffinityMap
return ra
}
func updateReferenceAffinityMap(raMap map[string][]referenceAffinityNodeCount, reference, nodeid string) map[string][]referenceAffinityNodeCount {
counts, ok := raMap[reference]
// Nodes returns a list of node IDs for the provided reference and domain. The list
// is ordered by how many references are on the nodes in descending order.
func (ra *referenceAffinity) Nodes(reference, domain string) []string {
if len(reference) == 0 {
return nil
}
key := reference + "@" + domain
counts, ok := ra.m[key]
if !ok {
raMap[reference] = []referenceAffinityNodeCount{
return nil
}
nodes := []string{}
for _, count := range counts {
nodes = append(nodes, count.nodeid)
}
return nodes
}
// Add adds a reference on a node to an existing reference affinity.
func (ra *referenceAffinity) Add(reference, domain, nodeid string) {
if len(reference) == 0 {
return
}
key := reference + "@" + domain
counts, ok := ra.m[key]
if !ok {
ra.m[key] = []referenceAffinityNodeCount{
{
nodeid: nodeid,
count: 1,
},
}
return raMap
return
}
found := false
@ -1215,9 +1333,59 @@ func updateReferenceAffinityMap(raMap map[string][]referenceAffinityNodeCount, r
})
}
raMap[reference] = counts
ra.m[key] = counts
}
return raMap
// Move moves a reference from one node to another node in an existing reference affinity.
func (ra *referenceAffinity) Move(reference, domain, fromnodeid, tonodeid string) {
if len(reference) == 0 {
return
}
key := reference + "@" + domain
counts, ok := ra.m[key]
if !ok {
ra.m[key] = []referenceAffinityNodeCount{
{
nodeid: tonodeid,
count: 1,
},
}
return
}
found := false
for i, count := range counts {
if count.nodeid == tonodeid {
count.count++
counts[i] = count
found = true
} else if count.nodeid == fromnodeid {
count.count--
counts[i] = count
}
}
if !found {
counts = append(counts, referenceAffinityNodeCount{
nodeid: tonodeid,
count: 1,
})
}
newCounts := []referenceAffinityNodeCount{}
for _, count := range counts {
if count.count == 0 {
continue
}
newCounts = append(newCounts, count)
}
ra.m[key] = newCounts
}
// rebalance returns a list of operations that will move running processes away from nodes that are overloaded.
@ -1230,30 +1398,30 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
// Group all running processes by node and sort them by their runtime in ascending order.
nodeProcessMap := createNodeProcessMap(have)
// A map from the process reference to the nodes it is running on
haveReferenceAffinityMap := createReferenceAffinityMap(have)
// A map from the process reference to the nodes it is running on.
haveReferenceAffinity := NewReferenceAffinity(have)
opStack := []interface{}{}
// Check if any of the nodes is overloaded
// Check if any of the nodes is overloaded.
for id, node := range nodes {
r := node.Resources
// Ignore this node if the resource values are not reliable
// Ignore this node if the resource values are not reliable.
if r.Error != nil {
continue
}
// Check if node is overloaded
// Check if node is overloaded.
if r.CPU < r.CPULimit && r.Mem < r.MemLimit && !r.IsThrottling {
continue
}
// Move processes from this noed to another node with enough free resources.
// Move processes from this node to another node with enough free resources.
// The processes are ordered ascending by their runtime.
processes := nodeProcessMap[id]
if len(processes) == 0 {
// If there are no processes on that node, we can't do anything
// If there are no processes on that node, we can't do anything.
continue
}
@ -1265,27 +1433,35 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
// Try to move the process to a node where other processes with the same
// reference currently reside.
if len(p.Config.Reference) != 0 {
for _, count := range haveReferenceAffinityMap[p.Config.Reference+"@"+p.Config.Domain] {
// Do not move the process to the node it is currently on
if count.nodeid == overloadedNodeid {
raNodes := haveReferenceAffinity.Nodes(p.Config.Reference, p.Config.Domain)
for _, raNodeid := range raNodes {
// Do not move the process to the node it is currently on.
if raNodeid == overloadedNodeid {
continue
}
r := resources[count.nodeid]
r := resources[raNodeid]
if hasNodeEnoughResources(r, p.CPU, p.Mem) {
availableNodeid = count.nodeid
availableNodeid = raNodeid
break
}
}
}
// Find the best node with enough resources available
// Find the best node with enough resources available.
if len(availableNodeid) == 0 {
availableNodeid = findBestNodeForProcess(resources, p.CPU, p.Mem)
nodes := findBestNodesForProcess(resources, p.CPU, p.Mem)
for _, nodeid := range nodes {
if nodeid == overloadedNodeid {
continue
}
availableNodeid = nodeid
}
}
if len(availableNodeid) == 0 {
// There's no other node with enough resources to take over this process
// There's no other node with enough resources to take over this process.
opStack = append(opStack, processOpSkip{
nodeid: overloadedNodeid,
processid: p.Config.ProcessID(),
@ -1301,22 +1477,17 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
metadata: p.Metadata,
})
// Adjust the process
// Adjust the process.
p.NodeID = availableNodeid
processes[i] = p
// Adjust the resources
r = resources[availableNodeid]
r.CPU += p.CPU
r.Mem += p.Mem
resources[availableNodeid] = r
// Adjust the resources.
resources = adjustResources(resources, availableNodeid, overloadedNodeid, p.CPU, p.Mem)
r = resources[overloadedNodeid]
r.CPU -= p.CPU
r.Mem -= p.Mem
resources[overloadedNodeid] = r
// Adjust the reference affinity.
haveReferenceAffinity.Move(p.Config.Reference, p.Config.Domain, overloadedNodeid, availableNodeid)
// Move only one process at a time
// Move only one process at a time.
break
}
}
@ -1324,6 +1495,132 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
return opStack, resources
}
// relocate returns a list of operations that will move deployed processes to different nodes.
func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMap map[string]string) ([]interface{}, map[string]proxy.NodeResources, []string) {
resources := map[string]proxy.NodeResources{}
for nodeid, about := range nodes {
resources[nodeid] = about.Resources
}
relocatedProcessIDs := []string{}
// A map from the process reference to the nodes it is running on.
haveReferenceAffinity := NewReferenceAffinity(have)
opStack := []interface{}{}
// Check for any requested relocations.
for processid, targetNodeid := range relocateMap {
process := proxy.Process{}
found := false
for _, p := range have {
if processid == p.Config.ProcessID().String() {
process = p
found = true
break
}
}
if !found {
relocatedProcessIDs = append(relocatedProcessIDs, processid)
continue
}
sourceNodeid := process.NodeID
if len(targetNodeid) != 0 {
_, hasNode := nodes[targetNodeid]
if !hasNode || !hasNodeEnoughResources(nodes[targetNodeid].Resources, process.CPU, process.Mem) {
targetNodeid = ""
}
}
if len(targetNodeid) == 0 {
// Try to move the process to a node where other processes with the same
// reference currently reside.
if len(process.Config.Reference) != 0 {
raNodes := haveReferenceAffinity.Nodes(process.Config.Reference, process.Config.Domain)
for _, raNodeid := range raNodes {
// Do not move the process to the node it is currently on.
if raNodeid == sourceNodeid {
continue
}
r := resources[raNodeid]
if hasNodeEnoughResources(r, process.CPU, process.Mem) {
targetNodeid = raNodeid
break
}
}
}
// Find the best node with enough resources available.
if len(targetNodeid) == 0 {
nodes := findBestNodesForProcess(resources, process.CPU, process.Mem)
for _, nodeid := range nodes {
if nodeid == sourceNodeid {
continue
}
targetNodeid = nodeid
break
}
}
if len(targetNodeid) == 0 {
// There's no other node with enough resources to take over this process.
opStack = append(opStack, processOpSkip{
nodeid: sourceNodeid,
processid: process.Config.ProcessID(),
err: errNotEnoughResourcesForRebalancing,
})
continue
}
}
opStack = append(opStack, processOpMove{
fromNodeid: sourceNodeid,
toNodeid: targetNodeid,
config: process.Config,
metadata: process.Metadata,
})
// Adjust the resources.
resources = adjustResources(resources, targetNodeid, sourceNodeid, process.CPU, process.Mem)
// Adjust the reference affinity.
haveReferenceAffinity.Move(process.Config.Reference, process.Config.Domain, sourceNodeid, targetNodeid)
relocatedProcessIDs = append(relocatedProcessIDs, processid)
}
return opStack, resources, relocatedProcessIDs
}
// adjustResources adjusts the resources from the target and source node according to the cpu and memory utilization.
func adjustResources(resources map[string]proxy.NodeResources, target, source string, cpu float64, mem uint64) map[string]proxy.NodeResources {
r := resources[target]
r.CPU += cpu
r.Mem += mem
resources[target] = r
r = resources[source]
r.CPU -= cpu
if r.CPU < 0 {
r.CPU = 0
}
if mem >= r.Mem {
r.Mem = 0
} else {
r.Mem -= mem
}
resources[source] = r
return resources
}
// createNodeProcessMap takes a list of processes and groups them by the nodeid they
// are running on. Each group contains only running processes and gets sorted by their
// preference to be moved somewhere else, increasing. From the running processes, the

View File

@ -2034,6 +2034,276 @@ func TestRebalanceReferenceAffinity(t *testing.T) {
}, resources)
}
func TestRebalanceRelocateTarget(t *testing.T) {
processes := []proxy.Process{
{
NodeID: "node1",
Order: "start",
State: "running",
CPU: 35,
Mem: 20,
Runtime: 42,
Config: &app.Config{
ID: "foobar1",
},
},
{
NodeID: "node1",
Order: "start",
State: "running",
CPU: 17,
Mem: 31,
Runtime: 27,
Config: &app.Config{
ID: "foobar3",
},
},
{
NodeID: "node2",
Order: "start",
State: "running",
CPU: 12,
Mem: 5,
Runtime: 42,
Config: &app.Config{
ID: "foobar2",
},
},
}
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 27,
Mem: 35,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 15,
Mem: 11,
CPULimit: 90,
MemLimit: 90,
},
},
"node3": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 0,
Mem: 0,
CPULimit: 90,
MemLimit: 90,
},
},
}
relocateMap := map[string]string{
"foobar1@": "node3",
}
opStack, resources, _ := relocate(processes, nodes, relocateMap)
require.NotEmpty(t, opStack)
require.Equal(t, []interface{}{
processOpMove{
fromNodeid: "node1",
toNodeid: "node3",
config: &app.Config{
ID: "foobar1",
},
},
}, opStack)
require.Equal(t, map[string]proxy.NodeResources{
"node1": {
NCPU: 1,
CPU: 0,
Mem: 15,
CPULimit: 90,
MemLimit: 90,
},
"node2": {
NCPU: 1,
CPU: 15,
Mem: 11,
CPULimit: 90,
MemLimit: 90,
},
"node3": {
NCPU: 1,
CPU: 35,
Mem: 20,
CPULimit: 90,
MemLimit: 90,
},
}, resources)
}
func TestRebalanceRelocateAny(t *testing.T) {
processes := []proxy.Process{
{
NodeID: "node1",
Order: "start",
State: "running",
CPU: 35,
Mem: 20,
Runtime: 42,
Config: &app.Config{
ID: "foobar1",
},
},
{
NodeID: "node1",
Order: "start",
State: "running",
CPU: 17,
Mem: 31,
Runtime: 27,
Config: &app.Config{
ID: "foobar3",
},
},
{
NodeID: "node2",
Order: "start",
State: "running",
CPU: 12,
Mem: 5,
Runtime: 42,
Config: &app.Config{
ID: "foobar2",
},
},
}
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 27,
Mem: 35,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 15,
Mem: 11,
CPULimit: 90,
MemLimit: 90,
},
},
"node3": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 0,
Mem: 0,
CPULimit: 90,
MemLimit: 90,
},
},
}
relocateMap := map[string]string{
"foobar1@": "",
}
opStack, resources, _ := relocate(processes, nodes, relocateMap)
require.NotEmpty(t, opStack)
require.Equal(t, []interface{}{
processOpMove{
fromNodeid: "node1",
toNodeid: "node3",
config: &app.Config{
ID: "foobar1",
},
},
}, opStack)
require.Equal(t, map[string]proxy.NodeResources{
"node1": {
NCPU: 1,
CPU: 0,
Mem: 15,
CPULimit: 90,
MemLimit: 90,
},
"node2": {
NCPU: 1,
CPU: 15,
Mem: 11,
CPULimit: 90,
MemLimit: 90,
},
"node3": {
NCPU: 1,
CPU: 35,
Mem: 20,
CPULimit: 90,
MemLimit: 90,
},
}, resources)
}
func TestFindBestNodesForProcess(t *testing.T) {
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 27,
Mem: 35,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 15,
Mem: 11,
CPULimit: 90,
MemLimit: 90,
},
},
"node3": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 0,
Mem: 0,
CPULimit: 90,
MemLimit: 90,
},
},
}
resources := map[string]proxy.NodeResources{}
for nodeid, about := range nodes {
resources[nodeid] = about.Resources
}
list := findBestNodesForProcess(resources, 35, 20)
require.Equal(t, []string{"node3", "node2", "node1"}, list)
}
func TestCreateNodeProcessMap(t *testing.T) {
processes := []proxy.Process{
{
@ -2291,7 +2561,7 @@ func TestCreateReferenceAffinityNodeMap(t *testing.T) {
},
}
affinityMap := createReferenceAffinityMap(processes)
affinityMap := NewReferenceAffinity(processes)
require.Equal(t, map[string][]referenceAffinityNodeCount{
"ref1@": {
@ -2316,36 +2586,38 @@ func TestCreateReferenceAffinityNodeMap(t *testing.T) {
count: 1,
},
},
}, affinityMap)
}, affinityMap.m)
}
func TestUpdateReferenceAffinityNodeMap(t *testing.T) {
affinityMap := map[string][]referenceAffinityNodeCount{
"ref1@": {
{
nodeid: "node3",
count: 2,
affinityMap := &referenceAffinity{
m: map[string][]referenceAffinityNodeCount{
"ref1@": {
{
nodeid: "node3",
count: 2,
},
{
nodeid: "node1",
count: 1,
},
},
{
nodeid: "node1",
count: 1,
"ref2@": {
{
nodeid: "node2",
count: 1,
},
},
},
"ref2@": {
{
nodeid: "node2",
count: 1,
},
},
"ref3@": {
{
nodeid: "node2",
count: 1,
"ref3@": {
{
nodeid: "node2",
count: 1,
},
},
},
}
affinityMap = updateReferenceAffinityMap(affinityMap, "ref3@", "node1")
affinityMap.Add("ref3", "", "node1")
require.Equal(t, map[string][]referenceAffinityNodeCount{
"ref1@": {
@ -2374,9 +2646,9 @@ func TestUpdateReferenceAffinityNodeMap(t *testing.T) {
count: 1,
},
},
}, affinityMap)
}, affinityMap.m)
affinityMap = updateReferenceAffinityMap(affinityMap, "ref2@", "node2")
affinityMap.Add("ref2", "", "node2")
require.Equal(t, map[string][]referenceAffinityNodeCount{
"ref1@": {
@ -2405,9 +2677,9 @@ func TestUpdateReferenceAffinityNodeMap(t *testing.T) {
count: 1,
},
},
}, affinityMap)
}, affinityMap.m)
affinityMap = updateReferenceAffinityMap(affinityMap, "ref4@", "node2")
affinityMap.Add("ref4", "", "node2")
require.Equal(t, map[string][]referenceAffinityNodeCount{
"ref1@": {
@ -2442,7 +2714,85 @@ func TestUpdateReferenceAffinityNodeMap(t *testing.T) {
count: 1,
},
},
}, affinityMap)
}, affinityMap.m)
affinityMap.Move("ref2", "", "node2", "node3")
require.Equal(t, map[string][]referenceAffinityNodeCount{
"ref1@": {
{
nodeid: "node3",
count: 2,
},
{
nodeid: "node1",
count: 1,
},
},
"ref2@": {
{
nodeid: "node2",
count: 1,
},
{
nodeid: "node3",
count: 1,
},
},
"ref3@": {
{
nodeid: "node2",
count: 1,
},
{
nodeid: "node1",
count: 1,
},
},
"ref4@": {
{
nodeid: "node2",
count: 1,
},
},
}, affinityMap.m)
affinityMap.Move("ref2", "", "node2", "node3")
require.Equal(t, map[string][]referenceAffinityNodeCount{
"ref1@": {
{
nodeid: "node3",
count: 2,
},
{
nodeid: "node1",
count: 1,
},
},
"ref2@": {
{
nodeid: "node3",
count: 2,
},
},
"ref3@": {
{
nodeid: "node2",
count: 1,
},
{
nodeid: "node1",
count: 1,
},
},
"ref4@": {
{
nodeid: "node2",
count: 1,
},
},
}, affinityMap.m)
}
func TestIsMetadataUpdateRequired(t *testing.T) {

View File

@ -35,7 +35,7 @@ func (s *store) deleteLock(cmd CommandDeleteLock) error {
return nil
}
func (s *store) clearLocks(cmd CommandClearLocks) error {
func (s *store) clearLocks(_ CommandClearLocks) error {
s.lock.Lock()
defer s.lock.Unlock()

View File

@ -102,6 +102,30 @@ func (s *store) updateProcess(cmd CommandUpdateProcess) error {
return nil
}
func (s *store) setRelocateProcess(cmd CommandSetRelocateProcess) error {
s.lock.Lock()
defer s.lock.Unlock()
for processid, targetNodeid := range cmd.Map {
id := processid.String()
s.data.ProcessRelocateMap[id] = targetNodeid
}
return nil
}
func (s *store) unsetRelocateProcess(cmd CommandUnsetRelocateProcess) error {
s.lock.Lock()
defer s.lock.Unlock()
for _, processid := range cmd.ID {
id := processid.String()
delete(s.data.ProcessRelocateMap, id)
}
return nil
}
func (s *store) setProcessOrder(cmd CommandSetProcessOrder) error {
s.lock.Lock()
defer s.lock.Unlock()
@ -226,3 +250,16 @@ func (s *store) GetProcessNodeMap() map[string]string {
return m
}
func (s *store) GetProcessRelocateMap() map[string]string {
s.lock.RLock()
defer s.lock.RUnlock()
m := map[string]string{}
for key, value := range s.data.ProcessRelocateMap {
m[key] = value
}
return m
}

View File

@ -23,6 +23,7 @@ type Store interface {
ListProcesses() []Process
GetProcess(id app.ProcessID) (Process, error)
GetProcessNodeMap() map[string]string
GetProcessRelocateMap() map[string]string
ListUsers() Users
GetUser(name string) Users
@ -63,22 +64,24 @@ type Value struct {
type Operation string
const (
OpAddProcess Operation = "addProcess"
OpRemoveProcess Operation = "removeProcess"
OpUpdateProcess Operation = "updateProcess"
OpSetProcessOrder Operation = "setProcessOrder"
OpSetProcessMetadata Operation = "setProcessMetadata"
OpSetProcessError Operation = "setProcessError"
OpAddIdentity Operation = "addIdentity"
OpUpdateIdentity Operation = "updateIdentity"
OpRemoveIdentity Operation = "removeIdentity"
OpSetPolicies Operation = "setPolicies"
OpSetProcessNodeMap Operation = "setProcessNodeMap"
OpCreateLock Operation = "createLock"
OpDeleteLock Operation = "deleteLock"
OpClearLocks Operation = "clearLocks"
OpSetKV Operation = "setKV"
OpUnsetKV Operation = "unsetKV"
OpAddProcess Operation = "addProcess"
OpRemoveProcess Operation = "removeProcess"
OpUpdateProcess Operation = "updateProcess"
OpSetRelocateProcess Operation = "setRelocateProcess"
OpUnsetRelocateProcess Operation = "unsetRelocateProcess"
OpSetProcessOrder Operation = "setProcessOrder"
OpSetProcessMetadata Operation = "setProcessMetadata"
OpSetProcessError Operation = "setProcessError"
OpAddIdentity Operation = "addIdentity"
OpUpdateIdentity Operation = "updateIdentity"
OpRemoveIdentity Operation = "removeIdentity"
OpSetPolicies Operation = "setPolicies"
OpSetProcessNodeMap Operation = "setProcessNodeMap"
OpCreateLock Operation = "createLock"
OpDeleteLock Operation = "deleteLock"
OpClearLocks Operation = "clearLocks"
OpSetKV Operation = "setKV"
OpUnsetKV Operation = "unsetKV"
)
type Command struct {
@ -90,13 +93,21 @@ type CommandAddProcess struct {
Config *app.Config
}
type CommandRemoveProcess struct {
ID app.ProcessID
}
type CommandUpdateProcess struct {
ID app.ProcessID
Config *app.Config
}
type CommandRemoveProcess struct {
ID app.ProcessID
type CommandSetRelocateProcess struct {
Map map[app.ProcessID]string
}
type CommandUnsetRelocateProcess struct {
ID []app.ProcessID
}
type CommandSetProcessOrder struct {
@ -115,6 +126,10 @@ type CommandSetProcessError struct {
Error string
}
type CommandSetProcessNodeMap struct {
Map map[string]string
}
type CommandAddIdentity struct {
Identity identity.User
}
@ -133,10 +148,6 @@ type CommandSetPolicies struct {
Policies []access.Policy
}
type CommandSetProcessNodeMap struct {
Map map[string]string
}
type CommandCreateLock struct {
Name string
ValidUntil time.Time
@ -158,9 +169,10 @@ type CommandUnsetKV struct {
}
type storeData struct {
Version uint64
Process map[string]Process
ProcessNodeMap map[string]string
Version uint64
Process map[string]Process // processid -> process
ProcessNodeMap map[string]string // processid -> nodeid
ProcessRelocateMap map[string]string // processid -> nodeid
Users struct {
UpdatedAt time.Time
@ -297,6 +309,22 @@ func (s *store) applyCommand(c Command) error {
}
err = s.updateProcess(cmd)
case OpSetRelocateProcess:
cmd := CommandSetRelocateProcess{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.setRelocateProcess(cmd)
case OpUnsetRelocateProcess:
cmd := CommandUnsetRelocateProcess{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.unsetRelocateProcess(cmd)
case OpSetProcessOrder:
cmd := CommandSetProcessOrder{}
err = decodeCommand(&cmd, c.Data)