From 34db225eb0b6f489b2d3e38111ebe4383d157a1f Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 17 Jul 2023 15:39:03 +0200 Subject: [PATCH] Return map of nodes --- cluster/proxy/proxy.go | 27 +++++++++++++++++---------- http/api/cluster.go | 2 +- http/handler/api/cluster_process.go | 24 ++++++++++++++++-------- 3 files changed, 34 insertions(+), 19 deletions(-) diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index a761dc9b..17813258 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -38,7 +38,7 @@ type ProxyReader interface { FindNodeFromProcess(id app.ProcessID) (string, error) Resources() map[string]NodeResources - ListProcesses(ProcessListOptions) []clientapi.Process + ListProcesses(ProcessListOptions) map[string][]clientapi.Process ListProxyProcesses() []Process ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe, error) @@ -463,9 +463,14 @@ func (p *proxy) FindNodeFromProcess(id app.ProcessID) (string, error) { return nodeid, nil } -func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process { - processChan := make(chan clientapi.Process, 64) - processList := []clientapi.Process{} +type processList struct { + nodeid string + processes []clientapi.Process +} + +func (p *proxy) ListProcesses(options ProcessListOptions) map[string][]clientapi.Process { + processChan := make(chan processList, 64) + processMap := map[string][]clientapi.Process{} wgList := sync.WaitGroup{} wgList.Add(1) @@ -473,8 +478,8 @@ func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process { go func() { defer wgList.Done() - for process := range processChan { - processList = append(processList, process) + for list := range processChan { + processMap[list.nodeid] = list.processes } }() @@ -484,7 +489,7 @@ func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process { for _, node := range p.nodes { wg.Add(1) - go func(node Node, p chan<- clientapi.Process) { + go func(node Node, p chan<- processList) { defer wg.Done() processes, err := node.ProcessList(options) @@ -492,9 +497,11 @@ func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process { return } - for _, process := range processes { - p <- process + p <- processList{ + nodeid: node.About().ID, + processes: processes, } + }(node, processChan) } p.nodesLock.RUnlock() @@ -505,7 +512,7 @@ func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process { wgList.Wait() - return processList + return processMap } func (p *proxy) AddProcess(nodeid string, config *app.Config, metadata map[string]interface{}) error { diff --git a/http/api/cluster.go b/http/api/cluster.go index d4d64301..9f271de7 100644 --- a/http/api/cluster.go +++ b/http/api/cluster.go @@ -42,7 +42,7 @@ type ClusterNodeResources struct { type ClusterRaft struct { Address string `json:"address"` State string `json:"state"` - LastContact float64 `json:"last_contact_ms"` + LastContact float64 `json:"last_contact_ms"` // milliseconds NumPeers uint64 `json:"num_peers"` LogTerm uint64 `json:"log_term"` LogIndex uint64 `json:"log_index"` diff --git a/http/handler/api/cluster_process.go b/http/handler/api/cluster_process.go index 6ab4e14b..fc73a19a 100644 --- a/http/handler/api/cluster_process.go +++ b/http/handler/api/cluster_process.go @@ -50,7 +50,7 @@ func (h *ClusterHandler) GetAllProcesses(c echo.Context) error { ownerpattern := util.DefaultQuery(c, "ownerpattern", "") domainpattern := util.DefaultQuery(c, "domainpattern", "") - procs := h.proxy.ListProcesses(proxy.ProcessListOptions{ + procsMap := h.proxy.ListProcesses(proxy.ProcessListOptions{ ID: wantids, Filter: filter.Slice(), Domain: domain, @@ -64,13 +64,15 @@ func (h *ClusterHandler) GetAllProcesses(c echo.Context) error { processes := []clientapi.Process{} pmap := map[app.ProcessID]struct{}{} - for _, p := range procs { - if !h.iam.Enforce(ctxuser, domain, "process:"+p.ID, "read") { - continue - } + for _, procs := range procsMap { + for _, p := range procs { + if !h.iam.Enforce(ctxuser, domain, "process:"+p.ID, "read") { + continue + } - processes = append(processes, p) - pmap[app.NewProcessID(p.ID, p.Domain)] = struct{}{} + processes = append(processes, p) + pmap[app.NewProcessID(p.ID, p.Domain)] = struct{}{} + } } missing := []api.Process{} @@ -317,12 +319,18 @@ func (h *ClusterHandler) GetProcess(c echo.Context) error { return api.Err(http.StatusForbidden, "") } - procs := h.proxy.ListProcesses(proxy.ProcessListOptions{ + procsMap := h.proxy.ListProcesses(proxy.ProcessListOptions{ ID: []string{id}, Filter: filter.Slice(), Domain: domain, }) + procs := []clientapi.Process{} + + for _, processes := range procsMap { + procs = append(procs, processes...) + } + if len(procs) == 0 { // Check the store in the store for an undeployed process p, err := h.cluster.GetProcess(app.NewProcessID(id, domain))