Return error from ClusterProcessList, remove ProcessFindNodeID

This commit is contained in:
Ingo Oppermann 2024-07-17 16:50:39 +02:00
parent 6f524f5991
commit 4d0eed092e
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
4 changed files with 42 additions and 55 deletions

View File

@ -3,7 +3,6 @@ package cluster
import (
"github.com/datarhei/core/v16/cluster/node"
"github.com/datarhei/core/v16/cluster/store"
"github.com/datarhei/core/v16/log"
)
func (c *cluster) doRebalance(emergency bool, term uint64) {
@ -17,8 +16,12 @@ func (c *cluster) doRebalance(emergency bool, term uint64) {
logger.Debug().WithField("emergency", emergency).Log("Rebalancing")
storeNodes := c.store.NodeList()
have := c.manager.ClusterProcessList()
nodes := c.manager.NodeList()
have, err := c.manager.ClusterProcessList()
if err != nil {
logger.Warn().WithError(err).Log("Failed to retrieve complete process list")
return
}
nodesMap := map[string]node.About{}
@ -32,14 +35,9 @@ func (c *cluster) doRebalance(emergency bool, term uint64) {
nodesMap[about.ID] = about
}
logger.Debug().WithFields(log.Fields{
"have": have,
"nodes": nodesMap,
}).Log("Rebalance")
opStack, _ := rebalance(have, nodesMap)
errors := c.applyOpStack(opStack, term)
errors := c.applyOpStack(opStack, term, 5)
for _, e := range errors {
// Only apply the command if the error is different.

View File

@ -3,7 +3,6 @@ package cluster
import (
"github.com/datarhei/core/v16/cluster/node"
"github.com/datarhei/core/v16/cluster/store"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/restream/app"
)
@ -19,8 +18,12 @@ func (c *cluster) doRelocate(emergency bool, term uint64) {
relocateMap := c.store.ProcessGetRelocateMap()
storeNodes := c.store.NodeList()
have := c.manager.ClusterProcessList()
nodes := c.manager.NodeList()
have, err := c.manager.ClusterProcessList()
if err != nil {
logger.Warn().WithError(err).Log("Failed to retrieve complete process list")
return
}
nodesMap := map[string]node.About{}
@ -34,15 +37,9 @@ func (c *cluster) doRelocate(emergency bool, term uint64) {
nodesMap[about.ID] = about
}
logger.Debug().WithFields(log.Fields{
"relocate": relocate,
"have": have,
"nodes": nodesMap,
}).Log("Rebalance")
opStack, _, relocatedProcessIDs := relocate(have, nodesMap, relocateMap)
errors := c.applyOpStack(opStack, term)
errors := c.applyOpStack(opStack, term, 5)
for _, e := range errors {
// Only apply the command if the error is different.

View File

@ -8,20 +8,23 @@ import (
"github.com/datarhei/core/v16/cluster/node"
"github.com/datarhei/core/v16/cluster/store"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/log"
)
func (c *cluster) doSynchronize(emergency bool, term uint64) {
wish := c.store.ProcessGetNodeMap()
want := c.store.ProcessList()
storeNodes := c.store.NodeList()
have := c.manager.ClusterProcessList()
nodes := c.manager.NodeList()
logger := c.logger.WithField("term", term)
logger.Debug().WithField("emergency", emergency).Log("Synchronizing")
wish := c.store.ProcessGetNodeMap()
want := c.store.ProcessList()
storeNodes := c.store.NodeList()
nodes := c.manager.NodeList()
have, err := c.manager.ClusterProcessList()
if err != nil {
logger.Warn().WithError(err).Log("Failed to retrieve complete process list")
return
}
nodesMap := map[string]node.About{}
for _, node := range nodes {
@ -34,12 +37,6 @@ func (c *cluster) doSynchronize(emergency bool, term uint64) {
nodesMap[about.ID] = about
}
logger.Debug().WithFields(log.Fields{
"want": want,
"have": have,
"nodes": nodesMap,
}).Log("Synchronize")
opStack, _, reality := synchronize(wish, want, have, nodesMap, c.nodeRecoverTimeout)
if !emergency && !maps.Equal(wish, reality) {
@ -53,7 +50,7 @@ func (c *cluster) doSynchronize(emergency bool, term uint64) {
c.applyCommand(cmd)
}
errors := c.applyOpStack(opStack, term)
errors := c.applyOpStack(opStack, term, 5)
if !emergency {
for _, e := range errors {

View File

@ -409,12 +409,14 @@ func (p *Manager) FilesystemList(storage, pattern string) []api.FileInfo {
return filesList
}
func (p *Manager) ClusterProcessList() []Process {
func (p *Manager) ClusterProcessList() ([]Process, error) {
processChan := make(chan []Process, 64)
processList := []Process{}
errorChan := make(chan error, 8)
errorList := []error{}
wgList := sync.WaitGroup{}
wgList.Add(1)
wgList.Add(2)
go func() {
defer wgList.Done()
@ -424,53 +426,46 @@ func (p *Manager) ClusterProcessList() []Process {
}
}()
go func() {
defer wgList.Done()
for err := range errorChan {
errorList = append(errorList, err)
}
}()
wg := sync.WaitGroup{}
p.lock.RLock()
for _, n := range p.nodes {
wg.Add(1)
go func(node *Node, p chan<- []Process) {
go func(node *Node, p chan<- []Process, e chan<- error) {
defer wg.Done()
processes, err := node.Core().ClusterProcessList()
if err != nil {
e <- err
return
}
p <- processes
}(n, processChan)
}(n, processChan, errorChan)
}
p.lock.RUnlock()
wg.Wait()
close(processChan)
close(errorChan)
wgList.Wait()
return processList
}
func (p *Manager) ProcessFindNodeID(id app.ProcessID) (string, error) {
procs := p.ClusterProcessList()
nodeid := ""
for _, p := range procs {
if p.Config.ProcessID() != id {
continue
}
nodeid = p.NodeID
break
if len(errorList) != 0 {
return nil, fmt.Errorf("not all nodes responded wit their process list")
}
if len(nodeid) == 0 {
return "", fmt.Errorf("the process '%s' is not registered with any node", id.String())
}
return nodeid, nil
return processList, nil
}
func (p *Manager) FindNodeForResources(nodeid string, cpu float64, memory uint64) string {