diff --git a/app/api/api.go b/app/api/api.go index 2ed471f2..5c115efb 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -624,13 +624,17 @@ func (a *api) start() error { if cfg.Cluster.Enable { cluster, err := cluster.New(cluster.ClusterConfig{ - ID: cfg.ID, - Name: cfg.Name, - Path: filepath.Join(cfg.DB.Dir, "cluster"), - IPLimiter: a.sessionsLimiter, - Logger: a.log.logger.core.WithComponent("Cluster"), - Bootstrap: cfg.Cluster.Bootstrap, - Address: cfg.Cluster.Address, + ID: cfg.ID, + Name: cfg.Name, + Path: filepath.Join(cfg.DB.Dir, "cluster"), + Bootstrap: cfg.Cluster.Bootstrap, + Address: cfg.Cluster.Address, + JoinAddress: cfg.Cluster.JoinAddress, + CoreAPIAddress: cfg.Address, + CoreAPIUsername: cfg.API.Auth.Username, + CoreAPIPassword: cfg.API.Auth.Password, + IPLimiter: a.sessionsLimiter, + Logger: a.log.logger.core.WithComponent("Cluster"), }) if err != nil { return fmt.Errorf("unable to create cluster: %w", err) @@ -1323,7 +1327,7 @@ func (a *api) stop() { } if a.cluster != nil { - a.cluster.Leave() + a.cluster.Leave("", "") a.cluster.Shutdown() } diff --git a/cluster/api.go b/cluster/api.go new file mode 100644 index 00000000..71e7268b --- /dev/null +++ b/cluster/api.go @@ -0,0 +1,167 @@ +package cluster + +import ( + "bytes" + "context" + "net/http" + "strings" + + httpapi "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/http/errorhandler" + "github.com/datarhei/core/v16/http/handler/util" + httplog "github.com/datarhei/core/v16/http/log" + mwlog "github.com/datarhei/core/v16/http/middleware/log" + "github.com/datarhei/core/v16/http/validator" + "github.com/datarhei/core/v16/log" + + "github.com/labstack/echo/v4" + "github.com/labstack/echo/v4/middleware" +) + +type api struct { + id string + address string + router *echo.Echo + cluster Cluster + logger log.Logger +} + +type API interface { + Start() error + Shutdown(ctx context.Context) error +} + +type APIConfig struct { + ID string + Cluster Cluster + Logger log.Logger +} + +type JoinRequest struct { + Origin string `json:"origin"` + ID string `json:"id"` + RaftAddress string `json:"raft_address"` + APIAddress string `json:"api_address"` + APIUsername string `json:"api_username"` + APIPassword string `json:"api_password"` +} + +type LeaveRequest struct { + Origin string `json:"origin"` + ID string `json:"id"` +} + +func NewAPI(config APIConfig) (API, error) { + a := &api{ + id: config.ID, + cluster: config.Cluster, + logger: config.Logger, + } + + if a.logger == nil { + a.logger = log.New("") + } + + address, err := config.Cluster.APIAddr("") + if err != nil { + return nil, err + } + + a.address = address + + a.router = echo.New() + a.router.Debug = true + a.router.HTTPErrorHandler = errorhandler.HTTPErrorHandler + a.router.Validator = validator.New() + a.router.HideBanner = false + a.router.HidePort = false + + mwlog.NewWithConfig(mwlog.Config{ + Logger: a.logger, + }) + + a.router.Use(mwlog.NewWithConfig(mwlog.Config{ + Logger: a.logger, + })) + a.router.Use(middleware.RecoverWithConfig(middleware.RecoverConfig{ + LogErrorFunc: func(c echo.Context, err error, stack []byte) error { + rows := strings.Split(string(stack), "\n") + a.logger.Error().WithField("stack", rows).Log("recovered from a panic") + return nil + }, + })) + a.router.Logger.SetOutput(httplog.NewWrapper(a.logger)) + + a.router.POST("/v1/join", func(c echo.Context) error { + r := JoinRequest{} + + if err := util.ShouldBindJSON(c, &r); err != nil { + return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + } + + a.logger.Debug().Log("got join request: %+v", r) + + if r.Origin == a.id { + return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") + } + + err := a.cluster.Join(r.Origin, r.ID, r.RaftAddress, r.APIAddress, r.APIUsername, r.APIPassword) + if err != nil { + a.logger.Debug().WithError(err).Log("unable to join cluster") + return httpapi.Err(http.StatusInternalServerError, "unable to join cluster", "%s", err) + } + + return c.JSON(http.StatusOK, "OK") + }) + + a.router.POST("/v1/leave", func(c echo.Context) error { + r := LeaveRequest{} + + if err := util.ShouldBindJSON(c, &r); err != nil { + return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + } + + a.logger.Debug().Log("got leave request: %+v", r) + + if r.Origin == a.id { + return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") + } + + err := a.cluster.Leave(r.Origin, r.ID) + if err != nil { + a.logger.Debug().WithError(err).Log("unable to leave cluster") + return httpapi.Err(http.StatusInternalServerError, "unable to leave cluster", "%s", err) + } + + return c.JSON(http.StatusOK, "OK") + }) + + a.router.GET("/v1/snaphot", func(c echo.Context) error { + data, err := a.cluster.Snapshot() + if err != nil { + a.logger.Debug().WithError(err).Log("unable to create snaphot") + return httpapi.Err(http.StatusInternalServerError, "unable to create snapshot", "%s", err) + } + + return c.Stream(http.StatusOK, "application/octet-stream", bytes.NewReader(data)) + }) + + a.router.POST("/v1/process", func(c echo.Context) error { + return httpapi.Err(http.StatusNotImplemented, "") + }) + + a.router.DELETE("/v1/process/:id", func(c echo.Context) error { + return httpapi.Err(http.StatusNotImplemented, "") + }) + + return a, nil +} + +func (a *api) Start() error { + a.logger.Debug().Log("starting api at %s", a.address) + return a.router.Start(a.address) +} + +func (a *api) Shutdown(ctx context.Context) error { + return a.router.Shutdown(ctx) +} diff --git a/cluster/cluster.go b/cluster/cluster.go index 1ae4672d..d422e737 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -1,7 +1,9 @@ package cluster import ( + "bytes" "context" + "encoding/gob" "encoding/json" "errors" "fmt" @@ -9,11 +11,13 @@ import ( gonet "net" "path/filepath" "reflect" + "strconv" "sync" "time" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/net" + "github.com/datarhei/core/v16/restream/app" hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb/v2" @@ -62,23 +66,42 @@ func (r *dummyClusterReader) GetFile(path string) (io.ReadCloser, error) { } type Cluster interface { - AddNode(address, username, password string) (string, error) - RemoveNode(id string) error - ListNodes() []NodeReader - GetNode(id string) (NodeReader, error) - Leave() error // gracefully leave the cluster + Addr() string + APIAddr(raftAddress string) (string, error) + + Join(origin, id, raftAddress, apiAddress, apiUsername, apiPassword string) error + Leave(origin, id string) error // gracefully remove a node from the cluster + Snapshot() ([]byte, error) + Shutdown() error + + AddNode(id, address, username, password string) error + RemoveNode(id string) error + ListNodes() []addNodeCommand + GetNode(id string) (addNodeCommand, error) + + AddNodeX(address, username, password string) (string, error) + RemoveNodeX(id string) error + ListNodesX() []NodeReader + GetNodeX(id string) (NodeReader, error) + ClusterReader } type ClusterConfig struct { - ID string - Name string - Path string + ID string // ID of the node + Name string // Name of the node + Path string // Path where to store all cluster data + Bootstrap bool // Whether to bootstrap a cluster + Address string // Listen address for the raft protocol + JoinAddress string // Address of a member of a cluster to join + + CoreAPIAddress string // Address of the core API + CoreAPIUsername string // Username for the core API + CoreAPIPassword string // Password for the core API + IPLimiter net.IPLimiter Logger log.Logger - Bootstrap bool - Address string } type cluster struct { @@ -104,10 +127,13 @@ type cluster struct { raft *raft.Raft raftTransport *raft.NetworkTransport raftAddress string - raftNotifyCh <-chan bool + raftNotifyCh chan bool + raftEmergencyNotifyCh chan bool raftStore *raftboltdb.BoltStore raftRemoveGracePeriod time.Duration + joinAddress string + store Store reassertLeaderCh chan chan error @@ -117,6 +143,20 @@ type cluster struct { shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex + + forwarder Forwarder + api API + + core struct { + address string + username string + password string + } + + isRaftLeader bool + hasRaftLeader bool + isLeader bool + leaderLock sync.Mutex } func New(config ClusterConfig) (Cluster, error) { @@ -133,12 +173,17 @@ func New(config ClusterConfig) (Cluster, error) { logger: config.Logger, raftAddress: config.Address, + joinAddress: config.JoinAddress, reassertLeaderCh: make(chan chan error), leaveCh: make(chan struct{}), shutdownCh: make(chan struct{}), } + c.core.address = config.CoreAPIAddress + c.core.username = config.CoreAPIUsername + c.core.password = config.CoreAPIPassword + if c.limiter == nil { c.limiter = net.NewNullIPLimiter() } @@ -154,13 +199,65 @@ func New(config ClusterConfig) (Cluster, error) { c.store = store + api, err := NewAPI(APIConfig{ + ID: c.id, + Cluster: c, + Logger: c.logger.WithField("logname", "api"), + }) + if err != nil { + return nil, err + } + + go func(api API) { + api.Start() + }(api) + + c.api = api + + if forwarder, err := NewForwarder(ForwarderConfig{ + ID: c.id, + Logger: c.logger.WithField("logname", "forwarder"), + }); err != nil { + c.shutdownAPI() + return nil, err + } else { + c.forwarder = forwarder + } + c.logger.Debug().Log("starting raft") err = c.startRaft(store, config.Bootstrap, false) if err != nil { + c.shutdownAPI() return nil, err } + if len(c.joinAddress) != 0 { + addr, _ := c.APIAddr(c.joinAddress) + c.forwarder.SetLeader(addr) + + go func(addr string) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-c.shutdownCh: + return + case <-ticker.C: + c.logger.Debug().Log("joining cluster at %s", c.joinAddress) + err := c.Join("", c.id, c.raftAddress, c.core.address, c.core.username, c.core.password) + if err != nil { + c.logger.Warn().WithError(err).Log("unable to join %s", c.joinAddress) + continue + } + + return + } + } + }(addr) + } + go func() { for { select { @@ -200,6 +297,25 @@ func New(config ClusterConfig) (Cluster, error) { return c, nil } +func (c *cluster) Addr() string { + return c.raftAddress +} + +func (c *cluster) APIAddr(raftAddress string) (string, error) { + if len(raftAddress) == 0 { + raftAddress = c.raftAddress + } + + host, port, _ := gonet.SplitHostPort(raftAddress) + + p, err := strconv.Atoi(port) + if err != nil { + return "", err + } + + return gonet.JoinHostPort(host, strconv.Itoa(p+1)), nil +} + func (c *cluster) Shutdown() error { c.logger.Info().Log("shutting down cluster") c.shutdownLock.Lock() @@ -226,46 +342,34 @@ func (c *cluster) Shutdown() error { return nil } -// https://github.com/hashicorp/consul/blob/44b39240a86bc94ddc67bc105286ab450bd869a9/agent/consul/server.go#L1369 -func (c *cluster) Leave() error { - addr := c.raftTransport.LocalAddr() +func (c *cluster) IsRaftLeader() bool { + c.leaderLock.Lock() + defer c.leaderLock.Unlock() - // Get the latest configuration. - future := c.raft.GetConfiguration() - if err := future.Error(); err != nil { - c.logger.Error().WithError(err).Log("failed to get raft configuration") - return err + return c.isRaftLeader +} + +func (c *cluster) Leave(origin, id string) error { + if len(id) == 0 { + id = c.id } - numPeers := len(future.Configuration().Servers) + c.logger.Debug().WithFields(log.Fields{ + "nodeid": id, + }).Log("received leave request for node") - // If we are the current leader, and we have any other peers (cluster has multiple - // servers), we should do a RemoveServer/RemovePeer to safely reduce the quorum size. - // If we are not the leader, then we should issue our leave intention and wait to be - // removed for some reasonable period of time. - isLeader := c.IsLeader() - if isLeader && numPeers > 1 { - if err := c.leadershipTransfer(); err == nil { - isLeader = false - } else { - c.StoreRemoveNode(c.id) - future := c.raft.RemoveServer(raft.ServerID(c.id), 0, 0) - if err := future.Error(); err != nil { - c.logger.Error().WithError(err).Log("failed to remove ourself as raft peer") - } + if !c.IsRaftLeader() { + // Tell the leader to remove us + err := c.forwarder.Leave(origin, id) + if err != nil { + return err } - } - - // If we were not leader, wait to be safely removed from the cluster. We - // must wait to allow the raft replication to take place, otherwise an - // immediate shutdown could cause a loss of quorum. - if !isLeader { - // Send leave-request to leader - // DELETE leader//api/v3/cluster/node/:id + // Wait for us being removed from the configuration left := false limit := time.Now().Add(c.raftRemoveGracePeriod) for !left && time.Now().Before(limit) { + c.logger.Debug().Log("waiting for getting removed from the configuration") // Sleep a while before we check. time.Sleep(50 * time.Millisecond) @@ -279,7 +383,7 @@ func (c *cluster) Leave() error { // See if we are no longer included. left = true for _, server := range future.Configuration().Servers { - if server.Address == addr { + if server.Address == raft.ServerAddress(c.raftAddress) { left = false break } @@ -289,46 +393,108 @@ func (c *cluster) Leave() error { if !left { c.logger.Warn().Log("failed to leave raft configuration gracefully, timeout") } + + return nil } - return nil -} - -func (c *cluster) IsLeader() bool { - return c.raft.State() == raft.Leader -} - -func (c *cluster) leave(id string) error { - if !c.IsLeader() { - return fmt.Errorf("not leader") + // Count the number of servers in the cluster + future := c.raft.GetConfiguration() + if err := future.Error(); err != nil { + c.logger.Error().WithError(err).Log("failed to get raft configuration") + return err } - c.logger.Debug().WithFields(log.Fields{ - "nodeid": id, - }).Log("received leave request for remote node") + numPeers := len(future.Configuration().Servers) if id == c.id { + // We're going to remove ourselves + if numPeers <= 1 { + // Don't do so if we're the only server in the cluster + c.logger.Debug().Log("we're the leader without any peers, not doing anything") + return nil + } + + // Transfer the leadership to another server err := c.leadershipTransfer() if err != nil { c.logger.Warn().WithError(err).Log("failed to transfer leadership") + return err } + + // Wait for new leader election + for { + c.logger.Debug().Log("waiting for new leader election") + + time.Sleep(50 * time.Millisecond) + + c.leaderLock.Lock() + hasLeader := c.hasRaftLeader + c.leaderLock.Unlock() + + if hasLeader { + break + } + } + + // Tell the new leader to remove us + err = c.forwarder.Leave("", id) + if err != nil { + return err + } + + // Wait for us being removed from the configuration + left := false + limit := time.Now().Add(c.raftRemoveGracePeriod) + for !left && time.Now().Before(limit) { + c.logger.Debug().Log("waiting for getting removed from the configuration") + // Sleep a while before we check. + time.Sleep(50 * time.Millisecond) + + // Get the latest configuration. + future := c.raft.GetConfiguration() + if err := future.Error(); err != nil { + c.logger.Error().WithError(err).Log("failed to get raft configuration") + break + } + + // See if we are no longer included. + left = true + for _, server := range future.Configuration().Servers { + if server.Address == raft.ServerAddress(c.raftAddress) { + left = false + break + } + } + } + + return nil } - future := c.raft.RemoveServer(raft.ServerID(id), 0, 0) - if err := future.Error(); err != nil { - c.logger.Error().WithError(err).WithFields(log.Fields{ + err := c.RemoveNode(id) + if err != nil { + c.logger.Error().WithError(err).Log("failed to apply log for removal") + } + + // Remove another sever from the cluster + if future := c.raft.RemoveServer(raft.ServerID(id), 0, 0); future.Error() != nil { + c.logger.Error().WithError(future.Error()).WithFields(log.Fields{ "nodeid": id, }).Log("failed to remove node") + + return future.Error() } return nil } -func (c *cluster) Join(id, raftAddress, apiAddress, username, password string) error { - if !c.IsLeader() { - return fmt.Errorf("not leader") +func (c *cluster) Join(origin, id, raftAddress, apiAddress, apiUsername, apiPassword string) error { + if !c.IsRaftLeader() { + c.logger.Debug().Log("not leader, forwarding to leader") + return c.forwarder.Join(origin, id, raftAddress, apiAddress, apiUsername, apiPassword) } + c.logger.Debug().Log("leader: joining %s", raftAddress) + c.logger.Debug().WithFields(log.Fields{ "nodeid": id, "address": raftAddress, @@ -365,21 +531,24 @@ func (c *cluster) Join(id, raftAddress, apiAddress, username, password string) e } } - f := c.raft.AddVoter(raft.ServerID(id), raft.ServerAddress(raftAddress), 0, 5*time.Second) + f := c.raft.AddVoter(raft.ServerID(id), raft.ServerAddress(raftAddress), 0, 0) if err := f.Error(); err != nil { return err } - if err := c.StoreAddNode(id, apiAddress, username, password); err != nil { - future := c.raft.RemoveServer(raft.ServerID(id), 0, 0) - if err := future.Error(); err != nil { - c.logger.Error().WithError(err).WithFields(log.Fields{ - "nodeid": id, - "address": raftAddress, - }).Log("error removing existing node") + if err := c.AddNode(id, apiAddress, apiUsername, apiPassword); err != nil { + /* + future := c.raft.RemoveServer(raft.ServerID(id), 0, 0) + if err := future.Error(); err != nil { + c.logger.Error().WithError(err).WithFields(log.Fields{ + "nodeid": id, + "address": raftAddress, + }).Log("error removing existing node") + return err + } return err - } - return err + */ + c.logger.Debug().WithError(err).Log("") } c.logger.Info().WithFields(log.Fields{ @@ -390,31 +559,69 @@ func (c *cluster) Join(id, raftAddress, apiAddress, username, password string) e return nil } -type command struct { - Operation string - Data interface{} +type Snapshot struct { + Metadata *raft.SnapshotMeta + Data []byte } -type addNodeCommand struct { - ID string - Address string - Username string - Password string +func (c *cluster) Snapshot() ([]byte, error) { + if !c.IsRaftLeader() { + c.logger.Debug().Log("not leader, forwarding to leader") + return c.forwarder.Snapshot() + } + + f := c.raft.Snapshot() + err := f.Error() + if err != nil { + return nil, err + } + + metadata, r, err := f.Open() + if err != nil { + return nil, err + } + + defer r.Close() + + data, err := io.ReadAll(r) + if err != nil { + return nil, fmt.Errorf("failed to read in snapshot: %w", err) + } + + snapshot := Snapshot{ + Metadata: metadata, + Data: data, + } + + buffer := bytes.Buffer{} + enc := gob.NewEncoder(&buffer) + err = enc.Encode(snapshot) + if err != nil { + return nil, err + } + + return buffer.Bytes(), nil } -func (c *cluster) StoreListNodes() []addNodeCommand { +func (c *cluster) ListNodes() []addNodeCommand { c.store.ListNodes() return nil } -func (c *cluster) StoreAddNode(id, address, username, password string) error { - if !c.IsLeader() { +func (c *cluster) GetNode(id string) (addNodeCommand, error) { + c.store.GetNode(id) + + return addNodeCommand{}, nil +} + +func (c *cluster) AddNode(id, address, username, password string) error { + if !c.IsRaftLeader() { return fmt.Errorf("not leader") } com := &command{ - Operation: "addNode", + Operation: opAddNode, Data: &addNodeCommand{ ID: id, Address: address, @@ -436,17 +643,13 @@ func (c *cluster) StoreAddNode(id, address, username, password string) error { return nil } -type removeNodeCommand struct { - ID string -} - -func (c *cluster) StoreRemoveNode(id string) error { - if !c.IsLeader() { +func (c *cluster) RemoveNode(id string) error { + if !c.IsRaftLeader() { return fmt.Errorf("not leader") } com := &command{ - Operation: "removeNode", + Operation: opRemoveNode, Data: &removeNodeCommand{ ID: id, }, @@ -481,11 +684,22 @@ func (c *cluster) trackLeaderChanges() { select { case obs := <-obsCh: if leaderObs, ok := obs.Data.(raft.LeaderObservation); ok { - // TODO: update the forwarder c.logger.Debug().WithFields(log.Fields{ "id": leaderObs.LeaderID, "address": leaderObs.LeaderAddr, }).Log("new leader observation") + addr := string(leaderObs.LeaderAddr) + if len(addr) != 0 { + addr, _ = c.APIAddr(addr) + } + c.forwarder.SetLeader(addr) + c.leaderLock.Lock() + if len(addr) == 0 { + c.hasRaftLeader = false + } else { + c.hasRaftLeader = true + } + c.leaderLock.Unlock() } else if peerObs, ok := obs.Data.(raft.PeerObservation); ok { c.logger.Debug().WithFields(log.Fields{ "removed": peerObs.Removed, @@ -502,7 +716,7 @@ func (c *cluster) trackLeaderChanges() { } } -func (c *cluster) AddNode(address, username, password string) (string, error) { +func (c *cluster) AddNodeX(address, username, password string) (string, error) { node, err := newNode(address, username, password, c.updates) if err != nil { return "", err @@ -537,7 +751,7 @@ func (c *cluster) AddNode(address, username, password string) (string, error) { return id, nil } -func (c *cluster) RemoveNode(id string) error { +func (c *cluster) RemoveNodeX(id string) error { c.lock.Lock() defer c.lock.Unlock() @@ -556,7 +770,7 @@ func (c *cluster) RemoveNode(id string) error { c.limiter.RemoveBlock(ip) } - c.leave(id) + c.Leave("", id) c.logger.Info().WithFields(log.Fields{ "id": id, @@ -565,7 +779,7 @@ func (c *cluster) RemoveNode(id string) error { return nil } -func (c *cluster) ListNodes() []NodeReader { +func (c *cluster) ListNodesX() []NodeReader { list := []NodeReader{} c.lock.RLock() @@ -578,7 +792,7 @@ func (c *cluster) ListNodes() []NodeReader { return list } -func (c *cluster) GetNode(id string) (NodeReader, error) { +func (c *cluster) GetNodeX(id string) (NodeReader, error) { c.lock.RLock() defer c.lock.RUnlock() @@ -690,7 +904,7 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, inmem bool) error { c.raftTransport = transport snapshotLogger := NewLogger(c.logger, hclog.Debug).Named("raft-snapshot") - snapshots, err := raft.NewFileSnapshotStoreWithLogger(filepath.Join(c.path, "snapshots"), 10, snapshotLogger) + snapshots, err := raft.NewFileSnapshotStoreWithLogger(c.path, 3, snapshotLogger) if err != nil { return err } @@ -742,8 +956,7 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, inmem bool) error { }, } - if err := raft.BootstrapCluster(cfg, - logStore, stableStore, snapshots, transport, configuration); err != nil { + if err := raft.BootstrapCluster(cfg, logStore, stableStore, snapshots, transport, configuration); err != nil { return err } } @@ -754,6 +967,8 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, inmem bool) error { cfg.NotifyCh = raftNotifyCh c.raftNotifyCh = raftNotifyCh + c.raftEmergencyNotifyCh = make(chan bool, 10) + node, err := raft.NewRaft(cfg, fsm, logStore, stableStore, snapshots, transport) if err != nil { return err @@ -763,6 +978,7 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, inmem bool) error { go c.trackLeaderChanges() go c.monitorLeadership() + go c.sentinel() c.logger.Debug().Log("raft started") @@ -782,6 +998,103 @@ func (c *cluster) shutdownRaft() { } } +func (c *cluster) shutdownAPI() { + if c.api != nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + c.api.Shutdown(ctx) + } +} + // nodeLoop is run by every node in the cluster. This is mainly to check the list // of nodes from the FSM, in order to connect to them and to fetch their file lists. -func (c *cluster) nodeLoop() {} +func (c *cluster) followerLoop(stopCh chan struct{}) { + // Periodically reconcile as long as we are the leader + for { + select { + case <-stopCh: + return + case <-c.shutdownCh: + return + } + } +} + +func (c *cluster) AddProcess(config app.Config) error { + if !c.IsRaftLeader() { + return c.forwarder.AddProcess() + } + + cmd := &command{ + Operation: "addProcess", + Data: &addProcessCommand{ + Config: nil, + }, + } + + return c.applyCommand(cmd) +} + +func (c *cluster) applyCommand(cmd *command) error { + b, err := json.Marshal(cmd) + if err != nil { + return err + } + + future := c.raft.Apply(b, 5*time.Second) + if err := future.Error(); err != nil { + return fmt.Errorf("applying command failed: %w", err) + } + + return nil +} + +func (c *cluster) sentinel() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + start := time.Now() + var lastContactSince time.Duration + + isEmergencyLeader := false + + for { + select { + case <-c.shutdownCh: + return + case <-ticker.C: + stats := c.raft.Stats() + + fields := log.Fields{} + + for k, v := range stats { + fields[k] = v + } + + c.logger.Debug().WithFields(fields).Log("stats") + + lastContact := stats["last_contact"] + if lastContact == "never" { + lastContactSince = time.Since(start) + } else { + if d, err := time.ParseDuration(lastContact); err == nil { + lastContactSince = d + start = time.Now() + } else { + lastContactSince = time.Since(start) + } + } + + if lastContactSince > 10*time.Second && !isEmergencyLeader { + c.logger.Warn().Log("force leadership due to lost contact to leader") + c.raftEmergencyNotifyCh <- true + isEmergencyLeader = true + } else if lastContactSince <= 10*time.Second && isEmergencyLeader { + c.logger.Warn().Log("stop forced leadership due to contact to leader") + c.raftEmergencyNotifyCh <- false + isEmergencyLeader = false + } + } + } +} diff --git a/cluster/forwarder.go b/cluster/forwarder.go index a7a27545..1775184a 100644 --- a/cluster/forwarder.go +++ b/cluster/forwarder.go @@ -1,28 +1,219 @@ package cluster -import "github.com/labstack/echo/v4" +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "sync" + "time" + + httpapi "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/log" +) // Forwarder forwards any HTTP request from a follower to the leader type Forwarder interface { SetLeader(address string) - Forward(c echo.Context) + HasLeader() bool + Join(origin, id, raftAddress, apiAddress, apiUsername, apiPassword string) error + Leave(origin, id string) error + Snapshot() ([]byte, error) + AddProcess() error + UpdateProcess() error + RemoveProcess() error } type forwarder struct { + id string leaderAddr string + lock sync.RWMutex + + client *http.Client + + logger log.Logger } -func NewForwarder() (Forwarder, error) { - return &forwarder{}, nil +type ForwarderConfig struct { + ID string + Logger log.Logger +} + +func NewForwarder(config ForwarderConfig) (Forwarder, error) { + f := &forwarder{ + id: config.ID, + logger: config.Logger, + } + + if f.logger == nil { + f.logger = log.New("") + } + + tr := &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Second, + } + + client := &http.Client{ + Transport: tr, + Timeout: 5 * time.Second, + } + + f.client = client + + return f, nil } func (f *forwarder) SetLeader(address string) { + f.lock.Lock() + defer f.lock.Unlock() + if f.leaderAddr == address { return } + f.logger.Debug().Log("setting leader address to %s", address) + + f.leaderAddr = address } -func (f *forwarder) Forward(c echo.Context) { - +func (f *forwarder) HasLeader() bool { + return len(f.leaderAddr) != 0 +} + +func (f *forwarder) Join(origin, id, raftAddress, apiAddress, apiUsername, apiPassword string) error { + if origin == "" { + origin = f.id + } + + r := JoinRequest{ + Origin: origin, + ID: id, + RaftAddress: raftAddress, + APIAddress: apiAddress, + APIUsername: apiUsername, + APIPassword: apiPassword, + } + + f.logger.Debug().WithField("request", r).Log("forwarding to leader") + + data, err := json.Marshal(&r) + if err != nil { + return err + } + + _, err = f.call(http.MethodPost, "/join", "application/json", bytes.NewReader(data)) + + return err +} + +func (f *forwarder) Leave(origin, id string) error { + if origin == "" { + origin = f.id + } + + r := LeaveRequest{ + Origin: origin, + ID: id, + } + + f.logger.Debug().WithField("request", r).Log("forwarding to leader") + + data, err := json.Marshal(&r) + if err != nil { + return err + } + + _, err = f.call(http.MethodPost, "/leave", "application/json", bytes.NewReader(data)) + + return err +} + +func (f *forwarder) Snapshot() ([]byte, error) { + r, err := f.stream(http.MethodGet, "/snapshot", "", nil) + if err != nil { + return nil, err + } + + data, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + return data, nil +} + +func (f *forwarder) AddProcess() error { + return fmt.Errorf("not implemented") +} + +func (f *forwarder) UpdateProcess() error { + return fmt.Errorf("not implemented") +} + +func (f *forwarder) RemoveProcess() error { + return fmt.Errorf("not implemented") +} + +func (f *forwarder) stream(method, path, contentType string, data io.Reader) (io.ReadCloser, error) { + if len(f.leaderAddr) == 0 { + return nil, fmt.Errorf("no leader address defined") + } + + f.lock.RLock() + address := "http://" + f.leaderAddr + "/v1" + path + f.lock.RUnlock() + + f.logger.Debug().Log("address: %s", address) + + req, err := http.NewRequest(method, address, data) + if err != nil { + return nil, err + } + + if method == "POST" || method == "PUT" { + req.Header.Add("Content-Type", contentType) + } + + status, body, err := f.request(req) + if err != nil { + return nil, err + } + + if status < 200 || status >= 300 { + e := httpapi.Error{} + + defer body.Close() + + x, _ := io.ReadAll(body) + + json.Unmarshal(x, &e) + + return nil, e + } + + return body, nil +} + +func (f *forwarder) call(method, path, contentType string, data io.Reader) ([]byte, error) { + body, err := f.stream(method, path, contentType, data) + if err != nil { + return nil, err + } + + defer body.Close() + + x, _ := io.ReadAll(body) + + return x, nil +} + +func (f *forwarder) request(req *http.Request) (int, io.ReadCloser, error) { + resp, err := f.client.Do(req) + if err != nil { + return -1, nil, err + } + + return resp.StatusCode, resp.Body, nil } diff --git a/cluster/fsm.go b/cluster/fsm.go index 8d78b5e9..37db667f 100644 --- a/cluster/fsm.go +++ b/cluster/fsm.go @@ -12,6 +12,36 @@ type Store interface { raft.FSM ListNodes() []string + GetNode(id string) string +} + +type operation string + +const ( + opAddNode operation = "addNode" + opRemoveNode operation = "removeNode" + opAddProcess operation = "addProcess" + opRemoveProcess operation = "removeProcess" +) + +type command struct { + Operation operation + Data interface{} +} + +type addNodeCommand struct { + ID string + Address string + Username string + Password string +} + +type removeNodeCommand struct { + ID string +} + +type addProcessCommand struct { + Config []byte } // Implement a FSM @@ -36,13 +66,13 @@ func (s *store) Apply(log *raft.Log) interface{} { fmt.Printf("op: %+v\n", c) switch c.Operation { - case "addNode": + case opAddNode: b, _ := json.Marshal(c.Data) cmd := addNodeCommand{} json.Unmarshal(b, &cmd) fmt.Printf("addNode: %+v\n", cmd) - case "removeNode": + case opRemoveNode: b, _ := json.Marshal(c.Data) cmd := removeNodeCommand{} json.Unmarshal(b, &cmd) @@ -66,6 +96,10 @@ func (s *store) ListNodes() []string { return nil } +func (s *store) GetNode(id string) string { + return "" +} + type fsmSnapshot struct{} func (s *fsmSnapshot) Persist(sink raft.SnapshotSink) error { diff --git a/cluster/leader.go b/cluster/leader.go index 76f8c85d..59f279ea 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -9,6 +9,10 @@ import ( "github.com/datarhei/core/v16/log" ) +const NOTIFY_FOLLOWER = 0 +const NOTIFY_LEADER = 1 +const NOTIFY_EMERGENCY = 2 + // monitorLeadership listens to the raf notify channel in order to find // out if we got the leadership or lost it. // https://github.com/hashicorp/consul/blob/44b39240a86bc94ddc67bc105286ab450bd869a9/agent/consul/leader.go#L71 @@ -17,42 +21,161 @@ func (c *cluster) monitorLeadership() { // leaderCh, which is only notified best-effort. Doing this ensures // that we get all notifications in order, which is required for // cleanup and to ensure we never run multiple leader loops. - raftNotifyCh := c.raftNotifyCh + notifyCh := make(chan int, 10) + var notifyLoop sync.WaitGroup + + notifyLoop.Add(1) + + go func() { + raftNotifyCh := c.raftNotifyCh + raftEmergencyNotifyCh := c.raftEmergencyNotifyCh + + isRaftLeader := false + + notifyCh <- NOTIFY_FOLLOWER + + notifyLoop.Done() + + for { + select { + case isEmergencyLeader := <-raftEmergencyNotifyCh: + if isEmergencyLeader { + isRaftLeader = false + notifyCh <- NOTIFY_EMERGENCY + } else { + if !isRaftLeader { + notifyCh <- NOTIFY_FOLLOWER + } + } + case isLeader := <-raftNotifyCh: + if isLeader { + isRaftLeader = true + notifyCh <- NOTIFY_LEADER + } else { + isRaftLeader = false + notifyCh <- NOTIFY_FOLLOWER + } + case <-c.shutdownCh: + return + } + } + }() + + notifyLoop.Wait() var weAreLeaderCh chan struct{} + var weAreEmergencyLeaderCh chan struct{} + var weAreFollowerCh chan struct{} + var leaderLoop sync.WaitGroup + var emergencyLeaderLoop sync.WaitGroup + var followerLoop sync.WaitGroup for { select { - case isLeader := <-raftNotifyCh: - switch { - case isLeader: - if weAreLeaderCh != nil { - c.logger.Error().Log("attempted to start the leader loop while running") + case notification := <-notifyCh: + if notification == NOTIFY_FOLLOWER { + if weAreFollowerCh != nil { + // we are already follower, don't do anything continue } + // shutdown any leader and emergency loop + if weAreLeaderCh != nil { + c.logger.Debug().Log("shutting down leader loop") + close(weAreLeaderCh) + leaderLoop.Wait() + weAreLeaderCh = nil + } + + if weAreEmergencyLeaderCh != nil { + c.logger.Debug().Log("shutting down emergency leader loop") + close(weAreEmergencyLeaderCh) + emergencyLeaderLoop.Wait() + weAreEmergencyLeaderCh = nil + } + + weAreFollowerCh = make(chan struct{}) + followerLoop.Add(1) + go func(ch chan struct{}) { + defer followerLoop.Done() + c.followerLoop(ch) + }(weAreFollowerCh) + + c.logger.Info().Log("cluster followship acquired") + + c.leaderLock.Lock() + c.isRaftLeader = false + c.isLeader = false + c.leaderLock.Unlock() + } else if notification == NOTIFY_LEADER { + if weAreLeaderCh != nil { + // we are already leader, don't do anything + continue + } + + // shutdown any follower and emergency loop + if weAreFollowerCh != nil { + c.logger.Debug().Log("shutting down follower loop") + close(weAreFollowerCh) + followerLoop.Wait() + weAreFollowerCh = nil + } + + if weAreEmergencyLeaderCh != nil { + c.logger.Debug().Log("shutting down emergency leader loop") + close(weAreEmergencyLeaderCh) + emergencyLeaderLoop.Wait() + weAreEmergencyLeaderCh = nil + } + weAreLeaderCh = make(chan struct{}) leaderLoop.Add(1) go func(ch chan struct{}) { defer leaderLoop.Done() - c.leaderLoop(ch) + c.leaderLoop(ch, false) }(weAreLeaderCh) c.logger.Info().Log("cluster leadership acquired") - c.StoreAddNode(c.id, ":8080", "foo", "bar") + c.leaderLock.Lock() + c.isRaftLeader = true + c.isLeader = true + c.leaderLock.Unlock() - default: - if weAreLeaderCh == nil { - c.logger.Error().Log("attempted to stop the leader loop while not running") + c.AddNode(c.id, c.core.address, c.core.username, c.core.password) + } else if notification == NOTIFY_EMERGENCY { + if weAreEmergencyLeaderCh != nil { + // we are already emergency leader, don't do anything continue } - c.logger.Debug().Log("shutting down leader loop") - close(weAreLeaderCh) - leaderLoop.Wait() - weAreLeaderCh = nil - c.logger.Info().Log("cluster leadership lost") + // shutdown any follower and leader loop + if weAreFollowerCh != nil { + c.logger.Debug().Log("shutting down follower loop") + close(weAreFollowerCh) + followerLoop.Wait() + weAreFollowerCh = nil + } + + if weAreLeaderCh != nil { + c.logger.Debug().Log("shutting down leader loop") + close(weAreLeaderCh) + leaderLoop.Wait() + weAreLeaderCh = nil + } + + weAreEmergencyLeaderCh = make(chan struct{}) + emergencyLeaderLoop.Add(1) + go func(ch chan struct{}) { + defer emergencyLeaderLoop.Done() + c.leaderLoop(ch, true) + }(weAreEmergencyLeaderCh) + c.logger.Info().Log("cluster emergency leadership acquired") + + c.leaderLock.Lock() + c.isRaftLeader = false + c.isLeader = true + c.leaderLock.Unlock() } case <-c.shutdownCh: return @@ -77,26 +200,42 @@ func (c *cluster) leadershipTransfer() error { "attempt": i, "retry_limit": retryCount, }).Log("successfully transferred leadership") + + for { + c.logger.Debug().Log("waiting for losing leadership") + + time.Sleep(50 * time.Millisecond) + + c.leaderLock.Lock() + isLeader := c.isRaftLeader + c.leaderLock.Unlock() + + if !isLeader { + break + } + } + return nil } - } return fmt.Errorf("failed to transfer leadership in %d attempts", retryCount) } // leaderLoop runs as long as we are the leader to run various maintenance activities // https://github.com/hashicorp/consul/blob/44b39240a86bc94ddc67bc105286ab450bd869a9/agent/consul/leader.go#L146 -func (c *cluster) leaderLoop(stopCh chan struct{}) { +func (c *cluster) leaderLoop(stopCh chan struct{}, emergency bool) { establishedLeader := false RECONCILE: // Setup a reconciliation timer interval := time.After(10 * time.Second) // Apply a raft barrier to ensure our FSM is caught up - barrier := c.raft.Barrier(time.Minute) - if err := barrier.Error(); err != nil { - c.logger.Error().WithError(err).Log("failed to wait for barrier") - goto WAIT + if !emergency { + barrier := c.raft.Barrier(time.Minute) + if err := barrier.Error(); err != nil { + c.logger.Error().WithError(err).Log("failed to wait for barrier") + goto WAIT + } } // Check if we need to handle initial leadership actions diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 656f9c2f..79d344ec 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -37,7 +37,7 @@ func NewCluster(cluster cluster.Cluster) *ClusterHandler { // @Security ApiKeyAuth // @Router /api/v3/cluster [get] func (h *ClusterHandler) GetCluster(c echo.Context) error { - nodes := h.cluster.ListNodes() + nodes := h.cluster.ListNodesX() list := []api.ClusterNode{} @@ -74,7 +74,7 @@ func (h *ClusterHandler) AddNode(c echo.Context) error { return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - id, err := h.cluster.AddNode(node.Address, "", "") + id, err := h.cluster.AddNodeX(node.Address, "", "") if err != nil { return api.Err(http.StatusBadRequest, "Failed to add node", "%s", err) } @@ -96,7 +96,7 @@ func (h *ClusterHandler) AddNode(c echo.Context) error { func (h *ClusterHandler) DeleteNode(c echo.Context) error { id := util.PathParam(c, "id") - if err := h.cluster.RemoveNode(id); err != nil { + if err := h.cluster.RemoveNodeX(id); err != nil { if err == cluster.ErrNodeNotFound { return api.Err(http.StatusNotFound, err.Error(), "%s", id) } @@ -120,7 +120,7 @@ func (h *ClusterHandler) DeleteNode(c echo.Context) error { func (h *ClusterHandler) GetNode(c echo.Context) error { id := util.PathParam(c, "id") - peer, err := h.cluster.GetNode(id) + peer, err := h.cluster.GetNodeX(id) if err != nil { return api.Err(http.StatusNotFound, "Node not found", "%s", err) } @@ -150,7 +150,7 @@ func (h *ClusterHandler) GetNode(c echo.Context) error { func (h *ClusterHandler) GetNodeProxy(c echo.Context) error { id := util.PathParam(c, "id") - peer, err := h.cluster.GetNode(id) + peer, err := h.cluster.GetNodeX(id) if err != nil { return api.Err(http.StatusNotFound, "Node not found", "%s", err) } @@ -200,7 +200,7 @@ func (h *ClusterHandler) UpdateNode(c echo.Context) error { return api.Err(http.StatusBadRequest, "Failed to remove node", "%s", err) } - id, err := h.cluster.AddNode(node.Address, "", "") + id, err := h.cluster.AddNodeX(node.Address, "", "") if err != nil { return api.Err(http.StatusBadRequest, "Failed to add node", "%s", err) } diff --git a/http/log.go b/http/log/log.go similarity index 89% rename from http/log.go rename to http/log/log.go index 67f2ff47..c77e592e 100644 --- a/http/log.go +++ b/http/log/log.go @@ -1,4 +1,4 @@ -package http +package log import ( "encoding/json" @@ -14,7 +14,7 @@ type logentry struct { Message string `json:"message"` } -func newLogwrapper(writer io.Writer) *logwrapper { +func NewWrapper(writer io.Writer) *logwrapper { return &logwrapper{ writer: writer, } diff --git a/http/server.go b/http/server.go index 35de6505..cd116853 100644 --- a/http/server.go +++ b/http/server.go @@ -42,6 +42,7 @@ import ( "github.com/datarhei/core/v16/http/handler" api "github.com/datarhei/core/v16/http/handler/api" "github.com/datarhei/core/v16/http/jwt" + httplog "github.com/datarhei/core/v16/http/log" "github.com/datarhei/core/v16/http/router" "github.com/datarhei/core/v16/http/validator" "github.com/datarhei/core/v16/log" @@ -358,7 +359,7 @@ func NewServer(config Config) (Server, error) { s.router.HideBanner = true s.router.HidePort = true - s.router.Logger.SetOutput(newLogwrapper(s.logger)) + s.router.Logger.SetOutput(httplog.NewWrapper(s.logger)) if s.middleware.cors != nil { s.router.Use(s.middleware.cors)