Rearrange code

This commit is contained in:
Ingo Oppermann 2023-07-03 20:39:47 +02:00
parent adcbd98467
commit 9c88e88619
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
5 changed files with 446 additions and 419 deletions

View File

@ -17,9 +17,8 @@ import (
clusterautocert "github.com/datarhei/core/v16/cluster/autocert"
apiclient "github.com/datarhei/core/v16/cluster/client"
"github.com/datarhei/core/v16/cluster/forwarder"
clusteriam "github.com/datarhei/core/v16/cluster/iam"
clusteriamadapter "github.com/datarhei/core/v16/cluster/iam/adapter"
"github.com/datarhei/core/v16/cluster/kvs"
clusternode "github.com/datarhei/core/v16/cluster/node"
"github.com/datarhei/core/v16/cluster/proxy"
"github.com/datarhei/core/v16/cluster/raft"
"github.com/datarhei/core/v16/cluster/store"
@ -156,7 +155,7 @@ type cluster struct {
isTLSRequired bool
certManager autocert.Manager
nodes map[string]*clusterNode
nodes map[string]clusternode.Node
nodesLock sync.RWMutex
barrier map[string]bool
@ -190,7 +189,7 @@ func New(ctx context.Context, config Config) (Cluster, error) {
isCoreDegradedErr: fmt.Errorf("cluster not yet started"),
config: config.CoreConfig,
nodes: map[string]*clusterNode{},
nodes: map[string]clusternode.Node{},
barrier: map[string]bool{},
@ -904,7 +903,7 @@ func (c *cluster) trackNodeChanges() {
logger.Warn().WithError(err).Log("Discovering cluster API address")
}
node := NewClusterNode(id, address)
node := clusternode.New(id, address)
if err := verifyClusterVersion(node.Version()); err != nil {
logger.Warn().Log("Version mismatch. Cluster will end up in degraded mode")
@ -971,15 +970,6 @@ func (c *cluster) trackNodeChanges() {
c.isCoreDegradedErr = nil
}
c.stateLock.Unlock()
/*
if c.isTLSRequired {
// Update list of managed hostnames
if c.certManager != nil {
c.certManager.ManageCertificates(context.Background(), hostnames)
}
}
*/
case <-c.shutdownCh:
return
}
@ -1206,393 +1196,6 @@ func (c *cluster) trackLeaderChanges() {
}
}
func (c *cluster) ListProcesses() []store.Process {
return c.store.ListProcesses()
}
func (c *cluster) GetProcess(id app.ProcessID) (store.Process, error) {
return c.store.GetProcess(id)
}
func (c *cluster) AddProcess(origin string, config *app.Config) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.AddProcess(origin, config)
}
cmd := &store.Command{
Operation: store.OpAddProcess,
Data: &store.CommandAddProcess{
Config: config,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) RemoveProcess(origin string, id app.ProcessID) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.RemoveProcess(origin, id)
}
cmd := &store.Command{
Operation: store.OpRemoveProcess,
Data: &store.CommandRemoveProcess{
ID: id,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) UpdateProcess(origin string, id app.ProcessID, config *app.Config) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.UpdateProcess(origin, id, config)
}
cmd := &store.Command{
Operation: store.OpUpdateProcess,
Data: &store.CommandUpdateProcess{
ID: id,
Config: config,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) SetProcessCommand(origin string, id app.ProcessID, command string) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.SetProcessCommand(origin, id, command)
}
if command == "start" || command == "stop" {
cmd := &store.Command{
Operation: store.OpSetProcessOrder,
Data: &store.CommandSetProcessOrder{
ID: id,
Order: command,
},
}
return c.applyCommand(cmd)
}
procs := c.proxy.ListProxyProcesses()
nodeid := ""
for _, p := range procs {
if p.Config.ProcessID() != id {
continue
}
nodeid = p.NodeID
break
}
if len(nodeid) == 0 {
return fmt.Errorf("the process '%s' is not registered with any node", id.String())
}
return c.proxy.CommandProcess(nodeid, id, command)
}
func (c *cluster) SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.SetProcessMetadata(origin, id, key, data)
}
cmd := &store.Command{
Operation: store.OpSetProcessMetadata,
Data: &store.CommandSetProcessMetadata{
ID: id,
Key: key,
Data: data,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (iam.IAM, error) {
policyAdapter, err := clusteriamadapter.NewPolicyAdapter(c.store)
if err != nil {
return nil, fmt.Errorf("cluster policy adapter: %w", err)
}
identityAdapter, err := clusteriamadapter.NewIdentityAdapter(c.store)
if err != nil {
return nil, fmt.Errorf("cluster identitry adapter: %w", err)
}
iam, err := clusteriam.New(iam.Config{
PolicyAdapter: policyAdapter,
IdentityAdapter: identityAdapter,
Superuser: superuser,
JWTRealm: jwtRealm,
JWTSecret: jwtSecret,
Logger: c.logger.WithComponent("IAM"),
}, c.store)
if err != nil {
return nil, fmt.Errorf("cluster iam: %w", err)
}
return iam, nil
}
func (c *cluster) ListIdentities() (time.Time, []iamidentity.User) {
users := c.store.ListUsers()
return users.UpdatedAt, users.Users
}
func (c *cluster) ListIdentity(name string) (time.Time, iamidentity.User, error) {
user := c.store.GetUser(name)
if len(user.Users) == 0 {
return time.Time{}, iamidentity.User{}, fmt.Errorf("not found")
}
return user.UpdatedAt, user.Users[0], nil
}
func (c *cluster) ListPolicies() (time.Time, []iamaccess.Policy) {
policies := c.store.ListPolicies()
return policies.UpdatedAt, policies.Policies
}
func (c *cluster) ListUserPolicies(name string) (time.Time, []iamaccess.Policy) {
policies := c.store.ListUserPolicies(name)
return policies.UpdatedAt, policies.Policies
}
func (c *cluster) AddIdentity(origin string, identity iamidentity.User) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if err := identity.Validate(); err != nil {
return fmt.Errorf("invalid identity: %w", err)
}
if !c.IsRaftLeader() {
return c.forwarder.AddIdentity(origin, identity)
}
cmd := &store.Command{
Operation: store.OpAddIdentity,
Data: &store.CommandAddIdentity{
Identity: identity,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) UpdateIdentity(origin, name string, identity iamidentity.User) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.UpdateIdentity(origin, name, identity)
}
cmd := &store.Command{
Operation: store.OpUpdateIdentity,
Data: &store.CommandUpdateIdentity{
Name: name,
Identity: identity,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) SetPolicies(origin, name string, policies []iamaccess.Policy) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.SetPolicies(origin, name, policies)
}
cmd := &store.Command{
Operation: store.OpSetPolicies,
Data: &store.CommandSetPolicies{
Name: name,
Policies: policies,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) RemoveIdentity(origin string, name string) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.RemoveIdentity(origin, name)
}
cmd := &store.Command{
Operation: store.OpRemoveIdentity,
Data: &store.CommandRemoveIdentity{
Name: name,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) CreateLock(origin string, name string, validUntil time.Time) (*kvs.Lock, error) {
if ok, _ := c.IsClusterDegraded(); ok {
return nil, ErrDegraded
}
if !c.IsRaftLeader() {
err := c.forwarder.CreateLock(origin, name, validUntil)
if err != nil {
return nil, err
}
l := &kvs.Lock{
ValidUntil: validUntil,
}
return l, nil
}
cmd := &store.Command{
Operation: store.OpCreateLock,
Data: &store.CommandCreateLock{
Name: name,
ValidUntil: validUntil,
},
}
err := c.applyCommand(cmd)
if err != nil {
return nil, err
}
l := &kvs.Lock{
ValidUntil: validUntil,
}
return l, nil
}
func (c *cluster) DeleteLock(origin string, name string) error {
if ok, _ := c.IsClusterDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.DeleteLock(origin, name)
}
cmd := &store.Command{
Operation: store.OpDeleteLock,
Data: &store.CommandDeleteLock{
Name: name,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) ListLocks() map[string]time.Time {
return c.store.ListLocks()
}
func (c *cluster) SetKV(origin, key, value string) error {
if ok, _ := c.IsClusterDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.SetKV(origin, key, value)
}
cmd := &store.Command{
Operation: store.OpSetKV,
Data: &store.CommandSetKV{
Key: key,
Value: value,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) UnsetKV(origin, key string) error {
if ok, _ := c.IsClusterDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.UnsetKV(origin, key)
}
cmd := &store.Command{
Operation: store.OpUnsetKV,
Data: &store.CommandUnsetKV{
Key: key,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) GetKV(origin, key string) (string, time.Time, error) {
if ok, _ := c.IsClusterDegraded(); ok {
return "", time.Time{}, ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.GetKV(origin, key)
}
value, err := c.store.GetFromKVS(key)
if err != nil {
return "", time.Time{}, err
}
return value.Value, value.UpdatedAt, nil
}
func (c *cluster) ListKV(prefix string) map[string]store.Value {
storeValues := c.store.ListKVS(prefix)
return storeValues
}
func (c *cluster) applyCommand(cmd *store.Command) error {
b, err := json.Marshal(cmd)
if err != nil {

149
cluster/iam.go Normal file
View File

@ -0,0 +1,149 @@
package cluster
import (
"fmt"
"time"
clusteriam "github.com/datarhei/core/v16/cluster/iam"
clusteriamadapter "github.com/datarhei/core/v16/cluster/iam/adapter"
"github.com/datarhei/core/v16/cluster/store"
"github.com/datarhei/core/v16/iam"
iamaccess "github.com/datarhei/core/v16/iam/access"
iamidentity "github.com/datarhei/core/v16/iam/identity"
)
func (c *cluster) IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (iam.IAM, error) {
policyAdapter, err := clusteriamadapter.NewPolicyAdapter(c.store)
if err != nil {
return nil, fmt.Errorf("cluster policy adapter: %w", err)
}
identityAdapter, err := clusteriamadapter.NewIdentityAdapter(c.store)
if err != nil {
return nil, fmt.Errorf("cluster identitry adapter: %w", err)
}
iam, err := clusteriam.New(iam.Config{
PolicyAdapter: policyAdapter,
IdentityAdapter: identityAdapter,
Superuser: superuser,
JWTRealm: jwtRealm,
JWTSecret: jwtSecret,
Logger: c.logger.WithComponent("IAM"),
}, c.store)
if err != nil {
return nil, fmt.Errorf("cluster iam: %w", err)
}
return iam, nil
}
func (c *cluster) ListIdentities() (time.Time, []iamidentity.User) {
users := c.store.ListUsers()
return users.UpdatedAt, users.Users
}
func (c *cluster) ListIdentity(name string) (time.Time, iamidentity.User, error) {
user := c.store.GetUser(name)
if len(user.Users) == 0 {
return time.Time{}, iamidentity.User{}, fmt.Errorf("not found")
}
return user.UpdatedAt, user.Users[0], nil
}
func (c *cluster) ListPolicies() (time.Time, []iamaccess.Policy) {
policies := c.store.ListPolicies()
return policies.UpdatedAt, policies.Policies
}
func (c *cluster) ListUserPolicies(name string) (time.Time, []iamaccess.Policy) {
policies := c.store.ListUserPolicies(name)
return policies.UpdatedAt, policies.Policies
}
func (c *cluster) AddIdentity(origin string, identity iamidentity.User) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if err := identity.Validate(); err != nil {
return fmt.Errorf("invalid identity: %w", err)
}
if !c.IsRaftLeader() {
return c.forwarder.AddIdentity(origin, identity)
}
cmd := &store.Command{
Operation: store.OpAddIdentity,
Data: &store.CommandAddIdentity{
Identity: identity,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) UpdateIdentity(origin, name string, identity iamidentity.User) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.UpdateIdentity(origin, name, identity)
}
cmd := &store.Command{
Operation: store.OpUpdateIdentity,
Data: &store.CommandUpdateIdentity{
Name: name,
Identity: identity,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) SetPolicies(origin, name string, policies []iamaccess.Policy) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.SetPolicies(origin, name, policies)
}
cmd := &store.Command{
Operation: store.OpSetPolicies,
Data: &store.CommandSetPolicies{
Name: name,
Policies: policies,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) RemoveIdentity(origin string, name string) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.RemoveIdentity(origin, name)
}
cmd := &store.Command{
Operation: store.OpRemoveIdentity,
Data: &store.CommandRemoveIdentity{
Name: name,
},
}
return c.applyCommand(cmd)
}

View File

@ -8,6 +8,129 @@ import (
"github.com/datarhei/core/v16/log"
)
func (c *cluster) CreateLock(origin string, name string, validUntil time.Time) (*kvs.Lock, error) {
if ok, _ := c.IsClusterDegraded(); ok {
return nil, ErrDegraded
}
if !c.IsRaftLeader() {
err := c.forwarder.CreateLock(origin, name, validUntil)
if err != nil {
return nil, err
}
l := &kvs.Lock{
ValidUntil: validUntil,
}
return l, nil
}
cmd := &store.Command{
Operation: store.OpCreateLock,
Data: &store.CommandCreateLock{
Name: name,
ValidUntil: validUntil,
},
}
err := c.applyCommand(cmd)
if err != nil {
return nil, err
}
l := &kvs.Lock{
ValidUntil: validUntil,
}
return l, nil
}
func (c *cluster) DeleteLock(origin string, name string) error {
if ok, _ := c.IsClusterDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.DeleteLock(origin, name)
}
cmd := &store.Command{
Operation: store.OpDeleteLock,
Data: &store.CommandDeleteLock{
Name: name,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) ListLocks() map[string]time.Time {
return c.store.ListLocks()
}
func (c *cluster) SetKV(origin, key, value string) error {
if ok, _ := c.IsClusterDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.SetKV(origin, key, value)
}
cmd := &store.Command{
Operation: store.OpSetKV,
Data: &store.CommandSetKV{
Key: key,
Value: value,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) UnsetKV(origin, key string) error {
if ok, _ := c.IsClusterDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.UnsetKV(origin, key)
}
cmd := &store.Command{
Operation: store.OpUnsetKV,
Data: &store.CommandUnsetKV{
Key: key,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) GetKV(origin, key string) (string, time.Time, error) {
if ok, _ := c.IsClusterDegraded(); ok {
return "", time.Time{}, ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.GetKV(origin, key)
}
value, err := c.store.GetFromKVS(key)
if err != nil {
return "", time.Time{}, err
}
return value.Value, value.UpdatedAt, nil
}
func (c *cluster) ListKV(prefix string) map[string]store.Value {
storeValues := c.store.ListKVS(prefix)
return storeValues
}
type clusterKVS struct {
cluster Cluster
logger log.Logger

View File

@ -1,4 +1,4 @@
package cluster
package node
import (
"context"
@ -13,7 +13,23 @@ import (
"github.com/datarhei/core/v16/config"
)
type clusterNode struct {
type Node interface {
Stop() error
Version() string
IPs() []string
Status() (string, error)
LastContact() time.Time
Barrier(name string) (bool, error)
CoreStatus() (string, error)
CoreEssentials() (string, *config.Config, error)
CoreConfig() (*config.Config, error)
CoreAPIAddress() (string, error)
Proxy() proxy.Node
}
type node struct {
client client.APIClient
id string
@ -33,8 +49,8 @@ type clusterNode struct {
proxyNode proxy.Node
}
func NewClusterNode(id, address string) *clusterNode {
n := &clusterNode{
func New(id, address string) Node {
n := &node{
id: id,
address: address,
version: "0.0.0",
@ -61,7 +77,7 @@ func NewClusterNode(id, address string) *clusterNode {
return n
}
func (n *clusterNode) start(id string) error {
func (n *node) start(id string) error {
n.runLock.Lock()
defer n.runLock.Unlock()
@ -113,7 +129,7 @@ func (n *clusterNode) start(id string) error {
return nil
}
func (n *clusterNode) Stop() error {
func (n *node) Stop() error {
n.runLock.Lock()
defer n.runLock.Unlock()
@ -129,18 +145,18 @@ func (n *clusterNode) Stop() error {
return nil
}
func (n *clusterNode) Version() string {
func (n *node) Version() string {
n.pingLock.RLock()
defer n.pingLock.RUnlock()
return n.version
}
func (n *clusterNode) IPs() []string {
func (n *node) IPs() []string {
return n.ips
}
func (n *clusterNode) Status() (string, error) {
func (n *node) Status() (string, error) {
n.pingLock.RLock()
defer n.pingLock.RUnlock()
@ -152,7 +168,7 @@ func (n *clusterNode) Status() (string, error) {
return "online", nil
}
func (n *clusterNode) CoreStatus() (string, error) {
func (n *node) CoreStatus() (string, error) {
n.pingLock.RLock()
defer n.pingLock.RUnlock()
@ -164,14 +180,14 @@ func (n *clusterNode) CoreStatus() (string, error) {
return "online", nil
}
func (n *clusterNode) LastContact() time.Time {
func (n *node) LastContact() time.Time {
n.pingLock.RLock()
defer n.pingLock.RUnlock()
return n.lastContact
}
func (n *clusterNode) CoreEssentials() (string, *config.Config, error) {
func (n *node) CoreEssentials() (string, *config.Config, error) {
address, err := n.CoreAPIAddress()
if err != nil {
return "", nil, err
@ -185,23 +201,23 @@ func (n *clusterNode) CoreEssentials() (string, *config.Config, error) {
return address, config, nil
}
func (n *clusterNode) CoreConfig() (*config.Config, error) {
func (n *node) CoreConfig() (*config.Config, error) {
return n.client.CoreConfig()
}
func (n *clusterNode) CoreAPIAddress() (string, error) {
func (n *node) CoreAPIAddress() (string, error) {
return n.client.CoreAPIAddress()
}
func (n *clusterNode) Barrier(name string) (bool, error) {
func (n *node) Barrier(name string) (bool, error) {
return n.client.Barrier(name)
}
func (n *clusterNode) Proxy() proxy.Node {
func (n *node) Proxy() proxy.Node {
return n.proxyNode
}
func (n *clusterNode) ping(ctx context.Context) {
func (n *node) ping(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
@ -227,7 +243,7 @@ func (n *clusterNode) ping(ctx context.Context) {
}
}
func (n *clusterNode) pingCore(ctx context.Context) {
func (n *node) pingCore(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

136
cluster/process.go Normal file
View File

@ -0,0 +1,136 @@
package cluster
import (
"fmt"
"github.com/datarhei/core/v16/cluster/store"
"github.com/datarhei/core/v16/restream/app"
)
func (c *cluster) ListProcesses() []store.Process {
return c.store.ListProcesses()
}
func (c *cluster) GetProcess(id app.ProcessID) (store.Process, error) {
return c.store.GetProcess(id)
}
func (c *cluster) AddProcess(origin string, config *app.Config) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.AddProcess(origin, config)
}
cmd := &store.Command{
Operation: store.OpAddProcess,
Data: &store.CommandAddProcess{
Config: config,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) RemoveProcess(origin string, id app.ProcessID) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.RemoveProcess(origin, id)
}
cmd := &store.Command{
Operation: store.OpRemoveProcess,
Data: &store.CommandRemoveProcess{
ID: id,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) UpdateProcess(origin string, id app.ProcessID, config *app.Config) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.UpdateProcess(origin, id, config)
}
cmd := &store.Command{
Operation: store.OpUpdateProcess,
Data: &store.CommandUpdateProcess{
ID: id,
Config: config,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) SetProcessCommand(origin string, id app.ProcessID, command string) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.SetProcessCommand(origin, id, command)
}
if command == "start" || command == "stop" {
cmd := &store.Command{
Operation: store.OpSetProcessOrder,
Data: &store.CommandSetProcessOrder{
ID: id,
Order: command,
},
}
return c.applyCommand(cmd)
}
procs := c.proxy.ListProxyProcesses()
nodeid := ""
for _, p := range procs {
if p.Config.ProcessID() != id {
continue
}
nodeid = p.NodeID
break
}
if len(nodeid) == 0 {
return fmt.Errorf("the process '%s' is not registered with any node", id.String())
}
return c.proxy.CommandProcess(nodeid, id, command)
}
func (c *cluster) SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.SetProcessMetadata(origin, id, key, data)
}
cmd := &store.Command{
Operation: store.OpSetProcessMetadata,
Data: &store.CommandSetProcessMetadata{
ID: id,
Key: key,
Data: data,
},
}
return c.applyCommand(cmd)
}