diff --git a/cluster/api.go b/cluster/api.go index 12ae14b3..92777ea5 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -147,12 +147,12 @@ func NewAPI(config APIConfig) (API, error) { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - a.logger.Debug().WithField("id", r.Config.ID).Log("Add process request") - if r.Origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") } + a.logger.Debug().WithField("id", r.Config.ID).Log("Add process request") + err := a.cluster.AddProcess(r.Origin, &r.Config) if err != nil { a.logger.Debug().WithError(err).WithField("id", r.Config.ID).Log("Unable to add process") @@ -163,18 +163,24 @@ func NewAPI(config APIConfig) (API, error) { }) a.router.POST("/v1/process/:id", func(c echo.Context) error { + id := util.PathParam(c, "id") + r := client.RemoveProcessRequest{} if err := util.ShouldBindJSON(c, &r); err != nil { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - a.logger.Debug().WithField("id", r.ID).Log("Remove process request") - if r.Origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") } + if id != r.ID { + return httpapi.Err(http.StatusBadRequest, "Invalid data", "the ID in the path and the request do not match") + } + + a.logger.Debug().WithField("id", r.ID).Log("Remove process request") + err := a.cluster.RemoveProcess(r.Origin, r.ID) if err != nil { a.logger.Debug().WithError(err).WithField("id", r.ID).Log("Unable to remove process") @@ -184,6 +190,37 @@ func NewAPI(config APIConfig) (API, error) { return c.JSON(http.StatusOK, "OK") }) + a.router.PUT("/v1/process/:id", func(c echo.Context) error { + id := util.PathParam(c, "id") + + r := client.UpdateProcessRequest{} + + if err := util.ShouldBindJSON(c, &r); err != nil { + return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + } + + if r.Origin == a.id { + return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") + } + + if id != r.ID { + return httpapi.Err(http.StatusBadRequest, "Invalid data", "the ID in the path and the request do not match") + } + + a.logger.Debug().WithFields(log.Fields{ + "old_id": r.ID, + "new_id": r.Config.ID, + }).Log("Update process request") + + err := a.cluster.UpdateProcess(r.Origin, r.ID, &r.Config) + if err != nil { + a.logger.Debug().WithError(err).WithField("id", r.ID).Log("Unable to update process") + return httpapi.Err(http.StatusInternalServerError, "unable to update process", "%s", err) + } + + return c.JSON(http.StatusOK, "OK") + }) + a.router.GET("/v1/core", func(c echo.Context) error { address, _ := a.cluster.CoreAPIAddress("") return c.JSON(http.StatusOK, address) diff --git a/cluster/client/client.go b/cluster/client/client.go index 586f7638..06575778 100644 --- a/cluster/client/client.go +++ b/cluster/client/client.go @@ -33,6 +33,12 @@ type RemoveProcessRequest struct { ID string `json:"id"` } +type UpdateProcessRequest struct { + Origin string `json:"origin"` + ID string `json:"id"` + Config app.Config `json:"config"` +} + type APIClient struct { Address string Client *http.Client @@ -97,6 +103,17 @@ func (c *APIClient) RemoveProcess(r RemoveProcessRequest) error { return err } +func (c *APIClient) UpdateProcess(r UpdateProcessRequest) error { + data, err := json.Marshal(r) + if err != nil { + return err + } + + _, err = c.call(http.MethodPut, "/process/"+r.ID, "application/json", bytes.NewReader(data)) + + return err +} + func (c *APIClient) Snapshot() (io.ReadCloser, error) { return c.stream(http.MethodGet, "/snapshot", "", nil) } diff --git a/cluster/cluster.go b/cluster/cluster.go index f213c39f..d3216093 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -63,6 +63,7 @@ type Cluster interface { AddProcess(origin string, config *app.Config) error RemoveProcess(origin, id string) error + UpdateProcess(origin, id string, config *app.Config) error ProxyReader() proxy.ProxyReader } @@ -715,6 +716,22 @@ func (c *cluster) RemoveProcess(origin, id string) error { return c.applyCommand(cmd) } +func (c *cluster) UpdateProcess(origin, id string, config *app.Config) error { + if !c.IsRaftLeader() { + return c.forwarder.UpdateProcess(origin, id, config) + } + + cmd := &store.Command{ + Operation: store.OpUpdateProcess, + Data: &store.CommandUpdateProcess{ + ID: id, + Config: *config, + }, + } + + return c.applyCommand(cmd) +} + func (c *cluster) applyCommand(cmd *store.Command) error { b, err := json.Marshal(cmd) if err != nil { diff --git a/cluster/forwarder/forwarder.go b/cluster/forwarder/forwarder.go index a11183da..a314bcb4 100644 --- a/cluster/forwarder/forwarder.go +++ b/cluster/forwarder/forwarder.go @@ -1,7 +1,6 @@ package forwarder import ( - "fmt" "io" "net/http" "sync" @@ -20,7 +19,7 @@ type Forwarder interface { Leave(origin, id string) error Snapshot() (io.ReadCloser, error) AddProcess(origin string, config *app.Config) error - UpdateProcess() error + UpdateProcess(origin, id string, config *app.Config) error RemoveProcess(origin, id string) error } @@ -153,8 +152,18 @@ func (f *forwarder) AddProcess(origin string, config *app.Config) error { return client.AddProcess(r) } -func (f *forwarder) UpdateProcess() error { - return fmt.Errorf("not implemented") +func (f *forwarder) UpdateProcess(origin, id string, config *app.Config) error { + if origin == "" { + origin = f.id + } + + r := apiclient.UpdateProcessRequest{} + + f.lock.RLock() + client := f.client + f.lock.RUnlock() + + return client.UpdateProcess(r) } func (f *forwarder) RemoveProcess(origin, id string) error { diff --git a/cluster/store/store.go b/cluster/store/store.go index 1abfb0aa..1342d147 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -30,6 +30,7 @@ type operation string const ( OpAddProcess operation = "addProcess" OpRemoveProcess operation = "removeProcess" + OpUpdateProcess operation = "updateProcess" ) type Command struct { @@ -41,6 +42,11 @@ type CommandAddProcess struct { app.Config } +type CommandUpdateProcess struct { + ID string + Config app.Config +} + type CommandRemoveProcess struct { ID string } @@ -95,9 +101,12 @@ func (s *store) Apply(entry *raft.Log) interface{} { json.Unmarshal(b, &cmd) s.lock.Lock() - s.Process[cmd.ID] = Process{ - UpdatedAt: time.Now(), - Config: &cmd.Config, + _, ok := s.Process[cmd.ID] + if !ok { + s.Process[cmd.ID] = Process{ + UpdatedAt: time.Now(), + Config: &cmd.Config, + } } s.lock.Unlock() case OpRemoveProcess: @@ -108,6 +117,33 @@ func (s *store) Apply(entry *raft.Log) interface{} { s.lock.Lock() delete(s.Process, cmd.ID) s.lock.Unlock() + case OpUpdateProcess: + b, _ := json.Marshal(c.Data) + cmd := CommandUpdateProcess{} + json.Unmarshal(b, &cmd) + + s.lock.Lock() + _, ok := s.Process[cmd.ID] + if ok { + if cmd.ID == cmd.Config.ID { + s.Process[cmd.ID] = Process{ + UpdatedAt: time.Now(), + Config: &cmd.Config, + } + } else { + _, ok := s.Process[cmd.Config.ID] + if !ok { + delete(s.Process, cmd.ID) + s.Process[cmd.Config.ID] = Process{ + UpdatedAt: time.Now(), + Config: &cmd.Config, + } + } else { + return fmt.Errorf("the process with the ID %s already exists", cmd.Config.ID) + } + } + } + s.lock.Unlock() } s.lock.RLock()