From 4d0eed092eb057f5db6f4a5873b823bf3a0982c5 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 17 Jul 2024 16:50:39 +0200 Subject: [PATCH] Return error from ClusterProcessList, remove ProcessFindNodeID --- cluster/leader_rebalance.go | 14 +++++------- cluster/leader_relocate.go | 15 +++++------- cluster/leader_synchronize.go | 25 +++++++++----------- cluster/node/manager.go | 43 ++++++++++++++++------------------- 4 files changed, 42 insertions(+), 55 deletions(-) diff --git a/cluster/leader_rebalance.go b/cluster/leader_rebalance.go index ebfc3120..6080777e 100644 --- a/cluster/leader_rebalance.go +++ b/cluster/leader_rebalance.go @@ -3,7 +3,6 @@ package cluster import ( "github.com/datarhei/core/v16/cluster/node" "github.com/datarhei/core/v16/cluster/store" - "github.com/datarhei/core/v16/log" ) func (c *cluster) doRebalance(emergency bool, term uint64) { @@ -17,8 +16,12 @@ func (c *cluster) doRebalance(emergency bool, term uint64) { logger.Debug().WithField("emergency", emergency).Log("Rebalancing") storeNodes := c.store.NodeList() - have := c.manager.ClusterProcessList() nodes := c.manager.NodeList() + have, err := c.manager.ClusterProcessList() + if err != nil { + logger.Warn().WithError(err).Log("Failed to retrieve complete process list") + return + } nodesMap := map[string]node.About{} @@ -32,14 +35,9 @@ func (c *cluster) doRebalance(emergency bool, term uint64) { nodesMap[about.ID] = about } - logger.Debug().WithFields(log.Fields{ - "have": have, - "nodes": nodesMap, - }).Log("Rebalance") - opStack, _ := rebalance(have, nodesMap) - errors := c.applyOpStack(opStack, term) + errors := c.applyOpStack(opStack, term, 5) for _, e := range errors { // Only apply the command if the error is different. diff --git a/cluster/leader_relocate.go b/cluster/leader_relocate.go index 5879a1a9..b28e5696 100644 --- a/cluster/leader_relocate.go +++ b/cluster/leader_relocate.go @@ -3,7 +3,6 @@ package cluster import ( "github.com/datarhei/core/v16/cluster/node" "github.com/datarhei/core/v16/cluster/store" - "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/restream/app" ) @@ -19,8 +18,12 @@ func (c *cluster) doRelocate(emergency bool, term uint64) { relocateMap := c.store.ProcessGetRelocateMap() storeNodes := c.store.NodeList() - have := c.manager.ClusterProcessList() nodes := c.manager.NodeList() + have, err := c.manager.ClusterProcessList() + if err != nil { + logger.Warn().WithError(err).Log("Failed to retrieve complete process list") + return + } nodesMap := map[string]node.About{} @@ -34,15 +37,9 @@ func (c *cluster) doRelocate(emergency bool, term uint64) { nodesMap[about.ID] = about } - logger.Debug().WithFields(log.Fields{ - "relocate": relocate, - "have": have, - "nodes": nodesMap, - }).Log("Rebalance") - opStack, _, relocatedProcessIDs := relocate(have, nodesMap, relocateMap) - errors := c.applyOpStack(opStack, term) + errors := c.applyOpStack(opStack, term, 5) for _, e := range errors { // Only apply the command if the error is different. diff --git a/cluster/leader_synchronize.go b/cluster/leader_synchronize.go index 60508433..cab2755e 100644 --- a/cluster/leader_synchronize.go +++ b/cluster/leader_synchronize.go @@ -8,20 +8,23 @@ import ( "github.com/datarhei/core/v16/cluster/node" "github.com/datarhei/core/v16/cluster/store" "github.com/datarhei/core/v16/encoding/json" - "github.com/datarhei/core/v16/log" ) func (c *cluster) doSynchronize(emergency bool, term uint64) { - wish := c.store.ProcessGetNodeMap() - want := c.store.ProcessList() - storeNodes := c.store.NodeList() - have := c.manager.ClusterProcessList() - nodes := c.manager.NodeList() - logger := c.logger.WithField("term", term) logger.Debug().WithField("emergency", emergency).Log("Synchronizing") + wish := c.store.ProcessGetNodeMap() + want := c.store.ProcessList() + storeNodes := c.store.NodeList() + nodes := c.manager.NodeList() + have, err := c.manager.ClusterProcessList() + if err != nil { + logger.Warn().WithError(err).Log("Failed to retrieve complete process list") + return + } + nodesMap := map[string]node.About{} for _, node := range nodes { @@ -34,12 +37,6 @@ func (c *cluster) doSynchronize(emergency bool, term uint64) { nodesMap[about.ID] = about } - logger.Debug().WithFields(log.Fields{ - "want": want, - "have": have, - "nodes": nodesMap, - }).Log("Synchronize") - opStack, _, reality := synchronize(wish, want, have, nodesMap, c.nodeRecoverTimeout) if !emergency && !maps.Equal(wish, reality) { @@ -53,7 +50,7 @@ func (c *cluster) doSynchronize(emergency bool, term uint64) { c.applyCommand(cmd) } - errors := c.applyOpStack(opStack, term) + errors := c.applyOpStack(opStack, term, 5) if !emergency { for _, e := range errors { diff --git a/cluster/node/manager.go b/cluster/node/manager.go index 4b63c2dc..5518a59e 100644 --- a/cluster/node/manager.go +++ b/cluster/node/manager.go @@ -409,12 +409,14 @@ func (p *Manager) FilesystemList(storage, pattern string) []api.FileInfo { return filesList } -func (p *Manager) ClusterProcessList() []Process { +func (p *Manager) ClusterProcessList() ([]Process, error) { processChan := make(chan []Process, 64) processList := []Process{} + errorChan := make(chan error, 8) + errorList := []error{} wgList := sync.WaitGroup{} - wgList.Add(1) + wgList.Add(2) go func() { defer wgList.Done() @@ -424,53 +426,46 @@ func (p *Manager) ClusterProcessList() []Process { } }() + go func() { + defer wgList.Done() + + for err := range errorChan { + errorList = append(errorList, err) + } + }() + wg := sync.WaitGroup{} p.lock.RLock() for _, n := range p.nodes { wg.Add(1) - go func(node *Node, p chan<- []Process) { + go func(node *Node, p chan<- []Process, e chan<- error) { defer wg.Done() processes, err := node.Core().ClusterProcessList() if err != nil { + e <- err return } p <- processes - }(n, processChan) + }(n, processChan, errorChan) } p.lock.RUnlock() wg.Wait() close(processChan) + close(errorChan) wgList.Wait() - return processList -} - -func (p *Manager) ProcessFindNodeID(id app.ProcessID) (string, error) { - procs := p.ClusterProcessList() - nodeid := "" - - for _, p := range procs { - if p.Config.ProcessID() != id { - continue - } - - nodeid = p.NodeID - - break + if len(errorList) != 0 { + return nil, fmt.Errorf("not all nodes responded wit their process list") } - if len(nodeid) == 0 { - return "", fmt.Errorf("the process '%s' is not registered with any node", id.String()) - } - - return nodeid, nil + return processList, nil } func (p *Manager) FindNodeForResources(nodeid string, cpu float64, memory uint64) string {