Deploy processes with order stop to nodes

This commit is contained in:
Ingo Oppermann 2023-07-05 09:55:11 +02:00
parent 2d7affdec8
commit e49de44eb7
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
3 changed files with 277 additions and 48 deletions

View File

@ -409,10 +409,16 @@ type processOpStart struct {
processid app.ProcessID
}
type processOpStop struct {
nodeid string
processid app.ProcessID
}
type processOpAdd struct {
nodeid string
config *app.Config
metadata map[string]interface{}
order string
}
type processOpUpdate struct {
@ -446,13 +452,15 @@ func (c *cluster) applyOpStack(stack []interface{}) {
break
}
err = c.proxy.StartProcess(v.nodeid, v.config.ProcessID())
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID,
"nodeid": v.nodeid,
}).Log("Starting process")
break
if v.order == "start" {
err = c.proxy.CommandProcess(v.nodeid, v.config.ProcessID(), "start")
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID,
"nodeid": v.nodeid,
}).Log("Starting process")
break
}
}
c.logger.Info().WithFields(log.Fields{
"processid": v.config.ID,
@ -507,7 +515,7 @@ func (c *cluster) applyOpStack(stack []interface{}) {
break
}
err = c.proxy.StartProcess(v.toNodeid, v.config.ProcessID())
err = c.proxy.CommandProcess(v.toNodeid, v.config.ProcessID(), "start")
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID,
@ -523,7 +531,7 @@ func (c *cluster) applyOpStack(stack []interface{}) {
"tonodeid": v.toNodeid,
}).Log("Moving process")
case processOpStart:
err := c.proxy.StartProcess(v.nodeid, v.processid)
err := c.proxy.CommandProcess(v.nodeid, v.processid, "start")
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid,
@ -536,6 +544,20 @@ func (c *cluster) applyOpStack(stack []interface{}) {
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Starting process")
case processOpStop:
err := c.proxy.CommandProcess(v.nodeid, v.processid, "stop")
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Stopping process")
break
}
c.logger.Info().WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Stopping process")
case processOpReject:
c.logger.Warn().WithError(v.err).WithField("processid", v.processid).Log("Process rejected")
case processOpSkip:
@ -610,7 +632,7 @@ func (c *cluster) doRebalance(emergency bool) {
c.applyOpStack(opStack)
}
// isMetadataUpdateRequired compares two metadata. It relies on the documents property that json.Marshal
// isMetadataUpdateRequired compares two metadata. It relies on the documented property that json.Marshal
// sorts the map keys prior encoding.
func isMetadataUpdateRequired(wantMap map[string]interface{}, haveMap map[string]interface{}) (bool, map[string]interface{}) {
hasChanges := false
@ -674,9 +696,6 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
// we want to be running on the nodes.
wantMap := map[string]store.Process{}
for _, process := range want {
if process.Order != "start" {
continue
}
pid := process.Config.ProcessID().String()
wantMap[pid] = process
}
@ -684,7 +703,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
opStack := []interface{}{}
// Now we iterate through the processes we actually have running on the nodes
// and remove them from the wantMap. We also make sure that they are running.
// and remove them from the wantMap. We also make sure that they have the correct order.
// If a process cannot be found on the wantMap, it will be deleted from the nodes.
haveAfterRemove := []proxy.Process{}
@ -723,12 +742,34 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
delete(wantMap, pid)
reality[pid] = haveP.NodeID
if haveP.Order != wantP.Order { // wantP.Order is always "start" because we selected only those above
opStack = append(opStack, processOpStart{
nodeid: haveP.NodeID,
processid: haveP.Config.ProcessID(),
})
if haveP.Order != wantP.Order {
if wantP.Order == "start" {
opStack = append(opStack, processOpStart{
nodeid: haveP.NodeID,
processid: haveP.Config.ProcessID(),
})
// Consume the resources
r, ok := resources[haveP.NodeID]
if ok {
r.CPU += haveP.Config.LimitCPU
r.Mem += haveP.Config.LimitMemory
resources[haveP.NodeID] = r
}
} else {
opStack = append(opStack, processOpStop{
nodeid: haveP.NodeID,
processid: haveP.Config.ProcessID(),
})
// Release the resources
r, ok := resources[haveP.NodeID]
if ok {
r.CPU -= haveP.CPU
r.Mem -= haveP.Mem
resources[haveP.NodeID] = r
}
}
}
haveAfterRemove = append(haveAfterRemove, haveP)
@ -755,10 +796,10 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
// The wantMap now contains only those processes that need to be installed on a node.
// A map from the process reference to the node it is running on.
// Create a map from the process reference to the node it is running on.
haveReferenceAffinityMap := createReferenceAffinityMap(have)
// Now all remaining processes in the wantMap must be added to one of the nodes.
// Now, all remaining processes in the wantMap must be added to one of the nodes.
for pid, process := range wantMap {
// If a process doesn't have any limits defined, reject that process
if process.Config.LimitCPU <= 0 || process.Config.LimitMemory <= 0 {
@ -775,8 +816,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
// not, then select a node with the most available resources.
nodeid := ""
// Try to add the process to a node where other processes with the same
// reference currently reside.
// Try to add the process to a node where other processes with the same reference currently reside.
if len(process.Config.Reference) != 0 {
for _, count := range haveReferenceAffinityMap[process.Config.Reference+"@"+process.Config.Domain] {
r := resources[count.nodeid]
@ -815,9 +855,10 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
nodeid: nodeid,
config: process.Config,
metadata: process.Metadata,
order: process.Order,
})
// Adjust the resources
// Consume the resources
r, ok := resources[nodeid]
if ok {
r.CPU += process.Config.LimitCPU

View File

@ -20,10 +20,19 @@ func TestSynchronizeAdd(t *testing.T) {
Config: &app.Config{
ID: "foobar",
LimitCPU: 10,
LimitMemory: 50,
LimitMemory: 20,
},
Order: "start",
},
{
UpdatedAt: time.Now(),
Config: &app.Config{
ID: "foobaz",
LimitCPU: 10,
LimitMemory: 20,
},
Order: "stop",
},
}
have := []proxy.Process{}
@ -59,8 +68,109 @@ func TestSynchronizeAdd(t *testing.T) {
config: &app.Config{
ID: "foobar",
LimitCPU: 10,
LimitMemory: 50,
LimitMemory: 20,
},
order: "start",
},
processOpAdd{
nodeid: "node1",
config: &app.Config{
ID: "foobaz",
LimitCPU: 10,
LimitMemory: 20,
},
order: "stop",
},
}, stack)
require.Equal(t, map[string]string{
"foobar@": "node1",
"foobaz@": "node1",
}, reality)
require.Equal(t, map[string]proxy.NodeResources{
"node1": {
NCPU: 1,
CPU: 27,
Mem: 75,
CPULimit: 90,
MemLimit: 90,
},
"node2": {
NCPU: 1,
CPU: 85,
Mem: 11,
CPULimit: 90,
MemLimit: 90,
},
}, resources)
}
func TestSynchronizeOrderStop(t *testing.T) {
wish := map[string]string{
"foobar@": "node1",
}
now := time.Now()
want := []store.Process{
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar",
LimitCPU: 10,
LimitMemory: 20,
},
Order: "stop",
},
}
have := []proxy.Process{
{
NodeID: "node1",
Order: "start",
State: "running",
CPU: 12,
Mem: 5,
Runtime: 42,
UpdatedAt: now,
Config: &app.Config{
ID: "foobar",
LimitCPU: 10,
LimitMemory: 20,
},
},
}
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 20,
Mem: 35,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpStop{
nodeid: "node1",
processid: app.ProcessID{ID: "foobar"},
},
}, stack)
@ -71,15 +181,105 @@ func TestSynchronizeAdd(t *testing.T) {
require.Equal(t, map[string]proxy.NodeResources{
"node1": {
NCPU: 1,
CPU: 17,
Mem: 85,
CPU: 8,
Mem: 30,
CPULimit: 90,
MemLimit: 90,
},
"node2": {
NCPU: 1,
CPU: 85,
Mem: 11,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
}, resources)
}
func TestSynchronizeOrderStart(t *testing.T) {
wish := map[string]string{
"foobar@": "node1",
}
now := time.Now()
want := []store.Process{
{
UpdatedAt: now,
Config: &app.Config{
ID: "foobar",
LimitCPU: 10,
LimitMemory: 20,
},
Order: "start",
},
}
have := []proxy.Process{
{
NodeID: "node1",
Order: "stop",
State: "finished",
CPU: 0,
Mem: 0,
Runtime: 42,
UpdatedAt: now,
Config: &app.Config{
ID: "foobar",
LimitCPU: 10,
LimitMemory: 20,
},
},
}
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 20,
Mem: 35,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpStart{
nodeid: "node1",
processid: app.ProcessID{ID: "foobar"},
},
}, stack)
require.Equal(t, map[string]string{
"foobar@": "node1",
}, reality)
require.Equal(t, map[string]proxy.NodeResources{
"node1": {
NCPU: 1,
CPU: 30,
Mem: 55,
CPULimit: 90,
MemLimit: 90,
},
"node2": {
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
@ -168,6 +368,7 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) {
LimitCPU: 10,
LimitMemory: 30,
},
order: "start",
},
}, stack)
@ -227,6 +428,7 @@ func TestSynchronizeAddLimit(t *testing.T) {
LimitCPU: 10,
LimitMemory: 5,
},
order: "start",
},
}, stack)
@ -541,6 +743,7 @@ func TestSynchronizeAddRemove(t *testing.T) {
LimitCPU: 10,
LimitMemory: 5,
},
order: "start",
},
}, stack)
@ -962,6 +1165,7 @@ func TestSynchronizeWaitDisconnectedNodeNoWish(t *testing.T) {
LimitCPU: 10,
LimitMemory: 30,
},
order: "start",
},
}, stack)
@ -1055,6 +1259,7 @@ func TestSynchronizeWaitDisconnectedNodeUnrealisticWish(t *testing.T) {
LimitCPU: 10,
LimitMemory: 30,
},
order: "start",
},
}, stack)
@ -1148,6 +1353,7 @@ func TestSynchronizeTimeoutDisconnectedNode(t *testing.T) {
LimitCPU: 10,
LimitMemory: 30,
},
order: "start",
},
}, stack)

View File

@ -27,7 +27,6 @@ type Proxy interface {
AddProcess(nodeid string, config *app.Config, metadata map[string]interface{}) error
DeleteProcess(nodeid string, id app.ProcessID) error
StartProcess(nodeid string, id app.ProcessID) error
UpdateProcess(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]interface{}) error
CommandProcess(nodeid string, id app.ProcessID, command string) error
}
@ -596,23 +595,6 @@ func (p *proxy) DeleteProcess(nodeid string, id app.ProcessID) error {
return nil
}
func (p *proxy) StartProcess(nodeid string, id app.ProcessID) error {
p.lock.RLock()
defer p.lock.RUnlock()
node, ok := p.nodes[nodeid]
if !ok {
return fmt.Errorf("node not found")
}
err := node.StartProcess(id)
if err != nil {
return err
}
return nil
}
func (p *proxy) UpdateProcess(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]interface{}) error {
p.lock.RLock()
defer p.lock.RUnlock()