diff --git a/app/api/api.go b/app/api/api.go index 1cdd1f4d..d14c81eb 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -539,6 +539,7 @@ func (a *api) start(ctx context.Context) error { SyncInterval: time.Duration(cfg.Cluster.SyncInterval) * time.Second, NodeRecoverTimeout: time.Duration(cfg.Cluster.NodeRecoverTimeout) * time.Second, EmergencyLeaderTimeout: time.Duration(cfg.Cluster.EmergencyLeaderTimeout) * time.Second, + RecoverTimeout: time.Duration(cfg.Cluster.RecoverTimeout) * time.Second, CoreConfig: cfg.Clone(), CoreSkills: a.ffmpeg.Skills(), IPLimiter: a.sessionsLimiter, diff --git a/cluster/cluster.go b/cluster/cluster.go index fde578c8..093dc22a 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -115,6 +115,7 @@ type Config struct { SyncInterval time.Duration // Interval between aligning the process in the cluster DB with the processes on the nodes NodeRecoverTimeout time.Duration // Timeout for a node to recover before rebalancing the processes EmergencyLeaderTimeout time.Duration // Timeout for establishing the emergency leadership after lost contact to raft leader + RecoverTimeout time.Duration // Timeout for recovering the cluster if there's no raft leader CoreConfig *config.Config CoreSkills skills.Skills @@ -133,7 +134,7 @@ type cluster struct { logger log.Logger - raft raft.Raft + raft raft.RaftRecoverer raftRemoveGracePeriod time.Duration raftAddress string raftNotifyCh chan bool @@ -142,7 +143,8 @@ type cluster struct { store store.Store - cancelLeaderShip context.CancelFunc + cancelLeaderShip context.CancelFunc + cancelFollowerShip context.CancelFunc shutdown bool shutdownCh chan struct{} @@ -151,6 +153,7 @@ type cluster struct { syncInterval time.Duration nodeRecoverTimeout time.Duration emergencyLeaderTimeout time.Duration + recoverTimeout time.Duration forwarder forwarder.Forwarder api API @@ -171,6 +174,7 @@ type cluster struct { hasRaftLeader bool isLeader bool isEmergencyLeader bool + lastLeaderChange time.Time leaderLock sync.Mutex isTLSRequired bool @@ -207,6 +211,7 @@ func New(config Config) (Cluster, error) { syncInterval: config.SyncInterval, nodeRecoverTimeout: config.NodeRecoverTimeout, emergencyLeaderTimeout: config.EmergencyLeaderTimeout, + recoverTimeout: config.RecoverTimeout, isDegraded: true, isDegradedErr: fmt.Errorf("cluster not yet startet"), @@ -214,6 +219,8 @@ func New(config Config) (Cluster, error) { isCoreDegraded: true, isCoreDegradedErr: fmt.Errorf("cluster not yet started"), + lastLeaderChange: time.Now(), + config: config.CoreConfig, skills: config.CoreSkills, nodes: map[string]clusternode.Node{}, @@ -344,7 +351,7 @@ func New(config Config) (Cluster, error) { c.raftLeaderObservationCh = make(chan string, 16) c.raftEmergencyNotifyCh = make(chan bool, 16) - raft, err := raft.New(raft.Config{ + raft, err := raft.NewRecoverer(raft.Config{ ID: config.NodeID, Path: config.Path, Address: config.Address, @@ -381,9 +388,17 @@ func New(config Config) (Cluster, error) { case <-c.shutdownCh: return case <-timer.C: - c.logger.Warn().WithField("peer", peerAddress).Log("Giving up joining cluster") + c.logger.Warn().WithFields(log.Fields{ + "peer": peerAddress, + "timeout": c.nodeRecoverTimeout, + }).Log("Giving up joining cluster via peer") return case <-ticker.C: + if c.HasRaftLeader() { + c.logger.Warn().WithField("peer", peerAddress).Log("Stop joining cluster via peer, already joined") + return + } + err := c.Join("", c.nodeID, c.raftAddress, peerAddress) if err != nil { c.logger.Warn().WithError(err).Log("Join cluster") @@ -726,10 +741,6 @@ func (c *cluster) IsClusterDegraded() (bool, error) { } func (c *cluster) Leave(origin, id string) error { - if !c.HasRaftLeader() { - return ErrDegraded - } - if len(id) == 0 { id = c.nodeID } @@ -872,10 +883,6 @@ func (c *cluster) Leave(origin, id string) error { } func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error { - if !c.HasRaftLeader() { - return ErrDegraded - } - if !c.IsRaftLeader() { c.logger.Debug().Log("Not leader, forwarding to leader") return c.forwarder.Join(origin, id, raftAddress, peerAddress) @@ -1335,6 +1342,7 @@ func (c *cluster) trackLeaderChanges() { } c.forwarder.SetLeader(leaderAddress) c.leaderLock.Lock() + c.lastLeaderChange = time.Now() if len(leaderAddress) == 0 { c.hasRaftLeader = false } else { diff --git a/cluster/follower.go b/cluster/follower.go index afeeb664..6d7cf6ff 100644 --- a/cluster/follower.go +++ b/cluster/follower.go @@ -1,7 +1,22 @@ package cluster +import ( + "context" + "time" + + "github.com/datarhei/core/v16/cluster/raft" +) + // followerLoop is run by every follower node in the cluster. func (c *cluster) followerLoop(stopCh chan struct{}) { + establishedFollower := false + + if !establishedFollower { + c.establishFollowership(context.TODO()) + establishedFollower = true + defer c.revokeFollowership() + } + for { select { case <-stopCh: @@ -11,3 +26,80 @@ func (c *cluster) followerLoop(stopCh chan struct{}) { } } } + +func (c *cluster) establishFollowership(ctx context.Context) { + c.logger.Info().Log("Establishing followership") + + ctx, cancel := context.WithCancel(ctx) + c.cancelFollowerShip = cancel + + go c.recoverCluster(ctx, c.syncInterval) +} + +func (c *cluster) revokeFollowership() { + c.logger.Info().Log("Revoking followership") + + if c.cancelFollowerShip != nil { + c.cancelFollowerShip() + c.cancelFollowerShip = nil + } +} + +func (c *cluster) recoverCluster(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.leaderLock.Lock() + hasLeader := c.hasRaftLeader + lastLeaderChange := c.lastLeaderChange + c.leaderLock.Unlock() + + uptime := c.raft.Stats().Uptime + if uptime < time.Minute { + continue + } + + if !hasLeader && c.recoverTimeout > 0 && time.Since(lastLeaderChange) > c.recoverTimeout { + peers := []raft.Peer{} + + // find living peers and recover + servers, err := c.raft.Servers() + if err != nil { + break + } + + c.nodesLock.RLock() + for id, node := range c.nodes { + if _, err := node.Status(); err != nil { + continue + } + + for _, server := range servers { + if server.ID == id && server.ID != c.nodeID { + peers = append(peers, raft.Peer{ + ID: id, + Address: server.Address, + }) + } + } + } + c.nodesLock.RUnlock() + + c.logger.Warn().WithField("peers", peers).Log("Recovering raft.") + + // recover raft with new set of peers + err = c.raft.Recover(peers, 2*interval) + if err != nil { + c.logger.Error().WithError(err).Log("Recovering raft failed. Shutting down.") + c.Shutdown() + return + } + } + } + } +} diff --git a/cluster/leader.go b/cluster/leader.go index aaccce4d..b1ce02d4 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -306,14 +306,14 @@ func (c *cluster) establishLeadership(ctx context.Context, emergency bool) error if !emergency { go c.clearLocks(ctx, time.Minute) - go c.clearDeadNodes(ctx, c.nodeRecoverTimeout) + go c.clearDeadNodes(ctx, c.syncInterval, c.nodeRecoverTimeout) } return nil } -func (c *cluster) clearDeadNodes(ctx context.Context, nodeRecoverTimeout time.Duration) { - ticker := time.NewTicker(c.syncInterval) +func (c *cluster) clearDeadNodes(ctx context.Context, interval, nodeRecoverTimeout time.Duration) { + ticker := time.NewTicker(interval) defer ticker.Stop() for { diff --git a/cluster/raft/raft.go b/cluster/raft/raft.go index e07f1f1e..72c19cf3 100644 --- a/cluster/raft/raft.go +++ b/cluster/raft/raft.go @@ -85,6 +85,7 @@ type Server struct { } type Stats struct { + Uptime time.Duration Address string State string LastContact time.Duration @@ -210,6 +211,7 @@ func (r *raft) Servers() ([]Server, error) { func (r *raft) Stats() Stats { stats := Stats{ + Uptime: time.Since(r.raftStart), Address: r.raftAddress, } diff --git a/cluster/raft/recovery.go b/cluster/raft/recovery.go new file mode 100644 index 00000000..8754184a --- /dev/null +++ b/cluster/raft/recovery.go @@ -0,0 +1,136 @@ +package raft + +import ( + "io" + "sync" + "time" +) + +type RaftRecoverer interface { + Raft + + Recover([]Peer, time.Duration) error +} + +type raftRecoverer struct { + raft Raft + config Config + + lock sync.RWMutex +} + +func NewRecoverer(config Config) (RaftRecoverer, error) { + r := &raftRecoverer{ + config: config, + } + + raft, err := New(config) + if err != nil { + return nil, err + } + + r.raft = raft + + return r, nil +} + +func (r *raftRecoverer) Recover(peers []Peer, cooldown time.Duration) error { + r.lock.Lock() + defer r.lock.Unlock() + + r.raft.Shutdown() + + if r.config.Logger != nil { + r.config.Logger.Warn().WithField("duration", cooldown).Log("Cooling down") + } + + time.Sleep(cooldown) + + r.config.Peers = peers + + raft, err := New(r.config) + if err != nil { + return err + } + + r.raft = raft + + return nil +} + +func (r *raftRecoverer) Shutdown() { + r.lock.RLock() + defer r.lock.RUnlock() + + r.raft.Shutdown() +} + +func (r *raftRecoverer) IsLeader() bool { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.IsLeader() +} + +func (r *raftRecoverer) Leader() (string, string) { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.Leader() +} + +func (r *raftRecoverer) Servers() ([]Server, error) { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.Servers() +} + +func (r *raftRecoverer) Stats() Stats { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.Stats() +} + +func (r *raftRecoverer) Apply(data []byte) error { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.Apply(data) +} + +func (r *raftRecoverer) Barrier(timeout time.Duration) error { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.Barrier(timeout) +} + +func (r *raftRecoverer) AddServer(id, address string) error { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.AddServer(id, address) +} + +func (r *raftRecoverer) RemoveServer(id string) error { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.RemoveServer(id) +} + +func (r *raftRecoverer) LeadershipTransfer(id string) error { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.LeadershipTransfer(id) +} + +func (r *raftRecoverer) Snapshot() (io.ReadCloser, error) { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.Snapshot() +} diff --git a/config/config.go b/config/config.go index 317da336..281e31ef 100644 --- a/config/config.go +++ b/config/config.go @@ -299,6 +299,7 @@ func (d *Config) init() { d.vars.Register(value.NewInt64(&d.Cluster.SyncInterval, 5), "cluster.sync_interval_sec", "CORE_CLUSTER_SYNC_INTERVAL_SEC", nil, "Interval between aligning the process in the cluster DB with the processes on the nodes", true, false) d.vars.Register(value.NewInt64(&d.Cluster.NodeRecoverTimeout, 120), "cluster.node_recover_timeout_sec", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT_SEC", nil, "Timeout for a node to recover before rebalancing the processes", true, false) d.vars.Register(value.NewInt64(&d.Cluster.EmergencyLeaderTimeout, 10), "cluster.emergency_leader_timeout_sec", "CORE_CLUSTER_EMERGENCY_LEADER_TIMEOUT_SEC", nil, "Timeout for establishing the emergency leadership after lost contact to raft leader", true, false) + d.vars.Register(value.NewInt64(&d.Cluster.RecoverTimeout, 0), "cluster.recover_timeout_sec", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT_SECONDS", nil, "Timeout for recovering the cluster if no leader can be voted", false, false) d.vars.Register(value.NewBool(&d.Cluster.Debug.DisableFFmpegCheck, false), "cluster.debug.disable_ffmpeg_check", "CORE_CLUSTER_DEBUG_DISABLE_FFMPEG_CHECK", nil, "Disable checking for identical FFmpeg versions on all nodes", false, false) } diff --git a/config/data.go b/config/data.go index 887c8e98..be3e4056 100644 --- a/config/data.go +++ b/config/data.go @@ -191,6 +191,7 @@ type Data struct { SyncInterval int64 `json:"sync_interval_sec" format:"int64"` // seconds NodeRecoverTimeout int64 `json:"node_recover_timeout_sec" format:"int64"` // seconds EmergencyLeaderTimeout int64 `json:"emergency_leader_timeout_sec" format:"int64"` // seconds + RecoverTimeout int64 `json:"revocer_timeout_sec" format:"int64"` // seconds Debug struct { DisableFFmpegCheck bool `json:"disable_ffmpeg_check"` } `json:"debug"`