diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 1ca76031..302050bc 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -1,6 +1,8 @@ package api import ( + "bytes" + "encoding/json" "fmt" "net/http" "sort" @@ -9,6 +11,8 @@ import ( "github.com/datarhei/core/v16/cluster" "github.com/datarhei/core/v16/cluster/proxy" + "github.com/datarhei/core/v16/cluster/store" + "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/handler/util" "github.com/datarhei/core/v16/iam" @@ -41,6 +45,10 @@ func NewCluster(cluster cluster.Cluster, iam iam.IAM) (*ClusterHandler, error) { return nil, fmt.Errorf("no cluster provided") } + if h.proxy == nil { + return nil, fmt.Errorf("proxy reader from cluster is not available") + } + if h.iam == nil { return nil, fmt.Errorf("no IAM provided") } @@ -169,9 +177,7 @@ func (h *ClusterHandler) Leave(c echo.Context) error { // @Router /api/v3/cluster/process [get] func (h *ClusterHandler) ListAllNodesProcesses(c echo.Context) error { ctxuser := util.DefaultContext(c, "user", "") - filter := strings.FieldsFunc(util.DefaultQuery(c, "filter", ""), func(r rune) bool { - return r == rune(',') - }) + filter := newFilter(util.DefaultQuery(c, "filter", "")) reference := util.DefaultQuery(c, "reference", "") wantids := strings.FieldsFunc(util.DefaultQuery(c, "id", ""), func(r rune) bool { return r == rune(',') @@ -184,7 +190,7 @@ func (h *ClusterHandler) ListAllNodesProcesses(c echo.Context) error { procs := h.proxy.ListProcesses(proxy.ProcessListOptions{ ID: wantids, - Filter: filter, + Filter: filter.Slice(), Domain: domain, Reference: reference, IDPattern: idpattern, @@ -194,6 +200,7 @@ func (h *ClusterHandler) ListAllNodesProcesses(c echo.Context) error { }) processes := []clientapi.Process{} + pmap := map[app.ProcessID]struct{}{} for _, p := range procs { if !h.iam.Enforce(ctxuser, domain, "process:"+p.ID, "read") { @@ -201,9 +208,190 @@ func (h *ClusterHandler) ListAllNodesProcesses(c echo.Context) error { } processes = append(processes, p) + pmap[app.NewProcessID(p.ID, p.Domain)] = struct{}{} } - return c.JSON(http.StatusOK, processes) + missing := []api.Process{} + + // Here we have to add those processes that are in the cluster DB and couldn't be deployed + { + processes := h.cluster.ListProcesses() + filtered := h.getFilteredStoreProcesses(processes, wantids, domain, reference, idpattern, refpattern, ownerpattern, domainpattern) + + for _, p := range filtered { + if !h.iam.Enforce(ctxuser, domain, "process:"+p.Config.ID, "read") { + continue + } + + // Check if the process has been deployed + if _, ok := pmap[p.Config.ProcessID()]; ok { + continue + } + + process := api.Process{ + ID: p.Config.ID, + Owner: p.Config.Owner, + Domain: p.Config.Domain, + Type: "ffmpeg", + Reference: p.Config.Reference, + CreatedAt: p.CreatedAt.Unix(), + UpdatedAt: p.UpdatedAt.Unix(), + } + + if filter.metadata { + process.Metadata = p.Metadata + } + + if filter.config { + config := &api.ProcessConfig{} + config.Unmarshal(p.Config) + + process.Config = config + } + + if filter.state { + process.State = &api.ProcessState{ + State: "failed", + Order: p.Order, + LastLog: p.Error, + } + } + + if filter.report { + process.Report = &api.ProcessReport{} + } + + missing = append(missing, process) + } + } + + // We're doing some byte-wrangling here because the processes from the nodes + // are of type clientapi.Process, the missing processes are from type api.Process. + // They are actually the same and converting them is cumbersome. That's why + // we're doing the JSON marshalling here and appending these two slices is done + // in JSON representation. + + data, err := json.Marshal(processes) + if err != nil { + return api.Err(http.StatusInternalServerError, "", err.Error()) + } + + buf := &bytes.Buffer{} + + if len(missing) != 0 { + reallyData, err := json.Marshal(missing) + if err != nil { + return api.Err(http.StatusInternalServerError, "", err.Error()) + } + + i := bytes.LastIndexByte(data, ']') + if i == -1 { + return api.Err(http.StatusInternalServerError, "", "no valid JSON") + } + + if len(processes) != 0 { + data[i] = ',' + } else { + data[i] = ' ' + } + buf.Write(data) + + i = bytes.IndexByte(reallyData, '[') + if i == -1 { + return api.Err(http.StatusInternalServerError, "", "no valid JSON") + } + buf.Write(reallyData[i+1:]) + } else { + buf.Write(data) + } + + return c.Stream(http.StatusOK, "application/json", buf) +} + +func (h *ClusterHandler) getFilteredStoreProcesses(processes []store.Process, wantids []string, domain, reference, idpattern, refpattern, ownerpattern, domainpattern string) []store.Process { + filtered := []store.Process{} + + count := 0 + + var idglob glob.Glob + var refglob glob.Glob + var ownerglob glob.Glob + var domainglob glob.Glob + + if len(idpattern) != 0 { + count++ + idglob, _ = glob.Compile(idpattern) + } + + if len(refpattern) != 0 { + count++ + refglob, _ = glob.Compile(refpattern) + } + + if len(ownerpattern) != 0 { + count++ + ownerglob, _ = glob.Compile(ownerpattern) + } + + if len(domainpattern) != 0 { + count++ + domainglob, _ = glob.Compile(domainpattern) + } + + for _, t := range processes { + matches := 0 + if idglob != nil { + if match := idglob.Match(t.Config.ID); match { + matches++ + } + } + + if refglob != nil { + if match := refglob.Match(t.Config.Reference); match { + matches++ + } + } + + if ownerglob != nil { + if match := ownerglob.Match(t.Config.Owner); match { + matches++ + } + } + + if domainglob != nil { + if match := domainglob.Match(t.Config.Domain); match { + matches++ + } + } + + if count != matches { + continue + } + + filtered = append(filtered, t) + } + + final := []store.Process{} + + if len(wantids) == 0 || len(reference) != 0 { + for _, p := range filtered { + if len(reference) != 0 && p.Config.Reference != reference { + continue + } + + final = append(final, p) + } + } else { + for _, p := range filtered { + for _, wantid := range wantids { + if wantid == p.Config.ID { + final = append(final, p) + } + } + } + } + + return final } // GetAllNodesProcess returns the process with the given ID whereever it's running on the cluster @@ -223,9 +411,7 @@ func (h *ClusterHandler) ListAllNodesProcesses(c echo.Context) error { func (h *ClusterHandler) GetAllNodesProcess(c echo.Context) error { ctxuser := util.DefaultContext(c, "user", "") id := util.PathParam(c, "id") - filter := strings.FieldsFunc(util.DefaultQuery(c, "filter", ""), func(r rune) bool { - return r == rune(',') - }) + filter := newFilter(util.DefaultQuery(c, "filter", "")) domain := util.DefaultQuery(c, "domain", "") if !h.iam.Enforce(ctxuser, domain, "process:"+id, "read") { @@ -234,12 +420,51 @@ func (h *ClusterHandler) GetAllNodesProcess(c echo.Context) error { procs := h.proxy.ListProcesses(proxy.ProcessListOptions{ ID: []string{id}, - Filter: filter, + Filter: filter.Slice(), Domain: domain, }) if len(procs) == 0 { - return api.Err(http.StatusNotFound, "", "Unknown process ID: %s", id) + // Check the store in the store for an undeployed process + p, err := h.cluster.GetProcess(app.NewProcessID(id, domain)) + if err != nil { + return api.Err(http.StatusNotFound, "", "Unknown process ID: %s", id) + } + + process := api.Process{ + ID: p.Config.ID, + Owner: p.Config.Owner, + Domain: p.Config.Domain, + Type: "ffmpeg", + Reference: p.Config.Reference, + CreatedAt: p.CreatedAt.Unix(), + UpdatedAt: p.UpdatedAt.Unix(), + } + + if filter.metadata { + process.Metadata = p.Metadata + } + + if filter.config { + config := &api.ProcessConfig{} + config.Unmarshal(p.Config) + + process.Config = config + } + + if filter.state { + process.State = &api.ProcessState{ + State: "failed", + Order: p.Order, + LastLog: p.Error, + } + } + + if filter.report { + process.Report = &api.ProcessReport{} + } + + return c.JSON(http.StatusOK, process) } if procs[0].Domain != domain { @@ -491,6 +716,7 @@ func (h *ClusterHandler) ListStoreProcesses(c echo.Context) error { process.Config = config process.State = &api.ProcessState{ + State: "failed", Order: p.Order, LastLog: p.Error, } @@ -548,7 +774,9 @@ func (h *ClusterHandler) GetStoreProcess(c echo.Context) error { process.Config = config process.State = &api.ProcessState{ - Order: p.Order, + State: "failed", + Order: p.Order, + LastLog: p.Error, } return c.JSON(http.StatusOK, process) diff --git a/http/handler/api/restream.go b/http/handler/api/restream.go index 81f87c5d..81b3af68 100644 --- a/http/handler/api/restream.go +++ b/http/handler/api/restream.go @@ -92,7 +92,7 @@ func (h *RestreamHandler) Add(c echo.Context) error { h.restream.SetProcessMetadata(tid, key, data) } - p, _ := h.getProcess(tid, "config") + p, _ := h.getProcess(tid, newFilter("config")) return c.JSON(http.StatusOK, p.Config) } @@ -116,7 +116,7 @@ func (h *RestreamHandler) Add(c echo.Context) error { // @Router /api/v3/process [get] func (h *RestreamHandler) GetAll(c echo.Context) error { ctxuser := util.DefaultContext(c, "user", "") - filter := util.DefaultQuery(c, "filter", "") + filter := newFilter(util.DefaultQuery(c, "filter", "")) reference := util.DefaultQuery(c, "reference", "") wantids := strings.FieldsFunc(util.DefaultQuery(c, "id", ""), func(r rune) bool { return r == rune(',') @@ -193,7 +193,7 @@ func (h *RestreamHandler) Get(c echo.Context) error { Domain: domain, } - p, err := h.getProcess(tid, filter) + p, err := h.getProcess(tid, newFilter(filter)) if err != nil { return api.Err(http.StatusNotFound, "", "unknown process ID: %s", err.Error()) } @@ -326,7 +326,7 @@ func (h *RestreamHandler) Update(c echo.Context) error { h.restream.SetProcessMetadata(tid, key, data) } - p, _ := h.getProcess(tid, "config") + p, _ := h.getProcess(tid, newFilter("config")) return c.JSON(http.StatusOK, p.Config) } @@ -845,8 +845,15 @@ func (h *RestreamHandler) SetMetadata(c echo.Context) error { return c.JSON(http.StatusOK, data) } -func (h *RestreamHandler) getProcess(id app.ProcessID, filterString string) (api.Process, error) { - filter := strings.FieldsFunc(filterString, func(r rune) bool { +type filter struct { + config bool + state bool + report bool + metadata bool +} + +func newFilter(filterString string) filter { + filters := strings.FieldsFunc(filterString, func(r rune) bool { return r == rune(',') }) @@ -857,18 +864,55 @@ func (h *RestreamHandler) getProcess(id app.ProcessID, filterString string) (api "metadata": true, } - if len(filter) != 0 { + if len(filters) != 0 { for k := range wants { wants[k] = false } - for _, f := range filter { + for _, f := range filters { if _, ok := wants[f]; ok { wants[f] = true } } } + f := filter{ + config: wants["config"], + state: wants["state"], + report: wants["report"], + metadata: wants["metadata"], + } + + return f +} + +func (f filter) Slice() []string { + what := []string{} + + if f.config { + what = append(what, "config") + } + + if f.state { + what = append(what, "state") + } + + if f.report { + what = append(what, "report") + } + + if f.metadata { + what = append(what, "metadata") + } + + return what +} + +func (f filter) String() string { + return strings.Join(f.Slice(), ",") +} + +func (h *RestreamHandler) getProcess(id app.ProcessID, filter filter) (api.Process, error) { process, err := h.restream.GetProcess(id) if err != nil { return api.Process{}, err @@ -884,26 +928,26 @@ func (h *RestreamHandler) getProcess(id app.ProcessID, filterString string) (api UpdatedAt: process.UpdatedAt, } - if wants["config"] { + if filter.config { info.Config = &api.ProcessConfig{} info.Config.Unmarshal(process.Config) } - if wants["state"] { + if filter.state { if state, err := h.restream.GetProcessState(id); err == nil { info.State = &api.ProcessState{} info.State.Unmarshal(state) } } - if wants["report"] { + if filter.report { if log, err := h.restream.GetProcessLog(id); err == nil { info.Report = &api.ProcessReport{} info.Report.Unmarshal(log) } } - if wants["metadata"] { + if filter.metadata { if data, err := h.restream.GetProcessMetadata(id, ""); err == nil { info.Metadata = api.NewMetadata(data) } diff --git a/restream/restream.go b/restream/restream.go index 37301ef5..20857361 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -1218,58 +1218,60 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { } func (r *restream) GetProcessIDs(idpattern, refpattern, ownerpattern, domainpattern string) []app.ProcessID { + ids := []app.ProcessID{} + + count := 0 + + var idglob glob.Glob + var refglob glob.Glob + var ownerglob glob.Glob + var domainglob glob.Glob + + if len(idpattern) != 0 { + count++ + idglob, _ = glob.Compile(idpattern) + } + + if len(refpattern) != 0 { + count++ + refglob, _ = glob.Compile(refpattern) + } + + if len(ownerpattern) != 0 { + count++ + ownerglob, _ = glob.Compile(ownerpattern) + } + + if len(domainpattern) != 0 { + count++ + domainglob, _ = glob.Compile(domainpattern) + } + r.lock.RLock() defer r.lock.RUnlock() - ids := []app.ProcessID{} - for _, t := range r.tasks { - count := 0 matches := 0 - if len(idpattern) != 0 { - count++ - match, err := glob.Match(idpattern, t.id) - if err != nil { - return nil - } - - if match { + if idglob != nil { + if match := idglob.Match(t.id); match { matches++ } } - if len(refpattern) != 0 { - count++ - match, err := glob.Match(refpattern, t.reference) - if err != nil { - return nil - } - - if match { + if refglob != nil { + if match := refglob.Match(t.reference); match { matches++ } } - if len(ownerpattern) != 0 { - count++ - match, err := glob.Match(ownerpattern, t.owner) - if err != nil { - return nil - } - - if match { + if ownerglob != nil { + if match := ownerglob.Match(t.owner); match { matches++ } } - if len(domainpattern) != 0 { - count++ - match, err := glob.Match(domainpattern, t.domain) - if err != nil { - return nil - } - - if match { + if domainglob != nil { + if match := domainglob.Match(t.domain); match { matches++ } }