List undeployed processes

This commit is contained in:
Ingo Oppermann 2023-07-11 22:38:33 +02:00
parent 69ffec6b6a
commit 62fdf8e370
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
3 changed files with 332 additions and 58 deletions

View File

@ -1,6 +1,8 @@
package api
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"sort"
@ -9,6 +11,8 @@ import (
"github.com/datarhei/core/v16/cluster"
"github.com/datarhei/core/v16/cluster/proxy"
"github.com/datarhei/core/v16/cluster/store"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util"
"github.com/datarhei/core/v16/iam"
@ -41,6 +45,10 @@ func NewCluster(cluster cluster.Cluster, iam iam.IAM) (*ClusterHandler, error) {
return nil, fmt.Errorf("no cluster provided")
}
if h.proxy == nil {
return nil, fmt.Errorf("proxy reader from cluster is not available")
}
if h.iam == nil {
return nil, fmt.Errorf("no IAM provided")
}
@ -169,9 +177,7 @@ func (h *ClusterHandler) Leave(c echo.Context) error {
// @Router /api/v3/cluster/process [get]
func (h *ClusterHandler) ListAllNodesProcesses(c echo.Context) error {
ctxuser := util.DefaultContext(c, "user", "")
filter := strings.FieldsFunc(util.DefaultQuery(c, "filter", ""), func(r rune) bool {
return r == rune(',')
})
filter := newFilter(util.DefaultQuery(c, "filter", ""))
reference := util.DefaultQuery(c, "reference", "")
wantids := strings.FieldsFunc(util.DefaultQuery(c, "id", ""), func(r rune) bool {
return r == rune(',')
@ -184,7 +190,7 @@ func (h *ClusterHandler) ListAllNodesProcesses(c echo.Context) error {
procs := h.proxy.ListProcesses(proxy.ProcessListOptions{
ID: wantids,
Filter: filter,
Filter: filter.Slice(),
Domain: domain,
Reference: reference,
IDPattern: idpattern,
@ -194,6 +200,7 @@ func (h *ClusterHandler) ListAllNodesProcesses(c echo.Context) error {
})
processes := []clientapi.Process{}
pmap := map[app.ProcessID]struct{}{}
for _, p := range procs {
if !h.iam.Enforce(ctxuser, domain, "process:"+p.ID, "read") {
@ -201,9 +208,190 @@ func (h *ClusterHandler) ListAllNodesProcesses(c echo.Context) error {
}
processes = append(processes, p)
pmap[app.NewProcessID(p.ID, p.Domain)] = struct{}{}
}
return c.JSON(http.StatusOK, processes)
missing := []api.Process{}
// Here we have to add those processes that are in the cluster DB and couldn't be deployed
{
processes := h.cluster.ListProcesses()
filtered := h.getFilteredStoreProcesses(processes, wantids, domain, reference, idpattern, refpattern, ownerpattern, domainpattern)
for _, p := range filtered {
if !h.iam.Enforce(ctxuser, domain, "process:"+p.Config.ID, "read") {
continue
}
// Check if the process has been deployed
if _, ok := pmap[p.Config.ProcessID()]; ok {
continue
}
process := api.Process{
ID: p.Config.ID,
Owner: p.Config.Owner,
Domain: p.Config.Domain,
Type: "ffmpeg",
Reference: p.Config.Reference,
CreatedAt: p.CreatedAt.Unix(),
UpdatedAt: p.UpdatedAt.Unix(),
}
if filter.metadata {
process.Metadata = p.Metadata
}
if filter.config {
config := &api.ProcessConfig{}
config.Unmarshal(p.Config)
process.Config = config
}
if filter.state {
process.State = &api.ProcessState{
State: "failed",
Order: p.Order,
LastLog: p.Error,
}
}
if filter.report {
process.Report = &api.ProcessReport{}
}
missing = append(missing, process)
}
}
// We're doing some byte-wrangling here because the processes from the nodes
// are of type clientapi.Process, the missing processes are from type api.Process.
// They are actually the same and converting them is cumbersome. That's why
// we're doing the JSON marshalling here and appending these two slices is done
// in JSON representation.
data, err := json.Marshal(processes)
if err != nil {
return api.Err(http.StatusInternalServerError, "", err.Error())
}
buf := &bytes.Buffer{}
if len(missing) != 0 {
reallyData, err := json.Marshal(missing)
if err != nil {
return api.Err(http.StatusInternalServerError, "", err.Error())
}
i := bytes.LastIndexByte(data, ']')
if i == -1 {
return api.Err(http.StatusInternalServerError, "", "no valid JSON")
}
if len(processes) != 0 {
data[i] = ','
} else {
data[i] = ' '
}
buf.Write(data)
i = bytes.IndexByte(reallyData, '[')
if i == -1 {
return api.Err(http.StatusInternalServerError, "", "no valid JSON")
}
buf.Write(reallyData[i+1:])
} else {
buf.Write(data)
}
return c.Stream(http.StatusOK, "application/json", buf)
}
func (h *ClusterHandler) getFilteredStoreProcesses(processes []store.Process, wantids []string, domain, reference, idpattern, refpattern, ownerpattern, domainpattern string) []store.Process {
filtered := []store.Process{}
count := 0
var idglob glob.Glob
var refglob glob.Glob
var ownerglob glob.Glob
var domainglob glob.Glob
if len(idpattern) != 0 {
count++
idglob, _ = glob.Compile(idpattern)
}
if len(refpattern) != 0 {
count++
refglob, _ = glob.Compile(refpattern)
}
if len(ownerpattern) != 0 {
count++
ownerglob, _ = glob.Compile(ownerpattern)
}
if len(domainpattern) != 0 {
count++
domainglob, _ = glob.Compile(domainpattern)
}
for _, t := range processes {
matches := 0
if idglob != nil {
if match := idglob.Match(t.Config.ID); match {
matches++
}
}
if refglob != nil {
if match := refglob.Match(t.Config.Reference); match {
matches++
}
}
if ownerglob != nil {
if match := ownerglob.Match(t.Config.Owner); match {
matches++
}
}
if domainglob != nil {
if match := domainglob.Match(t.Config.Domain); match {
matches++
}
}
if count != matches {
continue
}
filtered = append(filtered, t)
}
final := []store.Process{}
if len(wantids) == 0 || len(reference) != 0 {
for _, p := range filtered {
if len(reference) != 0 && p.Config.Reference != reference {
continue
}
final = append(final, p)
}
} else {
for _, p := range filtered {
for _, wantid := range wantids {
if wantid == p.Config.ID {
final = append(final, p)
}
}
}
}
return final
}
// GetAllNodesProcess returns the process with the given ID whereever it's running on the cluster
@ -223,9 +411,7 @@ func (h *ClusterHandler) ListAllNodesProcesses(c echo.Context) error {
func (h *ClusterHandler) GetAllNodesProcess(c echo.Context) error {
ctxuser := util.DefaultContext(c, "user", "")
id := util.PathParam(c, "id")
filter := strings.FieldsFunc(util.DefaultQuery(c, "filter", ""), func(r rune) bool {
return r == rune(',')
})
filter := newFilter(util.DefaultQuery(c, "filter", ""))
domain := util.DefaultQuery(c, "domain", "")
if !h.iam.Enforce(ctxuser, domain, "process:"+id, "read") {
@ -234,12 +420,51 @@ func (h *ClusterHandler) GetAllNodesProcess(c echo.Context) error {
procs := h.proxy.ListProcesses(proxy.ProcessListOptions{
ID: []string{id},
Filter: filter,
Filter: filter.Slice(),
Domain: domain,
})
if len(procs) == 0 {
return api.Err(http.StatusNotFound, "", "Unknown process ID: %s", id)
// Check the store in the store for an undeployed process
p, err := h.cluster.GetProcess(app.NewProcessID(id, domain))
if err != nil {
return api.Err(http.StatusNotFound, "", "Unknown process ID: %s", id)
}
process := api.Process{
ID: p.Config.ID,
Owner: p.Config.Owner,
Domain: p.Config.Domain,
Type: "ffmpeg",
Reference: p.Config.Reference,
CreatedAt: p.CreatedAt.Unix(),
UpdatedAt: p.UpdatedAt.Unix(),
}
if filter.metadata {
process.Metadata = p.Metadata
}
if filter.config {
config := &api.ProcessConfig{}
config.Unmarshal(p.Config)
process.Config = config
}
if filter.state {
process.State = &api.ProcessState{
State: "failed",
Order: p.Order,
LastLog: p.Error,
}
}
if filter.report {
process.Report = &api.ProcessReport{}
}
return c.JSON(http.StatusOK, process)
}
if procs[0].Domain != domain {
@ -491,6 +716,7 @@ func (h *ClusterHandler) ListStoreProcesses(c echo.Context) error {
process.Config = config
process.State = &api.ProcessState{
State: "failed",
Order: p.Order,
LastLog: p.Error,
}
@ -548,7 +774,9 @@ func (h *ClusterHandler) GetStoreProcess(c echo.Context) error {
process.Config = config
process.State = &api.ProcessState{
Order: p.Order,
State: "failed",
Order: p.Order,
LastLog: p.Error,
}
return c.JSON(http.StatusOK, process)

View File

@ -92,7 +92,7 @@ func (h *RestreamHandler) Add(c echo.Context) error {
h.restream.SetProcessMetadata(tid, key, data)
}
p, _ := h.getProcess(tid, "config")
p, _ := h.getProcess(tid, newFilter("config"))
return c.JSON(http.StatusOK, p.Config)
}
@ -116,7 +116,7 @@ func (h *RestreamHandler) Add(c echo.Context) error {
// @Router /api/v3/process [get]
func (h *RestreamHandler) GetAll(c echo.Context) error {
ctxuser := util.DefaultContext(c, "user", "")
filter := util.DefaultQuery(c, "filter", "")
filter := newFilter(util.DefaultQuery(c, "filter", ""))
reference := util.DefaultQuery(c, "reference", "")
wantids := strings.FieldsFunc(util.DefaultQuery(c, "id", ""), func(r rune) bool {
return r == rune(',')
@ -193,7 +193,7 @@ func (h *RestreamHandler) Get(c echo.Context) error {
Domain: domain,
}
p, err := h.getProcess(tid, filter)
p, err := h.getProcess(tid, newFilter(filter))
if err != nil {
return api.Err(http.StatusNotFound, "", "unknown process ID: %s", err.Error())
}
@ -326,7 +326,7 @@ func (h *RestreamHandler) Update(c echo.Context) error {
h.restream.SetProcessMetadata(tid, key, data)
}
p, _ := h.getProcess(tid, "config")
p, _ := h.getProcess(tid, newFilter("config"))
return c.JSON(http.StatusOK, p.Config)
}
@ -845,8 +845,15 @@ func (h *RestreamHandler) SetMetadata(c echo.Context) error {
return c.JSON(http.StatusOK, data)
}
func (h *RestreamHandler) getProcess(id app.ProcessID, filterString string) (api.Process, error) {
filter := strings.FieldsFunc(filterString, func(r rune) bool {
type filter struct {
config bool
state bool
report bool
metadata bool
}
func newFilter(filterString string) filter {
filters := strings.FieldsFunc(filterString, func(r rune) bool {
return r == rune(',')
})
@ -857,18 +864,55 @@ func (h *RestreamHandler) getProcess(id app.ProcessID, filterString string) (api
"metadata": true,
}
if len(filter) != 0 {
if len(filters) != 0 {
for k := range wants {
wants[k] = false
}
for _, f := range filter {
for _, f := range filters {
if _, ok := wants[f]; ok {
wants[f] = true
}
}
}
f := filter{
config: wants["config"],
state: wants["state"],
report: wants["report"],
metadata: wants["metadata"],
}
return f
}
func (f filter) Slice() []string {
what := []string{}
if f.config {
what = append(what, "config")
}
if f.state {
what = append(what, "state")
}
if f.report {
what = append(what, "report")
}
if f.metadata {
what = append(what, "metadata")
}
return what
}
func (f filter) String() string {
return strings.Join(f.Slice(), ",")
}
func (h *RestreamHandler) getProcess(id app.ProcessID, filter filter) (api.Process, error) {
process, err := h.restream.GetProcess(id)
if err != nil {
return api.Process{}, err
@ -884,26 +928,26 @@ func (h *RestreamHandler) getProcess(id app.ProcessID, filterString string) (api
UpdatedAt: process.UpdatedAt,
}
if wants["config"] {
if filter.config {
info.Config = &api.ProcessConfig{}
info.Config.Unmarshal(process.Config)
}
if wants["state"] {
if filter.state {
if state, err := h.restream.GetProcessState(id); err == nil {
info.State = &api.ProcessState{}
info.State.Unmarshal(state)
}
}
if wants["report"] {
if filter.report {
if log, err := h.restream.GetProcessLog(id); err == nil {
info.Report = &api.ProcessReport{}
info.Report.Unmarshal(log)
}
}
if wants["metadata"] {
if filter.metadata {
if data, err := h.restream.GetProcessMetadata(id, ""); err == nil {
info.Metadata = api.NewMetadata(data)
}

View File

@ -1218,58 +1218,60 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error {
}
func (r *restream) GetProcessIDs(idpattern, refpattern, ownerpattern, domainpattern string) []app.ProcessID {
ids := []app.ProcessID{}
count := 0
var idglob glob.Glob
var refglob glob.Glob
var ownerglob glob.Glob
var domainglob glob.Glob
if len(idpattern) != 0 {
count++
idglob, _ = glob.Compile(idpattern)
}
if len(refpattern) != 0 {
count++
refglob, _ = glob.Compile(refpattern)
}
if len(ownerpattern) != 0 {
count++
ownerglob, _ = glob.Compile(ownerpattern)
}
if len(domainpattern) != 0 {
count++
domainglob, _ = glob.Compile(domainpattern)
}
r.lock.RLock()
defer r.lock.RUnlock()
ids := []app.ProcessID{}
for _, t := range r.tasks {
count := 0
matches := 0
if len(idpattern) != 0 {
count++
match, err := glob.Match(idpattern, t.id)
if err != nil {
return nil
}
if match {
if idglob != nil {
if match := idglob.Match(t.id); match {
matches++
}
}
if len(refpattern) != 0 {
count++
match, err := glob.Match(refpattern, t.reference)
if err != nil {
return nil
}
if match {
if refglob != nil {
if match := refglob.Match(t.reference); match {
matches++
}
}
if len(ownerpattern) != 0 {
count++
match, err := glob.Match(ownerpattern, t.owner)
if err != nil {
return nil
}
if match {
if ownerglob != nil {
if match := ownerglob.Match(t.owner); match {
matches++
}
}
if len(domainpattern) != 0 {
count++
match, err := glob.Match(domainpattern, t.domain)
if err != nil {
return nil
}
if match {
if domainglob != nil {
if match := domainglob.Match(t.domain); match {
matches++
}
}