core/cluster/store/store.go
2023-06-22 21:43:51 +02:00

938 lines
17 KiB
Go

package store
import (
"encoding/json"
"fmt"
"io"
"io/fs"
"strings"
"sync"
"time"
"github.com/datarhei/core/v16/iam/access"
"github.com/datarhei/core/v16/iam/identity"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/restream/app"
"github.com/hashicorp/raft"
)
type Store interface {
raft.FSM
OnApply(func(op Operation))
ListProcesses() []Process
GetProcess(id app.ProcessID) (Process, error)
GetProcessNodeMap() map[string]string
ListUsers() Users
GetUser(name string) Users
ListPolicies() Policies
ListUserPolicies(name string) Policies
ListLocks() map[string]time.Time
ListKVS(prefix string) map[string]Value
GetFromKVS(key string) (Value, error)
}
type Process struct {
CreatedAt time.Time
UpdatedAt time.Time
Config *app.Config
Order string
Metadata map[string]interface{}
}
type Users struct {
UpdatedAt time.Time
Users []identity.User
}
type Policies struct {
UpdatedAt time.Time
Policies []access.Policy
}
type Value struct {
Value string
UpdatedAt time.Time
}
type Operation string
const (
OpAddProcess Operation = "addProcess"
OpRemoveProcess Operation = "removeProcess"
OpUpdateProcess Operation = "updateProcess"
OpSetProcessOrder Operation = "setProcessOrder"
OpSetProcessMetadata Operation = "setProcessMetadata"
OpAddIdentity Operation = "addIdentity"
OpUpdateIdentity Operation = "updateIdentity"
OpRemoveIdentity Operation = "removeIdentity"
OpSetPolicies Operation = "setPolicies"
OpSetProcessNodeMap Operation = "setProcessNodeMap"
OpCreateLock Operation = "createLock"
OpDeleteLock Operation = "deleteLock"
OpClearLocks Operation = "clearLocks"
OpSetKV Operation = "setKV"
OpUnsetKV Operation = "unsetKV"
)
type Command struct {
Operation Operation
Data interface{}
}
type CommandAddProcess struct {
Config *app.Config
}
type CommandUpdateProcess struct {
ID app.ProcessID
Config *app.Config
}
type CommandRemoveProcess struct {
ID app.ProcessID
}
type CommandSetProcessOrder struct {
ID app.ProcessID
Order string
}
type CommandSetProcessMetadata struct {
ID app.ProcessID
Key string
Data interface{}
}
type CommandAddIdentity struct {
Identity identity.User
}
type CommandUpdateIdentity struct {
Name string
Identity identity.User
}
type CommandRemoveIdentity struct {
Name string
}
type CommandSetPolicies struct {
Name string
Policies []access.Policy
}
type CommandSetProcessNodeMap struct {
Map map[string]string
}
type CommandCreateLock struct {
Name string
ValidUntil time.Time
}
type CommandDeleteLock struct {
Name string
}
type CommandClearLocks struct{}
type CommandSetKV struct {
Key string
Value string
}
type CommandUnsetKV struct {
Key string
}
type storeData struct {
Version uint64
Process map[string]Process
ProcessNodeMap map[string]string
Users struct {
UpdatedAt time.Time
Users map[string]identity.User
}
Policies struct {
UpdatedAt time.Time
Policies map[string][]access.Policy
}
Locks map[string]time.Time
KVS map[string]Value
}
func (s *storeData) init() {
now := time.Now()
s.Version = 1
s.Process = map[string]Process{}
s.ProcessNodeMap = map[string]string{}
s.Users.UpdatedAt = now
s.Users.Users = map[string]identity.User{}
s.Policies.UpdatedAt = now
s.Policies.Policies = map[string][]access.Policy{}
s.Locks = map[string]time.Time{}
s.KVS = map[string]Value{}
}
// store implements a raft.FSM
type store struct {
lock sync.RWMutex
callback func(op Operation)
logger log.Logger
data storeData
}
type Config struct {
Logger log.Logger
}
func NewStore(config Config) (Store, error) {
s := &store{
logger: config.Logger,
}
if s.logger == nil {
s.logger = log.New("")
}
s.data.init()
return s, nil
}
func (s *store) Apply(entry *raft.Log) interface{} {
if entry.Type != raft.LogCommand {
return nil
}
logger := s.logger.WithFields(log.Fields{
"index": entry.Index,
"term": entry.Term,
})
logger.Debug().WithField("data", string(entry.Data)).Log("New entry")
c := Command{}
err := json.Unmarshal(entry.Data, &c)
if err != nil {
logger.Error().WithError(err).Log("Invalid entry")
return fmt.Errorf("invalid log entry, index: %d, term: %d", entry.Index, entry.Term)
}
logger.Debug().WithField("operation", c.Operation).Log("")
err = s.applyCommand(c)
if err != nil {
logger.Debug().WithError(err).WithField("operation", c.Operation).Log("")
return err
}
s.lock.RLock()
if s.callback != nil {
s.callback(c.Operation)
}
s.lock.RUnlock()
return nil
}
func decodeCommand[T any](cmd T, data any) error {
b, err := json.Marshal(data)
if err != nil {
return err
}
err = json.Unmarshal(b, cmd)
return err
}
func (s *store) applyCommand(c Command) error {
var err error = nil
switch c.Operation {
case OpAddProcess:
cmd := CommandAddProcess{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.addProcess(cmd)
case OpRemoveProcess:
cmd := CommandRemoveProcess{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.removeProcess(cmd)
case OpUpdateProcess:
cmd := CommandUpdateProcess{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.updateProcess(cmd)
case OpSetProcessOrder:
cmd := CommandSetProcessOrder{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.setProcessOrder(cmd)
case OpSetProcessMetadata:
cmd := CommandSetProcessMetadata{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.setProcessMetadata(cmd)
case OpAddIdentity:
cmd := CommandAddIdentity{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.addIdentity(cmd)
case OpUpdateIdentity:
cmd := CommandUpdateIdentity{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.updateIdentity(cmd)
case OpRemoveIdentity:
cmd := CommandRemoveIdentity{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.removeIdentity(cmd)
case OpSetPolicies:
cmd := CommandSetPolicies{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.setPolicies(cmd)
case OpSetProcessNodeMap:
cmd := CommandSetProcessNodeMap{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.setProcessNodeMap(cmd)
case OpCreateLock:
cmd := CommandCreateLock{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.createLock(cmd)
case OpDeleteLock:
cmd := CommandDeleteLock{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.deleteLock(cmd)
case OpClearLocks:
cmd := CommandClearLocks{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.clearLocks(cmd)
case OpSetKV:
cmd := CommandSetKV{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.setKV(cmd)
case OpUnsetKV:
cmd := CommandUnsetKV{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.unsetKV(cmd)
default:
s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation")
err = fmt.Errorf("unknown operation: %s", c.Operation)
}
return err
}
func (s *store) addProcess(cmd CommandAddProcess) error {
s.lock.Lock()
defer s.lock.Unlock()
id := cmd.Config.ProcessID().String()
if cmd.Config.LimitCPU <= 0 || cmd.Config.LimitMemory <= 0 {
return fmt.Errorf("the process with the ID '%s' must have limits defined", id)
}
_, ok := s.data.Process[id]
if ok {
return fmt.Errorf("the process with the ID '%s' already exists", id)
}
order := "stop"
if cmd.Config.Autostart {
order = "start"
cmd.Config.Autostart = false
}
now := time.Now()
s.data.Process[id] = Process{
CreatedAt: now,
UpdatedAt: now,
Config: cmd.Config,
Order: order,
Metadata: map[string]interface{}{},
}
return nil
}
func (s *store) removeProcess(cmd CommandRemoveProcess) error {
s.lock.Lock()
defer s.lock.Unlock()
id := cmd.ID.String()
_, ok := s.data.Process[id]
if !ok {
return fmt.Errorf("the process with the ID '%s' doesn't exist", id)
}
delete(s.data.Process, id)
return nil
}
func (s *store) updateProcess(cmd CommandUpdateProcess) error {
s.lock.Lock()
defer s.lock.Unlock()
srcid := cmd.ID.String()
dstid := cmd.Config.ProcessID().String()
if cmd.Config.LimitCPU <= 0 || cmd.Config.LimitMemory <= 0 {
return fmt.Errorf("the process with the ID '%s' must have limits defined", dstid)
}
p, ok := s.data.Process[srcid]
if !ok {
return fmt.Errorf("the process with the ID '%s' doesn't exists", srcid)
}
if p.Config.Equal(cmd.Config) {
return nil
}
if srcid == dstid {
p.UpdatedAt = time.Now()
p.Config = cmd.Config
s.data.Process[srcid] = p
return nil
}
_, ok = s.data.Process[dstid]
if ok {
return fmt.Errorf("the process with the ID '%s' already exists", dstid)
}
now := time.Now()
p.CreatedAt = now
p.UpdatedAt = now
p.Config = cmd.Config
delete(s.data.Process, srcid)
s.data.Process[dstid] = p
return nil
}
func (s *store) setProcessOrder(cmd CommandSetProcessOrder) error {
s.lock.Lock()
defer s.lock.Unlock()
id := cmd.ID.String()
p, ok := s.data.Process[id]
if !ok {
return fmt.Errorf("the process with the ID '%s' doesn't exists", cmd.ID)
}
p.Order = cmd.Order
p.UpdatedAt = time.Now()
s.data.Process[id] = p
return nil
}
func (s *store) setProcessMetadata(cmd CommandSetProcessMetadata) error {
s.lock.Lock()
defer s.lock.Unlock()
id := cmd.ID.String()
p, ok := s.data.Process[id]
if !ok {
return fmt.Errorf("the process with the ID '%s' doesn't exists", cmd.ID)
}
if p.Metadata == nil {
p.Metadata = map[string]interface{}{}
}
if cmd.Data == nil {
delete(p.Metadata, cmd.Key)
} else {
p.Metadata[cmd.Key] = cmd.Data
}
p.UpdatedAt = time.Now()
s.data.Process[id] = p
return nil
}
func (s *store) addIdentity(cmd CommandAddIdentity) error {
s.lock.Lock()
defer s.lock.Unlock()
if cmd.Identity.Name == "$anon" {
return fmt.Errorf("the identity with the name '%s' can't be created", cmd.Identity.Name)
}
_, ok := s.data.Users.Users[cmd.Identity.Name]
if ok {
return fmt.Errorf("the identity with the name '%s' already exists", cmd.Identity.Name)
}
s.data.Users.UpdatedAt = time.Now()
s.data.Users.Users[cmd.Identity.Name] = cmd.Identity
return nil
}
func (s *store) updateIdentity(cmd CommandUpdateIdentity) error {
s.lock.Lock()
defer s.lock.Unlock()
if cmd.Name == "$anon" {
return fmt.Errorf("the identity with the name '%s' can't be updated", cmd.Name)
}
_, ok := s.data.Users.Users[cmd.Name]
if !ok {
return fmt.Errorf("the identity with the name '%s' doesn't exist", cmd.Name)
}
if cmd.Name == cmd.Identity.Name {
s.data.Users.UpdatedAt = time.Now()
s.data.Users.Users[cmd.Identity.Name] = cmd.Identity
return nil
}
_, ok = s.data.Users.Users[cmd.Identity.Name]
if ok {
return fmt.Errorf("the identity with the name '%s' already exists", cmd.Identity.Name)
}
now := time.Now()
s.data.Users.UpdatedAt = now
s.data.Users.Users[cmd.Identity.Name] = cmd.Identity
s.data.Policies.UpdatedAt = now
s.data.Policies.Policies[cmd.Identity.Name] = s.data.Policies.Policies[cmd.Name]
delete(s.data.Users.Users, cmd.Name)
delete(s.data.Policies.Policies, cmd.Name)
return nil
}
func (s *store) removeIdentity(cmd CommandRemoveIdentity) error {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.data.Users.Users, cmd.Name)
s.data.Users.UpdatedAt = time.Now()
delete(s.data.Policies.Policies, cmd.Name)
s.data.Policies.UpdatedAt = time.Now()
return nil
}
func (s *store) setPolicies(cmd CommandSetPolicies) error {
s.lock.Lock()
defer s.lock.Unlock()
if cmd.Name != "$anon" {
if _, ok := s.data.Users.Users[cmd.Name]; !ok {
return fmt.Errorf("the identity with the name '%s' doesn't exist", cmd.Name)
}
}
for i, p := range cmd.Policies {
if len(p.Domain) != 0 {
continue
}
p.Domain = "$none"
cmd.Policies[i] = p
}
delete(s.data.Policies.Policies, cmd.Name)
s.data.Policies.Policies[cmd.Name] = cmd.Policies
s.data.Policies.UpdatedAt = time.Now()
return nil
}
func (s *store) setProcessNodeMap(cmd CommandSetProcessNodeMap) error {
s.lock.Lock()
defer s.lock.Unlock()
s.data.ProcessNodeMap = cmd.Map
return nil
}
func (s *store) createLock(cmd CommandCreateLock) error {
s.lock.Lock()
defer s.lock.Unlock()
validUntil, ok := s.data.Locks[cmd.Name]
if ok {
if time.Now().Before(validUntil) {
return fmt.Errorf("the lock with the ID '%s' already exists", cmd.Name)
}
}
s.data.Locks[cmd.Name] = cmd.ValidUntil
return nil
}
func (s *store) deleteLock(cmd CommandDeleteLock) error {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.data.Locks[cmd.Name]; !ok {
return nil
}
delete(s.data.Locks, cmd.Name)
return nil
}
func (s *store) clearLocks(cmd CommandClearLocks) error {
s.lock.Lock()
defer s.lock.Unlock()
for name, validUntil := range s.data.Locks {
if time.Now().Before(validUntil) {
// Lock is still valid
continue
}
delete(s.data.Locks, name)
}
return nil
}
func (s *store) setKV(cmd CommandSetKV) error {
s.lock.Lock()
defer s.lock.Unlock()
value := s.data.KVS[cmd.Key]
value.Value = cmd.Value
value.UpdatedAt = time.Now()
s.data.KVS[cmd.Key] = value
return nil
}
func (s *store) unsetKV(cmd CommandUnsetKV) error {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.data.KVS[cmd.Key]; !ok {
return fs.ErrNotExist
}
delete(s.data.KVS, cmd.Key)
return nil
}
func (s *store) OnApply(fn func(op Operation)) {
s.lock.Lock()
defer s.lock.Unlock()
s.callback = fn
}
func (s *store) Snapshot() (raft.FSMSnapshot, error) {
s.logger.Debug().Log("Snapshot request")
s.lock.RLock()
defer s.lock.RUnlock()
data, err := json.Marshal(&s.data)
if err != nil {
return nil, err
}
return &fsmSnapshot{
data: data,
}, nil
}
func (s *store) Restore(snapshot io.ReadCloser) error {
s.logger.Debug().Log("Snapshot restore")
defer snapshot.Close()
s.lock.Lock()
defer s.lock.Unlock()
data := storeData{}
data.init()
dec := json.NewDecoder(snapshot)
if err := dec.Decode(&data); err != nil {
return err
}
for id, p := range data.Process {
if p.Metadata != nil {
continue
}
p.Metadata = map[string]interface{}{}
data.Process[id] = p
}
if data.Version == 0 {
data.Version = 1
}
s.data = data
return nil
}
func (s *store) ListProcesses() []Process {
s.lock.RLock()
defer s.lock.RUnlock()
processes := []Process{}
for _, p := range s.data.Process {
processes = append(processes, Process{
CreatedAt: p.CreatedAt,
UpdatedAt: p.UpdatedAt,
Config: p.Config.Clone(),
Order: p.Order,
Metadata: p.Metadata,
})
}
return processes
}
func (s *store) GetProcess(id app.ProcessID) (Process, error) {
s.lock.RLock()
defer s.lock.RUnlock()
process, ok := s.data.Process[id.String()]
if !ok {
return Process{}, fmt.Errorf("not found")
}
return Process{
CreatedAt: process.CreatedAt,
UpdatedAt: process.UpdatedAt,
Config: process.Config.Clone(),
Metadata: process.Metadata,
}, nil
}
func (s *store) ListUsers() Users {
s.lock.RLock()
defer s.lock.RUnlock()
u := Users{
UpdatedAt: s.data.Users.UpdatedAt,
}
for _, user := range s.data.Users.Users {
u.Users = append(u.Users, user)
}
return u
}
func (s *store) GetUser(name string) Users {
s.lock.RLock()
defer s.lock.RUnlock()
u := Users{
UpdatedAt: s.data.Users.UpdatedAt,
}
if user, ok := s.data.Users.Users[name]; ok {
u.Users = append(u.Users, user)
}
return u
}
func (s *store) ListPolicies() Policies {
s.lock.RLock()
defer s.lock.RUnlock()
p := Policies{
UpdatedAt: s.data.Policies.UpdatedAt,
}
for _, policies := range s.data.Policies.Policies {
p.Policies = append(p.Policies, policies...)
}
return p
}
func (s *store) ListUserPolicies(name string) Policies {
s.lock.RLock()
defer s.lock.RUnlock()
p := Policies{
UpdatedAt: s.data.Policies.UpdatedAt,
}
p.Policies = append(p.Policies, s.data.Policies.Policies[name]...)
return p
}
func (s *store) GetProcessNodeMap() map[string]string {
s.lock.RLock()
defer s.lock.RUnlock()
m := map[string]string{}
for key, value := range s.data.ProcessNodeMap {
m[key] = value
}
return m
}
func (s *store) ListLocks() map[string]time.Time {
s.lock.RLock()
defer s.lock.RUnlock()
m := map[string]time.Time{}
for key, value := range s.data.Locks {
m[key] = value
}
return m
}
func (s *store) ListKVS(prefix string) map[string]Value {
s.lock.RLock()
defer s.lock.RUnlock()
m := map[string]Value{}
for key, value := range s.data.KVS {
if !strings.HasPrefix(key, prefix) {
continue
}
m[key] = value
}
return m
}
func (s *store) GetFromKVS(key string) (Value, error) {
s.lock.RLock()
defer s.lock.RUnlock()
value, ok := s.data.KVS[key]
if !ok {
return Value{}, fs.ErrNotExist
}
return value, nil
}
type fsmSnapshot struct {
data []byte
}
func (s *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
if _, err := sink.Write(s.data); err != nil {
sink.Cancel()
return err
}
sink.Close()
return nil
}
func (s *fsmSnapshot) Release() {
s.data = nil
}