From 1a64fddbb15fd3c2a678112065116819d48d7c24 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Fri, 7 Jun 2024 11:28:54 +0200 Subject: [PATCH] Allow cluster leave endpoint to remove any node in the cluster --- app/api/api.go | 32 ++++++++++++++++++++++++++------ cluster/cluster.go | 30 ++++++++++++++++++++++++++++++ docs/docs.go | 19 +++++++++++++++++++ docs/swagger.json | 19 +++++++++++++++++++ docs/swagger.yaml | 12 ++++++++++++ http/api/cluster.go | 4 ++++ http/handler/api/cluster.go | 7 +++---- log/log.go | 4 ++++ 8 files changed, 117 insertions(+), 10 deletions(-) diff --git a/app/api/api.go b/app/api/api.go index a8efa542..1cdd1f4d 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -1719,15 +1719,35 @@ func (a *api) start(ctx context.Context) error { // Start the cluster if a.cluster != nil { - ctx, cancel := context.WithTimeout(ctx, time.Duration(cfg.Cluster.StartupTimeout)*time.Second) - defer cancel() + wgStart.Add(1) + a.wgStop.Add(1) - err := a.cluster.Start(ctx) - if err != nil { - return fmt.Errorf("failed to start cluster: %w", err) - } + go func() { + logger := a.log.logger.core + + var err error + + defer func() { + logger.Info().Log("Cluster exited") + a.wgStop.Done() + }() + + ctx, cancel := context.WithTimeout(ctx, time.Duration(cfg.Cluster.StartupTimeout)*time.Second) + defer cancel() + + wgStart.Done() + + err = a.cluster.Start(ctx) + if err != nil { + err = fmt.Errorf("cluster failed: %w", err) + } + + sendError(err) + }() } + wgStart.Wait() + // Start the service if a.service != nil { a.service.Start() diff --git a/cluster/cluster.go b/cluster/cluster.go index 9399b760..bfa650f6 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -432,6 +432,8 @@ func (c *cluster) Start(ctx context.Context) error { return fmt.Errorf("failed to setup cluster: %w", err) } + <-c.shutdownCh + return nil } @@ -1014,6 +1016,13 @@ func (c *cluster) trackNodeChanges() { } delete(c.nodes, id) + /* + if id == c.nodeID { + c.logger.Warn().WithField("id", id).Log("This node left the cluster. Shutting down.") + // We got removed from the cluster, shutdown + c.Shutdown() + } + */ } c.nodesLock.Unlock() @@ -1307,6 +1316,27 @@ func (c *cluster) trackLeaderChanges() { c.hasRaftLeader = true } c.leaderLock.Unlock() + + servers, err := c.raft.Servers() + if err != nil { + c.logger.Error().WithError(err).Log("Raft configuration") + break + } + + isNodeInCluster := false + for _, server := range servers { + if c.nodeID == server.ID { + isNodeInCluster = true + break + } + } + + if !isNodeInCluster { + // We're not anymore part of the cluster, shutdown + c.logger.Warn().WithField("id", c.nodeID).Log("This node left the cluster. Shutting down.") + c.Shutdown() + } + case <-c.shutdownCh: return } diff --git a/docs/docs.go b/docs/docs.go index e8fc9e01..f296fe53 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -860,6 +860,17 @@ const docTemplate = `{ ], "summary": "Leave the cluster gracefully", "operationId": "cluster-3-leave", + "parameters": [ + { + "description": "Node ID", + "name": "nodeid", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.ClusterNodeID" + } + } + ], "responses": { "200": { "description": "OK", @@ -5043,6 +5054,14 @@ const docTemplate = `{ } } }, + "api.ClusterNodeID": { + "type": "object", + "properties": { + "id": { + "type": "string" + } + } + }, "api.ClusterNodeResources": { "type": "object", "properties": { diff --git a/docs/swagger.json b/docs/swagger.json index de3940f2..554a708f 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -852,6 +852,17 @@ ], "summary": "Leave the cluster gracefully", "operationId": "cluster-3-leave", + "parameters": [ + { + "description": "Node ID", + "name": "nodeid", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.ClusterNodeID" + } + } + ], "responses": { "200": { "description": "OK", @@ -5035,6 +5046,14 @@ } } }, + "api.ClusterNodeID": { + "type": "object", + "properties": { + "id": { + "type": "string" + } + } + }, "api.ClusterNodeResources": { "type": "object", "properties": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index c5597b7c..3afc15c0 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -181,6 +181,11 @@ definitions: description: unix timestamp type: integer type: object + api.ClusterNodeID: + properties: + id: + type: string + type: object api.ClusterNodeResources: properties: cpu_limit: @@ -3093,6 +3098,13 @@ paths: put: description: Leave the cluster gracefully operationId: cluster-3-leave + parameters: + - description: Node ID + in: body + name: nodeid + required: true + schema: + $ref: '#/definitions/api.ClusterNodeID' produces: - application/json responses: diff --git a/http/api/cluster.go b/http/api/cluster.go index be60ff76..f5e0d12f 100644 --- a/http/api/cluster.go +++ b/http/api/cluster.go @@ -4,6 +4,10 @@ import ( "time" ) +type ClusterNodeID struct { + ID string `json:"id"` +} + type ClusterNode struct { ID string `json:"id"` Name string `json:"name"` diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 067b4250..2bfae27a 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -175,13 +175,13 @@ func (h *ClusterHandler) TransferLeadership(c echo.Context) error { // @Tags v16.?.? // @ID cluster-3-leave // @Produce json -// @Param nodeid body string true "Node ID" +// @Param nodeid body api.ClusterNodeID true "Node ID" // @Success 200 {string} string // @Failure 500 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/cluster/leave [put] func (h *ClusterHandler) Leave(c echo.Context) error { - nodeid := "" + nodeid := api.ClusterNodeID{} req := c.Request() @@ -196,8 +196,7 @@ func (h *ClusterHandler) Leave(c echo.Context) error { } } - h.cluster.Leave("", nodeid) - //h.cluster.Shutdown() + h.cluster.Leave("", nodeid.ID) return c.JSON(http.StatusOK, "OK") } diff --git a/log/log.go b/log/log.go index f19f183e..3ad1a822 100644 --- a/log/log.go +++ b/log/log.go @@ -319,6 +319,10 @@ func (e *Event) WithFields(f Fields) Logger { } func (e *Event) WithError(err error) Logger { + if err == nil { + return e + } + return e.WithFields(Fields{ "error": err, })