Add autorecovery, add CORE_CLUSTER_NODE_RECOVER_TIMEOUT_SECONDS

This commit is contained in:
Ingo Oppermann 2024-06-13 15:55:18 +02:00
parent a1b1609e73
commit 0bf371807a
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
8 changed files with 256 additions and 15 deletions

View File

@ -539,6 +539,7 @@ func (a *api) start(ctx context.Context) error {
SyncInterval: time.Duration(cfg.Cluster.SyncInterval) * time.Second,
NodeRecoverTimeout: time.Duration(cfg.Cluster.NodeRecoverTimeout) * time.Second,
EmergencyLeaderTimeout: time.Duration(cfg.Cluster.EmergencyLeaderTimeout) * time.Second,
RecoverTimeout: time.Duration(cfg.Cluster.RecoverTimeout) * time.Second,
CoreConfig: cfg.Clone(),
CoreSkills: a.ffmpeg.Skills(),
IPLimiter: a.sessionsLimiter,

View File

@ -115,6 +115,7 @@ type Config struct {
SyncInterval time.Duration // Interval between aligning the process in the cluster DB with the processes on the nodes
NodeRecoverTimeout time.Duration // Timeout for a node to recover before rebalancing the processes
EmergencyLeaderTimeout time.Duration // Timeout for establishing the emergency leadership after lost contact to raft leader
RecoverTimeout time.Duration // Timeout for recovering the cluster if there's no raft leader
CoreConfig *config.Config
CoreSkills skills.Skills
@ -133,7 +134,7 @@ type cluster struct {
logger log.Logger
raft raft.Raft
raft raft.RaftRecoverer
raftRemoveGracePeriod time.Duration
raftAddress string
raftNotifyCh chan bool
@ -142,7 +143,8 @@ type cluster struct {
store store.Store
cancelLeaderShip context.CancelFunc
cancelLeaderShip context.CancelFunc
cancelFollowerShip context.CancelFunc
shutdown bool
shutdownCh chan struct{}
@ -151,6 +153,7 @@ type cluster struct {
syncInterval time.Duration
nodeRecoverTimeout time.Duration
emergencyLeaderTimeout time.Duration
recoverTimeout time.Duration
forwarder forwarder.Forwarder
api API
@ -171,6 +174,7 @@ type cluster struct {
hasRaftLeader bool
isLeader bool
isEmergencyLeader bool
lastLeaderChange time.Time
leaderLock sync.Mutex
isTLSRequired bool
@ -207,6 +211,7 @@ func New(config Config) (Cluster, error) {
syncInterval: config.SyncInterval,
nodeRecoverTimeout: config.NodeRecoverTimeout,
emergencyLeaderTimeout: config.EmergencyLeaderTimeout,
recoverTimeout: config.RecoverTimeout,
isDegraded: true,
isDegradedErr: fmt.Errorf("cluster not yet startet"),
@ -214,6 +219,8 @@ func New(config Config) (Cluster, error) {
isCoreDegraded: true,
isCoreDegradedErr: fmt.Errorf("cluster not yet started"),
lastLeaderChange: time.Now(),
config: config.CoreConfig,
skills: config.CoreSkills,
nodes: map[string]clusternode.Node{},
@ -344,7 +351,7 @@ func New(config Config) (Cluster, error) {
c.raftLeaderObservationCh = make(chan string, 16)
c.raftEmergencyNotifyCh = make(chan bool, 16)
raft, err := raft.New(raft.Config{
raft, err := raft.NewRecoverer(raft.Config{
ID: config.NodeID,
Path: config.Path,
Address: config.Address,
@ -381,9 +388,17 @@ func New(config Config) (Cluster, error) {
case <-c.shutdownCh:
return
case <-timer.C:
c.logger.Warn().WithField("peer", peerAddress).Log("Giving up joining cluster")
c.logger.Warn().WithFields(log.Fields{
"peer": peerAddress,
"timeout": c.nodeRecoverTimeout,
}).Log("Giving up joining cluster via peer")
return
case <-ticker.C:
if c.HasRaftLeader() {
c.logger.Warn().WithField("peer", peerAddress).Log("Stop joining cluster via peer, already joined")
return
}
err := c.Join("", c.nodeID, c.raftAddress, peerAddress)
if err != nil {
c.logger.Warn().WithError(err).Log("Join cluster")
@ -726,10 +741,6 @@ func (c *cluster) IsClusterDegraded() (bool, error) {
}
func (c *cluster) Leave(origin, id string) error {
if !c.HasRaftLeader() {
return ErrDegraded
}
if len(id) == 0 {
id = c.nodeID
}
@ -872,10 +883,6 @@ func (c *cluster) Leave(origin, id string) error {
}
func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error {
if !c.HasRaftLeader() {
return ErrDegraded
}
if !c.IsRaftLeader() {
c.logger.Debug().Log("Not leader, forwarding to leader")
return c.forwarder.Join(origin, id, raftAddress, peerAddress)
@ -1335,6 +1342,7 @@ func (c *cluster) trackLeaderChanges() {
}
c.forwarder.SetLeader(leaderAddress)
c.leaderLock.Lock()
c.lastLeaderChange = time.Now()
if len(leaderAddress) == 0 {
c.hasRaftLeader = false
} else {

View File

@ -1,7 +1,22 @@
package cluster
import (
"context"
"time"
"github.com/datarhei/core/v16/cluster/raft"
)
// followerLoop is run by every follower node in the cluster.
func (c *cluster) followerLoop(stopCh chan struct{}) {
establishedFollower := false
if !establishedFollower {
c.establishFollowership(context.TODO())
establishedFollower = true
defer c.revokeFollowership()
}
for {
select {
case <-stopCh:
@ -11,3 +26,80 @@ func (c *cluster) followerLoop(stopCh chan struct{}) {
}
}
}
func (c *cluster) establishFollowership(ctx context.Context) {
c.logger.Info().Log("Establishing followership")
ctx, cancel := context.WithCancel(ctx)
c.cancelFollowerShip = cancel
go c.recoverCluster(ctx, c.syncInterval)
}
func (c *cluster) revokeFollowership() {
c.logger.Info().Log("Revoking followership")
if c.cancelFollowerShip != nil {
c.cancelFollowerShip()
c.cancelFollowerShip = nil
}
}
func (c *cluster) recoverCluster(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.leaderLock.Lock()
hasLeader := c.hasRaftLeader
lastLeaderChange := c.lastLeaderChange
c.leaderLock.Unlock()
uptime := c.raft.Stats().Uptime
if uptime < time.Minute {
continue
}
if !hasLeader && c.recoverTimeout > 0 && time.Since(lastLeaderChange) > c.recoverTimeout {
peers := []raft.Peer{}
// find living peers and recover
servers, err := c.raft.Servers()
if err != nil {
break
}
c.nodesLock.RLock()
for id, node := range c.nodes {
if _, err := node.Status(); err != nil {
continue
}
for _, server := range servers {
if server.ID == id && server.ID != c.nodeID {
peers = append(peers, raft.Peer{
ID: id,
Address: server.Address,
})
}
}
}
c.nodesLock.RUnlock()
c.logger.Warn().WithField("peers", peers).Log("Recovering raft.")
// recover raft with new set of peers
err = c.raft.Recover(peers, 2*interval)
if err != nil {
c.logger.Error().WithError(err).Log("Recovering raft failed. Shutting down.")
c.Shutdown()
return
}
}
}
}
}

View File

@ -306,14 +306,14 @@ func (c *cluster) establishLeadership(ctx context.Context, emergency bool) error
if !emergency {
go c.clearLocks(ctx, time.Minute)
go c.clearDeadNodes(ctx, c.nodeRecoverTimeout)
go c.clearDeadNodes(ctx, c.syncInterval, c.nodeRecoverTimeout)
}
return nil
}
func (c *cluster) clearDeadNodes(ctx context.Context, nodeRecoverTimeout time.Duration) {
ticker := time.NewTicker(c.syncInterval)
func (c *cluster) clearDeadNodes(ctx context.Context, interval, nodeRecoverTimeout time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {

View File

@ -85,6 +85,7 @@ type Server struct {
}
type Stats struct {
Uptime time.Duration
Address string
State string
LastContact time.Duration
@ -210,6 +211,7 @@ func (r *raft) Servers() ([]Server, error) {
func (r *raft) Stats() Stats {
stats := Stats{
Uptime: time.Since(r.raftStart),
Address: r.raftAddress,
}

136
cluster/raft/recovery.go Normal file
View File

@ -0,0 +1,136 @@
package raft
import (
"io"
"sync"
"time"
)
type RaftRecoverer interface {
Raft
Recover([]Peer, time.Duration) error
}
type raftRecoverer struct {
raft Raft
config Config
lock sync.RWMutex
}
func NewRecoverer(config Config) (RaftRecoverer, error) {
r := &raftRecoverer{
config: config,
}
raft, err := New(config)
if err != nil {
return nil, err
}
r.raft = raft
return r, nil
}
func (r *raftRecoverer) Recover(peers []Peer, cooldown time.Duration) error {
r.lock.Lock()
defer r.lock.Unlock()
r.raft.Shutdown()
if r.config.Logger != nil {
r.config.Logger.Warn().WithField("duration", cooldown).Log("Cooling down")
}
time.Sleep(cooldown)
r.config.Peers = peers
raft, err := New(r.config)
if err != nil {
return err
}
r.raft = raft
return nil
}
func (r *raftRecoverer) Shutdown() {
r.lock.RLock()
defer r.lock.RUnlock()
r.raft.Shutdown()
}
func (r *raftRecoverer) IsLeader() bool {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.IsLeader()
}
func (r *raftRecoverer) Leader() (string, string) {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.Leader()
}
func (r *raftRecoverer) Servers() ([]Server, error) {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.Servers()
}
func (r *raftRecoverer) Stats() Stats {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.Stats()
}
func (r *raftRecoverer) Apply(data []byte) error {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.Apply(data)
}
func (r *raftRecoverer) Barrier(timeout time.Duration) error {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.Barrier(timeout)
}
func (r *raftRecoverer) AddServer(id, address string) error {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.AddServer(id, address)
}
func (r *raftRecoverer) RemoveServer(id string) error {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.RemoveServer(id)
}
func (r *raftRecoverer) LeadershipTransfer(id string) error {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.LeadershipTransfer(id)
}
func (r *raftRecoverer) Snapshot() (io.ReadCloser, error) {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.Snapshot()
}

View File

@ -299,6 +299,7 @@ func (d *Config) init() {
d.vars.Register(value.NewInt64(&d.Cluster.SyncInterval, 5), "cluster.sync_interval_sec", "CORE_CLUSTER_SYNC_INTERVAL_SEC", nil, "Interval between aligning the process in the cluster DB with the processes on the nodes", true, false)
d.vars.Register(value.NewInt64(&d.Cluster.NodeRecoverTimeout, 120), "cluster.node_recover_timeout_sec", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT_SEC", nil, "Timeout for a node to recover before rebalancing the processes", true, false)
d.vars.Register(value.NewInt64(&d.Cluster.EmergencyLeaderTimeout, 10), "cluster.emergency_leader_timeout_sec", "CORE_CLUSTER_EMERGENCY_LEADER_TIMEOUT_SEC", nil, "Timeout for establishing the emergency leadership after lost contact to raft leader", true, false)
d.vars.Register(value.NewInt64(&d.Cluster.RecoverTimeout, 0), "cluster.recover_timeout_sec", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT_SECONDS", nil, "Timeout for recovering the cluster if no leader can be voted", false, false)
d.vars.Register(value.NewBool(&d.Cluster.Debug.DisableFFmpegCheck, false), "cluster.debug.disable_ffmpeg_check", "CORE_CLUSTER_DEBUG_DISABLE_FFMPEG_CHECK", nil, "Disable checking for identical FFmpeg versions on all nodes", false, false)
}

View File

@ -191,6 +191,7 @@ type Data struct {
SyncInterval int64 `json:"sync_interval_sec" format:"int64"` // seconds
NodeRecoverTimeout int64 `json:"node_recover_timeout_sec" format:"int64"` // seconds
EmergencyLeaderTimeout int64 `json:"emergency_leader_timeout_sec" format:"int64"` // seconds
RecoverTimeout int64 `json:"revocer_timeout_sec" format:"int64"` // seconds
Debug struct {
DisableFFmpegCheck bool `json:"disable_ffmpeg_check"`
} `json:"debug"`