Incorporate process throttling into deploy decision, fix bug in rebalance, parallelize opstack

This commit is contained in:
Ingo Oppermann 2024-06-26 17:03:42 +02:00
parent ca177becfa
commit 28603aab98
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
3 changed files with 279 additions and 206 deletions

View File

@ -459,215 +459,253 @@ type processOpError struct {
func (c *cluster) applyOpStack(stack []interface{}, term uint64) []processOpError {
errors := []processOpError{}
logger := c.logger.WithField("term", term)
logger := c.logger.WithFields(log.Fields{
"term": term,
"logname": "opstack",
})
errChan := make(chan processOpError, len(stack))
wgReader := sync.WaitGroup{}
wgReader.Add(1)
go func(errChan <-chan processOpError) {
for opErr := range errChan {
errors = append(errors, opErr)
}
wgReader.Done()
}(errChan)
wg := sync.WaitGroup{}
for _, op := range stack {
switch v := op.(type) {
case processOpAdd:
err := c.proxy.AddProcess(v.nodeid, v.config, v.metadata)
if err != nil {
errors = append(errors, processOpError{
processid: v.config.ProcessID(),
err: err,
})
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ProcessID(),
"nodeid": v.nodeid,
}).Log("Adding process")
break
}
wg.Add(1)
if v.order == "start" {
err = c.proxy.CommandProcess(v.nodeid, v.config.ProcessID(), "start")
if err != nil {
errors = append(errors, processOpError{
processid: v.config.ProcessID(),
err: err,
})
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ProcessID(),
"nodeid": v.nodeid,
}).Log("Starting process")
break
}
go func(errChan chan<- processOpError, op interface{}, logger log.Logger) {
opErr := c.applyOp(op, logger)
if opErr.err != nil {
errChan <- opErr
}
wg.Done()
}(errChan, op, logger)
}
errors = append(errors, processOpError{
wg.Wait()
close(errChan)
wgReader.Wait()
return errors
}
func (c *cluster) applyOp(op interface{}, logger log.Logger) processOpError {
opErr := processOpError{}
switch v := op.(type) {
case processOpAdd:
err := c.proxy.AddProcess(v.nodeid, v.config, v.metadata)
if err != nil {
opErr = processOpError{
processid: v.config.ProcessID(),
err: nil,
})
logger.Info().WithFields(log.Fields{
err: err,
}
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ProcessID(),
"nodeid": v.nodeid,
}).Log("Adding process")
case processOpUpdate:
err := c.proxy.UpdateProcess(v.nodeid, v.processid, v.config, v.metadata)
break
}
if v.order == "start" {
err = c.proxy.CommandProcess(v.nodeid, v.config.ProcessID(), "start")
if err != nil {
errors = append(errors, processOpError{
processid: v.processid,
err: err,
})
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Updating process")
break
}
errors = append(errors, processOpError{
processid: v.processid,
err: nil,
})
logger.Info().WithFields(log.Fields{
"processid": v.config.ProcessID(),
"nodeid": v.nodeid,
}).Log("Updating process")
case processOpDelete:
err := c.proxy.DeleteProcess(v.nodeid, v.processid)
if err != nil {
errors = append(errors, processOpError{
processid: v.processid,
err: err,
})
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Removing process")
break
}
errors = append(errors, processOpError{
processid: v.processid,
err: nil,
})
logger.Info().WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Removing process")
case processOpMove:
err := c.proxy.AddProcess(v.toNodeid, v.config, v.metadata)
if err != nil {
errors = append(errors, processOpError{
opErr = processOpError{
processid: v.config.ProcessID(),
err: err,
})
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ProcessID(),
"fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid,
}).Log("Moving process, adding process")
break
}
err = c.proxy.DeleteProcess(v.fromNodeid, v.config.ProcessID())
if err != nil {
errors = append(errors, processOpError{
processid: v.config.ProcessID(),
err: err,
})
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ProcessID(),
"fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid,
}).Log("Moving process, removing process")
break
}
if v.order == "start" {
err = c.proxy.CommandProcess(v.toNodeid, v.config.ProcessID(), "start")
if err != nil {
errors = append(errors, processOpError{
processid: v.config.ProcessID(),
err: err,
})
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ProcessID(),
"fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid,
}).Log("Moving process, starting process")
break
}
}
errors = append(errors, processOpError{
processid: v.config.ProcessID(),
err: nil,
})
logger.Info().WithFields(log.Fields{
"processid": v.config.ProcessID(),
"fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid,
}).Log("Moving process")
case processOpStart:
err := c.proxy.CommandProcess(v.nodeid, v.processid, "start")
if err != nil {
errors = append(errors, processOpError{
processid: v.processid,
err: err,
})
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid,
"processid": v.config.ProcessID(),
"nodeid": v.nodeid,
}).Log("Starting process")
break
}
}
errors = append(errors, processOpError{
opErr = processOpError{
processid: v.config.ProcessID(),
err: nil,
}
logger.Info().WithFields(log.Fields{
"processid": v.config.ProcessID(),
"nodeid": v.nodeid,
}).Log("Adding process")
case processOpUpdate:
err := c.proxy.UpdateProcess(v.nodeid, v.processid, v.config, v.metadata)
if err != nil {
opErr = processOpError{
processid: v.processid,
err: nil,
})
err: err,
}
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Updating process")
break
}
logger.Info().WithFields(log.Fields{
opErr = processOpError{
processid: v.processid,
err: nil,
}
logger.Info().WithFields(log.Fields{
"processid": v.config.ProcessID(),
"nodeid": v.nodeid,
}).Log("Updating process")
case processOpDelete:
err := c.proxy.DeleteProcess(v.nodeid, v.processid)
if err != nil {
opErr = processOpError{
processid: v.processid,
err: err,
}
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Removing process")
break
}
opErr = processOpError{
processid: v.processid,
err: nil,
}
logger.Info().WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Removing process")
case processOpMove:
err := c.proxy.AddProcess(v.toNodeid, v.config, v.metadata)
if err != nil {
opErr = processOpError{
processid: v.config.ProcessID(),
err: err,
}
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ProcessID(),
"fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid,
}).Log("Moving process, adding process")
break
}
err = c.proxy.DeleteProcess(v.fromNodeid, v.config.ProcessID())
if err != nil {
opErr = processOpError{
processid: v.config.ProcessID(),
err: err,
}
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ProcessID(),
"fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid,
}).Log("Moving process, removing process")
break
}
if v.order == "start" {
err = c.proxy.CommandProcess(v.toNodeid, v.config.ProcessID(), "start")
if err != nil {
opErr = processOpError{
processid: v.config.ProcessID(),
err: err,
}
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ProcessID(),
"fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid,
}).Log("Moving process, starting process")
break
}
}
opErr = processOpError{
processid: v.config.ProcessID(),
err: nil,
}
logger.Info().WithFields(log.Fields{
"processid": v.config.ProcessID(),
"fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid,
}).Log("Moving process")
case processOpStart:
err := c.proxy.CommandProcess(v.nodeid, v.processid, "start")
if err != nil {
opErr = processOpError{
processid: v.processid,
err: err,
}
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Starting process")
case processOpStop:
err := c.proxy.CommandProcess(v.nodeid, v.processid, "stop")
if err != nil {
errors = append(errors, processOpError{
processid: v.processid,
err: err,
})
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Stopping process")
break
}
break
}
errors = append(errors, processOpError{
opErr = processOpError{
processid: v.processid,
err: nil,
}
logger.Info().WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Starting process")
case processOpStop:
err := c.proxy.CommandProcess(v.nodeid, v.processid, "stop")
if err != nil {
opErr = processOpError{
processid: v.processid,
err: nil,
})
logger.Info().WithFields(log.Fields{
err: err,
}
logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Stopping process")
case processOpReject:
errors = append(errors, processOpError(v))
logger.Warn().WithError(v.err).WithField("processid", v.processid).Log("Process rejected")
case processOpSkip:
errors = append(errors, processOpError{
processid: v.processid,
err: v.err,
})
logger.Warn().WithError(v.err).WithFields(log.Fields{
"nodeid": v.nodeid,
"processid": v.processid,
}).Log("Process skipped")
case processOpError:
errors = append(errors, v)
default:
logger.Warn().Log("Unknown operation on stack: %+v", v)
break
}
opErr = processOpError{
processid: v.processid,
err: nil,
}
logger.Info().WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Stopping process")
case processOpReject:
opErr = processOpError(v)
logger.Warn().WithError(v.err).WithField("processid", v.processid).Log("Process rejected")
case processOpSkip:
opErr = processOpError{
processid: v.processid,
err: v.err,
}
logger.Warn().WithError(v.err).WithFields(log.Fields{
"nodeid": v.nodeid,
"processid": v.processid,
}).Log("Process skipped")
case processOpError:
opErr = v
default:
logger.Warn().Log("Unknown operation on stack: %+v", v)
}
return errors
return opErr
}
func (c *cluster) doSynchronize(emergency bool, term uint64) {
@ -894,10 +932,12 @@ func (c *cluster) doRelocate(emergency bool, term uint64) {
cmd.ID = append(cmd.ID, app.ParseProcessID(processid))
}
c.applyCommand(&store.Command{
Operation: store.OpUnsetRelocateProcess,
Data: cmd,
})
if len(cmd.ID) != 0 {
c.applyCommand(&store.Command{
Operation: store.OpUnsetRelocateProcess,
Data: cmd,
})
}
}
// isMetadataUpdateRequired compares two metadata. It relies on the documented property that json.Marshal
@ -954,6 +994,13 @@ func isMetadataUpdateRequired(wantMap map[string]interface{}, haveMap map[string
func synchronize(wish map[string]string, want []store.Process, have []proxy.Process, nodes map[string]proxy.NodeAbout, nodeRecoverTimeout time.Duration) ([]interface{}, map[string]proxy.NodeResources, map[string]string) {
resources := NewResources(nodes)
// Mark nodes as throttling where at least one process is still throttling
for _, haveP := range have {
if haveP.Throttling {
resources.Throttling(haveP.NodeID, true)
}
}
// A map same as wish, but reflecting the actual situation.
reality := map[string]string{}
@ -1200,6 +1247,17 @@ func NewResources(nodes map[string]proxy.NodeAbout) *resources {
return r
}
func (r *resources) Throttling(nodeid string, throttling bool) {
res, hasNode := r.nodes[nodeid]
if !hasNode {
return
}
res.IsThrottling = throttling
r.nodes[nodeid] = res
}
// HasNodeEnough returns whether a node has enough resources available for the
// requested cpu and memory consumption.
func (r *resources) HasNodeEnough(nodeid string, cpu float64, mem uint64) bool {
@ -1462,6 +1520,13 @@ func (ra *referenceAffinity) Move(reference, domain, fromnodeid, tonodeid string
func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interface{}, map[string]proxy.NodeResources) {
resources := NewResources(nodes)
// Mark nodes as throttling where at least one process is still throttling
for _, haveP := range have {
if haveP.Throttling {
resources.Throttling(haveP.NodeID, true)
}
}
// Group all running processes by node and sort them by their runtime in ascending order.
nodeProcessMap := createNodeProcessMap(have)
@ -1471,9 +1536,7 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
opStack := []interface{}{}
// Check if any of the nodes is overloaded.
for id, node := range nodes {
r := node.Resources
for id, r := range resources.Map() {
// Ignore this node if the resource values are not reliable.
if r.Error != nil {
continue
@ -1507,7 +1570,7 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
continue
}
if resources.HasNodeEnough(raNodeid, p.CPU, p.Mem) {
if resources.HasNodeEnough(raNodeid, p.Config.LimitCPU, p.Config.LimitMemory) {
availableNodeid = raNodeid
break
}
@ -1516,13 +1579,14 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
// Find the best node with enough resources available.
if len(availableNodeid) == 0 {
nodes := resources.FindBestNodes(p.CPU, p.Mem)
nodes := resources.FindBestNodes(p.Config.LimitCPU, p.Config.LimitMemory)
for _, nodeid := range nodes {
if nodeid == overloadedNodeid {
continue
}
availableNodeid = nodeid
break
}
}
@ -1566,6 +1630,13 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMap map[string]string) ([]interface{}, map[string]proxy.NodeResources, []string) {
resources := NewResources(nodes)
// Mark nodes as throttling where at least one process is still throttling
for _, haveP := range have {
if haveP.Throttling {
resources.Throttling(haveP.NodeID, true)
}
}
relocatedProcessIDs := []string{}
// A map from the process reference to the nodes it is running on.
@ -1601,7 +1672,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa
if len(targetNodeid) != 0 {
_, hasNode := nodes[targetNodeid]
if !hasNode || !resources.HasNodeEnough(targetNodeid, process.CPU, process.Mem) {
if !hasNode || !resources.HasNodeEnough(targetNodeid, process.Config.LimitCPU, process.Config.LimitMemory) {
targetNodeid = ""
}
}
@ -1617,7 +1688,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa
continue
}
if resources.HasNodeEnough(raNodeid, process.CPU, process.Mem) {
if resources.HasNodeEnough(raNodeid, process.Config.LimitCPU, process.Config.LimitMemory) {
targetNodeid = raNodeid
break
}
@ -1626,7 +1697,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa
// Find the best node with enough resources available.
if len(targetNodeid) == 0 {
nodes := resources.FindBestNodes(process.CPU, process.Mem)
nodes := resources.FindBestNodes(process.Config.LimitCPU, process.Config.LimitMemory)
for _, nodeid := range nodes {
if nodeid == sourceNodeid {
continue

View File

@ -934,14 +934,15 @@ func (n *node) ProxyProcessList() ([]Process, error) {
}
process := Process{
NodeID: nodeid,
Order: p.State.Order,
State: p.State.State,
Mem: p.State.Memory,
CPU: p.State.CPU * n.resources.ncpu,
Runtime: time.Duration(p.State.Runtime) * time.Second,
UpdatedAt: time.Unix(p.UpdatedAt, 0),
Metadata: p.Metadata,
NodeID: nodeid,
Order: p.State.Order,
State: p.State.State,
Mem: p.State.Resources.Memory.Current,
CPU: p.State.Resources.CPU.Current,
Throttling: p.State.Resources.CPU.IsThrottling,
Runtime: time.Duration(p.State.Runtime) * time.Second,
UpdatedAt: time.Unix(p.UpdatedAt, 0),
Metadata: p.Metadata,
}
cfg := &app.Config{

View File

@ -392,15 +392,16 @@ func (p *proxy) ListFiles(storage, pattern string) []clientapi.FileInfo {
}
type Process struct {
NodeID string
Order string
State string
CPU float64 // Current CPU load of this process, 0-100*ncpu
Mem uint64 // Currently consumed memory of this process in bytes
Runtime time.Duration
UpdatedAt time.Time
Config *app.Config
Metadata map[string]interface{}
NodeID string
Order string
State string
CPU float64 // Current CPU load of this process, 0-100*ncpu
Mem uint64 // Currently consumed memory of this process in bytes
Throttling bool
Runtime time.Duration
UpdatedAt time.Time
Config *app.Config
Metadata map[string]interface{}
}
type ProcessListOptions struct {