diff --git a/cluster/cluster.go b/cluster/cluster.go index 1f04aec0..7c0d21b1 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -17,9 +17,8 @@ import ( clusterautocert "github.com/datarhei/core/v16/cluster/autocert" apiclient "github.com/datarhei/core/v16/cluster/client" "github.com/datarhei/core/v16/cluster/forwarder" - clusteriam "github.com/datarhei/core/v16/cluster/iam" - clusteriamadapter "github.com/datarhei/core/v16/cluster/iam/adapter" "github.com/datarhei/core/v16/cluster/kvs" + clusternode "github.com/datarhei/core/v16/cluster/node" "github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/cluster/raft" "github.com/datarhei/core/v16/cluster/store" @@ -156,7 +155,7 @@ type cluster struct { isTLSRequired bool certManager autocert.Manager - nodes map[string]*clusterNode + nodes map[string]clusternode.Node nodesLock sync.RWMutex barrier map[string]bool @@ -190,7 +189,7 @@ func New(ctx context.Context, config Config) (Cluster, error) { isCoreDegradedErr: fmt.Errorf("cluster not yet started"), config: config.CoreConfig, - nodes: map[string]*clusterNode{}, + nodes: map[string]clusternode.Node{}, barrier: map[string]bool{}, @@ -904,7 +903,7 @@ func (c *cluster) trackNodeChanges() { logger.Warn().WithError(err).Log("Discovering cluster API address") } - node := NewClusterNode(id, address) + node := clusternode.New(id, address) if err := verifyClusterVersion(node.Version()); err != nil { logger.Warn().Log("Version mismatch. Cluster will end up in degraded mode") @@ -971,15 +970,6 @@ func (c *cluster) trackNodeChanges() { c.isCoreDegradedErr = nil } c.stateLock.Unlock() - /* - if c.isTLSRequired { - // Update list of managed hostnames - if c.certManager != nil { - c.certManager.ManageCertificates(context.Background(), hostnames) - } - - } - */ case <-c.shutdownCh: return } @@ -1206,393 +1196,6 @@ func (c *cluster) trackLeaderChanges() { } } -func (c *cluster) ListProcesses() []store.Process { - return c.store.ListProcesses() -} - -func (c *cluster) GetProcess(id app.ProcessID) (store.Process, error) { - return c.store.GetProcess(id) -} - -func (c *cluster) AddProcess(origin string, config *app.Config) error { - if ok, _ := c.IsDegraded(); ok { - return ErrDegraded - } - - if !c.IsRaftLeader() { - return c.forwarder.AddProcess(origin, config) - } - - cmd := &store.Command{ - Operation: store.OpAddProcess, - Data: &store.CommandAddProcess{ - Config: config, - }, - } - - return c.applyCommand(cmd) -} - -func (c *cluster) RemoveProcess(origin string, id app.ProcessID) error { - if ok, _ := c.IsDegraded(); ok { - return ErrDegraded - } - - if !c.IsRaftLeader() { - return c.forwarder.RemoveProcess(origin, id) - } - - cmd := &store.Command{ - Operation: store.OpRemoveProcess, - Data: &store.CommandRemoveProcess{ - ID: id, - }, - } - - return c.applyCommand(cmd) -} - -func (c *cluster) UpdateProcess(origin string, id app.ProcessID, config *app.Config) error { - if ok, _ := c.IsDegraded(); ok { - return ErrDegraded - } - - if !c.IsRaftLeader() { - return c.forwarder.UpdateProcess(origin, id, config) - } - - cmd := &store.Command{ - Operation: store.OpUpdateProcess, - Data: &store.CommandUpdateProcess{ - ID: id, - Config: config, - }, - } - - return c.applyCommand(cmd) -} - -func (c *cluster) SetProcessCommand(origin string, id app.ProcessID, command string) error { - if ok, _ := c.IsDegraded(); ok { - return ErrDegraded - } - - if !c.IsRaftLeader() { - return c.forwarder.SetProcessCommand(origin, id, command) - } - - if command == "start" || command == "stop" { - cmd := &store.Command{ - Operation: store.OpSetProcessOrder, - Data: &store.CommandSetProcessOrder{ - ID: id, - Order: command, - }, - } - - return c.applyCommand(cmd) - } - - procs := c.proxy.ListProxyProcesses() - nodeid := "" - - for _, p := range procs { - if p.Config.ProcessID() != id { - continue - } - - nodeid = p.NodeID - - break - } - - if len(nodeid) == 0 { - return fmt.Errorf("the process '%s' is not registered with any node", id.String()) - } - - return c.proxy.CommandProcess(nodeid, id, command) -} - -func (c *cluster) SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error { - if ok, _ := c.IsDegraded(); ok { - return ErrDegraded - } - - if !c.IsRaftLeader() { - return c.forwarder.SetProcessMetadata(origin, id, key, data) - } - - cmd := &store.Command{ - Operation: store.OpSetProcessMetadata, - Data: &store.CommandSetProcessMetadata{ - ID: id, - Key: key, - Data: data, - }, - } - - return c.applyCommand(cmd) -} - -func (c *cluster) IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (iam.IAM, error) { - policyAdapter, err := clusteriamadapter.NewPolicyAdapter(c.store) - if err != nil { - return nil, fmt.Errorf("cluster policy adapter: %w", err) - } - - identityAdapter, err := clusteriamadapter.NewIdentityAdapter(c.store) - if err != nil { - return nil, fmt.Errorf("cluster identitry adapter: %w", err) - } - - iam, err := clusteriam.New(iam.Config{ - PolicyAdapter: policyAdapter, - IdentityAdapter: identityAdapter, - Superuser: superuser, - JWTRealm: jwtRealm, - JWTSecret: jwtSecret, - Logger: c.logger.WithComponent("IAM"), - }, c.store) - if err != nil { - return nil, fmt.Errorf("cluster iam: %w", err) - } - - return iam, nil -} - -func (c *cluster) ListIdentities() (time.Time, []iamidentity.User) { - users := c.store.ListUsers() - - return users.UpdatedAt, users.Users -} - -func (c *cluster) ListIdentity(name string) (time.Time, iamidentity.User, error) { - user := c.store.GetUser(name) - - if len(user.Users) == 0 { - return time.Time{}, iamidentity.User{}, fmt.Errorf("not found") - } - - return user.UpdatedAt, user.Users[0], nil -} - -func (c *cluster) ListPolicies() (time.Time, []iamaccess.Policy) { - policies := c.store.ListPolicies() - - return policies.UpdatedAt, policies.Policies -} - -func (c *cluster) ListUserPolicies(name string) (time.Time, []iamaccess.Policy) { - policies := c.store.ListUserPolicies(name) - - return policies.UpdatedAt, policies.Policies -} - -func (c *cluster) AddIdentity(origin string, identity iamidentity.User) error { - if ok, _ := c.IsDegraded(); ok { - return ErrDegraded - } - - if err := identity.Validate(); err != nil { - return fmt.Errorf("invalid identity: %w", err) - } - - if !c.IsRaftLeader() { - return c.forwarder.AddIdentity(origin, identity) - } - - cmd := &store.Command{ - Operation: store.OpAddIdentity, - Data: &store.CommandAddIdentity{ - Identity: identity, - }, - } - - return c.applyCommand(cmd) -} - -func (c *cluster) UpdateIdentity(origin, name string, identity iamidentity.User) error { - if ok, _ := c.IsDegraded(); ok { - return ErrDegraded - } - - if !c.IsRaftLeader() { - return c.forwarder.UpdateIdentity(origin, name, identity) - } - - cmd := &store.Command{ - Operation: store.OpUpdateIdentity, - Data: &store.CommandUpdateIdentity{ - Name: name, - Identity: identity, - }, - } - - return c.applyCommand(cmd) -} - -func (c *cluster) SetPolicies(origin, name string, policies []iamaccess.Policy) error { - if ok, _ := c.IsDegraded(); ok { - return ErrDegraded - } - - if !c.IsRaftLeader() { - return c.forwarder.SetPolicies(origin, name, policies) - } - - cmd := &store.Command{ - Operation: store.OpSetPolicies, - Data: &store.CommandSetPolicies{ - Name: name, - Policies: policies, - }, - } - - return c.applyCommand(cmd) -} - -func (c *cluster) RemoveIdentity(origin string, name string) error { - if ok, _ := c.IsDegraded(); ok { - return ErrDegraded - } - - if !c.IsRaftLeader() { - return c.forwarder.RemoveIdentity(origin, name) - } - - cmd := &store.Command{ - Operation: store.OpRemoveIdentity, - Data: &store.CommandRemoveIdentity{ - Name: name, - }, - } - - return c.applyCommand(cmd) -} - -func (c *cluster) CreateLock(origin string, name string, validUntil time.Time) (*kvs.Lock, error) { - if ok, _ := c.IsClusterDegraded(); ok { - return nil, ErrDegraded - } - - if !c.IsRaftLeader() { - err := c.forwarder.CreateLock(origin, name, validUntil) - if err != nil { - return nil, err - } - - l := &kvs.Lock{ - ValidUntil: validUntil, - } - - return l, nil - } - - cmd := &store.Command{ - Operation: store.OpCreateLock, - Data: &store.CommandCreateLock{ - Name: name, - ValidUntil: validUntil, - }, - } - - err := c.applyCommand(cmd) - if err != nil { - return nil, err - } - - l := &kvs.Lock{ - ValidUntil: validUntil, - } - - return l, nil -} - -func (c *cluster) DeleteLock(origin string, name string) error { - if ok, _ := c.IsClusterDegraded(); ok { - return ErrDegraded - } - - if !c.IsRaftLeader() { - return c.forwarder.DeleteLock(origin, name) - } - - cmd := &store.Command{ - Operation: store.OpDeleteLock, - Data: &store.CommandDeleteLock{ - Name: name, - }, - } - - return c.applyCommand(cmd) -} - -func (c *cluster) ListLocks() map[string]time.Time { - return c.store.ListLocks() -} - -func (c *cluster) SetKV(origin, key, value string) error { - if ok, _ := c.IsClusterDegraded(); ok { - return ErrDegraded - } - - if !c.IsRaftLeader() { - return c.forwarder.SetKV(origin, key, value) - } - - cmd := &store.Command{ - Operation: store.OpSetKV, - Data: &store.CommandSetKV{ - Key: key, - Value: value, - }, - } - - return c.applyCommand(cmd) -} - -func (c *cluster) UnsetKV(origin, key string) error { - if ok, _ := c.IsClusterDegraded(); ok { - return ErrDegraded - } - - if !c.IsRaftLeader() { - return c.forwarder.UnsetKV(origin, key) - } - - cmd := &store.Command{ - Operation: store.OpUnsetKV, - Data: &store.CommandUnsetKV{ - Key: key, - }, - } - - return c.applyCommand(cmd) -} - -func (c *cluster) GetKV(origin, key string) (string, time.Time, error) { - if ok, _ := c.IsClusterDegraded(); ok { - return "", time.Time{}, ErrDegraded - } - - if !c.IsRaftLeader() { - return c.forwarder.GetKV(origin, key) - } - - value, err := c.store.GetFromKVS(key) - if err != nil { - return "", time.Time{}, err - } - - return value.Value, value.UpdatedAt, nil -} - -func (c *cluster) ListKV(prefix string) map[string]store.Value { - storeValues := c.store.ListKVS(prefix) - - return storeValues -} - func (c *cluster) applyCommand(cmd *store.Command) error { b, err := json.Marshal(cmd) if err != nil { diff --git a/cluster/iam.go b/cluster/iam.go new file mode 100644 index 00000000..79da50ac --- /dev/null +++ b/cluster/iam.go @@ -0,0 +1,149 @@ +package cluster + +import ( + "fmt" + "time" + + clusteriam "github.com/datarhei/core/v16/cluster/iam" + clusteriamadapter "github.com/datarhei/core/v16/cluster/iam/adapter" + "github.com/datarhei/core/v16/cluster/store" + "github.com/datarhei/core/v16/iam" + iamaccess "github.com/datarhei/core/v16/iam/access" + iamidentity "github.com/datarhei/core/v16/iam/identity" +) + +func (c *cluster) IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (iam.IAM, error) { + policyAdapter, err := clusteriamadapter.NewPolicyAdapter(c.store) + if err != nil { + return nil, fmt.Errorf("cluster policy adapter: %w", err) + } + + identityAdapter, err := clusteriamadapter.NewIdentityAdapter(c.store) + if err != nil { + return nil, fmt.Errorf("cluster identitry adapter: %w", err) + } + + iam, err := clusteriam.New(iam.Config{ + PolicyAdapter: policyAdapter, + IdentityAdapter: identityAdapter, + Superuser: superuser, + JWTRealm: jwtRealm, + JWTSecret: jwtSecret, + Logger: c.logger.WithComponent("IAM"), + }, c.store) + if err != nil { + return nil, fmt.Errorf("cluster iam: %w", err) + } + + return iam, nil +} + +func (c *cluster) ListIdentities() (time.Time, []iamidentity.User) { + users := c.store.ListUsers() + + return users.UpdatedAt, users.Users +} + +func (c *cluster) ListIdentity(name string) (time.Time, iamidentity.User, error) { + user := c.store.GetUser(name) + + if len(user.Users) == 0 { + return time.Time{}, iamidentity.User{}, fmt.Errorf("not found") + } + + return user.UpdatedAt, user.Users[0], nil +} + +func (c *cluster) ListPolicies() (time.Time, []iamaccess.Policy) { + policies := c.store.ListPolicies() + + return policies.UpdatedAt, policies.Policies +} + +func (c *cluster) ListUserPolicies(name string) (time.Time, []iamaccess.Policy) { + policies := c.store.ListUserPolicies(name) + + return policies.UpdatedAt, policies.Policies +} + +func (c *cluster) AddIdentity(origin string, identity iamidentity.User) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + + if err := identity.Validate(); err != nil { + return fmt.Errorf("invalid identity: %w", err) + } + + if !c.IsRaftLeader() { + return c.forwarder.AddIdentity(origin, identity) + } + + cmd := &store.Command{ + Operation: store.OpAddIdentity, + Data: &store.CommandAddIdentity{ + Identity: identity, + }, + } + + return c.applyCommand(cmd) +} + +func (c *cluster) UpdateIdentity(origin, name string, identity iamidentity.User) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.UpdateIdentity(origin, name, identity) + } + + cmd := &store.Command{ + Operation: store.OpUpdateIdentity, + Data: &store.CommandUpdateIdentity{ + Name: name, + Identity: identity, + }, + } + + return c.applyCommand(cmd) +} + +func (c *cluster) SetPolicies(origin, name string, policies []iamaccess.Policy) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.SetPolicies(origin, name, policies) + } + + cmd := &store.Command{ + Operation: store.OpSetPolicies, + Data: &store.CommandSetPolicies{ + Name: name, + Policies: policies, + }, + } + + return c.applyCommand(cmd) +} + +func (c *cluster) RemoveIdentity(origin string, name string) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.RemoveIdentity(origin, name) + } + + cmd := &store.Command{ + Operation: store.OpRemoveIdentity, + Data: &store.CommandRemoveIdentity{ + Name: name, + }, + } + + return c.applyCommand(cmd) +} diff --git a/cluster/kvs.go b/cluster/kvs.go index 1980e641..f730a438 100644 --- a/cluster/kvs.go +++ b/cluster/kvs.go @@ -8,6 +8,129 @@ import ( "github.com/datarhei/core/v16/log" ) +func (c *cluster) CreateLock(origin string, name string, validUntil time.Time) (*kvs.Lock, error) { + if ok, _ := c.IsClusterDegraded(); ok { + return nil, ErrDegraded + } + + if !c.IsRaftLeader() { + err := c.forwarder.CreateLock(origin, name, validUntil) + if err != nil { + return nil, err + } + + l := &kvs.Lock{ + ValidUntil: validUntil, + } + + return l, nil + } + + cmd := &store.Command{ + Operation: store.OpCreateLock, + Data: &store.CommandCreateLock{ + Name: name, + ValidUntil: validUntil, + }, + } + + err := c.applyCommand(cmd) + if err != nil { + return nil, err + } + + l := &kvs.Lock{ + ValidUntil: validUntil, + } + + return l, nil +} + +func (c *cluster) DeleteLock(origin string, name string) error { + if ok, _ := c.IsClusterDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.DeleteLock(origin, name) + } + + cmd := &store.Command{ + Operation: store.OpDeleteLock, + Data: &store.CommandDeleteLock{ + Name: name, + }, + } + + return c.applyCommand(cmd) +} + +func (c *cluster) ListLocks() map[string]time.Time { + return c.store.ListLocks() +} + +func (c *cluster) SetKV(origin, key, value string) error { + if ok, _ := c.IsClusterDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.SetKV(origin, key, value) + } + + cmd := &store.Command{ + Operation: store.OpSetKV, + Data: &store.CommandSetKV{ + Key: key, + Value: value, + }, + } + + return c.applyCommand(cmd) +} + +func (c *cluster) UnsetKV(origin, key string) error { + if ok, _ := c.IsClusterDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.UnsetKV(origin, key) + } + + cmd := &store.Command{ + Operation: store.OpUnsetKV, + Data: &store.CommandUnsetKV{ + Key: key, + }, + } + + return c.applyCommand(cmd) +} + +func (c *cluster) GetKV(origin, key string) (string, time.Time, error) { + if ok, _ := c.IsClusterDegraded(); ok { + return "", time.Time{}, ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.GetKV(origin, key) + } + + value, err := c.store.GetFromKVS(key) + if err != nil { + return "", time.Time{}, err + } + + return value.Value, value.UpdatedAt, nil +} + +func (c *cluster) ListKV(prefix string) map[string]store.Value { + storeValues := c.store.ListKVS(prefix) + + return storeValues +} + type clusterKVS struct { cluster Cluster logger log.Logger diff --git a/cluster/node.go b/cluster/node/node.go similarity index 79% rename from cluster/node.go rename to cluster/node/node.go index 04fdb2b8..2621c0fc 100644 --- a/cluster/node.go +++ b/cluster/node/node.go @@ -1,4 +1,4 @@ -package cluster +package node import ( "context" @@ -13,7 +13,23 @@ import ( "github.com/datarhei/core/v16/config" ) -type clusterNode struct { +type Node interface { + Stop() error + Version() string + IPs() []string + Status() (string, error) + LastContact() time.Time + Barrier(name string) (bool, error) + + CoreStatus() (string, error) + CoreEssentials() (string, *config.Config, error) + CoreConfig() (*config.Config, error) + CoreAPIAddress() (string, error) + + Proxy() proxy.Node +} + +type node struct { client client.APIClient id string @@ -33,8 +49,8 @@ type clusterNode struct { proxyNode proxy.Node } -func NewClusterNode(id, address string) *clusterNode { - n := &clusterNode{ +func New(id, address string) Node { + n := &node{ id: id, address: address, version: "0.0.0", @@ -61,7 +77,7 @@ func NewClusterNode(id, address string) *clusterNode { return n } -func (n *clusterNode) start(id string) error { +func (n *node) start(id string) error { n.runLock.Lock() defer n.runLock.Unlock() @@ -113,7 +129,7 @@ func (n *clusterNode) start(id string) error { return nil } -func (n *clusterNode) Stop() error { +func (n *node) Stop() error { n.runLock.Lock() defer n.runLock.Unlock() @@ -129,18 +145,18 @@ func (n *clusterNode) Stop() error { return nil } -func (n *clusterNode) Version() string { +func (n *node) Version() string { n.pingLock.RLock() defer n.pingLock.RUnlock() return n.version } -func (n *clusterNode) IPs() []string { +func (n *node) IPs() []string { return n.ips } -func (n *clusterNode) Status() (string, error) { +func (n *node) Status() (string, error) { n.pingLock.RLock() defer n.pingLock.RUnlock() @@ -152,7 +168,7 @@ func (n *clusterNode) Status() (string, error) { return "online", nil } -func (n *clusterNode) CoreStatus() (string, error) { +func (n *node) CoreStatus() (string, error) { n.pingLock.RLock() defer n.pingLock.RUnlock() @@ -164,14 +180,14 @@ func (n *clusterNode) CoreStatus() (string, error) { return "online", nil } -func (n *clusterNode) LastContact() time.Time { +func (n *node) LastContact() time.Time { n.pingLock.RLock() defer n.pingLock.RUnlock() return n.lastContact } -func (n *clusterNode) CoreEssentials() (string, *config.Config, error) { +func (n *node) CoreEssentials() (string, *config.Config, error) { address, err := n.CoreAPIAddress() if err != nil { return "", nil, err @@ -185,23 +201,23 @@ func (n *clusterNode) CoreEssentials() (string, *config.Config, error) { return address, config, nil } -func (n *clusterNode) CoreConfig() (*config.Config, error) { +func (n *node) CoreConfig() (*config.Config, error) { return n.client.CoreConfig() } -func (n *clusterNode) CoreAPIAddress() (string, error) { +func (n *node) CoreAPIAddress() (string, error) { return n.client.CoreAPIAddress() } -func (n *clusterNode) Barrier(name string) (bool, error) { +func (n *node) Barrier(name string) (bool, error) { return n.client.Barrier(name) } -func (n *clusterNode) Proxy() proxy.Node { +func (n *node) Proxy() proxy.Node { return n.proxyNode } -func (n *clusterNode) ping(ctx context.Context) { +func (n *node) ping(ctx context.Context) { ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -227,7 +243,7 @@ func (n *clusterNode) ping(ctx context.Context) { } } -func (n *clusterNode) pingCore(ctx context.Context) { +func (n *node) pingCore(ctx context.Context) { ticker := time.NewTicker(time.Second) defer ticker.Stop() diff --git a/cluster/process.go b/cluster/process.go new file mode 100644 index 00000000..5881c4e0 --- /dev/null +++ b/cluster/process.go @@ -0,0 +1,136 @@ +package cluster + +import ( + "fmt" + + "github.com/datarhei/core/v16/cluster/store" + "github.com/datarhei/core/v16/restream/app" +) + +func (c *cluster) ListProcesses() []store.Process { + return c.store.ListProcesses() +} + +func (c *cluster) GetProcess(id app.ProcessID) (store.Process, error) { + return c.store.GetProcess(id) +} + +func (c *cluster) AddProcess(origin string, config *app.Config) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.AddProcess(origin, config) + } + + cmd := &store.Command{ + Operation: store.OpAddProcess, + Data: &store.CommandAddProcess{ + Config: config, + }, + } + + return c.applyCommand(cmd) +} + +func (c *cluster) RemoveProcess(origin string, id app.ProcessID) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.RemoveProcess(origin, id) + } + + cmd := &store.Command{ + Operation: store.OpRemoveProcess, + Data: &store.CommandRemoveProcess{ + ID: id, + }, + } + + return c.applyCommand(cmd) +} + +func (c *cluster) UpdateProcess(origin string, id app.ProcessID, config *app.Config) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.UpdateProcess(origin, id, config) + } + + cmd := &store.Command{ + Operation: store.OpUpdateProcess, + Data: &store.CommandUpdateProcess{ + ID: id, + Config: config, + }, + } + + return c.applyCommand(cmd) +} + +func (c *cluster) SetProcessCommand(origin string, id app.ProcessID, command string) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.SetProcessCommand(origin, id, command) + } + + if command == "start" || command == "stop" { + cmd := &store.Command{ + Operation: store.OpSetProcessOrder, + Data: &store.CommandSetProcessOrder{ + ID: id, + Order: command, + }, + } + + return c.applyCommand(cmd) + } + + procs := c.proxy.ListProxyProcesses() + nodeid := "" + + for _, p := range procs { + if p.Config.ProcessID() != id { + continue + } + + nodeid = p.NodeID + + break + } + + if len(nodeid) == 0 { + return fmt.Errorf("the process '%s' is not registered with any node", id.String()) + } + + return c.proxy.CommandProcess(nodeid, id, command) +} + +func (c *cluster) SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.SetProcessMetadata(origin, id, key, data) + } + + cmd := &store.Command{ + Operation: store.OpSetProcessMetadata, + Data: &store.CommandSetProcessMetadata{ + ID: id, + Key: key, + Data: data, + }, + } + + return c.applyCommand(cmd) +}