diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 8ff5a94b..f860f2a4 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -2,6 +2,7 @@ package proxy import ( "context" + "errors" "fmt" "io" "net" @@ -285,7 +286,7 @@ func (n *node) Connect() error { n.resources.throttling = true n.resources.cpu = 100 n.resources.ncpu = 1 - n.resources.cpuLimit = 0 + n.resources.cpuLimit = 100 n.resources.mem = 0 n.resources.memLimit = 0 n.state = stateDisconnected @@ -486,6 +487,11 @@ func (n *node) About() NodeAbout { }, } + if state == stateDisconnected { + nodeAbout.Uptime = 0 + nodeAbout.Latency = 0 + } + return nodeAbout } @@ -535,124 +541,147 @@ func (n *node) Files() NodeFiles { return state } +var errNoPeer = errors.New("no peer") + func (n *node) files() { + errorsChan := make(chan error, 8) filesChan := make(chan string, 1024) filesList := []string{} + errorList := []error{} + + ctx, cancel := context.WithCancel(context.Background()) wgList := sync.WaitGroup{} wgList.Add(1) - go func() { + go func(ctx context.Context) { defer wgList.Done() - for file := range filesChan { - filesList = append(filesList, file) + for { + select { + case <-ctx.Done(): + return + case file := <-filesChan: + filesList = append(filesList, file) + case err := <-errorsChan: + errorList = append(errorList, err) + } } - }() + }(ctx) wg := sync.WaitGroup{} wg.Add(2) - go func(f chan<- string) { + go func(f chan<- string, e chan<- error) { defer wg.Done() n.peerLock.RLock() defer n.peerLock.RUnlock() if n.peer == nil { + e <- errNoPeer return } files, err := n.peer.MemFSList("name", "asc") if err != nil { + e <- err return } for _, file := range files { f <- "mem:" + file.Name } - }(filesChan) + }(filesChan, errorsChan) - go func(f chan<- string) { + go func(f chan<- string, e chan<- error) { defer wg.Done() n.peerLock.RLock() defer n.peerLock.RUnlock() if n.peer == nil { + e <- errNoPeer return } files, err := n.peer.DiskFSList("name", "asc") if err != nil { + e <- err return } for _, file := range files { f <- "disk:" + file.Name } - }(filesChan) + }(filesChan, errorsChan) if n.hasRTMP { wg.Add(1) - go func(f chan<- string) { + go func(f chan<- string, e chan<- error) { defer wg.Done() n.peerLock.RLock() defer n.peerLock.RUnlock() if n.peer == nil { + e <- errNoPeer return } files, err := n.peer.RTMPChannels() if err != nil { + e <- err return } for _, file := range files { f <- "rtmp:" + file.Name } - }(filesChan) + }(filesChan, errorsChan) } if n.hasSRT { wg.Add(1) - go func(f chan<- string) { + go func(f chan<- string, e chan<- error) { defer wg.Done() n.peerLock.RLock() defer n.peerLock.RUnlock() if n.peer == nil { + e <- errNoPeer return } files, err := n.peer.SRTChannels() if err != nil { + e <- err return } for _, file := range files { f <- "srt:" + file.Name } - }(filesChan) + }(filesChan, errorsChan) } wg.Wait() - close(filesChan) + cancel() wgList.Wait() n.stateLock.Lock() - n.filesList = make([]string, len(filesList)) - copy(n.filesList, filesList) - n.lastUpdate = time.Now() - n.lastContact = time.Now() + if len(errorList) == 0 { + n.filesList = make([]string, len(filesList)) + copy(n.filesList, filesList) + n.lastUpdate = time.Now() + n.lastContact = n.lastUpdate + } n.stateLock.Unlock() }