From 2c1757202735dc13d48f0418c3b47e0d00a156ab Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 12 Jun 2024 14:42:27 +0200 Subject: [PATCH] Drop dead nodes after CORE_CLUSTER_NODE_RECOVER_TIMEOUT_SEC --- cluster/cluster.go | 36 ++++++++++++++++++++++++++---------- cluster/leader.go | 30 ++++++++++++++++++++++++++++++ cluster/proxy/node.go | 17 +++++++++++------ 3 files changed, 67 insertions(+), 16 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index bfa650f6..d1221172 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -59,6 +59,8 @@ type Cluster interface { Leave(origin, id string) error // gracefully remove a node from the cluster TransferLeadership(origin, id string) error // transfer leadership to another node Snapshot(origin string) (io.ReadCloser, error) + IsRaftLeader() bool + HasRaftLeader() bool ListProcesses() []store.Process GetProcess(id app.ProcessID) (store.Process, error) @@ -165,10 +167,11 @@ type cluster struct { hostnames []string stateLock sync.RWMutex - isRaftLeader bool - hasRaftLeader bool - isLeader bool - leaderLock sync.Mutex + isRaftLeader bool + hasRaftLeader bool + isLeader bool + isEmergencyLeader bool + leaderLock sync.Mutex isTLSRequired bool clusterKVS ClusterKVS @@ -369,10 +372,16 @@ func New(config Config) (Cluster, error) { ticker := time.NewTicker(time.Second) defer ticker.Stop() + timer := time.NewTimer(c.nodeRecoverTimeout) + defer timer.Stop() + for { select { case <-c.shutdownCh: return + case <-timer.C: + c.logger.Warn().WithField("peer", peerAddress).Log("Giving up joining cluster") + return case <-ticker.C: err := c.Join("", c.nodeID, c.raftAddress, peerAddress) if err != nil { @@ -672,6 +681,13 @@ func (c *cluster) IsRaftLeader() bool { return c.isRaftLeader } +func (c *cluster) HasRaftLeader() bool { + c.leaderLock.Lock() + defer c.leaderLock.Unlock() + + return c.hasRaftLeader +} + func (c *cluster) IsDegraded() (bool, error) { c.stateLock.RLock() defer c.stateLock.RUnlock() @@ -709,7 +725,7 @@ func (c *cluster) IsClusterDegraded() (bool, error) { } func (c *cluster) Leave(origin, id string) error { - if ok, _ := c.IsDegraded(); ok { + if !c.HasRaftLeader() { return ErrDegraded } @@ -767,11 +783,11 @@ func (c *cluster) Leave(origin, id string) error { return err } - numPeers := len(servers) + numServers := len(servers) if id == c.nodeID { // We're going to remove ourselves - if numPeers <= 1 { + if numServers <= 1 { // Don't do so if we're the only server in the cluster c.logger.Debug().Log("We're the leader without any peers, not doing anything") return nil @@ -847,7 +863,7 @@ func (c *cluster) Leave(origin, id string) error { } func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error { - if ok, _ := c.IsDegraded(); ok { + if !c.HasRaftLeader() { return ErrDegraded } @@ -910,7 +926,7 @@ func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error { } func (c *cluster) TransferLeadership(origin, id string) error { - if ok, _ := c.IsDegraded(); ok { + if !c.HasRaftLeader() { return ErrDegraded } @@ -923,7 +939,7 @@ func (c *cluster) TransferLeadership(origin, id string) error { } func (c *cluster) Snapshot(origin string) (io.ReadCloser, error) { - if ok, _ := c.IsDegraded(); ok { + if !c.HasRaftLeader() { return nil, ErrDegraded } diff --git a/cluster/leader.go b/cluster/leader.go index 11793da9..aaccce4d 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -112,6 +112,7 @@ func (c *cluster) monitorLeadership() { c.leaderLock.Lock() c.isRaftLeader = false c.isLeader = false + c.isEmergencyLeader = false c.leaderLock.Unlock() } else if notification == NOTIFY_LEADER { if weAreLeaderCh != nil { @@ -145,6 +146,7 @@ func (c *cluster) monitorLeadership() { c.leaderLock.Lock() c.isRaftLeader = true c.isLeader = true + c.isEmergencyLeader = false c.leaderLock.Unlock() } else if notification == NOTIFY_EMERGENCY { if weAreEmergencyLeaderCh != nil { @@ -178,6 +180,7 @@ func (c *cluster) monitorLeadership() { c.leaderLock.Lock() c.isRaftLeader = false c.isLeader = true + c.isEmergencyLeader = true c.leaderLock.Unlock() } case <-c.shutdownCh: @@ -303,11 +306,38 @@ func (c *cluster) establishLeadership(ctx context.Context, emergency bool) error if !emergency { go c.clearLocks(ctx, time.Minute) + go c.clearDeadNodes(ctx, c.nodeRecoverTimeout) } return nil } +func (c *cluster) clearDeadNodes(ctx context.Context, nodeRecoverTimeout time.Duration) { + ticker := time.NewTicker(c.syncInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + nodes := c.proxy.ListNodes() + for _, node := range nodes { + about := node.About() + if time.Since(about.SpawnedAt) > nodeRecoverTimeout && time.Since(about.LastContact) > nodeRecoverTimeout { + c.logger.Warn().WithFields(log.Fields{ + "id": about.ID, + "after": nodeRecoverTimeout, + "lastContact": about.LastContact, + "spawnedAt": about.SpawnedAt, + }).Log("Removing peer from cluster") + c.raft.RemoveServer(about.ID) + } + } + } + } +} + func (c *cluster) revokeLeadership() { c.logger.Info().Log("Revoking leadership") diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 72891348..b5152d96 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -75,6 +75,7 @@ type NodeResources struct { } type NodeAbout struct { + SpawnedAt time.Time ID string Name string Address string @@ -114,6 +115,8 @@ type node struct { id string address string + createdAt time.Time + peer client.RestClient peerErr error peerLock sync.RWMutex @@ -159,12 +162,13 @@ type NodeConfig struct { func NewNode(config NodeConfig) Node { n := &node{ - id: config.ID, - address: config.Address, - config: config.Config, - state: stateDisconnected, - secure: strings.HasPrefix(config.Address, "https://"), - logger: config.Logger, + id: config.ID, + address: config.Address, + config: config.Config, + state: stateDisconnected, + secure: strings.HasPrefix(config.Address, "https://"), + logger: config.Logger, + createdAt: time.Now(), } if n.logger == nil { @@ -530,6 +534,7 @@ func (n *node) About() NodeAbout { } nodeAbout := NodeAbout{ + SpawnedAt: n.createdAt, ID: n.id, Name: name, Address: n.address,