Drop dead nodes after CORE_CLUSTER_NODE_RECOVER_TIMEOUT_SEC
This commit is contained in:
parent
1a64fddbb1
commit
2c17572027
@ -59,6 +59,8 @@ type Cluster interface {
|
||||
Leave(origin, id string) error // gracefully remove a node from the cluster
|
||||
TransferLeadership(origin, id string) error // transfer leadership to another node
|
||||
Snapshot(origin string) (io.ReadCloser, error)
|
||||
IsRaftLeader() bool
|
||||
HasRaftLeader() bool
|
||||
|
||||
ListProcesses() []store.Process
|
||||
GetProcess(id app.ProcessID) (store.Process, error)
|
||||
@ -165,10 +167,11 @@ type cluster struct {
|
||||
hostnames []string
|
||||
stateLock sync.RWMutex
|
||||
|
||||
isRaftLeader bool
|
||||
hasRaftLeader bool
|
||||
isLeader bool
|
||||
leaderLock sync.Mutex
|
||||
isRaftLeader bool
|
||||
hasRaftLeader bool
|
||||
isLeader bool
|
||||
isEmergencyLeader bool
|
||||
leaderLock sync.Mutex
|
||||
|
||||
isTLSRequired bool
|
||||
clusterKVS ClusterKVS
|
||||
@ -369,10 +372,16 @@ func New(config Config) (Cluster, error) {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
timer := time.NewTimer(c.nodeRecoverTimeout)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.shutdownCh:
|
||||
return
|
||||
case <-timer.C:
|
||||
c.logger.Warn().WithField("peer", peerAddress).Log("Giving up joining cluster")
|
||||
return
|
||||
case <-ticker.C:
|
||||
err := c.Join("", c.nodeID, c.raftAddress, peerAddress)
|
||||
if err != nil {
|
||||
@ -672,6 +681,13 @@ func (c *cluster) IsRaftLeader() bool {
|
||||
return c.isRaftLeader
|
||||
}
|
||||
|
||||
func (c *cluster) HasRaftLeader() bool {
|
||||
c.leaderLock.Lock()
|
||||
defer c.leaderLock.Unlock()
|
||||
|
||||
return c.hasRaftLeader
|
||||
}
|
||||
|
||||
func (c *cluster) IsDegraded() (bool, error) {
|
||||
c.stateLock.RLock()
|
||||
defer c.stateLock.RUnlock()
|
||||
@ -709,7 +725,7 @@ func (c *cluster) IsClusterDegraded() (bool, error) {
|
||||
}
|
||||
|
||||
func (c *cluster) Leave(origin, id string) error {
|
||||
if ok, _ := c.IsDegraded(); ok {
|
||||
if !c.HasRaftLeader() {
|
||||
return ErrDegraded
|
||||
}
|
||||
|
||||
@ -767,11 +783,11 @@ func (c *cluster) Leave(origin, id string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
numPeers := len(servers)
|
||||
numServers := len(servers)
|
||||
|
||||
if id == c.nodeID {
|
||||
// We're going to remove ourselves
|
||||
if numPeers <= 1 {
|
||||
if numServers <= 1 {
|
||||
// Don't do so if we're the only server in the cluster
|
||||
c.logger.Debug().Log("We're the leader without any peers, not doing anything")
|
||||
return nil
|
||||
@ -847,7 +863,7 @@ func (c *cluster) Leave(origin, id string) error {
|
||||
}
|
||||
|
||||
func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error {
|
||||
if ok, _ := c.IsDegraded(); ok {
|
||||
if !c.HasRaftLeader() {
|
||||
return ErrDegraded
|
||||
}
|
||||
|
||||
@ -910,7 +926,7 @@ func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error {
|
||||
}
|
||||
|
||||
func (c *cluster) TransferLeadership(origin, id string) error {
|
||||
if ok, _ := c.IsDegraded(); ok {
|
||||
if !c.HasRaftLeader() {
|
||||
return ErrDegraded
|
||||
}
|
||||
|
||||
@ -923,7 +939,7 @@ func (c *cluster) TransferLeadership(origin, id string) error {
|
||||
}
|
||||
|
||||
func (c *cluster) Snapshot(origin string) (io.ReadCloser, error) {
|
||||
if ok, _ := c.IsDegraded(); ok {
|
||||
if !c.HasRaftLeader() {
|
||||
return nil, ErrDegraded
|
||||
}
|
||||
|
||||
|
||||
@ -112,6 +112,7 @@ func (c *cluster) monitorLeadership() {
|
||||
c.leaderLock.Lock()
|
||||
c.isRaftLeader = false
|
||||
c.isLeader = false
|
||||
c.isEmergencyLeader = false
|
||||
c.leaderLock.Unlock()
|
||||
} else if notification == NOTIFY_LEADER {
|
||||
if weAreLeaderCh != nil {
|
||||
@ -145,6 +146,7 @@ func (c *cluster) monitorLeadership() {
|
||||
c.leaderLock.Lock()
|
||||
c.isRaftLeader = true
|
||||
c.isLeader = true
|
||||
c.isEmergencyLeader = false
|
||||
c.leaderLock.Unlock()
|
||||
} else if notification == NOTIFY_EMERGENCY {
|
||||
if weAreEmergencyLeaderCh != nil {
|
||||
@ -178,6 +180,7 @@ func (c *cluster) monitorLeadership() {
|
||||
c.leaderLock.Lock()
|
||||
c.isRaftLeader = false
|
||||
c.isLeader = true
|
||||
c.isEmergencyLeader = true
|
||||
c.leaderLock.Unlock()
|
||||
}
|
||||
case <-c.shutdownCh:
|
||||
@ -303,11 +306,38 @@ func (c *cluster) establishLeadership(ctx context.Context, emergency bool) error
|
||||
|
||||
if !emergency {
|
||||
go c.clearLocks(ctx, time.Minute)
|
||||
go c.clearDeadNodes(ctx, c.nodeRecoverTimeout)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cluster) clearDeadNodes(ctx context.Context, nodeRecoverTimeout time.Duration) {
|
||||
ticker := time.NewTicker(c.syncInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
nodes := c.proxy.ListNodes()
|
||||
for _, node := range nodes {
|
||||
about := node.About()
|
||||
if time.Since(about.SpawnedAt) > nodeRecoverTimeout && time.Since(about.LastContact) > nodeRecoverTimeout {
|
||||
c.logger.Warn().WithFields(log.Fields{
|
||||
"id": about.ID,
|
||||
"after": nodeRecoverTimeout,
|
||||
"lastContact": about.LastContact,
|
||||
"spawnedAt": about.SpawnedAt,
|
||||
}).Log("Removing peer from cluster")
|
||||
c.raft.RemoveServer(about.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cluster) revokeLeadership() {
|
||||
c.logger.Info().Log("Revoking leadership")
|
||||
|
||||
|
||||
@ -75,6 +75,7 @@ type NodeResources struct {
|
||||
}
|
||||
|
||||
type NodeAbout struct {
|
||||
SpawnedAt time.Time
|
||||
ID string
|
||||
Name string
|
||||
Address string
|
||||
@ -114,6 +115,8 @@ type node struct {
|
||||
id string
|
||||
address string
|
||||
|
||||
createdAt time.Time
|
||||
|
||||
peer client.RestClient
|
||||
peerErr error
|
||||
peerLock sync.RWMutex
|
||||
@ -159,12 +162,13 @@ type NodeConfig struct {
|
||||
|
||||
func NewNode(config NodeConfig) Node {
|
||||
n := &node{
|
||||
id: config.ID,
|
||||
address: config.Address,
|
||||
config: config.Config,
|
||||
state: stateDisconnected,
|
||||
secure: strings.HasPrefix(config.Address, "https://"),
|
||||
logger: config.Logger,
|
||||
id: config.ID,
|
||||
address: config.Address,
|
||||
config: config.Config,
|
||||
state: stateDisconnected,
|
||||
secure: strings.HasPrefix(config.Address, "https://"),
|
||||
logger: config.Logger,
|
||||
createdAt: time.Now(),
|
||||
}
|
||||
|
||||
if n.logger == nil {
|
||||
@ -530,6 +534,7 @@ func (n *node) About() NodeAbout {
|
||||
}
|
||||
|
||||
nodeAbout := NodeAbout{
|
||||
SpawnedAt: n.createdAt,
|
||||
ID: n.id,
|
||||
Name: name,
|
||||
Address: n.address,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user