Replace CORE_CLUSTER_JOIN_ADDRESS with CORE_CLUSTER_PEERS. This is a comma separated list of cluster members with their IDs of the form ID@host:port On startup the node tries to connect to all the peers. In case of sudden deaths of a node this will allow to find back into the cluster. The list in CLUSTER_PEERS is a starting point of known peers. Other node that are not in that list can still join the cluster. File and stream proxy has been moved to the Proxy type.
336 lines
8.9 KiB
Go
336 lines
8.9 KiB
Go
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/datarhei/core/v16/log"
|
|
)
|
|
|
|
const NOTIFY_FOLLOWER = 0
|
|
const NOTIFY_LEADER = 1
|
|
const NOTIFY_EMERGENCY = 2
|
|
|
|
// monitorLeadership listens to the raf notify channel in order to find
|
|
// out if we got the leadership or lost it.
|
|
// https://github.com/hashicorp/consul/blob/44b39240a86bc94ddc67bc105286ab450bd869a9/agent/consul/leader.go#L71
|
|
func (c *cluster) monitorLeadership() {
|
|
// We use the notify channel we configured Raft with, NOT Raft's
|
|
// leaderCh, which is only notified best-effort. Doing this ensures
|
|
// that we get all notifications in order, which is required for
|
|
// cleanup and to ensure we never run multiple leader loops.
|
|
notifyCh := make(chan int, 10)
|
|
var notifyLoop sync.WaitGroup
|
|
|
|
notifyLoop.Add(1)
|
|
|
|
go func() {
|
|
raftNotifyCh := c.raftNotifyCh
|
|
raftEmergencyNotifyCh := c.raftEmergencyNotifyCh
|
|
|
|
isRaftLeader := false
|
|
|
|
notifyCh <- NOTIFY_FOLLOWER
|
|
|
|
notifyLoop.Done()
|
|
|
|
for {
|
|
select {
|
|
case isEmergencyLeader := <-raftEmergencyNotifyCh:
|
|
if isEmergencyLeader {
|
|
isRaftLeader = false
|
|
notifyCh <- NOTIFY_EMERGENCY
|
|
} else {
|
|
if !isRaftLeader {
|
|
notifyCh <- NOTIFY_FOLLOWER
|
|
}
|
|
}
|
|
case isLeader := <-raftNotifyCh:
|
|
if isLeader {
|
|
isRaftLeader = true
|
|
notifyCh <- NOTIFY_LEADER
|
|
} else {
|
|
isRaftLeader = false
|
|
notifyCh <- NOTIFY_FOLLOWER
|
|
}
|
|
case <-c.shutdownCh:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
notifyLoop.Wait()
|
|
|
|
var weAreLeaderCh chan struct{}
|
|
var weAreEmergencyLeaderCh chan struct{}
|
|
var weAreFollowerCh chan struct{}
|
|
|
|
var leaderLoop sync.WaitGroup
|
|
var emergencyLeaderLoop sync.WaitGroup
|
|
var followerLoop sync.WaitGroup
|
|
|
|
for {
|
|
select {
|
|
case notification := <-notifyCh:
|
|
if notification == NOTIFY_FOLLOWER {
|
|
if weAreFollowerCh != nil {
|
|
// we are already follower, don't do anything
|
|
continue
|
|
}
|
|
|
|
// shutdown any leader and emergency loop
|
|
if weAreLeaderCh != nil {
|
|
c.logger.Debug().Log("shutting down leader loop")
|
|
close(weAreLeaderCh)
|
|
leaderLoop.Wait()
|
|
weAreLeaderCh = nil
|
|
}
|
|
|
|
if weAreEmergencyLeaderCh != nil {
|
|
c.logger.Debug().Log("shutting down emergency leader loop")
|
|
close(weAreEmergencyLeaderCh)
|
|
emergencyLeaderLoop.Wait()
|
|
weAreEmergencyLeaderCh = nil
|
|
}
|
|
|
|
weAreFollowerCh = make(chan struct{})
|
|
followerLoop.Add(1)
|
|
go func(ch chan struct{}) {
|
|
defer followerLoop.Done()
|
|
c.followerLoop(ch)
|
|
}(weAreFollowerCh)
|
|
|
|
c.logger.Info().Log("cluster followship acquired")
|
|
|
|
c.leaderLock.Lock()
|
|
c.isRaftLeader = false
|
|
c.isLeader = false
|
|
c.leaderLock.Unlock()
|
|
} else if notification == NOTIFY_LEADER {
|
|
if weAreLeaderCh != nil {
|
|
// we are already leader, don't do anything
|
|
continue
|
|
}
|
|
|
|
// shutdown any follower and emergency loop
|
|
if weAreFollowerCh != nil {
|
|
c.logger.Debug().Log("shutting down follower loop")
|
|
close(weAreFollowerCh)
|
|
followerLoop.Wait()
|
|
weAreFollowerCh = nil
|
|
}
|
|
|
|
if weAreEmergencyLeaderCh != nil {
|
|
c.logger.Debug().Log("shutting down emergency leader loop")
|
|
close(weAreEmergencyLeaderCh)
|
|
emergencyLeaderLoop.Wait()
|
|
weAreEmergencyLeaderCh = nil
|
|
}
|
|
|
|
weAreLeaderCh = make(chan struct{})
|
|
leaderLoop.Add(1)
|
|
go func(ch chan struct{}) {
|
|
defer leaderLoop.Done()
|
|
c.leaderLoop(ch, false)
|
|
}(weAreLeaderCh)
|
|
c.logger.Info().Log("cluster leadership acquired")
|
|
|
|
c.leaderLock.Lock()
|
|
c.isRaftLeader = true
|
|
c.isLeader = true
|
|
c.leaderLock.Unlock()
|
|
} else if notification == NOTIFY_EMERGENCY {
|
|
if weAreEmergencyLeaderCh != nil {
|
|
// we are already emergency leader, don't do anything
|
|
continue
|
|
}
|
|
|
|
// shutdown any follower and leader loop
|
|
if weAreFollowerCh != nil {
|
|
c.logger.Debug().Log("shutting down follower loop")
|
|
close(weAreFollowerCh)
|
|
followerLoop.Wait()
|
|
weAreFollowerCh = nil
|
|
}
|
|
|
|
if weAreLeaderCh != nil {
|
|
c.logger.Debug().Log("shutting down leader loop")
|
|
close(weAreLeaderCh)
|
|
leaderLoop.Wait()
|
|
weAreLeaderCh = nil
|
|
}
|
|
|
|
weAreEmergencyLeaderCh = make(chan struct{})
|
|
emergencyLeaderLoop.Add(1)
|
|
go func(ch chan struct{}) {
|
|
defer emergencyLeaderLoop.Done()
|
|
c.leaderLoop(ch, true)
|
|
}(weAreEmergencyLeaderCh)
|
|
c.logger.Info().Log("cluster emergency leadership acquired")
|
|
|
|
c.leaderLock.Lock()
|
|
c.isRaftLeader = false
|
|
c.isLeader = true
|
|
c.leaderLock.Unlock()
|
|
}
|
|
case <-c.shutdownCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// leadershipTransfer tries to transfer the leadership to another node e.g. in order
|
|
// to do a graceful shutdown.
|
|
// https://github.com/hashicorp/consul/blob/44b39240a86bc94ddc67bc105286ab450bd869a9/agent/consul/leader.go#L122
|
|
func (c *cluster) leadershipTransfer() error {
|
|
retryCount := 3
|
|
for i := 0; i < retryCount; i++ {
|
|
future := c.raft.LeadershipTransfer()
|
|
if err := future.Error(); err != nil {
|
|
c.logger.Error().WithError(err).WithFields(log.Fields{
|
|
"attempt": i,
|
|
"retry_limit": retryCount,
|
|
}).Log("failed to transfer leadership attempt, will retry")
|
|
} else {
|
|
c.logger.Info().WithFields(log.Fields{
|
|
"attempt": i,
|
|
"retry_limit": retryCount,
|
|
}).Log("successfully transferred leadership")
|
|
|
|
for {
|
|
c.logger.Debug().Log("waiting for losing leadership")
|
|
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
c.leaderLock.Lock()
|
|
isLeader := c.isRaftLeader
|
|
c.leaderLock.Unlock()
|
|
|
|
if !isLeader {
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
return fmt.Errorf("failed to transfer leadership in %d attempts", retryCount)
|
|
}
|
|
|
|
// leaderLoop runs as long as we are the leader to run various maintenance activities
|
|
// https://github.com/hashicorp/consul/blob/44b39240a86bc94ddc67bc105286ab450bd869a9/agent/consul/leader.go#L146
|
|
func (c *cluster) leaderLoop(stopCh chan struct{}, emergency bool) {
|
|
establishedLeader := false
|
|
RECONCILE:
|
|
// Setup a reconciliation timer
|
|
interval := time.After(10 * time.Second)
|
|
|
|
// Apply a raft barrier to ensure our FSM is caught up
|
|
if !emergency {
|
|
barrier := c.raft.Barrier(time.Minute)
|
|
if err := barrier.Error(); err != nil {
|
|
c.logger.Error().WithError(err).Log("failed to wait for barrier")
|
|
goto WAIT
|
|
}
|
|
}
|
|
|
|
// Check if we need to handle initial leadership actions
|
|
if !establishedLeader {
|
|
if err := c.establishLeadership(context.TODO()); err != nil {
|
|
c.logger.Error().WithError(err).Log("failed to establish leadership")
|
|
// Immediately revoke leadership since we didn't successfully
|
|
// establish leadership.
|
|
c.revokeLeadership()
|
|
|
|
// attempt to transfer leadership. If successful it is
|
|
// time to leave the leaderLoop since this node is no
|
|
// longer the leader. If leadershipTransfer() fails, we
|
|
// will try to acquire it again after
|
|
// 5 seconds.
|
|
if err := c.leadershipTransfer(); err != nil {
|
|
c.logger.Error().WithError(err).Log("failed to transfer leadership")
|
|
interval = time.After(5 * time.Second)
|
|
goto WAIT
|
|
}
|
|
return
|
|
}
|
|
establishedLeader = true
|
|
defer c.revokeLeadership()
|
|
}
|
|
|
|
WAIT:
|
|
// Poll the stop channel to give it priority so we don't waste time
|
|
// trying to perform the other operations if we have been asked to shut
|
|
// down.
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Periodically reconcile as long as we are the leader
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
case <-c.shutdownCh:
|
|
return
|
|
case <-interval:
|
|
goto RECONCILE
|
|
case errCh := <-c.reassertLeaderCh:
|
|
// we can get into this state when the initial
|
|
// establishLeadership has failed as well as the follow
|
|
// up leadershipTransfer. Afterwards we will be waiting
|
|
// for the interval to trigger a reconciliation and can
|
|
// potentially end up here. There is no point to
|
|
// reassert because this agent was never leader in the
|
|
// first place.
|
|
if !establishedLeader {
|
|
errCh <- fmt.Errorf("leadership has not been established")
|
|
continue
|
|
}
|
|
|
|
// continue to reassert only if we previously were the
|
|
// leader, which means revokeLeadership followed by an
|
|
// establishLeadership().
|
|
c.revokeLeadership()
|
|
err := c.establishLeadership(context.TODO())
|
|
errCh <- err
|
|
|
|
// in case establishLeadership failed, we will try to
|
|
// transfer leadership. At this time raft thinks we are
|
|
// the leader, but we disagree.
|
|
if err != nil {
|
|
if err := c.leadershipTransfer(); err != nil {
|
|
// establishedLeader was true before,
|
|
// but it no longer is since it revoked
|
|
// leadership and Leadership transfer
|
|
// also failed. Which is why it stays
|
|
// in the leaderLoop, but now
|
|
// establishedLeader needs to be set to
|
|
// false.
|
|
establishedLeader = false
|
|
interval = time.After(5 * time.Second)
|
|
goto WAIT
|
|
}
|
|
|
|
// leadershipTransfer was successful and it is
|
|
// time to leave the leaderLoop.
|
|
return
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *cluster) establishLeadership(ctx context.Context) error {
|
|
c.logger.Debug().Log("establishing leadership")
|
|
return nil
|
|
}
|
|
|
|
func (c *cluster) revokeLeadership() {
|
|
c.logger.Debug().Log("revoking leadership")
|
|
}
|