diff --git a/cluster/node/node.go b/cluster/node/node.go index 9579f910..847b6914 100644 --- a/cluster/node/node.go +++ b/cluster/node/node.go @@ -114,10 +114,6 @@ func (n *node) start(id string) error { n.lastCoreContactErr = err } n.pingLock.Unlock() - - if err == nil { - return - } case <-ctx.Done(): return } diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index d0c2862d..c5c9ac87 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -24,15 +24,6 @@ type Node interface { IsConnected() (bool, error) Disconnect() - Config() *config.Config - - StartFiles(updates chan<- NodeFiles) error - StopFiles() - - GetURL(prefix, path string) (*url.URL, error) - GetFile(prefix, path string, offset int64) (io.ReadCloser, error) - GetFileInfo(prefix, path string) (int64, time.Time, error) - AddProcess(config *app.Config, metadata map[string]interface{}) error StartProcess(id app.ProcessID) error StopProcess(id app.ProcessID) error @@ -52,6 +43,11 @@ type NodeReader interface { Resources() NodeResources Files() NodeFiles + + GetURL(prefix, path string) (*url.URL, error) + GetFile(prefix, path string, offset int64) (io.ReadCloser, error) + GetFileInfo(prefix, path string) (int64, time.Time, error) + ProcessList(ProcessListOptions) ([]clientapi.Process, error) ProxyProcessList() ([]Process, error) } @@ -76,6 +72,7 @@ type NodeAbout struct { Name string Address string State string + Error error CreatedAt time.Time Uptime time.Duration LastContact time.Time @@ -103,6 +100,8 @@ const ( stateConnected nodeState = "connected" ) +var ErrNoPeer = errors.New("not connected to the core API: client not available") + type node struct { id string address string @@ -126,16 +125,11 @@ type node struct { config *config.Config - state nodeState - latency float64 // Seconds - stateLock sync.RWMutex - updates chan<- NodeFiles - filesList []string - lastUpdate time.Time - cancelFiles context.CancelFunc - - runningLock sync.Mutex - running bool + state nodeState + latency float64 // Seconds + stateLock sync.RWMutex + filesList []string + lastUpdate time.Time secure bool httpAddress *url.URL @@ -164,31 +158,38 @@ func NewNode(id, address string, config *config.Config) Node { ctx, cancel := context.WithCancel(context.Background()) n.disconnect = cancel - err := n.connect(ctx) + err := n.connect() if err != nil { n.peerErr = err - - go func(ctx context.Context) { - ticker := time.NewTicker(3 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - err := n.connect(ctx) - if err == nil { - n.peerErr = nil - return - } else { - n.peerErr = err - } - case <-ctx.Done(): - return - } - } - }(ctx) } + n.peerWg.Add(4) + + go func(ctx context.Context) { + // This tries to reconnect to the core API. If everything's + // fine, this is a no-op. + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + defer n.peerWg.Done() + + for { + select { + case <-ticker.C: + err := n.connect() + + n.peerLock.Lock() + n.peerErr = err + n.peerLock.Unlock() + case <-ctx.Done(): + return + } + } + }(ctx) + + go n.pingPeer(ctx, &n.peerWg) + go n.updateResources(ctx, &n.peerWg) + go n.updateFiles(ctx, &n.peerWg) + return n } @@ -196,15 +197,27 @@ func (n *node) SetEssentials(address string, config *config.Config) { n.peerLock.Lock() defer n.peerLock.Unlock() - n.address = address - n.config = config + if address != n.address { + n.address = address + n.peer = nil // force reconnet + } + + if n.config == nil && config != nil { + n.config = config + n.peer = nil // force reconnect + } + + if n.config.UpdatedAt != config.UpdatedAt { + n.config = config + n.peer = nil // force reconnect + } } -func (n *node) connect(ctx context.Context) error { +func (n *node) connect() error { n.peerLock.Lock() defer n.peerLock.Unlock() - if n.peer != nil { + if n.peer != nil && n.state != stateDisconnected { return nil } @@ -293,11 +306,6 @@ func (n *node) connect(ctx context.Context) error { n.peer = peer - n.peerWg.Add(2) - - go n.pingPeer(ctx, &n.peerWg) - go n.updateResources(ctx, &n.peerWg) - return nil } @@ -306,7 +314,11 @@ func (n *node) IsConnected() (bool, error) { defer n.peerLock.RUnlock() if n.peer == nil { - return false, fmt.Errorf("not connected: %w", n.peerErr) + return false, ErrNoPeer + } + + if n.peerErr != nil { + return false, n.peerErr } return true, nil @@ -367,7 +379,7 @@ func (n *node) updateResources(ctx context.Context, wg *sync.WaitGroup) { select { case <-ticker.C: // Metrics - metrics, err := n.peer.Metrics(clientapi.MetricsQuery{ + metrics, err := n.Metrics(clientapi.MetricsQuery{ Metrics: []clientapi.MetricsQueryMetric{ {Name: "cpu_ncpu"}, {Name: "cpu_idle"}, @@ -450,11 +462,19 @@ func (n *node) updateResources(ctx context.Context, wg *sync.WaitGroup) { } } -func (n *node) Config() *config.Config { - n.peerLock.RLock() - defer n.peerLock.RUnlock() +func (n *node) updateFiles(ctx context.Context, wg *sync.WaitGroup) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + defer wg.Done() - return n.config + for { + select { + case <-ticker.C: + n.files() + case <-ctx.Done(): + return + } + } } func (n *node) Ping() (time.Duration, error) { @@ -462,7 +482,7 @@ func (n *node) Ping() (time.Duration, error) { defer n.peerLock.RUnlock() if n.peer == nil { - return 0, fmt.Errorf("not connected") + return 0, ErrNoPeer } latency, err := n.peer.Ping() @@ -475,7 +495,7 @@ func (n *node) Metrics(query clientapi.MetricsQuery) (clientapi.MetricsResponse, defer n.peerLock.RUnlock() if n.peer == nil { - return clientapi.MetricsResponse{}, fmt.Errorf("not connected") + return clientapi.MetricsResponse{}, ErrNoPeer } return n.peer.Metrics(query) @@ -486,61 +506,12 @@ func (n *node) AboutPeer() (clientapi.About, error) { defer n.peerLock.RUnlock() if n.peer == nil { - return clientapi.About{}, fmt.Errorf("not connected") + return clientapi.About{}, ErrNoPeer } return n.peer.About(false) } -func (n *node) StartFiles(updates chan<- NodeFiles) error { - n.runningLock.Lock() - defer n.runningLock.Unlock() - - if n.running { - return nil - } - - n.running = true - n.updates = updates - - ctx, cancel := context.WithCancel(context.Background()) - n.cancelFiles = cancel - - go func(ctx context.Context) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - n.files() - - select { - case n.updates <- n.Files(): - default: - } - } - } - }(ctx) - - return nil -} - -func (n *node) StopFiles() { - n.runningLock.Lock() - defer n.runningLock.Unlock() - - if !n.running { - return - } - - n.running = false - - n.cancelFiles() -} - func (n *node) About() NodeAbout { about, err := n.AboutPeer() if err != nil { @@ -548,6 +519,7 @@ func (n *node) About() NodeAbout { ID: n.id, Address: n.address, State: stateDisconnected.String(), + Error: err, } } @@ -636,21 +608,19 @@ func (n *node) Files() NodeFiles { n.stateLock.RLock() defer n.stateLock.RUnlock() - state := NodeFiles{ + files := NodeFiles{ ID: id, LastUpdate: n.lastUpdate, } if n.state != stateDisconnected && time.Since(n.lastUpdate) <= 2*time.Second { - state.Files = make([]string, len(n.filesList)) - copy(state.Files, n.filesList) + files.Files = make([]string, len(n.filesList)) + copy(files.Files, n.filesList) } - return state + return files } -var errNoPeer = errors.New("no peer") - func (n *node) files() { errorsChan := make(chan error, 8) filesChan := make(chan string, 1024) @@ -682,7 +652,7 @@ func (n *node) files() { defer n.peerLock.RUnlock() if n.peer == nil { - e <- errNoPeer + e <- ErrNoPeer return } @@ -704,7 +674,7 @@ func (n *node) files() { defer n.peerLock.RUnlock() if n.peer == nil { - e <- errNoPeer + e <- ErrNoPeer return } @@ -729,7 +699,7 @@ func (n *node) files() { defer n.peerLock.RUnlock() if n.peer == nil { - e <- errNoPeer + e <- ErrNoPeer return } @@ -755,7 +725,7 @@ func (n *node) files() { defer n.peerLock.RUnlock() if n.peer == nil { - e <- errNoPeer + e <- ErrNoPeer return } @@ -845,7 +815,7 @@ func (n *node) GetFile(prefix, path string, offset int64) (io.ReadCloser, error) defer n.peerLock.RUnlock() if n.peer == nil { - return nil, fmt.Errorf("not connected") + return nil, ErrNoPeer } return n.peer.FilesystemGetFileOffset(prefix, path, offset) @@ -856,12 +826,12 @@ func (n *node) GetFileInfo(prefix, path string) (int64, time.Time, error) { defer n.peerLock.RUnlock() if n.peer == nil { - return 0, time.Time{}, fmt.Errorf("not connected") + return 0, time.Time{}, ErrNoPeer } info, err := n.peer.FilesystemList(prefix, path, "", "") if err != nil { - return 0, time.Time{}, fmt.Errorf("not found: %w", err) + return 0, time.Time{}, fmt.Errorf("file not found: %w", err) } if len(info) != 1 { @@ -876,7 +846,7 @@ func (n *node) ProcessList(options ProcessListOptions) ([]clientapi.Process, err defer n.peerLock.RUnlock() if n.peer == nil { - return nil, fmt.Errorf("not connected") + return nil, ErrNoPeer } return n.peer.ProcessList(client.ProcessListOptions{ @@ -976,7 +946,7 @@ func (n *node) AddProcess(config *app.Config, metadata map[string]interface{}) e defer n.peerLock.RUnlock() if n.peer == nil { - return fmt.Errorf("not connected") + return ErrNoPeer } cfg := convertConfig(config, metadata) @@ -1045,7 +1015,7 @@ func (n *node) StartProcess(id app.ProcessID) error { defer n.peerLock.RUnlock() if n.peer == nil { - return fmt.Errorf("not connected") + return ErrNoPeer } return n.peer.ProcessCommand(client.NewProcessID(id.ID, id.Domain), "start") @@ -1056,7 +1026,7 @@ func (n *node) StopProcess(id app.ProcessID) error { defer n.peerLock.RUnlock() if n.peer == nil { - return fmt.Errorf("not connected") + return ErrNoPeer } return n.peer.ProcessCommand(client.NewProcessID(id.ID, id.Domain), "stop") @@ -1067,7 +1037,7 @@ func (n *node) RestartProcess(id app.ProcessID) error { defer n.peerLock.RUnlock() if n.peer == nil { - return fmt.Errorf("not connected") + return ErrNoPeer } return n.peer.ProcessCommand(client.NewProcessID(id.ID, id.Domain), "restart") @@ -1078,7 +1048,7 @@ func (n *node) ReloadProcess(id app.ProcessID) error { defer n.peerLock.RUnlock() if n.peer == nil { - return fmt.Errorf("not connected") + return ErrNoPeer } return n.peer.ProcessCommand(client.NewProcessID(id.ID, id.Domain), "reload") @@ -1089,7 +1059,7 @@ func (n *node) DeleteProcess(id app.ProcessID) error { defer n.peerLock.RUnlock() if n.peer == nil { - return fmt.Errorf("not connected") + return ErrNoPeer } return n.peer.ProcessDelete(client.NewProcessID(id.ID, id.Domain)) @@ -1100,7 +1070,7 @@ func (n *node) UpdateProcess(id app.ProcessID, config *app.Config, metadata map[ defer n.peerLock.RUnlock() if n.peer == nil { - return fmt.Errorf("not connected") + return ErrNoPeer } cfg := convertConfig(config, metadata) @@ -1116,7 +1086,7 @@ func (n *node) ProbeProcess(id app.ProcessID) (clientapi.Probe, error) { probe := clientapi.Probe{ Log: []string{fmt.Sprintf("the node %s where the process %s resides, is not connected", n.id, id.String())}, } - return probe, fmt.Errorf("not connected") + return probe, ErrNoPeer } probe, err := n.peer.ProcessProbe(client.NewProcessID(id.ID, id.Domain)) diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index a90de881..a761dc9b 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -33,7 +33,7 @@ type Proxy interface { type ProxyReader interface { ListNodes() []NodeReader - GetNode(id string) (NodeReader, error) + GetNodeReader(id string) (NodeReader, error) FindNodeFromProcess(id app.ProcessID) (string, error) @@ -47,96 +47,6 @@ type ProxyReader interface { GetFileInfo(prefix, path string) (int64, time.Time, error) } -func NewNullProxyReader() ProxyReader { - return &proxyReader{} -} - -type proxyReader struct { - proxy *proxy -} - -func (p *proxyReader) ListNodes() []NodeReader { - if p.proxy == nil { - return nil - } - - return p.proxy.ListNodes() -} - -func (p *proxyReader) GetNode(id string) (NodeReader, error) { - if p.proxy == nil { - return nil, fmt.Errorf("no proxy provided") - } - - return p.proxy.GetNode(id) -} - -func (p *proxyReader) FindNodeFromProcess(id app.ProcessID) (string, error) { - if p.proxy == nil { - return "", fmt.Errorf("no proxy provided") - } - - return p.proxy.FindNodeFromProcess(id) -} - -func (p *proxyReader) Resources() map[string]NodeResources { - if p.proxy == nil { - return nil - } - - return p.proxy.Resources() -} - -func (p *proxyReader) ListProcesses(options ProcessListOptions) []clientapi.Process { - if p.proxy == nil { - return nil - } - - return p.proxy.ListProcesses(options) -} - -func (p *proxyReader) ListProxyProcesses() []Process { - if p.proxy == nil { - return nil - } - - return p.proxy.ListProxyProcesses() -} - -func (p *proxyReader) ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe, error) { - if p.proxy == nil { - return clientapi.Probe{ - Log: []string{fmt.Sprintf("no proxy for node %s provided", nodeid)}, - }, fmt.Errorf("no proxy provided") - } - - return p.proxy.ProbeProcess(nodeid, id) -} - -func (p *proxyReader) GetURL(prefix, path string) (*url.URL, error) { - if p.proxy == nil { - return nil, fmt.Errorf("no proxy provided") - } - - return p.proxy.GetURL(prefix, path) -} - -func (p *proxyReader) GetFile(prefix, path string, offset int64) (io.ReadCloser, error) { - if p.proxy == nil { - return nil, fmt.Errorf("no proxy provided") - } - - return p.proxy.GetFile(prefix, path, offset) -} - -func (p *proxyReader) GetFileInfo(prefix, path string) (int64, time.Time, error) { - if p.proxy == nil { - return 0, time.Time{}, fmt.Errorf("no proxy provided") - } - - return p.proxy.GetFileInfo(prefix, path) -} - type ProxyConfig struct { ID string // ID of the node @@ -146,10 +56,13 @@ type ProxyConfig struct { type proxy struct { id string - nodes map[string]Node // List of known nodes - idfiles map[string][]string // Map from nodeid to list of files - idupdate map[string]time.Time // Map from nodeid to time of last update - fileid map[string]string // Map from file name to nodeid + nodes map[string]Node // List of known nodes + nodesLock sync.RWMutex + + idfiles map[string][]string // Map from nodeid to list of files + idupdate map[string]time.Time // Map from nodeid to time of last update + fileid map[string]string // Map from file name to nodeid + filesLock sync.RWMutex updates chan NodeFiles @@ -196,6 +109,24 @@ func (p *proxy) Start() { ctx, cancel := context.WithCancel(context.Background()) p.cancel = cancel + go func(ctx context.Context) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + p.nodesLock.RLock() + for _, node := range p.nodes { + p.updates <- node.Files() + } + p.nodesLock.RUnlock() + } + } + }(ctx) + go func(ctx context.Context) { for { select { @@ -211,7 +142,7 @@ func (p *proxy) Start() { continue } - p.lock.Lock() + p.filesLock.Lock() // Cleanup files := p.idfiles[update.ID] @@ -228,7 +159,7 @@ func (p *proxy) Start() { p.idfiles[update.ID] = update.Files p.idupdate[update.ID] = update.LastUpdate - p.lock.Unlock() + p.filesLock.Unlock() } } }(ctx) @@ -249,24 +180,18 @@ func (p *proxy) Stop() { p.cancel() p.cancel = nil - for _, node := range p.nodes { - node.StopFiles() - } - p.nodes = map[string]Node{} } func (p *proxy) Reader() ProxyReader { - return &proxyReader{ - proxy: p, - } + return p } func (p *proxy) Resources() map[string]NodeResources { resources := map[string]NodeResources{} - p.lock.RLock() - defer p.lock.RUnlock() + p.nodesLock.RLock() + defer p.nodesLock.RUnlock() for id, node := range p.nodes { resources[id] = node.Resources() @@ -282,19 +207,16 @@ func (p *proxy) AddNode(id string, node Node) (string, error) { // return "", fmt.Errorf("the provided (%s) and retrieved (%s) ID's don't match", id, about.ID) //} - p.lock.Lock() - defer p.lock.Unlock() + p.nodesLock.Lock() + defer p.nodesLock.Unlock() if n, ok := p.nodes[id]; ok { - n.StopFiles() - + n.Disconnect() delete(p.nodes, id) } p.nodes[id] = node - node.StartFiles(p.updates) - p.logger.Info().WithFields(log.Fields{ "address": about.Address, "name": about.Name, @@ -305,15 +227,15 @@ func (p *proxy) AddNode(id string, node Node) (string, error) { } func (p *proxy) RemoveNode(id string) error { - p.lock.Lock() - defer p.lock.Unlock() + p.nodesLock.Lock() + defer p.nodesLock.Unlock() node, ok := p.nodes[id] if !ok { return ErrNodeNotFound } - node.StopFiles() + node.Disconnect() delete(p.nodes, id) @@ -327,8 +249,8 @@ func (p *proxy) RemoveNode(id string) error { func (p *proxy) ListNodes() []NodeReader { list := []NodeReader{} - p.lock.RLock() - defer p.lock.RUnlock() + p.nodesLock.RLock() + defer p.nodesLock.RUnlock() for _, node := range p.nodes { list = append(list, node) @@ -337,9 +259,9 @@ func (p *proxy) ListNodes() []NodeReader { return list } -func (p *proxy) GetNode(id string) (NodeReader, error) { - p.lock.RLock() - defer p.lock.RUnlock() +func (p *proxy) GetNode(id string) (Node, error) { + p.nodesLock.RLock() + defer p.nodesLock.RUnlock() node, ok := p.nodes[id] if !ok { @@ -349,36 +271,20 @@ func (p *proxy) GetNode(id string) (NodeReader, error) { return node, nil } -func (p *proxy) GetURL(prefix, path string) (*url.URL, error) { - p.lock.RLock() - defer p.lock.RUnlock() +func (p *proxy) GetNodeReader(id string) (NodeReader, error) { + return p.GetNode(id) +} +func (p *proxy) GetURL(prefix, path string) (*url.URL, error) { logger := p.logger.WithFields(log.Fields{ "path": path, "prefix": prefix, }) - id, ok := p.fileid[prefix+":"+path] - if !ok { - logger.Debug().Log("Not found") - return nil, fmt.Errorf("file not found") - } - - ts, ok := p.idupdate[id] - if !ok { - logger.Debug().Log("No age information found") - return nil, fmt.Errorf("file not found") - } - - if time.Since(ts) > 2*time.Second { - logger.Debug().Log("File too old") - return nil, fmt.Errorf("file not found") - } - - node, ok := p.nodes[id] - if !ok { - logger.Debug().Log("Unknown node") - return nil, fmt.Errorf("file not found") + node, err := p.getNodeForFile(prefix, path) + if err != nil { + logger.Debug().WithError(err).Log("Unknown node") + return nil, fmt.Errorf("file not found: %w", err) } url, err := node.GetURL(prefix, path) @@ -438,30 +344,34 @@ func (p *proxy) GetFileInfo(prefix, path string) (int64, time.Time, error) { return size, lastModified, nil } -func (p *proxy) getNodeForFile(prefix, path string) (Node, error) { - p.lock.RLock() - defer p.lock.RUnlock() +func (p *proxy) getNodeIDForFile(prefix, path string) (string, error) { + p.filesLock.RLock() + defer p.filesLock.RUnlock() id, ok := p.fileid[prefix+":"+path] if !ok { - return nil, fmt.Errorf("file not found") + return "", fmt.Errorf("file not found") } ts, ok := p.idupdate[id] if !ok { - return nil, fmt.Errorf("no age information found") + return "", fmt.Errorf("no age information found") } if time.Since(ts) > 2*time.Second { - return nil, fmt.Errorf("file too old") + return "", fmt.Errorf("file too old") } - node, ok := p.nodes[id] - if !ok { - return nil, fmt.Errorf("unknown node") + return id, nil +} + +func (p *proxy) getNodeForFile(prefix, path string) (Node, error) { + id, err := p.getNodeIDForFile(prefix, path) + if err != nil { + return nil, err } - return node, nil + return p.GetNode(id) } type Process struct { @@ -504,7 +414,7 @@ func (p *proxy) ListProxyProcesses() []Process { wg := sync.WaitGroup{} - p.lock.RLock() + p.nodesLock.RLock() for _, node := range p.nodes { wg.Add(1) @@ -521,7 +431,7 @@ func (p *proxy) ListProxyProcesses() []Process { } }(node, processChan) } - p.lock.RUnlock() + p.nodesLock.RUnlock() wg.Wait() @@ -570,7 +480,7 @@ func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process { wg := sync.WaitGroup{} - p.lock.RLock() + p.nodesLock.RLock() for _, node := range p.nodes { wg.Add(1) @@ -587,7 +497,7 @@ func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process { } }(node, processChan) } - p.lock.RUnlock() + p.nodesLock.RUnlock() wg.Wait() @@ -599,62 +509,38 @@ func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process { } func (p *proxy) AddProcess(nodeid string, config *app.Config, metadata map[string]interface{}) error { - p.lock.RLock() - defer p.lock.RUnlock() - - node, ok := p.nodes[nodeid] - if !ok { - return fmt.Errorf("node not found") - } - - err := node.AddProcess(config, metadata) + node, err := p.GetNode(nodeid) if err != nil { - return err + return fmt.Errorf("node not found: %w", err) } - return nil + return node.AddProcess(config, metadata) } func (p *proxy) DeleteProcess(nodeid string, id app.ProcessID) error { - p.lock.RLock() - defer p.lock.RUnlock() - - node, ok := p.nodes[nodeid] - if !ok { - return fmt.Errorf("node not found") - } - - err := node.DeleteProcess(id) + node, err := p.GetNode(nodeid) if err != nil { - return err + return fmt.Errorf("node not found: %w", err) } - return nil + return node.DeleteProcess(id) } func (p *proxy) UpdateProcess(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]interface{}) error { - p.lock.RLock() - defer p.lock.RUnlock() - - node, ok := p.nodes[nodeid] - if !ok { - return fmt.Errorf("node not found") + node, err := p.GetNode(nodeid) + if err != nil { + return fmt.Errorf("node not found: %w", err) } return node.UpdateProcess(id, config, metadata) } func (p *proxy) CommandProcess(nodeid string, id app.ProcessID, command string) error { - p.lock.RLock() - defer p.lock.RUnlock() - - node, ok := p.nodes[nodeid] - if !ok { - return fmt.Errorf("node not found") + node, err := p.GetNode(nodeid) + if err != nil { + return fmt.Errorf("node not found: %w", err) } - var err error = nil - switch command { case "start": err = node.StartProcess(id) @@ -672,15 +558,12 @@ func (p *proxy) CommandProcess(nodeid string, id app.ProcessID, command string) } func (p *proxy) ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe, error) { - p.lock.RLock() - defer p.lock.RUnlock() - - node, ok := p.nodes[nodeid] - if !ok { + node, err := p.GetNode(nodeid) + if err != nil { probe := clientapi.Probe{ Log: []string{fmt.Sprintf("the node %s where the process %s should reside on, doesn't exist", nodeid, id.String())}, } - return probe, fmt.Errorf("node not found") + return probe, fmt.Errorf("node not found: %w", err) } return node.ProbeProcess(id) diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index bea1520a..a5f27e1e 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -271,7 +271,7 @@ func (h *ClusterHandler) GetNodes(c echo.Context) error { func (h *ClusterHandler) GetNode(c echo.Context) error { id := util.PathParam(c, "id") - peer, err := h.proxy.GetNode(id) + peer, err := h.proxy.GetNodeReader(id) if err != nil { return api.Err(http.StatusNotFound, "", "node not found: %s", err.Error()) } @@ -307,7 +307,7 @@ func (h *ClusterHandler) GetNode(c echo.Context) error { func (h *ClusterHandler) GetNodeVersion(c echo.Context) error { id := util.PathParam(c, "id") - peer, err := h.proxy.GetNode(id) + peer, err := h.proxy.GetNodeReader(id) if err != nil { return api.Err(http.StatusNotFound, "", "node not found: %s", err.Error()) } @@ -340,7 +340,7 @@ func (h *ClusterHandler) GetNodeVersion(c echo.Context) error { func (h *ClusterHandler) GetNodeFiles(c echo.Context) error { id := util.PathParam(c, "id") - peer, err := h.proxy.GetNode(id) + peer, err := h.proxy.GetNodeReader(id) if err != nil { return api.Err(http.StatusNotFound, "", "node not found: %s", err.Error()) } @@ -403,7 +403,7 @@ func (h *ClusterHandler) ListNodeProcesses(c echo.Context) error { ownerpattern := util.DefaultQuery(c, "ownerpattern", "") domainpattern := util.DefaultQuery(c, "domainpattern", "") - peer, err := h.proxy.GetNode(id) + peer, err := h.proxy.GetNodeReader(id) if err != nil { return api.Err(http.StatusNotFound, "", "node not found: %s", err.Error()) } diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index b5cf16ae..4202a9c3 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -126,10 +126,6 @@ func New(config Config) (Server, error) { s.collector = session.NewNullCollector() } - if s.proxy == nil { - s.proxy = proxy.NewNullProxyReader() - } - s.server = &rtmp.Server{ Addr: config.Addr, HandlePlay: s.handlePlay, @@ -276,7 +272,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) { ch := s.channels[playpath] s.lock.RUnlock() - if ch == nil { + if ch == nil && s.proxy != nil { // Check in the cluster for that stream url, err := s.proxy.GetURL("rtmp", playpath) if err != nil { diff --git a/srt/srt.go b/srt/srt.go index d39ba1d6..b1516c88 100644 --- a/srt/srt.go +++ b/srt/srt.go @@ -102,10 +102,6 @@ func New(config Config) (Server, error) { s.collector = session.NewNullCollector() } - if s.proxy == nil { - s.proxy = proxy.NewNullProxyReader() - } - if s.logger == nil { s.logger = log.New("") } @@ -408,7 +404,7 @@ func (s *server) handleSubscribe(conn srt.Conn) { ch := s.channels[si.Resource] s.lock.RUnlock() - if ch == nil { + if ch == nil && s.proxy != nil { // Check in the cluster for the stream and proxy it srturl, err := s.proxy.GetURL("srt", si.Resource) if err != nil {