diff --git a/cluster/leader.go b/cluster/leader.go index f252420a..e53a873c 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -292,7 +292,8 @@ func (c *cluster) establishLeadership(ctx context.Context, emergency bool) error ctx, cancel := context.WithCancel(ctx) c.cancelLeaderShip = cancel - go c.startSynchronizeAndRebalance(ctx, c.syncInterval, emergency) + go c.synchronizeAndRebalance(ctx, c.syncInterval, emergency) + go c.clearLocks(ctx, time.Minute) return nil } @@ -300,10 +301,13 @@ func (c *cluster) establishLeadership(ctx context.Context, emergency bool) error func (c *cluster) revokeLeadership() { c.logger.Debug().Log("Revoking leadership") - c.cancelLeaderShip() + if c.cancelLeaderShip != nil { + c.cancelLeaderShip() + c.cancelLeaderShip = nil + } } -// startSynchronizeAndRebalance synchronizes and rebalances the processes in a given interval. Synchronizing +// synchronizeAndRebalance synchronizes and rebalances the processes in a given interval. Synchronizing // takes care that all processes in the cluster DB are running on one node. It writes the process->node mapping // to the cluster DB such that when a new leader gets elected it knows where which process should be running. // This is checked against the actual state. If a node is not reachable, the leader still knows which processes @@ -324,7 +328,7 @@ func (c *cluster) revokeLeadership() { // // The goal of synchronizing and rebalancing is to make as little moves as possible and to be tolerant for // a while if a node is not reachable. -func (c *cluster) startSynchronizeAndRebalance(ctx context.Context, interval time.Duration, emergency bool) { +func (c *cluster) synchronizeAndRebalance(ctx context.Context, interval time.Duration, emergency bool) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -350,6 +354,38 @@ func (c *cluster) startSynchronizeAndRebalance(ctx context.Context, interval tim } } +func (c *cluster) clearLocks(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + locks := c.ListLocks() + hasExpiredLocks := false + + for _, validUntil := range locks { + if time.Now().Before(validUntil) { + continue + } + + hasExpiredLocks = true + break + } + + if hasExpiredLocks { + c.logger.Debug().Log("Clearing locks") + c.applyCommand(&store.Command{ + Operation: store.OpClearLocks, + Data: &store.CommandClearLocks{}, + }) + } + } + } +} + var errNotEnoughResources = errors.New("no node with enough resources is available") var errNotEnoughResourcesForRebalancing = errors.New("no other node to move the process to is available") var errNoLimitsDefined = errors.New("no process limits are defined") diff --git a/cluster/store/store.go b/cluster/store/store.go index 47201ab8..57e28173 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -63,6 +63,7 @@ const ( OpSetProcessNodeMap Operation = "setProcessNodeMap" OpCreateLock Operation = "createLock" OpDeleteLock Operation = "deleteLock" + OpClearLocks Operation = "clearLocks" ) type Command struct { @@ -120,6 +121,8 @@ type CommandDeleteLock struct { Name string } +type CommandClearLocks struct{} + type storeData struct { Version uint64 Process map[string]Process @@ -354,6 +357,18 @@ func (s *store) applyCommand(c Command) error { } err = s.deleteLock(cmd) + case OpClearLocks: + b, err = json.Marshal(c.Data) + if err != nil { + break + } + cmd := CommandClearLocks{} + err = json.Unmarshal(b, &cmd) + if err != nil { + break + } + + err = s.clearLocks(cmd) default: s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation") err = fmt.Errorf("unknown operation: %s", c.Operation) @@ -608,6 +623,22 @@ func (s *store) deleteLock(cmd CommandDeleteLock) error { return nil } +func (s *store) clearLocks(cmd CommandClearLocks) error { + s.lock.Lock() + defer s.lock.Unlock() + + for name, validUntil := range s.data.Locks { + if time.Now().Before(validUntil) { + // Lock is still valid + continue + } + + delete(s.data.Locks, name) + } + + return nil +} + func (s *store) OnApply(fn func(op Operation)) { s.lock.Lock() defer s.lock.Unlock() diff --git a/cluster/store/store_test.go b/cluster/store/store_test.go index 3a0993b7..c08aa9b7 100644 --- a/cluster/store/store_test.go +++ b/cluster/store/store_test.go @@ -950,6 +950,70 @@ func TestDeleteLock(t *testing.T) { require.NoError(t, err) } +func TestClearLocksCommand(t *testing.T) { + s, err := createStore() + require.NoError(t, err) + + err = s.applyCommand(Command{ + Operation: OpCreateLock, + Data: CommandCreateLock{ + Name: "foobar", + ValidUntil: time.Now().Add(3 * time.Second), + }, + }) + require.NoError(t, err) + + _, ok := s.data.Locks["foobar"] + require.True(t, ok) + + err = s.applyCommand(Command{ + Operation: OpClearLocks, + Data: CommandClearLocks{}, + }) + require.NoError(t, err) + + _, ok = s.data.Locks["foobar"] + require.True(t, ok) + + time.Sleep(3 * time.Second) + + err = s.applyCommand(Command{ + Operation: OpClearLocks, + Data: CommandClearLocks{}, + }) + require.NoError(t, err) + + _, ok = s.data.Locks["foobar"] + require.False(t, ok) +} + +func TestClearLocks(t *testing.T) { + s, err := createStore() + require.NoError(t, err) + + cmd := CommandCreateLock{ + Name: "foobar", + ValidUntil: time.Now().Add(3 * time.Second), + } + + err = s.createLock(cmd) + require.NoError(t, err) + + err = s.clearLocks(CommandClearLocks{}) + require.NoError(t, err) + + err = s.createLock(cmd) + require.Error(t, err) + + time.Sleep(3 * time.Second) + + err = s.clearLocks(CommandClearLocks{}) + require.NoError(t, err) + + err = s.createLock(cmd) + require.NoError(t, err) +} + func TestApplyCommand(t *testing.T) { s, err := createStore() require.NoError(t, err)