diff --git a/app/api/api.go b/app/api/api.go index be52618d..939941ef 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -622,16 +622,18 @@ func (a *api) start() error { a.restream = restream - if cluster, err := cluster.New(cluster.ClusterConfig{ - ID: cfg.ID, - Name: cfg.Name, - Path: filepath.Join(cfg.DB.Dir, "cluster"), - IPLimiter: a.sessionsLimiter, - Logger: a.log.logger.core.WithComponent("Cluster"), - }); err != nil { - return fmt.Errorf("unable to create cluster: %w", err) - } else { - a.cluster = cluster + if cfg.Cluster.Enable { + if cluster, err := cluster.New(cluster.ClusterConfig{ + ID: cfg.ID, + Name: cfg.Name, + Path: filepath.Join(cfg.DB.Dir, "cluster"), + IPLimiter: a.sessionsLimiter, + Logger: a.log.logger.core.WithComponent("Cluster"), + }); err != nil { + return fmt.Errorf("unable to create cluster: %w", err) + } else { + a.cluster = cluster + } } var httpjwt jwt.JWT @@ -1318,7 +1320,8 @@ func (a *api) stop() { } if a.cluster != nil { - a.cluster.Stop() + a.cluster.Leave() + a.cluster.Shutdown() } // Stop JWT authentication diff --git a/cluster/cluster.go b/cluster/cluster.go index 645e5d1d..3ed587cf 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -5,13 +5,13 @@ import ( "errors" "fmt" "io" + gonet "net" "path/filepath" "sync" "time" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/net" - hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb/v2" @@ -44,7 +44,8 @@ type Cluster interface { RemoveNode(id string) error ListNodes() []NodeReader GetNode(id string) (NodeReader, error) - Stop() + Leave() error // gracefully leave the cluster + Shutdown() error ClusterReader } @@ -75,6 +76,21 @@ type cluster struct { once sync.Once logger log.Logger + + raft *raft.Raft + raftTransport *raft.NetworkTransport + raftAddress string + raftNotifyCh <-chan bool + raftStore *raftboltdb.BoltStore + raftRemoveGracePeriod time.Duration + + reassertLeaderCh chan chan error + + leaveCh chan struct{} + + shutdown bool + shutdownCh chan struct{} + shutdownLock sync.Mutex } func New(config ClusterConfig) (Cluster, error) { @@ -89,6 +105,10 @@ func New(config ClusterConfig) (Cluster, error) { limiter: config.IPLimiter, updates: make(chan NodeState, 64), logger: config.Logger, + + reassertLeaderCh: make(chan chan error), + leaveCh: make(chan struct{}), + shutdownCh: make(chan struct{}), } if c.limiter == nil { @@ -104,62 +124,12 @@ func New(config ClusterConfig) (Cluster, error) { return nil, err } - snapshotLogger := NewLogger(c.logger.WithComponent("raft"), hclog.Debug).Named("snapshot") - snapShotStore, err := raft.NewFileSnapshotStoreWithLogger(filepath.Join(c.path, "snapshots"), 10, snapshotLogger) - if err != nil { - return nil, err - } + c.startRaft(fsm, true, false) - boltdb, err := raftboltdb.New(raftboltdb.Options{ - Path: filepath.Join(c.path, "store.db"), - BoltOptions: &bbolt.Options{ - Timeout: 5 * time.Second, - }, - }) - if err != nil { - return nil, err - } - - boltdb.Stats() - - raftConfig := raft.DefaultConfig() - raftConfig.Logger = NewLogger(c.logger.WithComponent("raft"), hclog.Debug) - - raftTransport, err := raft.NewTCPTransportWithConfig("127.0.0.1:8090", nil, &raft.NetworkTransportConfig{ - ServerAddressProvider: nil, - Logger: NewLogger(c.logger.WithComponent("raft"), hclog.Debug).Named("transport"), - Stream: &raft.TCPStreamLayer{}, - MaxPool: 5, - Timeout: 5 * time.Second, - }) - if err != nil { - boltdb.Close() - return nil, err - } - - node, err := raft.NewRaft(raftConfig, fsm, boltdb, boltdb, snapShotStore, raftTransport) - if err != nil { - boltdb.Close() - return nil, err - } - - node.BootstrapCluster(raft.Configuration{ - Servers: []raft.Server{ - { - Suffrage: raft.Voter, - ID: raft.ServerID(config.Name), - Address: raftTransport.LocalAddr(), - }, - }, - }) - - ctx, cancel := context.WithCancel(context.Background()) - c.cancel = cancel - - go func(ctx context.Context) { + go func() { for { select { - case <-ctx.Done(): + case <-c.shutdownCh: return case state := <-c.updates: c.logger.Debug().WithFields(log.Fields{ @@ -190,24 +160,103 @@ func New(config ClusterConfig) (Cluster, error) { c.lock.Unlock() } } - }(ctx) + }() return c, nil } -func (c *cluster) Stop() { - c.once.Do(func() { - c.lock.Lock() - defer c.lock.Unlock() +func (c *cluster) Shutdown() error { + c.logger.Info().Log("shutting down cluster") + c.shutdownLock.Lock() + defer c.shutdownLock.Unlock() - for _, node := range c.nodes { - node.stop() + if c.shutdown { + return nil + } + + c.shutdown = true + close(c.shutdownCh) + + c.lock.Lock() + defer c.lock.Unlock() + + for _, node := range c.nodes { + node.stop() + } + + c.nodes = map[string]*node{} + + c.shutdownRaft() + + return nil +} + +// https://github.com/hashicorp/consul/blob/44b39240a86bc94ddc67bc105286ab450bd869a9/agent/consul/server.go#L1369 +func (c *cluster) Leave() error { + addr := c.raftTransport.LocalAddr() + + // Get the latest configuration. + future := c.raft.GetConfiguration() + if err := future.Error(); err != nil { + c.logger.Error().WithError(err).Log("failed to get raft configuration") + return err + } + + numPeers := len(future.Configuration().Servers) + + // If we are the current leader, and we have any other peers (cluster has multiple + // servers), we should do a RemoveServer/RemovePeer to safely reduce the quorum size. + // If we are not the leader, then we should issue our leave intention and wait to be + // removed for some reasonable period of time. + isLeader := c.IsLeader() + if isLeader && numPeers > 1 { + if err := c.leadershipTransfer(); err == nil { + isLeader = false + } else { + future := c.raft.RemoveServer(raft.ServerID(c.id), 0, 0) + if err := future.Error(); err != nil { + c.logger.Error().WithError(err).Log("failed to remove ourself as raft peer") + } + } + } + + // If we were not leader, wait to be safely removed from the cluster. We + // must wait to allow the raft replication to take place, otherwise an + // immediate shutdown could cause a loss of quorum. + if !isLeader { + left := false + limit := time.Now().Add(c.raftRemoveGracePeriod) + for !left && time.Now().Before(limit) { + // Sleep a while before we check. + time.Sleep(50 * time.Millisecond) + + // Get the latest configuration. + future := c.raft.GetConfiguration() + if err := future.Error(); err != nil { + c.logger.Error().WithError(err).Log("failed to get raft configuration") + break + } + + // See if we are no longer included. + left = true + for _, server := range future.Configuration().Servers { + if server.Address == addr { + left = false + break + } + } } - c.nodes = map[string]*node{} + if !left { + c.logger.Warn().Log("failed to leave raft configuration gracefully, timeout") + } + } - c.cancel() - }) + return nil +} + +func (c *cluster) IsLeader() bool { + return c.raft.State() == raft.Leader } func (c *cluster) AddNode(address, username, password string) (string, error) { @@ -371,3 +420,112 @@ func (c *cluster) GetFile(path string) (io.ReadCloser, error) { return data, nil } + +func (c *cluster) startRaft(fsm raft.FSM, bootstrap, inmem bool) error { + defer func() { + if c.raft == nil && c.raftStore != nil { + c.raftStore.Close() + } + }() + + c.raftRemoveGracePeriod = 5 * time.Second + + addr, err := gonet.ResolveTCPAddr("tcp", c.raftAddress) + if err != nil { + return err + } + + transport, err := raft.NewTCPTransportWithLogger(c.raftAddress, addr, 3, 10*time.Second, NewLogger(c.logger.WithComponent("raft"), hclog.Debug).Named("transport")) + if err != nil { + return err + } + + snapshotLogger := NewLogger(c.logger.WithComponent("raft"), hclog.Debug).Named("snapshot") + snapshots, err := raft.NewFileSnapshotStoreWithLogger(filepath.Join(c.path, "snapshots"), 10, snapshotLogger) + if err != nil { + return err + } + + var logStore raft.LogStore + var stableStore raft.StableStore + if inmem { + logStore = raft.NewInmemStore() + stableStore = raft.NewInmemStore() + } else { + bolt, err := raftboltdb.New(raftboltdb.Options{ + Path: filepath.Join(c.path, "raftlog.db"), + BoltOptions: &bbolt.Options{ + Timeout: 5 * time.Second, + }, + }) + if err != nil { + return fmt.Errorf("bolt: %w", err) + } + logStore = bolt + stableStore = bolt + + cacheStore, err := raft.NewLogCache(512, logStore) + if err != nil { + return err + } + logStore = cacheStore + + c.raftStore = bolt + } + + cfg := raft.DefaultConfig() + cfg.LocalID = raft.ServerID(c.id) + cfg.Logger = NewLogger(c.logger.WithComponent("raft"), hclog.Debug) + + if bootstrap { + hasState, err := raft.HasExistingState(logStore, stableStore, snapshots) + if err != nil { + return err + } + if !hasState { + configuration := raft.Configuration{ + Servers: []raft.Server{ + { + Suffrage: raft.Voter, + ID: raft.ServerID(c.id), + Address: transport.LocalAddr(), + }, + }, + } + + if err := raft.BootstrapCluster(cfg, + logStore, stableStore, snapshots, transport, configuration); err != nil { + return err + } + } + } + + // Set up a channel for reliable leader notifications. + raftNotifyCh := make(chan bool, 10) + cfg.NotifyCh = raftNotifyCh + c.raftNotifyCh = raftNotifyCh + + node, err := raft.NewRaft(cfg, fsm, logStore, stableStore, snapshots, transport) + if err != nil { + return err + } + + c.raft = node + + go c.monitorLeadership() + + return nil +} + +func (c *cluster) shutdownRaft() { + if c.raft != nil { + c.raftTransport.Close() + future := c.raft.Shutdown() + if err := future.Error(); err != nil { + c.logger.Warn().WithError(err).Log("error shutting down raft") + } + if c.raftStore != nil { + c.raftStore.Close() + } + } +} diff --git a/cluster/leader.go b/cluster/leader.go new file mode 100644 index 00000000..6e66e54c --- /dev/null +++ b/cluster/leader.go @@ -0,0 +1,195 @@ +package cluster + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/datarhei/core/v16/log" +) + +// monitorLeadership listens to the raf notify channel in order to find +// out if we got the leadership or lost it. +// https://github.com/hashicorp/consul/blob/44b39240a86bc94ddc67bc105286ab450bd869a9/agent/consul/leader.go#L71 +func (c *cluster) monitorLeadership() { + // We use the notify channel we configured Raft with, NOT Raft's + // leaderCh, which is only notified best-effort. Doing this ensures + // that we get all notifications in order, which is required for + // cleanup and to ensure we never run multiple leader loops. + raftNotifyCh := c.raftNotifyCh + + var weAreLeaderCh chan struct{} + var leaderLoop sync.WaitGroup + for { + select { + case isLeader := <-raftNotifyCh: + switch { + case isLeader: + if weAreLeaderCh != nil { + c.logger.Error().Log("attempted to start the leader loop while running") + continue + } + + weAreLeaderCh = make(chan struct{}) + leaderLoop.Add(1) + go func(ch chan struct{}) { + defer leaderLoop.Done() + c.leaderLoop(ch) + }(weAreLeaderCh) + c.logger.Info().Log("cluster leadership acquired") + + default: + if weAreLeaderCh == nil { + c.logger.Error().Log("attempted to stop the leader loop while not running") + continue + } + + c.logger.Debug().Log("shutting down leader loop") + close(weAreLeaderCh) + leaderLoop.Wait() + weAreLeaderCh = nil + c.logger.Info().Log("cluster leadership lost") + } + case <-c.shutdownCh: + return + } + } +} + +// leadershipTransfer tries to transfer the leadership to another node e.g. in order +// to do a graceful shutdown. +// https://github.com/hashicorp/consul/blob/44b39240a86bc94ddc67bc105286ab450bd869a9/agent/consul/leader.go#L122 +func (c *cluster) leadershipTransfer() error { + retryCount := 3 + for i := 0; i < retryCount; i++ { + future := c.raft.LeadershipTransfer() + if err := future.Error(); err != nil { + c.logger.Error().WithError(err).WithFields(log.Fields{ + "attempt": i, + "retry_limit": retryCount, + }).Log("failed to transfer leadership attempt, will retry") + } else { + c.logger.Info().WithFields(log.Fields{ + "attempt": i, + "retry_limit": retryCount, + }).Log("successfully transferred leadership") + return nil + } + + } + return fmt.Errorf("failed to transfer leadership in %d attempts", retryCount) +} + +// leaderLoop runs as long as we are the leader to run various maintenance activities +// https://github.com/hashicorp/consul/blob/44b39240a86bc94ddc67bc105286ab450bd869a9/agent/consul/leader.go#L146 +func (c *cluster) leaderLoop(stopCh chan struct{}) { + establishedLeader := false +RECONCILE: + // Setup a reconciliation timer + interval := time.After(s.config.ReconcileInterval) + + // Apply a raft barrier to ensure our FSM is caught up + barrier := c.raft.Barrier(time.Minute) + if err := barrier.Error(); err != nil { + c.logger.Error().WithError(err).Log("failed to wait for barrier") + goto WAIT + } + + // Check if we need to handle initial leadership actions + if !establishedLeader { + if err := c.establishLeadership(stopCtx); err != nil { + c.logger.Error().WithError(err).Log("failed to establish leadership") + // Immediately revoke leadership since we didn't successfully + // establish leadership. + c.revokeLeadership() + + // attempt to transfer leadership. If successful it is + // time to leave the leaderLoop since this node is no + // longer the leader. If leadershipTransfer() fails, we + // will try to acquire it again after + // 5 seconds. + if err := c.leadershipTransfer(); err != nil { + c.logger.Error().WithError(err).Log("failed to transfer leadership") + interval = time.After(5 * time.Second) + goto WAIT + } + return + } + establishedLeader = true + defer c.revokeLeadership() + } + +WAIT: + // Poll the stop channel to give it priority so we don't waste time + // trying to perform the other operations if we have been asked to shut + // down. + select { + case <-stopCh: + return + default: + } + + // Periodically reconcile as long as we are the leader, + // or when Serf events arrive + for { + select { + case <-stopCh: + return + case <-c.shutdownCh: + return + case <-interval: + goto RECONCILE + case errCh := <-c.reassertLeaderCh: + // we can get into this state when the initial + // establishLeadership has failed as well as the follow + // up leadershipTransfer. Afterwards we will be waiting + // for the interval to trigger a reconciliation and can + // potentially end up here. There is no point to + // reassert because this agent was never leader in the + // first place. + if !establishedLeader { + errCh <- fmt.Errorf("leadership has not been established") + continue + } + + // continue to reassert only if we previously were the + // leader, which means revokeLeadership followed by an + // establishLeadership(). + c.revokeLeadership() + err := c.establishLeadership(stopCtx) + errCh <- err + + // in case establishLeadership failed, we will try to + // transfer leadership. At this time raft thinks we are + // the leader, but we disagree. + if err != nil { + if err := c.leadershipTransfer(); err != nil { + // establishedLeader was true before, + // but it no longer is since it revoked + // leadership and Leadership transfer + // also failed. Which is why it stays + // in the leaderLoop, but now + // establishedLeader needs to be set to + // false. + establishedLeader = false + interval = time.After(5 * time.Second) + goto WAIT + } + + // leadershipTransfer was successful and it is + // time to leave the leaderLoop. + return + } + + } + } +} + +func (c *cluster) establishLeadership(ctx context.Context) error { + return nil +} + +func (c *cluster) revokeLeadership() { + +} diff --git a/config/config.go b/config/config.go index 33d9492b..a5de6957 100644 --- a/config/config.go +++ b/config/config.go @@ -271,6 +271,11 @@ func (d *Config) init() { d.vars.Register(value.NewStringList(&d.Router.BlockedPrefixes, []string{"/api"}, ","), "router.blocked_prefixes", "CORE_ROUTER_BLOCKED_PREFIXES", nil, "List of path prefixes that can't be routed", false, false) d.vars.Register(value.NewStringMapString(&d.Router.Routes, nil), "router.routes", "CORE_ROUTER_ROUTES", nil, "List of route mappings", false, false) d.vars.Register(value.NewDir(&d.Router.UIPath, "", d.fs), "router.ui_path", "CORE_ROUTER_UI_PATH", nil, "Path to a directory holding UI files mounted as /ui", false, false) + + // Cluster + d.vars.Register(value.NewBool(&d.Cluster.Enable, false), "cluster.enable", "CORE_CLUSTER_ENABLE", nil, "Enable cluster mode", false, false) + d.vars.Register(value.NewBool(&d.Cluster.Bootstrap, false), "cluster.bootstrap", "CORE_CLUSTER_BOOTSTRAP", nil, "Bootstrap a cluster", false, false) + d.vars.Register(value.NewBool(&d.Cluster.Debug, false), "cluster.debug", "CORE_CLUSTER_DEBUG", nil, "Switch to debug mode, not for production", false, false) } // Validate validates the current state of the Config for completeness and sanity. Errors are diff --git a/config/data.go b/config/data.go index 35507888..631b215a 100644 --- a/config/data.go +++ b/config/data.go @@ -166,6 +166,11 @@ type Data struct { Routes map[string]string `json:"routes"` UIPath string `json:"ui_path"` } `json:"router"` + Cluster struct { + Enable bool `json:"enable"` + Bootstrap bool `json:"bootstrap"` + Debug bool `json:"debug"` + } `json:"cluster"` } func UpgradeV2ToV3(d *v2.Data, fs fs.Filesystem) (*Data, error) {