From e12fb0be523168e756f6ce5dbc62314e49c3afee Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 17 Jul 2024 16:48:33 +0200 Subject: [PATCH] Fix cluster shutdown, limit parallel opstack worker --- cluster/cluster.go | 18 +++++++++++++++++- cluster/leader.go | 29 +++++++++++++++++++++-------- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index c7add98b..d1bf5dd2 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -140,6 +140,7 @@ type cluster struct { shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex + shutdownWg sync.WaitGroup syncInterval time.Duration nodeRecoverTimeout time.Duration @@ -347,7 +348,11 @@ func New(config Config) (Cluster, error) { return nil, err } + c.shutdownWg.Add(1) + go func(peerAddress string) { + defer c.shutdownWg.Done() + ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -369,6 +374,8 @@ func New(config Config) (Cluster, error) { } } + c.shutdownWg.Add(4) + go c.trackNodeChanges() go c.trackLeaderChanges() go c.monitorLeadership() @@ -416,6 +423,8 @@ func (c *cluster) Start(ctx context.Context) error { <-c.shutdownCh + c.shutdownWg.Wait() + return nil } @@ -621,6 +630,8 @@ func (c *cluster) Shutdown() error { c.shutdown = true close(c.shutdownCh) + c.shutdownWg.Wait() + if c.manager != nil { c.manager.NodesClear() c.manager = nil @@ -635,7 +646,6 @@ func (c *cluster) Shutdown() error { if c.raft != nil { c.raft.Shutdown() - c.raft = nil } return nil @@ -887,6 +897,8 @@ func (c *cluster) Snapshot(origin string) (io.ReadCloser, error) { } func (c *cluster) trackNodeChanges() { + defer c.shutdownWg.Done() + ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() @@ -980,6 +992,8 @@ func (c *cluster) getClusterBarrier(name string) (bool, error) { // trackLeaderChanges registers an Observer with raft in order to receive updates // about leader changes, in order to keep the forwarder up to date. func (c *cluster) trackLeaderChanges() { + defer c.shutdownWg.Done() + for { select { case leaderAddress := <-c.raftLeaderObservationCh: @@ -1039,6 +1053,8 @@ func (c *cluster) applyCommand(cmd *store.Command) error { } func (c *cluster) sentinel() { + defer c.shutdownWg.Done() + ticker := time.NewTicker(time.Second) defer ticker.Stop() diff --git a/cluster/leader.go b/cluster/leader.go index c45f05f9..60ab84d4 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -19,6 +19,8 @@ const NOTIFY_LEADER = 1 const NOTIFY_EMERGENCY = 2 func (c *cluster) monitorLeadership() { + defer c.shutdownWg.Done() + // We use the notify channel we configured Raft with, NOT Raft's // leaderCh, which is only notified best-effort. Doing this ensures // that we get all notifications in order, which is required for @@ -449,7 +451,7 @@ type processOpError struct { err error } -func (c *cluster) applyOpStack(stack []interface{}, term uint64) []processOpError { +func (c *cluster) applyOpStack(stack []interface{}, term uint64, runners int) []processOpError { errors := []processOpError{} logger := c.logger.WithFields(log.Fields{ @@ -458,6 +460,7 @@ func (c *cluster) applyOpStack(stack []interface{}, term uint64) []processOpErro }) errChan := make(chan processOpError, len(stack)) + opChan := make(chan interface{}, len(stack)) wgReader := sync.WaitGroup{} wgReader.Add(1) @@ -470,18 +473,28 @@ func (c *cluster) applyOpStack(stack []interface{}, term uint64) []processOpErro }(errChan) wg := sync.WaitGroup{} - for _, op := range stack { + + for i := 0; i < runners; i++ { wg.Add(1) - go func(errChan chan<- processOpError, op interface{}, logger log.Logger) { - opErr := c.applyOp(op, logger) - if opErr.err != nil { - errChan <- opErr + go func(errChan chan<- processOpError, opChan <-chan interface{}, logger log.Logger) { + defer wg.Done() + + for op := range opChan { + opErr := c.applyOp(op, logger) + if opErr.err != nil { + errChan <- opErr + } } - wg.Done() - }(errChan, op, logger) + }(errChan, opChan, logger) } + for _, op := range stack { + opChan <- op + } + + close(opChan) + wg.Wait() close(errChan)