Allow to add and remove identites
This commit is contained in:
parent
56e03308c2
commit
1974442814
@ -239,6 +239,34 @@ func NewAPI(config APIConfig) (API, error) {
|
||||
return c.JSON(http.StatusOK, "OK")
|
||||
})
|
||||
|
||||
a.router.POST("/v1/iam/user/:name", func(c echo.Context) error {
|
||||
name := util.PathParam(c, "name")
|
||||
|
||||
r := client.RemoveIdentityRequest{}
|
||||
|
||||
if err := util.ShouldBindJSON(c, &r); err != nil {
|
||||
return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
|
||||
}
|
||||
|
||||
if r.Origin == a.id {
|
||||
return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit")
|
||||
}
|
||||
|
||||
if name != r.Name {
|
||||
return httpapi.Err(http.StatusBadRequest, "Invalid data", "the name in the path and the request do not match")
|
||||
}
|
||||
|
||||
a.logger.Debug().WithField("identity", r.Name).Log("Remove identity request")
|
||||
|
||||
err := a.cluster.RemoveIdentity(r.Origin, r.Name)
|
||||
if err != nil {
|
||||
a.logger.Debug().WithError(err).WithField("identity", r.Name).Log("Unable to remove identity")
|
||||
return httpapi.Err(http.StatusInternalServerError, "unable to remove identity", "%s", err)
|
||||
}
|
||||
|
||||
return c.JSON(http.StatusOK, "OK")
|
||||
})
|
||||
|
||||
a.router.GET("/v1/core", func(c echo.Context) error {
|
||||
address, _ := a.cluster.CoreAPIAddress("")
|
||||
return c.JSON(http.StatusOK, address)
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
httpapi "github.com/datarhei/core/v16/http/api"
|
||||
iamidentity "github.com/datarhei/core/v16/iam/identity"
|
||||
"github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
|
||||
@ -40,8 +41,13 @@ type UpdateProcessRequest struct {
|
||||
}
|
||||
|
||||
type AddIdentityRequest struct {
|
||||
Origin string `json:"origin"`
|
||||
Identity any `json:"identity"`
|
||||
Origin string `json:"origin"`
|
||||
Identity iamidentity.User `json:"identity"`
|
||||
}
|
||||
|
||||
type RemoveIdentityRequest struct {
|
||||
Origin string `json:"origin"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
type APIClient struct {
|
||||
@ -130,6 +136,17 @@ func (c *APIClient) AddIdentity(r AddIdentityRequest) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *APIClient) RemoveIdentity(r RemoveIdentityRequest) error {
|
||||
data, err := json.Marshal(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.call(http.MethodPost, "/iam/user/:name", "application/json", bytes.NewReader(data))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *APIClient) Snapshot() (io.ReadCloser, error) {
|
||||
return c.stream(http.MethodGet, "/snapshot", "", nil)
|
||||
}
|
||||
|
||||
@ -16,6 +16,7 @@ import (
|
||||
"github.com/datarhei/core/v16/cluster/proxy"
|
||||
"github.com/datarhei/core/v16/cluster/raft"
|
||||
"github.com/datarhei/core/v16/cluster/store"
|
||||
iamidentity "github.com/datarhei/core/v16/iam/identity"
|
||||
"github.com/datarhei/core/v16/log"
|
||||
"github.com/datarhei/core/v16/net"
|
||||
"github.com/datarhei/core/v16/restream/app"
|
||||
@ -67,7 +68,8 @@ type Cluster interface {
|
||||
RemoveProcess(origin, id string) error
|
||||
UpdateProcess(origin, id string, config *app.Config) error
|
||||
|
||||
AddIdentity(origin string, identity any) error
|
||||
AddIdentity(origin string, identity iamidentity.User) error
|
||||
RemoveIdentity(origin string, name string) error
|
||||
|
||||
ProxyReader() proxy.ProxyReader
|
||||
}
|
||||
@ -744,7 +746,7 @@ func (c *cluster) UpdateProcess(origin, id string, config *app.Config) error {
|
||||
return c.applyCommand(cmd)
|
||||
}
|
||||
|
||||
func (c *cluster) AddIdentity(origin string, identity any) error {
|
||||
func (c *cluster) AddIdentity(origin string, identity iamidentity.User) error {
|
||||
if !c.IsRaftLeader() {
|
||||
return c.forwarder.AddIdentity(origin, identity)
|
||||
}
|
||||
@ -759,6 +761,21 @@ func (c *cluster) AddIdentity(origin string, identity any) error {
|
||||
return c.applyCommand(cmd)
|
||||
}
|
||||
|
||||
func (c *cluster) RemoveIdentity(origin string, name string) error {
|
||||
if !c.IsRaftLeader() {
|
||||
return c.forwarder.RemoveIdentity(origin, name)
|
||||
}
|
||||
|
||||
cmd := &store.Command{
|
||||
Operation: store.OpAddIdentity,
|
||||
Data: &store.CommandRemoveIdentity{
|
||||
Name: name,
|
||||
},
|
||||
}
|
||||
|
||||
return c.applyCommand(cmd)
|
||||
}
|
||||
|
||||
func (c *cluster) applyCommand(cmd *store.Command) error {
|
||||
b, err := json.Marshal(cmd)
|
||||
if err != nil {
|
||||
|
||||
@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
apiclient "github.com/datarhei/core/v16/cluster/client"
|
||||
iamidentity "github.com/datarhei/core/v16/iam/identity"
|
||||
"github.com/datarhei/core/v16/log"
|
||||
"github.com/datarhei/core/v16/restream/app"
|
||||
)
|
||||
@ -24,7 +25,8 @@ type Forwarder interface {
|
||||
UpdateProcess(origin, id string, config *app.Config) error
|
||||
RemoveProcess(origin, id string) error
|
||||
|
||||
AddIdentity(origin string, identity any) error
|
||||
AddIdentity(origin string, identity iamidentity.User) error
|
||||
RemoveIdentity(origin string, name string) error
|
||||
}
|
||||
|
||||
type forwarder struct {
|
||||
@ -191,7 +193,7 @@ func (f *forwarder) RemoveProcess(origin, id string) error {
|
||||
return client.RemoveProcess(r)
|
||||
}
|
||||
|
||||
func (f *forwarder) AddIdentity(origin string, identity any) error {
|
||||
func (f *forwarder) AddIdentity(origin string, identity iamidentity.User) error {
|
||||
if origin == "" {
|
||||
origin = f.id
|
||||
}
|
||||
@ -207,3 +209,20 @@ func (f *forwarder) AddIdentity(origin string, identity any) error {
|
||||
|
||||
return client.AddIdentity(r)
|
||||
}
|
||||
|
||||
func (f *forwarder) RemoveIdentity(origin string, name string) error {
|
||||
if origin == "" {
|
||||
origin = f.id
|
||||
}
|
||||
|
||||
r := apiclient.RemoveIdentityRequest{
|
||||
Origin: origin,
|
||||
Name: name,
|
||||
}
|
||||
|
||||
f.lock.RLock()
|
||||
client := f.client
|
||||
f.lock.RUnlock()
|
||||
|
||||
return client.RemoveIdentity(r)
|
||||
}
|
||||
|
||||
@ -20,6 +20,8 @@ type Store interface {
|
||||
|
||||
ProcessList() []Process
|
||||
GetProcess(id string) (Process, error)
|
||||
|
||||
UserList() Users
|
||||
}
|
||||
|
||||
type Process struct {
|
||||
@ -41,10 +43,11 @@ type Policies struct {
|
||||
type operation string
|
||||
|
||||
const (
|
||||
OpAddProcess operation = "addProcess"
|
||||
OpRemoveProcess operation = "removeProcess"
|
||||
OpUpdateProcess operation = "updateProcess"
|
||||
OpAddIdentity operation = "addIdentity"
|
||||
OpAddProcess operation = "addProcess"
|
||||
OpRemoveProcess operation = "removeProcess"
|
||||
OpUpdateProcess operation = "updateProcess"
|
||||
OpAddIdentity operation = "addIdentity"
|
||||
OpRemoveIdentity operation = "removeIdentity"
|
||||
)
|
||||
|
||||
type Command struct {
|
||||
@ -66,7 +69,11 @@ type CommandRemoveProcess struct {
|
||||
}
|
||||
|
||||
type CommandAddIdentity struct {
|
||||
Identity any
|
||||
Identity identity.User
|
||||
}
|
||||
|
||||
type CommandRemoveIdentity struct {
|
||||
Name string
|
||||
}
|
||||
|
||||
// Implement a FSM
|
||||
@ -74,6 +81,11 @@ type store struct {
|
||||
lock sync.RWMutex
|
||||
Process map[string]Process
|
||||
|
||||
Users struct {
|
||||
UpdatedAt time.Time
|
||||
Users map[string]identity.User
|
||||
}
|
||||
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
@ -87,6 +99,8 @@ func NewStore(config Config) (Store, error) {
|
||||
logger: config.Logger,
|
||||
}
|
||||
|
||||
s.Users.Users = map[string]identity.User{}
|
||||
|
||||
if s.logger == nil {
|
||||
s.logger = log.New("")
|
||||
}
|
||||
@ -164,6 +178,29 @@ func (s *store) Apply(entry *raft.Log) interface{} {
|
||||
}
|
||||
}
|
||||
s.lock.Unlock()
|
||||
case OpAddIdentity:
|
||||
b, _ := json.Marshal(c.Data)
|
||||
cmd := CommandAddIdentity{}
|
||||
json.Unmarshal(b, &cmd)
|
||||
|
||||
s.lock.Lock()
|
||||
_, ok := s.Users.Users[cmd.Identity.Name]
|
||||
if !ok {
|
||||
s.Users.UpdatedAt = time.Now()
|
||||
s.Users.Users[cmd.Identity.Name] = cmd.Identity
|
||||
}
|
||||
s.lock.Unlock()
|
||||
case OpRemoveIdentity:
|
||||
b, _ := json.Marshal(c.Data)
|
||||
cmd := CommandRemoveIdentity{}
|
||||
json.Unmarshal(b, &cmd)
|
||||
|
||||
s.lock.Lock()
|
||||
delete(s.Users.Users, cmd.Name)
|
||||
s.Users.UpdatedAt = time.Now()
|
||||
s.lock.Unlock()
|
||||
default:
|
||||
s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation")
|
||||
}
|
||||
|
||||
s.lock.RLock()
|
||||
@ -235,6 +272,21 @@ func (s *store) GetProcess(id string) (Process, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *store) UserList() Users {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
u := Users{
|
||||
UpdatedAt: s.Users.UpdatedAt,
|
||||
}
|
||||
|
||||
for _, user := range s.Users.Users {
|
||||
u.Users = append(u.Users, user)
|
||||
}
|
||||
|
||||
return u
|
||||
}
|
||||
|
||||
type fsmSnapshot struct {
|
||||
data []byte
|
||||
}
|
||||
|
||||
@ -378,22 +378,51 @@ func (h *ClusterHandler) DeleteProcess(c echo.Context) error {
|
||||
return c.JSON(http.StatusOK, "OK")
|
||||
}
|
||||
|
||||
// Add adds a new identity to the cluster
|
||||
// @Summary Add a new identiy
|
||||
// @Description Add a new identity
|
||||
// @Tags v16.?.?
|
||||
// @ID cluster-3-add-identity
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param config body api.IAMUser true "Identity"
|
||||
// @Success 200 {object} api.IAMUser
|
||||
// @Failure 400 {object} api.Error
|
||||
// @Security ApiKeyAuth
|
||||
// @Router /api/v3/cluster/iam/user [post]
|
||||
func (h *ClusterHandler) AddIdentity(c echo.Context) error {
|
||||
process := api.ProcessConfig{
|
||||
ID: shortuuid.New(),
|
||||
Type: "ffmpeg",
|
||||
Autostart: true,
|
||||
}
|
||||
user := api.IAMUser{}
|
||||
|
||||
if err := util.ShouldBindJSON(c, &process); err != nil {
|
||||
if err := util.ShouldBindJSON(c, &user); err != nil {
|
||||
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
|
||||
}
|
||||
|
||||
config := process.Marshal()
|
||||
identity, _ := user.Unmarshal()
|
||||
|
||||
if err := h.cluster.AddIdentity("", config); err != nil {
|
||||
return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error())
|
||||
if err := h.cluster.AddIdentity("", identity); err != nil {
|
||||
return api.Err(http.StatusBadRequest, "Invalid identity", "%s", err.Error())
|
||||
}
|
||||
|
||||
return c.JSON(http.StatusOK, process)
|
||||
return c.JSON(http.StatusOK, user)
|
||||
}
|
||||
|
||||
// Delete deletes the identity with the given name from the cluster
|
||||
// @Summary Delete an identity by its name
|
||||
// @Description Delete an identity by its name
|
||||
// @Tags v16.?.?
|
||||
// @ID cluster-3-delete-identity
|
||||
// @Produce json
|
||||
// @Param name path string true "Identity name"
|
||||
// @Success 200 {string} string
|
||||
// @Failure 404 {object} api.Error
|
||||
// @Security ApiKeyAuth
|
||||
// @Router /api/v3/cluster/iam/user/{name} [delete]
|
||||
func (h *ClusterHandler) RemoveIdentity(c echo.Context) error {
|
||||
name := util.PathParam(c, "name")
|
||||
|
||||
if err := h.cluster.RemoveIdentity("", name); err != nil {
|
||||
return api.Err(http.StatusBadRequest, "Invalid identity", "%s", err.Error())
|
||||
}
|
||||
|
||||
return c.JSON(http.StatusOK, "OK")
|
||||
}
|
||||
|
||||
@ -41,6 +41,7 @@ func NewRestream(restream restream.Restreamer, iam iam.IAM) *RestreamHandler {
|
||||
// @Router /api/v3/process [post]
|
||||
func (h *RestreamHandler) Add(c echo.Context) error {
|
||||
user := util.DefaultContext(c, "user", "")
|
||||
superuser := util.DefaultContext(c, "superuser", false)
|
||||
|
||||
process := api.ProcessConfig{
|
||||
ID: shortuuid.New(),
|
||||
@ -53,8 +54,10 @@ func (h *RestreamHandler) Add(c echo.Context) error {
|
||||
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
|
||||
}
|
||||
|
||||
if !h.iam.Enforce(process.Owner, process.Domain, "process:"+process.ID, "write") {
|
||||
return api.Err(http.StatusForbidden, "Forbidden")
|
||||
if !superuser {
|
||||
if !h.iam.Enforce(process.Owner, process.Domain, "process:"+process.ID, "write") {
|
||||
return api.Err(http.StatusForbidden, "Forbidden")
|
||||
}
|
||||
}
|
||||
|
||||
if process.Type != "ffmpeg" {
|
||||
@ -76,7 +79,7 @@ func (h *RestreamHandler) Add(c echo.Context) error {
|
||||
Domain: config.Domain,
|
||||
}
|
||||
|
||||
p, _ := h.getProcess(tid, config.Owner, "config")
|
||||
p, _ := h.getProcess(tid, "config")
|
||||
|
||||
return c.JSON(http.StatusOK, p.Config)
|
||||
}
|
||||
@ -90,23 +93,27 @@ func (h *RestreamHandler) Add(c echo.Context) error {
|
||||
// @Param filter query string false "Comma separated list of fields (config, state, report, metadata) that will be part of the output. If empty, all fields will be part of the output."
|
||||
// @Param reference query string false "Return only these process that have this reference value. If empty, the reference will be ignored."
|
||||
// @Param id query string false "Comma separated list of process ids to list. Overrides the reference. If empty all IDs will be returned."
|
||||
// @Param idpattern query string false "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from refpattern."
|
||||
// @Param refpattern query string false "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from idpattern."
|
||||
// @Param idpattern query string false "Glob pattern for process IDs. If empty all IDs will be returned. Intersected with results from other pattern matches."
|
||||
// @Param refpattern query string false "Glob pattern for process references. If empty all IDs will be returned. Intersected with results from other pattern matches."
|
||||
// @Param ownerpattern query string false "Glob pattern for process owners. If empty all IDs will be returned. Intersected with results from other pattern matches."
|
||||
// @Param domainpattern query string false "Glob pattern for process domain. If empty all IDs will be returned. Intersected with results from other pattern matches."
|
||||
// @Success 200 {array} api.Process
|
||||
// @Security ApiKeyAuth
|
||||
// @Router /api/v3/process [get]
|
||||
func (h *RestreamHandler) GetAll(c echo.Context) error {
|
||||
user := util.DefaultContext(c, "user", "")
|
||||
filter := util.DefaultQuery(c, "filter", "")
|
||||
reference := util.DefaultQuery(c, "reference", "")
|
||||
wantids := strings.FieldsFunc(util.DefaultQuery(c, "id", ""), func(r rune) bool {
|
||||
return r == rune(',')
|
||||
})
|
||||
domain := util.DefaultQuery(c, "domain", "")
|
||||
idpattern := util.DefaultQuery(c, "idpattern", "")
|
||||
refpattern := util.DefaultQuery(c, "refpattern", "")
|
||||
user := util.DefaultContext(c, "user", "")
|
||||
domain := util.DefaultQuery(c, "domain", "")
|
||||
ownerpattern := util.DefaultContext(c, "ownerpattern", "")
|
||||
domainpattern := util.DefaultQuery(c, "domainpattern", "")
|
||||
|
||||
preids := h.restream.GetProcessIDs(idpattern, refpattern, "", "")
|
||||
preids := h.restream.GetProcessIDs(idpattern, refpattern, ownerpattern, domainpattern)
|
||||
ids := []restream.TaskID{}
|
||||
|
||||
for _, id := range preids {
|
||||
@ -121,7 +128,7 @@ func (h *RestreamHandler) GetAll(c echo.Context) error {
|
||||
|
||||
if len(wantids) == 0 || len(reference) != 0 {
|
||||
for _, id := range ids {
|
||||
if p, err := h.getProcess(id, user, filter); err == nil {
|
||||
if p, err := h.getProcess(id, filter); err == nil {
|
||||
if len(reference) != 0 && p.Reference != reference {
|
||||
continue
|
||||
}
|
||||
@ -132,7 +139,7 @@ func (h *RestreamHandler) GetAll(c echo.Context) error {
|
||||
for _, id := range ids {
|
||||
for _, wantid := range wantids {
|
||||
if wantid == id.ID {
|
||||
if p, err := h.getProcess(id, user, filter); err == nil {
|
||||
if p, err := h.getProcess(id, filter); err == nil {
|
||||
processes = append(processes, p)
|
||||
}
|
||||
}
|
||||
@ -156,9 +163,9 @@ func (h *RestreamHandler) GetAll(c echo.Context) error {
|
||||
// @Security ApiKeyAuth
|
||||
// @Router /api/v3/process/{id} [get]
|
||||
func (h *RestreamHandler) Get(c echo.Context) error {
|
||||
user := util.DefaultContext(c, "user", "")
|
||||
id := util.PathParam(c, "id")
|
||||
filter := util.DefaultQuery(c, "filter", "")
|
||||
user := util.DefaultContext(c, "user", "")
|
||||
domain := util.DefaultQuery(c, "domain", "")
|
||||
|
||||
if !h.iam.Enforce(user, domain, "process:"+id, "read") {
|
||||
@ -170,7 +177,7 @@ func (h *RestreamHandler) Get(c echo.Context) error {
|
||||
Domain: domain,
|
||||
}
|
||||
|
||||
p, err := h.getProcess(tid, user, filter)
|
||||
p, err := h.getProcess(tid, filter)
|
||||
if err != nil {
|
||||
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
|
||||
}
|
||||
@ -190,19 +197,22 @@ func (h *RestreamHandler) Get(c echo.Context) error {
|
||||
// @Security ApiKeyAuth
|
||||
// @Router /api/v3/process/{id} [delete]
|
||||
func (h *RestreamHandler) Delete(c echo.Context) error {
|
||||
id := util.PathParam(c, "id")
|
||||
user := util.DefaultContext(c, "user", "")
|
||||
superuser := util.DefaultContext(c, "superuser", false)
|
||||
id := util.PathParam(c, "id")
|
||||
domain := util.DefaultQuery(c, "domain", "")
|
||||
|
||||
if !h.iam.Enforce(user, domain, "process:"+id, "write") {
|
||||
return api.Err(http.StatusForbidden, "Forbidden")
|
||||
}
|
||||
|
||||
tid := restream.TaskID{
|
||||
ID: id,
|
||||
Domain: domain,
|
||||
}
|
||||
|
||||
if !superuser {
|
||||
if !h.iam.Enforce(user, domain, "process:"+id, "write") {
|
||||
return api.Err(http.StatusForbidden, "Forbidden")
|
||||
}
|
||||
}
|
||||
|
||||
if err := h.restream.StopProcess(tid); err != nil {
|
||||
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
|
||||
}
|
||||
@ -285,7 +295,7 @@ func (h *RestreamHandler) Update(c echo.Context) error {
|
||||
Domain: config.Domain,
|
||||
}
|
||||
|
||||
p, _ := h.getProcess(tid, config.Owner, "config")
|
||||
p, _ := h.getProcess(tid, "config")
|
||||
|
||||
return c.JSON(http.StatusOK, p.Config)
|
||||
}
|
||||
@ -661,7 +671,7 @@ func (h *RestreamHandler) SetMetadata(c echo.Context) error {
|
||||
return c.JSON(http.StatusOK, data)
|
||||
}
|
||||
|
||||
func (h *RestreamHandler) getProcess(id restream.TaskID, user, filterString string) (api.Process, error) {
|
||||
func (h *RestreamHandler) getProcess(id restream.TaskID, filterString string) (api.Process, error) {
|
||||
filter := strings.FieldsFunc(filterString, func(r rune) bool {
|
||||
return r == rune(',')
|
||||
})
|
||||
|
||||
@ -664,6 +664,9 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
|
||||
v3.POST("/cluster/process", s.v3handler.cluster.AddProcess)
|
||||
v3.PUT("/cluster/process/:id", s.v3handler.cluster.UpdateProcess)
|
||||
v3.DELETE("/cluster/process/:id", s.v3handler.cluster.DeleteProcess)
|
||||
|
||||
v3.POST("/cluster/iam/user", s.v3handler.cluster.AddIdentity)
|
||||
v3.DELETE("/cluster/iam/user/:name", s.v3handler.cluster.RemoveIdentity)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user