Return map of nodes
This commit is contained in:
parent
bc04bb2df8
commit
34db225eb0
@ -38,7 +38,7 @@ type ProxyReader interface {
|
||||
FindNodeFromProcess(id app.ProcessID) (string, error)
|
||||
|
||||
Resources() map[string]NodeResources
|
||||
ListProcesses(ProcessListOptions) []clientapi.Process
|
||||
ListProcesses(ProcessListOptions) map[string][]clientapi.Process
|
||||
ListProxyProcesses() []Process
|
||||
ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe, error)
|
||||
|
||||
@ -463,9 +463,14 @@ func (p *proxy) FindNodeFromProcess(id app.ProcessID) (string, error) {
|
||||
return nodeid, nil
|
||||
}
|
||||
|
||||
func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process {
|
||||
processChan := make(chan clientapi.Process, 64)
|
||||
processList := []clientapi.Process{}
|
||||
type processList struct {
|
||||
nodeid string
|
||||
processes []clientapi.Process
|
||||
}
|
||||
|
||||
func (p *proxy) ListProcesses(options ProcessListOptions) map[string][]clientapi.Process {
|
||||
processChan := make(chan processList, 64)
|
||||
processMap := map[string][]clientapi.Process{}
|
||||
|
||||
wgList := sync.WaitGroup{}
|
||||
wgList.Add(1)
|
||||
@ -473,8 +478,8 @@ func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process {
|
||||
go func() {
|
||||
defer wgList.Done()
|
||||
|
||||
for process := range processChan {
|
||||
processList = append(processList, process)
|
||||
for list := range processChan {
|
||||
processMap[list.nodeid] = list.processes
|
||||
}
|
||||
}()
|
||||
|
||||
@ -484,7 +489,7 @@ func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process {
|
||||
for _, node := range p.nodes {
|
||||
wg.Add(1)
|
||||
|
||||
go func(node Node, p chan<- clientapi.Process) {
|
||||
go func(node Node, p chan<- processList) {
|
||||
defer wg.Done()
|
||||
|
||||
processes, err := node.ProcessList(options)
|
||||
@ -492,9 +497,11 @@ func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process {
|
||||
return
|
||||
}
|
||||
|
||||
for _, process := range processes {
|
||||
p <- process
|
||||
p <- processList{
|
||||
nodeid: node.About().ID,
|
||||
processes: processes,
|
||||
}
|
||||
|
||||
}(node, processChan)
|
||||
}
|
||||
p.nodesLock.RUnlock()
|
||||
@ -505,7 +512,7 @@ func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process {
|
||||
|
||||
wgList.Wait()
|
||||
|
||||
return processList
|
||||
return processMap
|
||||
}
|
||||
|
||||
func (p *proxy) AddProcess(nodeid string, config *app.Config, metadata map[string]interface{}) error {
|
||||
|
||||
@ -42,7 +42,7 @@ type ClusterNodeResources struct {
|
||||
type ClusterRaft struct {
|
||||
Address string `json:"address"`
|
||||
State string `json:"state"`
|
||||
LastContact float64 `json:"last_contact_ms"`
|
||||
LastContact float64 `json:"last_contact_ms"` // milliseconds
|
||||
NumPeers uint64 `json:"num_peers"`
|
||||
LogTerm uint64 `json:"log_term"`
|
||||
LogIndex uint64 `json:"log_index"`
|
||||
|
||||
@ -50,7 +50,7 @@ func (h *ClusterHandler) GetAllProcesses(c echo.Context) error {
|
||||
ownerpattern := util.DefaultQuery(c, "ownerpattern", "")
|
||||
domainpattern := util.DefaultQuery(c, "domainpattern", "")
|
||||
|
||||
procs := h.proxy.ListProcesses(proxy.ProcessListOptions{
|
||||
procsMap := h.proxy.ListProcesses(proxy.ProcessListOptions{
|
||||
ID: wantids,
|
||||
Filter: filter.Slice(),
|
||||
Domain: domain,
|
||||
@ -64,13 +64,15 @@ func (h *ClusterHandler) GetAllProcesses(c echo.Context) error {
|
||||
processes := []clientapi.Process{}
|
||||
pmap := map[app.ProcessID]struct{}{}
|
||||
|
||||
for _, p := range procs {
|
||||
if !h.iam.Enforce(ctxuser, domain, "process:"+p.ID, "read") {
|
||||
continue
|
||||
}
|
||||
for _, procs := range procsMap {
|
||||
for _, p := range procs {
|
||||
if !h.iam.Enforce(ctxuser, domain, "process:"+p.ID, "read") {
|
||||
continue
|
||||
}
|
||||
|
||||
processes = append(processes, p)
|
||||
pmap[app.NewProcessID(p.ID, p.Domain)] = struct{}{}
|
||||
processes = append(processes, p)
|
||||
pmap[app.NewProcessID(p.ID, p.Domain)] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
missing := []api.Process{}
|
||||
@ -317,12 +319,18 @@ func (h *ClusterHandler) GetProcess(c echo.Context) error {
|
||||
return api.Err(http.StatusForbidden, "")
|
||||
}
|
||||
|
||||
procs := h.proxy.ListProcesses(proxy.ProcessListOptions{
|
||||
procsMap := h.proxy.ListProcesses(proxy.ProcessListOptions{
|
||||
ID: []string{id},
|
||||
Filter: filter.Slice(),
|
||||
Domain: domain,
|
||||
})
|
||||
|
||||
procs := []clientapi.Process{}
|
||||
|
||||
for _, processes := range procsMap {
|
||||
procs = append(procs, processes...)
|
||||
}
|
||||
|
||||
if len(procs) == 0 {
|
||||
// Check the store in the store for an undeployed process
|
||||
p, err := h.cluster.GetProcess(app.NewProcessID(id, domain))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user