Fix cluster shutdown, limit parallel opstack worker

This commit is contained in:
Ingo Oppermann 2024-07-17 16:48:33 +02:00
parent 3df1075548
commit e12fb0be52
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
2 changed files with 38 additions and 9 deletions

View File

@ -140,6 +140,7 @@ type cluster struct {
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
shutdownWg sync.WaitGroup
syncInterval time.Duration
nodeRecoverTimeout time.Duration
@ -347,7 +348,11 @@ func New(config Config) (Cluster, error) {
return nil, err
}
c.shutdownWg.Add(1)
go func(peerAddress string) {
defer c.shutdownWg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
@ -369,6 +374,8 @@ func New(config Config) (Cluster, error) {
}
}
c.shutdownWg.Add(4)
go c.trackNodeChanges()
go c.trackLeaderChanges()
go c.monitorLeadership()
@ -416,6 +423,8 @@ func (c *cluster) Start(ctx context.Context) error {
<-c.shutdownCh
c.shutdownWg.Wait()
return nil
}
@ -621,6 +630,8 @@ func (c *cluster) Shutdown() error {
c.shutdown = true
close(c.shutdownCh)
c.shutdownWg.Wait()
if c.manager != nil {
c.manager.NodesClear()
c.manager = nil
@ -635,7 +646,6 @@ func (c *cluster) Shutdown() error {
if c.raft != nil {
c.raft.Shutdown()
c.raft = nil
}
return nil
@ -887,6 +897,8 @@ func (c *cluster) Snapshot(origin string) (io.ReadCloser, error) {
}
func (c *cluster) trackNodeChanges() {
defer c.shutdownWg.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
@ -980,6 +992,8 @@ func (c *cluster) getClusterBarrier(name string) (bool, error) {
// trackLeaderChanges registers an Observer with raft in order to receive updates
// about leader changes, in order to keep the forwarder up to date.
func (c *cluster) trackLeaderChanges() {
defer c.shutdownWg.Done()
for {
select {
case leaderAddress := <-c.raftLeaderObservationCh:
@ -1039,6 +1053,8 @@ func (c *cluster) applyCommand(cmd *store.Command) error {
}
func (c *cluster) sentinel() {
defer c.shutdownWg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

View File

@ -19,6 +19,8 @@ const NOTIFY_LEADER = 1
const NOTIFY_EMERGENCY = 2
func (c *cluster) monitorLeadership() {
defer c.shutdownWg.Done()
// We use the notify channel we configured Raft with, NOT Raft's
// leaderCh, which is only notified best-effort. Doing this ensures
// that we get all notifications in order, which is required for
@ -449,7 +451,7 @@ type processOpError struct {
err error
}
func (c *cluster) applyOpStack(stack []interface{}, term uint64) []processOpError {
func (c *cluster) applyOpStack(stack []interface{}, term uint64, runners int) []processOpError {
errors := []processOpError{}
logger := c.logger.WithFields(log.Fields{
@ -458,6 +460,7 @@ func (c *cluster) applyOpStack(stack []interface{}, term uint64) []processOpErro
})
errChan := make(chan processOpError, len(stack))
opChan := make(chan interface{}, len(stack))
wgReader := sync.WaitGroup{}
wgReader.Add(1)
@ -470,18 +473,28 @@ func (c *cluster) applyOpStack(stack []interface{}, term uint64) []processOpErro
}(errChan)
wg := sync.WaitGroup{}
for _, op := range stack {
for i := 0; i < runners; i++ {
wg.Add(1)
go func(errChan chan<- processOpError, op interface{}, logger log.Logger) {
opErr := c.applyOp(op, logger)
if opErr.err != nil {
errChan <- opErr
go func(errChan chan<- processOpError, opChan <-chan interface{}, logger log.Logger) {
defer wg.Done()
for op := range opChan {
opErr := c.applyOp(op, logger)
if opErr.err != nil {
errChan <- opErr
}
}
wg.Done()
}(errChan, op, logger)
}(errChan, opChan, logger)
}
for _, op := range stack {
opChan <- op
}
close(opChan)
wg.Wait()
close(errChan)