diff --git a/cluster/leader.go b/cluster/leader.go index 14fb4be3..ca7d2ab5 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -497,6 +497,10 @@ type processOpError struct { err error } +type processOpNull struct { + processid app.ProcessID +} + func (c *cluster) applyOpStack(stack []interface{}, term uint64, runners int) []processOpError { errors := []processOpError{} @@ -527,10 +531,7 @@ func (c *cluster) applyOpStack(stack []interface{}, term uint64, runners int) [] defer wg.Done() for op := range opChan { - opErr := c.applyOp(op, logger) - if opErr.err != nil { - errChan <- opErr - } + errChan <- c.applyOp(op, logger) } }(errChan, opChan, logger) } @@ -783,6 +784,11 @@ func (c *cluster) applyOp(op interface{}, logger log.Logger) processOpError { }).Log("Process skipped") case processOpError: opErr = v + case processOpNull: + opErr = processOpError{ + processid: v.processid, + err: nil, + } default: logger.Warn().Log("Unknown operation on stack: %+v", v) } diff --git a/cluster/leader_synchronize.go b/cluster/leader_synchronize.go index c56e4ad8..fa927814 100644 --- a/cluster/leader_synchronize.go +++ b/cluster/leader_synchronize.go @@ -190,18 +190,24 @@ func synchronize(wish map[string]string, want []store.Process, have []node.Proce // The process is on the wantMap. Update the process if the configuration and/or metadata differ. hasConfigChanges := !wantP.Config.Equal(haveP.Config) hasMetadataChanges, metadata := isMetadataUpdateRequired(wantP.Metadata, haveP.Metadata) - if (hasConfigChanges || hasMetadataChanges) && opBudget > 0 { - // TODO: When the required resources increase, should we move this process to a node - // that has them available? Otherwise, this node might start throttling. However, this - // will result in rebalancing. - opStackUpdate = append(opStackUpdate, processOpUpdate{ - nodeid: haveP.NodeID, - processid: haveP.Config.ProcessID(), - config: wantP.Config, - metadata: metadata, - }) + if opBudget > 0 { + if hasConfigChanges || hasMetadataChanges { + // TODO: When the required resources increase, should we move this process to a node + // that has them available? Otherwise, this node might start throttling. However, this + // will result in rebalancing. + opStackUpdate = append(opStackUpdate, processOpUpdate{ + nodeid: haveP.NodeID, + processid: haveP.Config.ProcessID(), + config: wantP.Config, + metadata: metadata, + }) - opBudget -= 3 + opBudget -= 3 + } else { + opStack = append(opStack, processOpNull{ + processid: haveP.Config.ProcessID(), + }) + } } delete(wantMap, pid) diff --git a/cluster/store/process.go b/cluster/store/process.go index d04b7129..19ef27c0 100644 --- a/cluster/store/process.go +++ b/cluster/store/process.go @@ -231,6 +231,10 @@ func (s *store) ProcessGet(id app.ProcessID) (Process, string, error) { nodeid := s.data.ProcessNodeMap[id.String()] + if len(process.Error) != 0 { + nodeid = "" + } + return Process{ CreatedAt: process.CreatedAt, UpdatedAt: process.UpdatedAt, diff --git a/http/handler/api/cluster_process.go b/http/handler/api/cluster_process.go index f8271293..755238a7 100644 --- a/http/handler/api/cluster_process.go +++ b/http/handler/api/cluster_process.go @@ -57,16 +57,14 @@ func (h *ClusterHandler) ProcessList(c echo.Context) error { DomainPattern: domainpattern, }) - processes := []api.Process{} - pmap := map[app.ProcessID]struct{}{} + pmap := map[app.ProcessID]api.Process{} for _, p := range procs { if !h.iam.Enforce(ctxuser, domain, "process", p.ID, "read") { continue } - processes = append(processes, p) - pmap[app.NewProcessID(p.ID, p.Domain)] = struct{}{} + pmap[app.NewProcessID(p.ID, p.Domain)] = p } missing := []api.Process{} @@ -82,8 +80,12 @@ func (h *ClusterHandler) ProcessList(c echo.Context) error { } // Check if the process has been deployed - if _, ok := pmap[p.Config.ProcessID()]; ok { - continue + if len(p.Error) == 0 { + if _, ok := pmap[p.Config.ProcessID()]; ok { + continue + } + } else { + delete(pmap, p.Config.ProcessID()) } process := api.Process{} @@ -93,6 +95,11 @@ func (h *ClusterHandler) ProcessList(c echo.Context) error { } } + processes := []api.Process{} + for _, p := range pmap { + processes = append(processes, p) + } + processes = append(processes, missing...) return c.JSON(http.StatusOK, processes)