Cleanup of cluster proxy and proxy node

This commit is contained in:
Ingo Oppermann 2023-07-07 22:51:53 +02:00
parent 4aec1d9817
commit ba9227dc96
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
6 changed files with 188 additions and 347 deletions

View File

@ -114,10 +114,6 @@ func (n *node) start(id string) error {
n.lastCoreContactErr = err
}
n.pingLock.Unlock()
if err == nil {
return
}
case <-ctx.Done():
return
}

View File

@ -24,15 +24,6 @@ type Node interface {
IsConnected() (bool, error)
Disconnect()
Config() *config.Config
StartFiles(updates chan<- NodeFiles) error
StopFiles()
GetURL(prefix, path string) (*url.URL, error)
GetFile(prefix, path string, offset int64) (io.ReadCloser, error)
GetFileInfo(prefix, path string) (int64, time.Time, error)
AddProcess(config *app.Config, metadata map[string]interface{}) error
StartProcess(id app.ProcessID) error
StopProcess(id app.ProcessID) error
@ -52,6 +43,11 @@ type NodeReader interface {
Resources() NodeResources
Files() NodeFiles
GetURL(prefix, path string) (*url.URL, error)
GetFile(prefix, path string, offset int64) (io.ReadCloser, error)
GetFileInfo(prefix, path string) (int64, time.Time, error)
ProcessList(ProcessListOptions) ([]clientapi.Process, error)
ProxyProcessList() ([]Process, error)
}
@ -76,6 +72,7 @@ type NodeAbout struct {
Name string
Address string
State string
Error error
CreatedAt time.Time
Uptime time.Duration
LastContact time.Time
@ -103,6 +100,8 @@ const (
stateConnected nodeState = "connected"
)
var ErrNoPeer = errors.New("not connected to the core API: client not available")
type node struct {
id string
address string
@ -126,16 +125,11 @@ type node struct {
config *config.Config
state nodeState
latency float64 // Seconds
stateLock sync.RWMutex
updates chan<- NodeFiles
filesList []string
lastUpdate time.Time
cancelFiles context.CancelFunc
runningLock sync.Mutex
running bool
state nodeState
latency float64 // Seconds
stateLock sync.RWMutex
filesList []string
lastUpdate time.Time
secure bool
httpAddress *url.URL
@ -164,31 +158,38 @@ func NewNode(id, address string, config *config.Config) Node {
ctx, cancel := context.WithCancel(context.Background())
n.disconnect = cancel
err := n.connect(ctx)
err := n.connect()
if err != nil {
n.peerErr = err
go func(ctx context.Context) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := n.connect(ctx)
if err == nil {
n.peerErr = nil
return
} else {
n.peerErr = err
}
case <-ctx.Done():
return
}
}
}(ctx)
}
n.peerWg.Add(4)
go func(ctx context.Context) {
// This tries to reconnect to the core API. If everything's
// fine, this is a no-op.
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
defer n.peerWg.Done()
for {
select {
case <-ticker.C:
err := n.connect()
n.peerLock.Lock()
n.peerErr = err
n.peerLock.Unlock()
case <-ctx.Done():
return
}
}
}(ctx)
go n.pingPeer(ctx, &n.peerWg)
go n.updateResources(ctx, &n.peerWg)
go n.updateFiles(ctx, &n.peerWg)
return n
}
@ -196,15 +197,27 @@ func (n *node) SetEssentials(address string, config *config.Config) {
n.peerLock.Lock()
defer n.peerLock.Unlock()
n.address = address
n.config = config
if address != n.address {
n.address = address
n.peer = nil // force reconnet
}
if n.config == nil && config != nil {
n.config = config
n.peer = nil // force reconnect
}
if n.config.UpdatedAt != config.UpdatedAt {
n.config = config
n.peer = nil // force reconnect
}
}
func (n *node) connect(ctx context.Context) error {
func (n *node) connect() error {
n.peerLock.Lock()
defer n.peerLock.Unlock()
if n.peer != nil {
if n.peer != nil && n.state != stateDisconnected {
return nil
}
@ -293,11 +306,6 @@ func (n *node) connect(ctx context.Context) error {
n.peer = peer
n.peerWg.Add(2)
go n.pingPeer(ctx, &n.peerWg)
go n.updateResources(ctx, &n.peerWg)
return nil
}
@ -306,7 +314,11 @@ func (n *node) IsConnected() (bool, error) {
defer n.peerLock.RUnlock()
if n.peer == nil {
return false, fmt.Errorf("not connected: %w", n.peerErr)
return false, ErrNoPeer
}
if n.peerErr != nil {
return false, n.peerErr
}
return true, nil
@ -367,7 +379,7 @@ func (n *node) updateResources(ctx context.Context, wg *sync.WaitGroup) {
select {
case <-ticker.C:
// Metrics
metrics, err := n.peer.Metrics(clientapi.MetricsQuery{
metrics, err := n.Metrics(clientapi.MetricsQuery{
Metrics: []clientapi.MetricsQueryMetric{
{Name: "cpu_ncpu"},
{Name: "cpu_idle"},
@ -450,11 +462,19 @@ func (n *node) updateResources(ctx context.Context, wg *sync.WaitGroup) {
}
}
func (n *node) Config() *config.Config {
n.peerLock.RLock()
defer n.peerLock.RUnlock()
func (n *node) updateFiles(ctx context.Context, wg *sync.WaitGroup) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
defer wg.Done()
return n.config
for {
select {
case <-ticker.C:
n.files()
case <-ctx.Done():
return
}
}
}
func (n *node) Ping() (time.Duration, error) {
@ -462,7 +482,7 @@ func (n *node) Ping() (time.Duration, error) {
defer n.peerLock.RUnlock()
if n.peer == nil {
return 0, fmt.Errorf("not connected")
return 0, ErrNoPeer
}
latency, err := n.peer.Ping()
@ -475,7 +495,7 @@ func (n *node) Metrics(query clientapi.MetricsQuery) (clientapi.MetricsResponse,
defer n.peerLock.RUnlock()
if n.peer == nil {
return clientapi.MetricsResponse{}, fmt.Errorf("not connected")
return clientapi.MetricsResponse{}, ErrNoPeer
}
return n.peer.Metrics(query)
@ -486,61 +506,12 @@ func (n *node) AboutPeer() (clientapi.About, error) {
defer n.peerLock.RUnlock()
if n.peer == nil {
return clientapi.About{}, fmt.Errorf("not connected")
return clientapi.About{}, ErrNoPeer
}
return n.peer.About(false)
}
func (n *node) StartFiles(updates chan<- NodeFiles) error {
n.runningLock.Lock()
defer n.runningLock.Unlock()
if n.running {
return nil
}
n.running = true
n.updates = updates
ctx, cancel := context.WithCancel(context.Background())
n.cancelFiles = cancel
go func(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
n.files()
select {
case n.updates <- n.Files():
default:
}
}
}
}(ctx)
return nil
}
func (n *node) StopFiles() {
n.runningLock.Lock()
defer n.runningLock.Unlock()
if !n.running {
return
}
n.running = false
n.cancelFiles()
}
func (n *node) About() NodeAbout {
about, err := n.AboutPeer()
if err != nil {
@ -548,6 +519,7 @@ func (n *node) About() NodeAbout {
ID: n.id,
Address: n.address,
State: stateDisconnected.String(),
Error: err,
}
}
@ -636,21 +608,19 @@ func (n *node) Files() NodeFiles {
n.stateLock.RLock()
defer n.stateLock.RUnlock()
state := NodeFiles{
files := NodeFiles{
ID: id,
LastUpdate: n.lastUpdate,
}
if n.state != stateDisconnected && time.Since(n.lastUpdate) <= 2*time.Second {
state.Files = make([]string, len(n.filesList))
copy(state.Files, n.filesList)
files.Files = make([]string, len(n.filesList))
copy(files.Files, n.filesList)
}
return state
return files
}
var errNoPeer = errors.New("no peer")
func (n *node) files() {
errorsChan := make(chan error, 8)
filesChan := make(chan string, 1024)
@ -682,7 +652,7 @@ func (n *node) files() {
defer n.peerLock.RUnlock()
if n.peer == nil {
e <- errNoPeer
e <- ErrNoPeer
return
}
@ -704,7 +674,7 @@ func (n *node) files() {
defer n.peerLock.RUnlock()
if n.peer == nil {
e <- errNoPeer
e <- ErrNoPeer
return
}
@ -729,7 +699,7 @@ func (n *node) files() {
defer n.peerLock.RUnlock()
if n.peer == nil {
e <- errNoPeer
e <- ErrNoPeer
return
}
@ -755,7 +725,7 @@ func (n *node) files() {
defer n.peerLock.RUnlock()
if n.peer == nil {
e <- errNoPeer
e <- ErrNoPeer
return
}
@ -845,7 +815,7 @@ func (n *node) GetFile(prefix, path string, offset int64) (io.ReadCloser, error)
defer n.peerLock.RUnlock()
if n.peer == nil {
return nil, fmt.Errorf("not connected")
return nil, ErrNoPeer
}
return n.peer.FilesystemGetFileOffset(prefix, path, offset)
@ -856,12 +826,12 @@ func (n *node) GetFileInfo(prefix, path string) (int64, time.Time, error) {
defer n.peerLock.RUnlock()
if n.peer == nil {
return 0, time.Time{}, fmt.Errorf("not connected")
return 0, time.Time{}, ErrNoPeer
}
info, err := n.peer.FilesystemList(prefix, path, "", "")
if err != nil {
return 0, time.Time{}, fmt.Errorf("not found: %w", err)
return 0, time.Time{}, fmt.Errorf("file not found: %w", err)
}
if len(info) != 1 {
@ -876,7 +846,7 @@ func (n *node) ProcessList(options ProcessListOptions) ([]clientapi.Process, err
defer n.peerLock.RUnlock()
if n.peer == nil {
return nil, fmt.Errorf("not connected")
return nil, ErrNoPeer
}
return n.peer.ProcessList(client.ProcessListOptions{
@ -976,7 +946,7 @@ func (n *node) AddProcess(config *app.Config, metadata map[string]interface{}) e
defer n.peerLock.RUnlock()
if n.peer == nil {
return fmt.Errorf("not connected")
return ErrNoPeer
}
cfg := convertConfig(config, metadata)
@ -1045,7 +1015,7 @@ func (n *node) StartProcess(id app.ProcessID) error {
defer n.peerLock.RUnlock()
if n.peer == nil {
return fmt.Errorf("not connected")
return ErrNoPeer
}
return n.peer.ProcessCommand(client.NewProcessID(id.ID, id.Domain), "start")
@ -1056,7 +1026,7 @@ func (n *node) StopProcess(id app.ProcessID) error {
defer n.peerLock.RUnlock()
if n.peer == nil {
return fmt.Errorf("not connected")
return ErrNoPeer
}
return n.peer.ProcessCommand(client.NewProcessID(id.ID, id.Domain), "stop")
@ -1067,7 +1037,7 @@ func (n *node) RestartProcess(id app.ProcessID) error {
defer n.peerLock.RUnlock()
if n.peer == nil {
return fmt.Errorf("not connected")
return ErrNoPeer
}
return n.peer.ProcessCommand(client.NewProcessID(id.ID, id.Domain), "restart")
@ -1078,7 +1048,7 @@ func (n *node) ReloadProcess(id app.ProcessID) error {
defer n.peerLock.RUnlock()
if n.peer == nil {
return fmt.Errorf("not connected")
return ErrNoPeer
}
return n.peer.ProcessCommand(client.NewProcessID(id.ID, id.Domain), "reload")
@ -1089,7 +1059,7 @@ func (n *node) DeleteProcess(id app.ProcessID) error {
defer n.peerLock.RUnlock()
if n.peer == nil {
return fmt.Errorf("not connected")
return ErrNoPeer
}
return n.peer.ProcessDelete(client.NewProcessID(id.ID, id.Domain))
@ -1100,7 +1070,7 @@ func (n *node) UpdateProcess(id app.ProcessID, config *app.Config, metadata map[
defer n.peerLock.RUnlock()
if n.peer == nil {
return fmt.Errorf("not connected")
return ErrNoPeer
}
cfg := convertConfig(config, metadata)
@ -1116,7 +1086,7 @@ func (n *node) ProbeProcess(id app.ProcessID) (clientapi.Probe, error) {
probe := clientapi.Probe{
Log: []string{fmt.Sprintf("the node %s where the process %s resides, is not connected", n.id, id.String())},
}
return probe, fmt.Errorf("not connected")
return probe, ErrNoPeer
}
probe, err := n.peer.ProcessProbe(client.NewProcessID(id.ID, id.Domain))

View File

@ -33,7 +33,7 @@ type Proxy interface {
type ProxyReader interface {
ListNodes() []NodeReader
GetNode(id string) (NodeReader, error)
GetNodeReader(id string) (NodeReader, error)
FindNodeFromProcess(id app.ProcessID) (string, error)
@ -47,96 +47,6 @@ type ProxyReader interface {
GetFileInfo(prefix, path string) (int64, time.Time, error)
}
func NewNullProxyReader() ProxyReader {
return &proxyReader{}
}
type proxyReader struct {
proxy *proxy
}
func (p *proxyReader) ListNodes() []NodeReader {
if p.proxy == nil {
return nil
}
return p.proxy.ListNodes()
}
func (p *proxyReader) GetNode(id string) (NodeReader, error) {
if p.proxy == nil {
return nil, fmt.Errorf("no proxy provided")
}
return p.proxy.GetNode(id)
}
func (p *proxyReader) FindNodeFromProcess(id app.ProcessID) (string, error) {
if p.proxy == nil {
return "", fmt.Errorf("no proxy provided")
}
return p.proxy.FindNodeFromProcess(id)
}
func (p *proxyReader) Resources() map[string]NodeResources {
if p.proxy == nil {
return nil
}
return p.proxy.Resources()
}
func (p *proxyReader) ListProcesses(options ProcessListOptions) []clientapi.Process {
if p.proxy == nil {
return nil
}
return p.proxy.ListProcesses(options)
}
func (p *proxyReader) ListProxyProcesses() []Process {
if p.proxy == nil {
return nil
}
return p.proxy.ListProxyProcesses()
}
func (p *proxyReader) ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe, error) {
if p.proxy == nil {
return clientapi.Probe{
Log: []string{fmt.Sprintf("no proxy for node %s provided", nodeid)},
}, fmt.Errorf("no proxy provided")
}
return p.proxy.ProbeProcess(nodeid, id)
}
func (p *proxyReader) GetURL(prefix, path string) (*url.URL, error) {
if p.proxy == nil {
return nil, fmt.Errorf("no proxy provided")
}
return p.proxy.GetURL(prefix, path)
}
func (p *proxyReader) GetFile(prefix, path string, offset int64) (io.ReadCloser, error) {
if p.proxy == nil {
return nil, fmt.Errorf("no proxy provided")
}
return p.proxy.GetFile(prefix, path, offset)
}
func (p *proxyReader) GetFileInfo(prefix, path string) (int64, time.Time, error) {
if p.proxy == nil {
return 0, time.Time{}, fmt.Errorf("no proxy provided")
}
return p.proxy.GetFileInfo(prefix, path)
}
type ProxyConfig struct {
ID string // ID of the node
@ -146,10 +56,13 @@ type ProxyConfig struct {
type proxy struct {
id string
nodes map[string]Node // List of known nodes
idfiles map[string][]string // Map from nodeid to list of files
idupdate map[string]time.Time // Map from nodeid to time of last update
fileid map[string]string // Map from file name to nodeid
nodes map[string]Node // List of known nodes
nodesLock sync.RWMutex
idfiles map[string][]string // Map from nodeid to list of files
idupdate map[string]time.Time // Map from nodeid to time of last update
fileid map[string]string // Map from file name to nodeid
filesLock sync.RWMutex
updates chan NodeFiles
@ -196,6 +109,24 @@ func (p *proxy) Start() {
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel
go func(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
p.nodesLock.RLock()
for _, node := range p.nodes {
p.updates <- node.Files()
}
p.nodesLock.RUnlock()
}
}
}(ctx)
go func(ctx context.Context) {
for {
select {
@ -211,7 +142,7 @@ func (p *proxy) Start() {
continue
}
p.lock.Lock()
p.filesLock.Lock()
// Cleanup
files := p.idfiles[update.ID]
@ -228,7 +159,7 @@ func (p *proxy) Start() {
p.idfiles[update.ID] = update.Files
p.idupdate[update.ID] = update.LastUpdate
p.lock.Unlock()
p.filesLock.Unlock()
}
}
}(ctx)
@ -249,24 +180,18 @@ func (p *proxy) Stop() {
p.cancel()
p.cancel = nil
for _, node := range p.nodes {
node.StopFiles()
}
p.nodes = map[string]Node{}
}
func (p *proxy) Reader() ProxyReader {
return &proxyReader{
proxy: p,
}
return p
}
func (p *proxy) Resources() map[string]NodeResources {
resources := map[string]NodeResources{}
p.lock.RLock()
defer p.lock.RUnlock()
p.nodesLock.RLock()
defer p.nodesLock.RUnlock()
for id, node := range p.nodes {
resources[id] = node.Resources()
@ -282,19 +207,16 @@ func (p *proxy) AddNode(id string, node Node) (string, error) {
// return "", fmt.Errorf("the provided (%s) and retrieved (%s) ID's don't match", id, about.ID)
//}
p.lock.Lock()
defer p.lock.Unlock()
p.nodesLock.Lock()
defer p.nodesLock.Unlock()
if n, ok := p.nodes[id]; ok {
n.StopFiles()
n.Disconnect()
delete(p.nodes, id)
}
p.nodes[id] = node
node.StartFiles(p.updates)
p.logger.Info().WithFields(log.Fields{
"address": about.Address,
"name": about.Name,
@ -305,15 +227,15 @@ func (p *proxy) AddNode(id string, node Node) (string, error) {
}
func (p *proxy) RemoveNode(id string) error {
p.lock.Lock()
defer p.lock.Unlock()
p.nodesLock.Lock()
defer p.nodesLock.Unlock()
node, ok := p.nodes[id]
if !ok {
return ErrNodeNotFound
}
node.StopFiles()
node.Disconnect()
delete(p.nodes, id)
@ -327,8 +249,8 @@ func (p *proxy) RemoveNode(id string) error {
func (p *proxy) ListNodes() []NodeReader {
list := []NodeReader{}
p.lock.RLock()
defer p.lock.RUnlock()
p.nodesLock.RLock()
defer p.nodesLock.RUnlock()
for _, node := range p.nodes {
list = append(list, node)
@ -337,9 +259,9 @@ func (p *proxy) ListNodes() []NodeReader {
return list
}
func (p *proxy) GetNode(id string) (NodeReader, error) {
p.lock.RLock()
defer p.lock.RUnlock()
func (p *proxy) GetNode(id string) (Node, error) {
p.nodesLock.RLock()
defer p.nodesLock.RUnlock()
node, ok := p.nodes[id]
if !ok {
@ -349,36 +271,20 @@ func (p *proxy) GetNode(id string) (NodeReader, error) {
return node, nil
}
func (p *proxy) GetURL(prefix, path string) (*url.URL, error) {
p.lock.RLock()
defer p.lock.RUnlock()
func (p *proxy) GetNodeReader(id string) (NodeReader, error) {
return p.GetNode(id)
}
func (p *proxy) GetURL(prefix, path string) (*url.URL, error) {
logger := p.logger.WithFields(log.Fields{
"path": path,
"prefix": prefix,
})
id, ok := p.fileid[prefix+":"+path]
if !ok {
logger.Debug().Log("Not found")
return nil, fmt.Errorf("file not found")
}
ts, ok := p.idupdate[id]
if !ok {
logger.Debug().Log("No age information found")
return nil, fmt.Errorf("file not found")
}
if time.Since(ts) > 2*time.Second {
logger.Debug().Log("File too old")
return nil, fmt.Errorf("file not found")
}
node, ok := p.nodes[id]
if !ok {
logger.Debug().Log("Unknown node")
return nil, fmt.Errorf("file not found")
node, err := p.getNodeForFile(prefix, path)
if err != nil {
logger.Debug().WithError(err).Log("Unknown node")
return nil, fmt.Errorf("file not found: %w", err)
}
url, err := node.GetURL(prefix, path)
@ -438,30 +344,34 @@ func (p *proxy) GetFileInfo(prefix, path string) (int64, time.Time, error) {
return size, lastModified, nil
}
func (p *proxy) getNodeForFile(prefix, path string) (Node, error) {
p.lock.RLock()
defer p.lock.RUnlock()
func (p *proxy) getNodeIDForFile(prefix, path string) (string, error) {
p.filesLock.RLock()
defer p.filesLock.RUnlock()
id, ok := p.fileid[prefix+":"+path]
if !ok {
return nil, fmt.Errorf("file not found")
return "", fmt.Errorf("file not found")
}
ts, ok := p.idupdate[id]
if !ok {
return nil, fmt.Errorf("no age information found")
return "", fmt.Errorf("no age information found")
}
if time.Since(ts) > 2*time.Second {
return nil, fmt.Errorf("file too old")
return "", fmt.Errorf("file too old")
}
node, ok := p.nodes[id]
if !ok {
return nil, fmt.Errorf("unknown node")
return id, nil
}
func (p *proxy) getNodeForFile(prefix, path string) (Node, error) {
id, err := p.getNodeIDForFile(prefix, path)
if err != nil {
return nil, err
}
return node, nil
return p.GetNode(id)
}
type Process struct {
@ -504,7 +414,7 @@ func (p *proxy) ListProxyProcesses() []Process {
wg := sync.WaitGroup{}
p.lock.RLock()
p.nodesLock.RLock()
for _, node := range p.nodes {
wg.Add(1)
@ -521,7 +431,7 @@ func (p *proxy) ListProxyProcesses() []Process {
}
}(node, processChan)
}
p.lock.RUnlock()
p.nodesLock.RUnlock()
wg.Wait()
@ -570,7 +480,7 @@ func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process {
wg := sync.WaitGroup{}
p.lock.RLock()
p.nodesLock.RLock()
for _, node := range p.nodes {
wg.Add(1)
@ -587,7 +497,7 @@ func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process {
}
}(node, processChan)
}
p.lock.RUnlock()
p.nodesLock.RUnlock()
wg.Wait()
@ -599,62 +509,38 @@ func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process {
}
func (p *proxy) AddProcess(nodeid string, config *app.Config, metadata map[string]interface{}) error {
p.lock.RLock()
defer p.lock.RUnlock()
node, ok := p.nodes[nodeid]
if !ok {
return fmt.Errorf("node not found")
}
err := node.AddProcess(config, metadata)
node, err := p.GetNode(nodeid)
if err != nil {
return err
return fmt.Errorf("node not found: %w", err)
}
return nil
return node.AddProcess(config, metadata)
}
func (p *proxy) DeleteProcess(nodeid string, id app.ProcessID) error {
p.lock.RLock()
defer p.lock.RUnlock()
node, ok := p.nodes[nodeid]
if !ok {
return fmt.Errorf("node not found")
}
err := node.DeleteProcess(id)
node, err := p.GetNode(nodeid)
if err != nil {
return err
return fmt.Errorf("node not found: %w", err)
}
return nil
return node.DeleteProcess(id)
}
func (p *proxy) UpdateProcess(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]interface{}) error {
p.lock.RLock()
defer p.lock.RUnlock()
node, ok := p.nodes[nodeid]
if !ok {
return fmt.Errorf("node not found")
node, err := p.GetNode(nodeid)
if err != nil {
return fmt.Errorf("node not found: %w", err)
}
return node.UpdateProcess(id, config, metadata)
}
func (p *proxy) CommandProcess(nodeid string, id app.ProcessID, command string) error {
p.lock.RLock()
defer p.lock.RUnlock()
node, ok := p.nodes[nodeid]
if !ok {
return fmt.Errorf("node not found")
node, err := p.GetNode(nodeid)
if err != nil {
return fmt.Errorf("node not found: %w", err)
}
var err error = nil
switch command {
case "start":
err = node.StartProcess(id)
@ -672,15 +558,12 @@ func (p *proxy) CommandProcess(nodeid string, id app.ProcessID, command string)
}
func (p *proxy) ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe, error) {
p.lock.RLock()
defer p.lock.RUnlock()
node, ok := p.nodes[nodeid]
if !ok {
node, err := p.GetNode(nodeid)
if err != nil {
probe := clientapi.Probe{
Log: []string{fmt.Sprintf("the node %s where the process %s should reside on, doesn't exist", nodeid, id.String())},
}
return probe, fmt.Errorf("node not found")
return probe, fmt.Errorf("node not found: %w", err)
}
return node.ProbeProcess(id)

View File

@ -271,7 +271,7 @@ func (h *ClusterHandler) GetNodes(c echo.Context) error {
func (h *ClusterHandler) GetNode(c echo.Context) error {
id := util.PathParam(c, "id")
peer, err := h.proxy.GetNode(id)
peer, err := h.proxy.GetNodeReader(id)
if err != nil {
return api.Err(http.StatusNotFound, "", "node not found: %s", err.Error())
}
@ -307,7 +307,7 @@ func (h *ClusterHandler) GetNode(c echo.Context) error {
func (h *ClusterHandler) GetNodeVersion(c echo.Context) error {
id := util.PathParam(c, "id")
peer, err := h.proxy.GetNode(id)
peer, err := h.proxy.GetNodeReader(id)
if err != nil {
return api.Err(http.StatusNotFound, "", "node not found: %s", err.Error())
}
@ -340,7 +340,7 @@ func (h *ClusterHandler) GetNodeVersion(c echo.Context) error {
func (h *ClusterHandler) GetNodeFiles(c echo.Context) error {
id := util.PathParam(c, "id")
peer, err := h.proxy.GetNode(id)
peer, err := h.proxy.GetNodeReader(id)
if err != nil {
return api.Err(http.StatusNotFound, "", "node not found: %s", err.Error())
}
@ -403,7 +403,7 @@ func (h *ClusterHandler) ListNodeProcesses(c echo.Context) error {
ownerpattern := util.DefaultQuery(c, "ownerpattern", "")
domainpattern := util.DefaultQuery(c, "domainpattern", "")
peer, err := h.proxy.GetNode(id)
peer, err := h.proxy.GetNodeReader(id)
if err != nil {
return api.Err(http.StatusNotFound, "", "node not found: %s", err.Error())
}

View File

@ -126,10 +126,6 @@ func New(config Config) (Server, error) {
s.collector = session.NewNullCollector()
}
if s.proxy == nil {
s.proxy = proxy.NewNullProxyReader()
}
s.server = &rtmp.Server{
Addr: config.Addr,
HandlePlay: s.handlePlay,
@ -276,7 +272,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
ch := s.channels[playpath]
s.lock.RUnlock()
if ch == nil {
if ch == nil && s.proxy != nil {
// Check in the cluster for that stream
url, err := s.proxy.GetURL("rtmp", playpath)
if err != nil {

View File

@ -102,10 +102,6 @@ func New(config Config) (Server, error) {
s.collector = session.NewNullCollector()
}
if s.proxy == nil {
s.proxy = proxy.NewNullProxyReader()
}
if s.logger == nil {
s.logger = log.New("")
}
@ -408,7 +404,7 @@ func (s *server) handleSubscribe(conn srt.Conn) {
ch := s.channels[si.Resource]
s.lock.RUnlock()
if ch == nil {
if ch == nil && s.proxy != nil {
// Check in the cluster for the stream and proxy it
srturl, err := s.proxy.GetURL("srt", si.Resource)
if err != nil {