Fix deployment errors in cluster DB

This commit is contained in:
Ingo Oppermann 2025-05-13 12:21:54 +02:00
parent 430dcd4340
commit 8807cd2d79
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
4 changed files with 44 additions and 21 deletions

View File

@ -497,6 +497,10 @@ type processOpError struct {
err error err error
} }
type processOpNull struct {
processid app.ProcessID
}
func (c *cluster) applyOpStack(stack []interface{}, term uint64, runners int) []processOpError { func (c *cluster) applyOpStack(stack []interface{}, term uint64, runners int) []processOpError {
errors := []processOpError{} errors := []processOpError{}
@ -527,10 +531,7 @@ func (c *cluster) applyOpStack(stack []interface{}, term uint64, runners int) []
defer wg.Done() defer wg.Done()
for op := range opChan { for op := range opChan {
opErr := c.applyOp(op, logger) errChan <- c.applyOp(op, logger)
if opErr.err != nil {
errChan <- opErr
}
} }
}(errChan, opChan, logger) }(errChan, opChan, logger)
} }
@ -783,6 +784,11 @@ func (c *cluster) applyOp(op interface{}, logger log.Logger) processOpError {
}).Log("Process skipped") }).Log("Process skipped")
case processOpError: case processOpError:
opErr = v opErr = v
case processOpNull:
opErr = processOpError{
processid: v.processid,
err: nil,
}
default: default:
logger.Warn().Log("Unknown operation on stack: %+v", v) logger.Warn().Log("Unknown operation on stack: %+v", v)
} }

View File

@ -190,18 +190,24 @@ func synchronize(wish map[string]string, want []store.Process, have []node.Proce
// The process is on the wantMap. Update the process if the configuration and/or metadata differ. // The process is on the wantMap. Update the process if the configuration and/or metadata differ.
hasConfigChanges := !wantP.Config.Equal(haveP.Config) hasConfigChanges := !wantP.Config.Equal(haveP.Config)
hasMetadataChanges, metadata := isMetadataUpdateRequired(wantP.Metadata, haveP.Metadata) hasMetadataChanges, metadata := isMetadataUpdateRequired(wantP.Metadata, haveP.Metadata)
if (hasConfigChanges || hasMetadataChanges) && opBudget > 0 { if opBudget > 0 {
// TODO: When the required resources increase, should we move this process to a node if hasConfigChanges || hasMetadataChanges {
// that has them available? Otherwise, this node might start throttling. However, this // TODO: When the required resources increase, should we move this process to a node
// will result in rebalancing. // that has them available? Otherwise, this node might start throttling. However, this
opStackUpdate = append(opStackUpdate, processOpUpdate{ // will result in rebalancing.
nodeid: haveP.NodeID, opStackUpdate = append(opStackUpdate, processOpUpdate{
processid: haveP.Config.ProcessID(), nodeid: haveP.NodeID,
config: wantP.Config, processid: haveP.Config.ProcessID(),
metadata: metadata, config: wantP.Config,
}) metadata: metadata,
})
opBudget -= 3 opBudget -= 3
} else {
opStack = append(opStack, processOpNull{
processid: haveP.Config.ProcessID(),
})
}
} }
delete(wantMap, pid) delete(wantMap, pid)

View File

@ -231,6 +231,10 @@ func (s *store) ProcessGet(id app.ProcessID) (Process, string, error) {
nodeid := s.data.ProcessNodeMap[id.String()] nodeid := s.data.ProcessNodeMap[id.String()]
if len(process.Error) != 0 {
nodeid = ""
}
return Process{ return Process{
CreatedAt: process.CreatedAt, CreatedAt: process.CreatedAt,
UpdatedAt: process.UpdatedAt, UpdatedAt: process.UpdatedAt,

View File

@ -57,16 +57,14 @@ func (h *ClusterHandler) ProcessList(c echo.Context) error {
DomainPattern: domainpattern, DomainPattern: domainpattern,
}) })
processes := []api.Process{} pmap := map[app.ProcessID]api.Process{}
pmap := map[app.ProcessID]struct{}{}
for _, p := range procs { for _, p := range procs {
if !h.iam.Enforce(ctxuser, domain, "process", p.ID, "read") { if !h.iam.Enforce(ctxuser, domain, "process", p.ID, "read") {
continue continue
} }
processes = append(processes, p) pmap[app.NewProcessID(p.ID, p.Domain)] = p
pmap[app.NewProcessID(p.ID, p.Domain)] = struct{}{}
} }
missing := []api.Process{} missing := []api.Process{}
@ -82,8 +80,12 @@ func (h *ClusterHandler) ProcessList(c echo.Context) error {
} }
// Check if the process has been deployed // Check if the process has been deployed
if _, ok := pmap[p.Config.ProcessID()]; ok { if len(p.Error) == 0 {
continue if _, ok := pmap[p.Config.ProcessID()]; ok {
continue
}
} else {
delete(pmap, p.Config.ProcessID())
} }
process := api.Process{} process := api.Process{}
@ -93,6 +95,11 @@ func (h *ClusterHandler) ProcessList(c echo.Context) error {
} }
} }
processes := []api.Process{}
for _, p := range pmap {
processes = append(processes, p)
}
processes = append(processes, missing...) processes = append(processes, missing...)
return c.JSON(http.StatusOK, processes) return c.JSON(http.StatusOK, processes)