Add leader task to clear the expired locks

This commit is contained in:
Ingo Oppermann 2023-06-22 14:12:02 +02:00
parent dc3e7afc52
commit a4d59a04b5
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
3 changed files with 135 additions and 4 deletions

View File

@ -292,7 +292,8 @@ func (c *cluster) establishLeadership(ctx context.Context, emergency bool) error
ctx, cancel := context.WithCancel(ctx)
c.cancelLeaderShip = cancel
go c.startSynchronizeAndRebalance(ctx, c.syncInterval, emergency)
go c.synchronizeAndRebalance(ctx, c.syncInterval, emergency)
go c.clearLocks(ctx, time.Minute)
return nil
}
@ -300,10 +301,13 @@ func (c *cluster) establishLeadership(ctx context.Context, emergency bool) error
func (c *cluster) revokeLeadership() {
c.logger.Debug().Log("Revoking leadership")
c.cancelLeaderShip()
if c.cancelLeaderShip != nil {
c.cancelLeaderShip()
c.cancelLeaderShip = nil
}
}
// startSynchronizeAndRebalance synchronizes and rebalances the processes in a given interval. Synchronizing
// synchronizeAndRebalance synchronizes and rebalances the processes in a given interval. Synchronizing
// takes care that all processes in the cluster DB are running on one node. It writes the process->node mapping
// to the cluster DB such that when a new leader gets elected it knows where which process should be running.
// This is checked against the actual state. If a node is not reachable, the leader still knows which processes
@ -324,7 +328,7 @@ func (c *cluster) revokeLeadership() {
//
// The goal of synchronizing and rebalancing is to make as little moves as possible and to be tolerant for
// a while if a node is not reachable.
func (c *cluster) startSynchronizeAndRebalance(ctx context.Context, interval time.Duration, emergency bool) {
func (c *cluster) synchronizeAndRebalance(ctx context.Context, interval time.Duration, emergency bool) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
@ -350,6 +354,38 @@ func (c *cluster) startSynchronizeAndRebalance(ctx context.Context, interval tim
}
}
func (c *cluster) clearLocks(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
locks := c.ListLocks()
hasExpiredLocks := false
for _, validUntil := range locks {
if time.Now().Before(validUntil) {
continue
}
hasExpiredLocks = true
break
}
if hasExpiredLocks {
c.logger.Debug().Log("Clearing locks")
c.applyCommand(&store.Command{
Operation: store.OpClearLocks,
Data: &store.CommandClearLocks{},
})
}
}
}
}
var errNotEnoughResources = errors.New("no node with enough resources is available")
var errNotEnoughResourcesForRebalancing = errors.New("no other node to move the process to is available")
var errNoLimitsDefined = errors.New("no process limits are defined")

View File

@ -63,6 +63,7 @@ const (
OpSetProcessNodeMap Operation = "setProcessNodeMap"
OpCreateLock Operation = "createLock"
OpDeleteLock Operation = "deleteLock"
OpClearLocks Operation = "clearLocks"
)
type Command struct {
@ -120,6 +121,8 @@ type CommandDeleteLock struct {
Name string
}
type CommandClearLocks struct{}
type storeData struct {
Version uint64
Process map[string]Process
@ -354,6 +357,18 @@ func (s *store) applyCommand(c Command) error {
}
err = s.deleteLock(cmd)
case OpClearLocks:
b, err = json.Marshal(c.Data)
if err != nil {
break
}
cmd := CommandClearLocks{}
err = json.Unmarshal(b, &cmd)
if err != nil {
break
}
err = s.clearLocks(cmd)
default:
s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation")
err = fmt.Errorf("unknown operation: %s", c.Operation)
@ -608,6 +623,22 @@ func (s *store) deleteLock(cmd CommandDeleteLock) error {
return nil
}
func (s *store) clearLocks(cmd CommandClearLocks) error {
s.lock.Lock()
defer s.lock.Unlock()
for name, validUntil := range s.data.Locks {
if time.Now().Before(validUntil) {
// Lock is still valid
continue
}
delete(s.data.Locks, name)
}
return nil
}
func (s *store) OnApply(fn func(op Operation)) {
s.lock.Lock()
defer s.lock.Unlock()

View File

@ -950,6 +950,70 @@ func TestDeleteLock(t *testing.T) {
require.NoError(t, err)
}
func TestClearLocksCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
err = s.applyCommand(Command{
Operation: OpCreateLock,
Data: CommandCreateLock{
Name: "foobar",
ValidUntil: time.Now().Add(3 * time.Second),
},
})
require.NoError(t, err)
_, ok := s.data.Locks["foobar"]
require.True(t, ok)
err = s.applyCommand(Command{
Operation: OpClearLocks,
Data: CommandClearLocks{},
})
require.NoError(t, err)
_, ok = s.data.Locks["foobar"]
require.True(t, ok)
time.Sleep(3 * time.Second)
err = s.applyCommand(Command{
Operation: OpClearLocks,
Data: CommandClearLocks{},
})
require.NoError(t, err)
_, ok = s.data.Locks["foobar"]
require.False(t, ok)
}
func TestClearLocks(t *testing.T) {
s, err := createStore()
require.NoError(t, err)
cmd := CommandCreateLock{
Name: "foobar",
ValidUntil: time.Now().Add(3 * time.Second),
}
err = s.createLock(cmd)
require.NoError(t, err)
err = s.clearLocks(CommandClearLocks{})
require.NoError(t, err)
err = s.createLock(cmd)
require.Error(t, err)
time.Sleep(3 * time.Second)
err = s.clearLocks(CommandClearLocks{})
require.NoError(t, err)
err = s.createLock(cmd)
require.NoError(t, err)
}
func TestApplyCommand(t *testing.T) {
s, err := createStore()
require.NoError(t, err)