diff --git a/cluster/cluster.go b/cluster/cluster.go index 6d0cd131..604065cb 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -697,7 +697,7 @@ func (c *cluster) trackLeaderChanges() { } func (c *cluster) ListProcesses() []store.Process { - return c.store.ProcessList() + return c.store.ListProcesses() } func (c *cluster) GetProcess(id app.ProcessID) (store.Process, error) { @@ -794,7 +794,7 @@ func (c *cluster) IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (i } func (c *cluster) ListIdentities() (time.Time, []iamidentity.User) { - users := c.store.UserList() + users := c.store.ListUsers() return users.UpdatedAt, users.Users } @@ -810,13 +810,13 @@ func (c *cluster) ListIdentity(name string) (time.Time, iamidentity.User, error) } func (c *cluster) ListPolicies() (time.Time, []iamaccess.Policy) { - policies := c.store.PolicyList() + policies := c.store.ListPolicies() return policies.UpdatedAt, policies.Policies } func (c *cluster) ListUserPolicies(name string) (time.Time, []iamaccess.Policy) { - policies := c.store.PolicyUserList(name) + policies := c.store.ListUserPolicies(name) return policies.UpdatedAt, policies.Policies } diff --git a/cluster/follower.go b/cluster/follower.go index f62b5841..afeeb664 100644 --- a/cluster/follower.go +++ b/cluster/follower.go @@ -2,7 +2,6 @@ package cluster // followerLoop is run by every follower node in the cluster. func (c *cluster) followerLoop(stopCh chan struct{}) { - // Periodically reconcile as long as we are the leader for { select { case <-stopCh: diff --git a/cluster/iam/adapter/identity.go b/cluster/iam/adapter/identity.go index 2026e8de..27bd2114 100644 --- a/cluster/iam/adapter/identity.go +++ b/cluster/iam/adapter/identity.go @@ -18,7 +18,7 @@ func NewIdentityAdapter(store store.Store) (iamidentity.Adapter, error) { } func (a *identityAdapter) LoadIdentities() ([]iamidentity.User, error) { - users := a.store.UserList() + users := a.store.ListUsers() return users.Users, nil } diff --git a/cluster/iam/adapter/policy.go b/cluster/iam/adapter/policy.go index 2f7b28ad..3b69212e 100644 --- a/cluster/iam/adapter/policy.go +++ b/cluster/iam/adapter/policy.go @@ -26,7 +26,7 @@ func NewPolicyAdapter(store store.Store) (iamaccess.Adapter, error) { } func (a *policyAdapter) LoadPolicy(model model.Model) error { - policies := a.store.PolicyList() + policies := a.store.ListPolicies() rules := [][]string{} domains := map[string]struct{}{} diff --git a/cluster/leader.go b/cluster/leader.go index 22882442..6fd32ca2 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -391,7 +391,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { for _, op := range stack { switch v := op.(type) { case processOpAdd: - err := c.proxy.ProcessAdd(v.nodeid, v.config, v.metadata) + err := c.proxy.AddProcess(v.nodeid, v.config, v.metadata) if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.config.ProcessID(), @@ -400,7 +400,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { break } - err = c.proxy.ProcessStart(v.nodeid, v.config.ProcessID()) + err = c.proxy.StartProcess(v.nodeid, v.config.ProcessID()) if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.config.ID, @@ -413,7 +413,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { "nodeid": v.nodeid, }).Log("Adding process") case processOpUpdate: - err := c.proxy.ProcessUpdate(v.nodeid, v.processid, v.config, v.metadata) + err := c.proxy.UpdateProcess(v.nodeid, v.processid, v.config, v.metadata) if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.config.ID, @@ -427,7 +427,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { "nodeid": v.nodeid, }).Log("Updating process") case processOpDelete: - err := c.proxy.ProcessDelete(v.nodeid, v.processid) + err := c.proxy.DeleteProcess(v.nodeid, v.processid) if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.processid, @@ -441,7 +441,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { "nodeid": v.nodeid, }).Log("Removing process") case processOpMove: - err := c.proxy.ProcessAdd(v.toNodeid, v.config, v.metadata) + err := c.proxy.AddProcess(v.toNodeid, v.config, v.metadata) if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.config.ID, @@ -451,7 +451,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { break } - err = c.proxy.ProcessDelete(v.fromNodeid, v.config.ProcessID()) + err = c.proxy.DeleteProcess(v.fromNodeid, v.config.ProcessID()) if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.config.ID, @@ -461,7 +461,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { break } - err = c.proxy.ProcessStart(v.toNodeid, v.config.ProcessID()) + err = c.proxy.StartProcess(v.toNodeid, v.config.ProcessID()) if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.config.ID, @@ -477,7 +477,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { "tonodeid": v.toNodeid, }).Log("Moving process") case processOpStart: - err := c.proxy.ProcessStart(v.nodeid, v.processid) + err := c.proxy.StartProcess(v.nodeid, v.processid) if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ "processid": v.processid, @@ -505,7 +505,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { func (c *cluster) doSynchronize(emergency bool) { wish := c.store.GetProcessNodeMap() - want := c.store.ProcessList() + want := c.store.ListProcesses() have := c.proxy.ListProcesses() nodes := c.proxy.ListNodes() diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 4c4449ad..2c492708 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -28,11 +28,11 @@ type Node interface { GetURL(prefix, path string) (*url.URL, error) GetFile(prefix, path string) (io.ReadCloser, error) - ProcessAdd(config *app.Config, metadata map[string]interface{}) error - ProcessStart(id app.ProcessID) error - ProcessStop(id app.ProcessID) error - ProcessDelete(id app.ProcessID) error - ProcessUpdate(id app.ProcessID, config *app.Config, metadata map[string]interface{}) error + AddProcess(config *app.Config, metadata map[string]interface{}) error + StartProcess(id app.ProcessID) error + StopProcess(id app.ProcessID) error + DeleteProcess(id app.ProcessID) error + UpdateProcess(id app.ProcessID, config *app.Config, metadata map[string]interface{}) error NodeReader } @@ -855,7 +855,7 @@ func (n *node) ProcessList() ([]Process, error) { return processes, nil } -func (n *node) ProcessAdd(config *app.Config, metadata map[string]interface{}) error { +func (n *node) AddProcess(config *app.Config, metadata map[string]interface{}) error { n.peerLock.RLock() defer n.peerLock.RUnlock() @@ -924,7 +924,7 @@ func convertConfig(config *app.Config, metadata map[string]interface{}) clientap return cfg } -func (n *node) ProcessStart(id app.ProcessID) error { +func (n *node) StartProcess(id app.ProcessID) error { n.peerLock.RLock() defer n.peerLock.RUnlock() @@ -935,7 +935,7 @@ func (n *node) ProcessStart(id app.ProcessID) error { return n.peer.ProcessCommand(client.NewProcessID(id.ID, id.Domain), "start") } -func (n *node) ProcessStop(id app.ProcessID) error { +func (n *node) StopProcess(id app.ProcessID) error { n.peerLock.RLock() defer n.peerLock.RUnlock() @@ -946,7 +946,7 @@ func (n *node) ProcessStop(id app.ProcessID) error { return n.peer.ProcessCommand(client.NewProcessID(id.ID, id.Domain), "stop") } -func (n *node) ProcessDelete(id app.ProcessID) error { +func (n *node) DeleteProcess(id app.ProcessID) error { n.peerLock.RLock() defer n.peerLock.RUnlock() @@ -957,7 +957,7 @@ func (n *node) ProcessDelete(id app.ProcessID) error { return n.peer.ProcessDelete(client.NewProcessID(id.ID, id.Domain)) } -func (n *node) ProcessUpdate(id app.ProcessID, config *app.Config, metadata map[string]interface{}) error { +func (n *node) UpdateProcess(id app.ProcessID, config *app.Config, metadata map[string]interface{}) error { n.peerLock.RLock() defer n.peerLock.RUnlock() diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index 8036e44e..29ef4b67 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -24,10 +24,10 @@ type Proxy interface { ProxyReader Reader() ProxyReader - ProcessAdd(nodeid string, config *app.Config, metadata map[string]interface{}) error - ProcessDelete(nodeid string, id app.ProcessID) error - ProcessStart(nodeid string, id app.ProcessID) error - ProcessUpdate(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]interface{}) error + AddProcess(nodeid string, config *app.Config, metadata map[string]interface{}) error + DeleteProcess(nodeid string, id app.ProcessID) error + StartProcess(nodeid string, id app.ProcessID) error + UpdateProcess(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]interface{}) error } type ProxyReader interface { @@ -480,7 +480,7 @@ func (p *proxy) ListProcesses() []Process { return processList } -func (p *proxy) ProcessAdd(nodeid string, config *app.Config, metadata map[string]interface{}) error { +func (p *proxy) AddProcess(nodeid string, config *app.Config, metadata map[string]interface{}) error { p.lock.RLock() defer p.lock.RUnlock() @@ -489,12 +489,12 @@ func (p *proxy) ProcessAdd(nodeid string, config *app.Config, metadata map[strin return fmt.Errorf("node not found") } - err := node.ProcessAdd(config, metadata) + err := node.AddProcess(config, metadata) if err != nil { return err } - err = node.ProcessStart(config.ProcessID()) + err = node.StartProcess(config.ProcessID()) if err != nil { return err } @@ -502,7 +502,7 @@ func (p *proxy) ProcessAdd(nodeid string, config *app.Config, metadata map[strin return nil } -func (p *proxy) ProcessDelete(nodeid string, id app.ProcessID) error { +func (p *proxy) DeleteProcess(nodeid string, id app.ProcessID) error { p.lock.RLock() defer p.lock.RUnlock() @@ -511,7 +511,7 @@ func (p *proxy) ProcessDelete(nodeid string, id app.ProcessID) error { return fmt.Errorf("node not found") } - err := node.ProcessDelete(id) + err := node.DeleteProcess(id) if err != nil { return err } @@ -519,7 +519,7 @@ func (p *proxy) ProcessDelete(nodeid string, id app.ProcessID) error { return nil } -func (p *proxy) ProcessStart(nodeid string, id app.ProcessID) error { +func (p *proxy) StartProcess(nodeid string, id app.ProcessID) error { p.lock.RLock() defer p.lock.RUnlock() @@ -528,7 +528,7 @@ func (p *proxy) ProcessStart(nodeid string, id app.ProcessID) error { return fmt.Errorf("node not found") } - err := node.ProcessStart(id) + err := node.StartProcess(id) if err != nil { return err } @@ -536,7 +536,7 @@ func (p *proxy) ProcessStart(nodeid string, id app.ProcessID) error { return nil } -func (p *proxy) ProcessUpdate(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]interface{}) error { +func (p *proxy) UpdateProcess(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]interface{}) error { p.lock.RLock() defer p.lock.RUnlock() @@ -545,5 +545,5 @@ func (p *proxy) ProcessUpdate(nodeid string, id app.ProcessID, config *app.Confi return fmt.Errorf("node not found") } - return node.ProcessUpdate(id, config, metadata) + return node.UpdateProcess(id, config, metadata) } diff --git a/cluster/store/store.go b/cluster/store/store.go index 9b93ed36..7d0bb801 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -20,14 +20,14 @@ type Store interface { OnApply(func(op Operation)) - ProcessList() []Process + ListProcesses() []Process GetProcess(id app.ProcessID) (Process, error) GetProcessNodeMap() map[string]string - UserList() Users + ListUsers() Users GetUser(name string) Users - PolicyList() Policies - PolicyUserList(name string) Policies + ListPolicies() Policies + ListUserPolicies(name string) Policies } type Process struct { @@ -549,7 +549,7 @@ func (s *store) Restore(snapshot io.ReadCloser) error { return nil } -func (s *store) ProcessList() []Process { +func (s *store) ListProcesses() []Process { s.lock.RLock() defer s.lock.RUnlock() @@ -584,7 +584,7 @@ func (s *store) GetProcess(id app.ProcessID) (Process, error) { }, nil } -func (s *store) UserList() Users { +func (s *store) ListUsers() Users { s.lock.RLock() defer s.lock.RUnlock() @@ -614,7 +614,7 @@ func (s *store) GetUser(name string) Users { return u } -func (s *store) PolicyList() Policies { +func (s *store) ListPolicies() Policies { s.lock.RLock() defer s.lock.RUnlock() @@ -629,7 +629,7 @@ func (s *store) PolicyList() Policies { return p } -func (s *store) PolicyUserList(name string) Policies { +func (s *store) ListUserPolicies(name string) Policies { s.lock.RLock() defer s.lock.RUnlock()