diff --git a/cluster/leader.go b/cluster/leader.go index e27f96f5..95cbd3cc 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -459,215 +459,253 @@ type processOpError struct { func (c *cluster) applyOpStack(stack []interface{}, term uint64) []processOpError { errors := []processOpError{} - logger := c.logger.WithField("term", term) + logger := c.logger.WithFields(log.Fields{ + "term": term, + "logname": "opstack", + }) + errChan := make(chan processOpError, len(stack)) + + wgReader := sync.WaitGroup{} + wgReader.Add(1) + go func(errChan <-chan processOpError) { + for opErr := range errChan { + errors = append(errors, opErr) + } + + wgReader.Done() + }(errChan) + + wg := sync.WaitGroup{} for _, op := range stack { - switch v := op.(type) { - case processOpAdd: - err := c.proxy.AddProcess(v.nodeid, v.config, v.metadata) - if err != nil { - errors = append(errors, processOpError{ - processid: v.config.ProcessID(), - err: err, - }) - logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.config.ProcessID(), - "nodeid": v.nodeid, - }).Log("Adding process") - break - } + wg.Add(1) - if v.order == "start" { - err = c.proxy.CommandProcess(v.nodeid, v.config.ProcessID(), "start") - if err != nil { - errors = append(errors, processOpError{ - processid: v.config.ProcessID(), - err: err, - }) - logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.config.ProcessID(), - "nodeid": v.nodeid, - }).Log("Starting process") - break - } + go func(errChan chan<- processOpError, op interface{}, logger log.Logger) { + opErr := c.applyOp(op, logger) + if opErr.err != nil { + errChan <- opErr } + wg.Done() + }(errChan, op, logger) + } - errors = append(errors, processOpError{ + wg.Wait() + + close(errChan) + + wgReader.Wait() + + return errors +} + +func (c *cluster) applyOp(op interface{}, logger log.Logger) processOpError { + opErr := processOpError{} + + switch v := op.(type) { + case processOpAdd: + err := c.proxy.AddProcess(v.nodeid, v.config, v.metadata) + if err != nil { + opErr = processOpError{ processid: v.config.ProcessID(), - err: nil, - }) - - logger.Info().WithFields(log.Fields{ + err: err, + } + logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.config.ProcessID(), "nodeid": v.nodeid, }).Log("Adding process") - case processOpUpdate: - err := c.proxy.UpdateProcess(v.nodeid, v.processid, v.config, v.metadata) + break + } + + if v.order == "start" { + err = c.proxy.CommandProcess(v.nodeid, v.config.ProcessID(), "start") if err != nil { - errors = append(errors, processOpError{ - processid: v.processid, - err: err, - }) - logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.processid, - "nodeid": v.nodeid, - }).Log("Updating process") - break - } - - errors = append(errors, processOpError{ - processid: v.processid, - err: nil, - }) - - logger.Info().WithFields(log.Fields{ - "processid": v.config.ProcessID(), - "nodeid": v.nodeid, - }).Log("Updating process") - case processOpDelete: - err := c.proxy.DeleteProcess(v.nodeid, v.processid) - if err != nil { - errors = append(errors, processOpError{ - processid: v.processid, - err: err, - }) - logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.processid, - "nodeid": v.nodeid, - }).Log("Removing process") - break - } - - errors = append(errors, processOpError{ - processid: v.processid, - err: nil, - }) - - logger.Info().WithFields(log.Fields{ - "processid": v.processid, - "nodeid": v.nodeid, - }).Log("Removing process") - case processOpMove: - err := c.proxy.AddProcess(v.toNodeid, v.config, v.metadata) - if err != nil { - errors = append(errors, processOpError{ + opErr = processOpError{ processid: v.config.ProcessID(), err: err, - }) - logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.config.ProcessID(), - "fromnodeid": v.fromNodeid, - "tonodeid": v.toNodeid, - }).Log("Moving process, adding process") - break - } - - err = c.proxy.DeleteProcess(v.fromNodeid, v.config.ProcessID()) - if err != nil { - errors = append(errors, processOpError{ - processid: v.config.ProcessID(), - err: err, - }) - logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.config.ProcessID(), - "fromnodeid": v.fromNodeid, - "tonodeid": v.toNodeid, - }).Log("Moving process, removing process") - break - } - - if v.order == "start" { - err = c.proxy.CommandProcess(v.toNodeid, v.config.ProcessID(), "start") - if err != nil { - errors = append(errors, processOpError{ - processid: v.config.ProcessID(), - err: err, - }) - logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.config.ProcessID(), - "fromnodeid": v.fromNodeid, - "tonodeid": v.toNodeid, - }).Log("Moving process, starting process") - break } - } - - errors = append(errors, processOpError{ - processid: v.config.ProcessID(), - err: nil, - }) - - logger.Info().WithFields(log.Fields{ - "processid": v.config.ProcessID(), - "fromnodeid": v.fromNodeid, - "tonodeid": v.toNodeid, - }).Log("Moving process") - case processOpStart: - err := c.proxy.CommandProcess(v.nodeid, v.processid, "start") - if err != nil { - errors = append(errors, processOpError{ - processid: v.processid, - err: err, - }) logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.processid, + "processid": v.config.ProcessID(), "nodeid": v.nodeid, }).Log("Starting process") break } + } - errors = append(errors, processOpError{ + opErr = processOpError{ + processid: v.config.ProcessID(), + err: nil, + } + + logger.Info().WithFields(log.Fields{ + "processid": v.config.ProcessID(), + "nodeid": v.nodeid, + }).Log("Adding process") + case processOpUpdate: + err := c.proxy.UpdateProcess(v.nodeid, v.processid, v.config, v.metadata) + if err != nil { + opErr = processOpError{ processid: v.processid, - err: nil, - }) + err: err, + } + logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.processid, + "nodeid": v.nodeid, + }).Log("Updating process") + break + } - logger.Info().WithFields(log.Fields{ + opErr = processOpError{ + processid: v.processid, + err: nil, + } + + logger.Info().WithFields(log.Fields{ + "processid": v.config.ProcessID(), + "nodeid": v.nodeid, + }).Log("Updating process") + case processOpDelete: + err := c.proxy.DeleteProcess(v.nodeid, v.processid) + if err != nil { + opErr = processOpError{ + processid: v.processid, + err: err, + } + logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.processid, + "nodeid": v.nodeid, + }).Log("Removing process") + break + } + + opErr = processOpError{ + processid: v.processid, + err: nil, + } + + logger.Info().WithFields(log.Fields{ + "processid": v.processid, + "nodeid": v.nodeid, + }).Log("Removing process") + case processOpMove: + err := c.proxy.AddProcess(v.toNodeid, v.config, v.metadata) + if err != nil { + opErr = processOpError{ + processid: v.config.ProcessID(), + err: err, + } + logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.config.ProcessID(), + "fromnodeid": v.fromNodeid, + "tonodeid": v.toNodeid, + }).Log("Moving process, adding process") + break + } + + err = c.proxy.DeleteProcess(v.fromNodeid, v.config.ProcessID()) + if err != nil { + opErr = processOpError{ + processid: v.config.ProcessID(), + err: err, + } + logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.config.ProcessID(), + "fromnodeid": v.fromNodeid, + "tonodeid": v.toNodeid, + }).Log("Moving process, removing process") + break + } + + if v.order == "start" { + err = c.proxy.CommandProcess(v.toNodeid, v.config.ProcessID(), "start") + if err != nil { + opErr = processOpError{ + processid: v.config.ProcessID(), + err: err, + } + logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.config.ProcessID(), + "fromnodeid": v.fromNodeid, + "tonodeid": v.toNodeid, + }).Log("Moving process, starting process") + break + } + } + + opErr = processOpError{ + processid: v.config.ProcessID(), + err: nil, + } + + logger.Info().WithFields(log.Fields{ + "processid": v.config.ProcessID(), + "fromnodeid": v.fromNodeid, + "tonodeid": v.toNodeid, + }).Log("Moving process") + case processOpStart: + err := c.proxy.CommandProcess(v.nodeid, v.processid, "start") + if err != nil { + opErr = processOpError{ + processid: v.processid, + err: err, + } + logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.processid, "nodeid": v.nodeid, }).Log("Starting process") - case processOpStop: - err := c.proxy.CommandProcess(v.nodeid, v.processid, "stop") - if err != nil { - errors = append(errors, processOpError{ - processid: v.processid, - err: err, - }) - logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.processid, - "nodeid": v.nodeid, - }).Log("Stopping process") - break - } + break + } - errors = append(errors, processOpError{ + opErr = processOpError{ + processid: v.processid, + err: nil, + } + + logger.Info().WithFields(log.Fields{ + "processid": v.processid, + "nodeid": v.nodeid, + }).Log("Starting process") + case processOpStop: + err := c.proxy.CommandProcess(v.nodeid, v.processid, "stop") + if err != nil { + opErr = processOpError{ processid: v.processid, - err: nil, - }) - - logger.Info().WithFields(log.Fields{ + err: err, + } + logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.processid, "nodeid": v.nodeid, }).Log("Stopping process") - case processOpReject: - errors = append(errors, processOpError(v)) - logger.Warn().WithError(v.err).WithField("processid", v.processid).Log("Process rejected") - case processOpSkip: - errors = append(errors, processOpError{ - processid: v.processid, - err: v.err, - }) - logger.Warn().WithError(v.err).WithFields(log.Fields{ - "nodeid": v.nodeid, - "processid": v.processid, - }).Log("Process skipped") - case processOpError: - errors = append(errors, v) - default: - logger.Warn().Log("Unknown operation on stack: %+v", v) + break } + + opErr = processOpError{ + processid: v.processid, + err: nil, + } + + logger.Info().WithFields(log.Fields{ + "processid": v.processid, + "nodeid": v.nodeid, + }).Log("Stopping process") + case processOpReject: + opErr = processOpError(v) + logger.Warn().WithError(v.err).WithField("processid", v.processid).Log("Process rejected") + case processOpSkip: + opErr = processOpError{ + processid: v.processid, + err: v.err, + } + logger.Warn().WithError(v.err).WithFields(log.Fields{ + "nodeid": v.nodeid, + "processid": v.processid, + }).Log("Process skipped") + case processOpError: + opErr = v + default: + logger.Warn().Log("Unknown operation on stack: %+v", v) } - return errors + return opErr } func (c *cluster) doSynchronize(emergency bool, term uint64) { @@ -894,10 +932,12 @@ func (c *cluster) doRelocate(emergency bool, term uint64) { cmd.ID = append(cmd.ID, app.ParseProcessID(processid)) } - c.applyCommand(&store.Command{ - Operation: store.OpUnsetRelocateProcess, - Data: cmd, - }) + if len(cmd.ID) != 0 { + c.applyCommand(&store.Command{ + Operation: store.OpUnsetRelocateProcess, + Data: cmd, + }) + } } // isMetadataUpdateRequired compares two metadata. It relies on the documented property that json.Marshal @@ -954,6 +994,13 @@ func isMetadataUpdateRequired(wantMap map[string]interface{}, haveMap map[string func synchronize(wish map[string]string, want []store.Process, have []proxy.Process, nodes map[string]proxy.NodeAbout, nodeRecoverTimeout time.Duration) ([]interface{}, map[string]proxy.NodeResources, map[string]string) { resources := NewResources(nodes) + // Mark nodes as throttling where at least one process is still throttling + for _, haveP := range have { + if haveP.Throttling { + resources.Throttling(haveP.NodeID, true) + } + } + // A map same as wish, but reflecting the actual situation. reality := map[string]string{} @@ -1200,6 +1247,17 @@ func NewResources(nodes map[string]proxy.NodeAbout) *resources { return r } +func (r *resources) Throttling(nodeid string, throttling bool) { + res, hasNode := r.nodes[nodeid] + if !hasNode { + return + } + + res.IsThrottling = throttling + + r.nodes[nodeid] = res +} + // HasNodeEnough returns whether a node has enough resources available for the // requested cpu and memory consumption. func (r *resources) HasNodeEnough(nodeid string, cpu float64, mem uint64) bool { @@ -1462,6 +1520,13 @@ func (ra *referenceAffinity) Move(reference, domain, fromnodeid, tonodeid string func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interface{}, map[string]proxy.NodeResources) { resources := NewResources(nodes) + // Mark nodes as throttling where at least one process is still throttling + for _, haveP := range have { + if haveP.Throttling { + resources.Throttling(haveP.NodeID, true) + } + } + // Group all running processes by node and sort them by their runtime in ascending order. nodeProcessMap := createNodeProcessMap(have) @@ -1471,9 +1536,7 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf opStack := []interface{}{} // Check if any of the nodes is overloaded. - for id, node := range nodes { - r := node.Resources - + for id, r := range resources.Map() { // Ignore this node if the resource values are not reliable. if r.Error != nil { continue @@ -1507,7 +1570,7 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf continue } - if resources.HasNodeEnough(raNodeid, p.CPU, p.Mem) { + if resources.HasNodeEnough(raNodeid, p.Config.LimitCPU, p.Config.LimitMemory) { availableNodeid = raNodeid break } @@ -1516,13 +1579,14 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf // Find the best node with enough resources available. if len(availableNodeid) == 0 { - nodes := resources.FindBestNodes(p.CPU, p.Mem) + nodes := resources.FindBestNodes(p.Config.LimitCPU, p.Config.LimitMemory) for _, nodeid := range nodes { if nodeid == overloadedNodeid { continue } availableNodeid = nodeid + break } } @@ -1566,6 +1630,13 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMap map[string]string) ([]interface{}, map[string]proxy.NodeResources, []string) { resources := NewResources(nodes) + // Mark nodes as throttling where at least one process is still throttling + for _, haveP := range have { + if haveP.Throttling { + resources.Throttling(haveP.NodeID, true) + } + } + relocatedProcessIDs := []string{} // A map from the process reference to the nodes it is running on. @@ -1601,7 +1672,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa if len(targetNodeid) != 0 { _, hasNode := nodes[targetNodeid] - if !hasNode || !resources.HasNodeEnough(targetNodeid, process.CPU, process.Mem) { + if !hasNode || !resources.HasNodeEnough(targetNodeid, process.Config.LimitCPU, process.Config.LimitMemory) { targetNodeid = "" } } @@ -1617,7 +1688,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa continue } - if resources.HasNodeEnough(raNodeid, process.CPU, process.Mem) { + if resources.HasNodeEnough(raNodeid, process.Config.LimitCPU, process.Config.LimitMemory) { targetNodeid = raNodeid break } @@ -1626,7 +1697,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa // Find the best node with enough resources available. if len(targetNodeid) == 0 { - nodes := resources.FindBestNodes(process.CPU, process.Mem) + nodes := resources.FindBestNodes(process.Config.LimitCPU, process.Config.LimitMemory) for _, nodeid := range nodes { if nodeid == sourceNodeid { continue diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 72891348..393e9a60 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -934,14 +934,15 @@ func (n *node) ProxyProcessList() ([]Process, error) { } process := Process{ - NodeID: nodeid, - Order: p.State.Order, - State: p.State.State, - Mem: p.State.Memory, - CPU: p.State.CPU * n.resources.ncpu, - Runtime: time.Duration(p.State.Runtime) * time.Second, - UpdatedAt: time.Unix(p.UpdatedAt, 0), - Metadata: p.Metadata, + NodeID: nodeid, + Order: p.State.Order, + State: p.State.State, + Mem: p.State.Resources.Memory.Current, + CPU: p.State.Resources.CPU.Current, + Throttling: p.State.Resources.CPU.IsThrottling, + Runtime: time.Duration(p.State.Runtime) * time.Second, + UpdatedAt: time.Unix(p.UpdatedAt, 0), + Metadata: p.Metadata, } cfg := &app.Config{ diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index 07a3ee2d..541f72e0 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -392,15 +392,16 @@ func (p *proxy) ListFiles(storage, pattern string) []clientapi.FileInfo { } type Process struct { - NodeID string - Order string - State string - CPU float64 // Current CPU load of this process, 0-100*ncpu - Mem uint64 // Currently consumed memory of this process in bytes - Runtime time.Duration - UpdatedAt time.Time - Config *app.Config - Metadata map[string]interface{} + NodeID string + Order string + State string + CPU float64 // Current CPU load of this process, 0-100*ncpu + Mem uint64 // Currently consumed memory of this process in bytes + Throttling bool + Runtime time.Duration + UpdatedAt time.Time + Config *app.Config + Metadata map[string]interface{} } type ProcessListOptions struct {