diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 7cf87ddd..b4897dbd 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -48,6 +48,8 @@ type NodeReader interface { GetFile(prefix, path string, offset int64) (io.ReadCloser, error) GetFileInfo(prefix, path string) (int64, time.Time, error) + GetResourceInfo(prefix, path string) (int64, time.Time, error) + ProcessList(ProcessListOptions) ([]clientapi.Process, error) ProxyProcessList() ([]Process, error) } @@ -125,11 +127,9 @@ type node struct { config *config.Config - state nodeState - latency float64 // Seconds - stateLock sync.RWMutex - filesList []string - lastUpdate time.Time + state nodeState + latency float64 // Seconds + stateLock sync.RWMutex secure bool httpAddress *url.URL @@ -163,7 +163,7 @@ func NewNode(id, address string, config *config.Config) Node { n.peerErr = err } - n.peerWg.Add(4) + n.peerWg.Add(3) go func(ctx context.Context) { // This tries to reconnect to the core API. If everything's @@ -188,7 +188,6 @@ func NewNode(id, address string, config *config.Config) Node { go n.pingPeer(ctx, &n.peerWg) go n.updateResources(ctx, &n.peerWg) - go n.updateFiles(ctx, &n.peerWg) return n } @@ -462,21 +461,6 @@ func (n *node) updateResources(ctx context.Context, wg *sync.WaitGroup) { } } -func (n *node) updateFiles(ctx context.Context, wg *sync.WaitGroup) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - defer wg.Done() - - for { - select { - case <-ticker.C: - n.files() - case <-ctx.Done(): - return - } - } -} - func (n *node) Ping() (time.Duration, error) { n.peerLock.RLock() defer n.peerLock.RUnlock() @@ -604,23 +588,21 @@ func (n *node) Version() NodeVersion { func (n *node) Files() NodeFiles { id := n.About().ID - n.stateLock.RLock() - defer n.stateLock.RUnlock() - files := NodeFiles{ ID: id, - LastUpdate: n.lastUpdate, + LastUpdate: time.Now(), } - if n.state != stateDisconnected && time.Since(n.lastUpdate) <= 2*time.Second { - files.Files = make([]string, len(n.filesList)) - copy(files.Files, n.filesList) + if ok, _ := n.IsConnected(); !ok { + return files } + files.Files = n.files() + return files } -func (n *node) files() { +func (n *node) files() []string { errorsChan := make(chan error, 8) filesChan := make(chan string, 1024) filesList := []string{} @@ -747,16 +729,7 @@ func (n *node) files() { wgList.Wait() - n.stateLock.Lock() - - if len(errorList) == 0 { - n.filesList = make([]string, len(filesList)) - copy(n.filesList, filesList) - n.lastUpdate = time.Now() - n.lastContact = n.lastUpdate - } - - n.stateLock.Unlock() + return filesList } func cloneURL(src *url.URL) *url.URL { @@ -840,6 +813,40 @@ func (n *node) GetFileInfo(prefix, path string) (int64, time.Time, error) { return info[0].Size, time.Unix(info[0].LastMod, 0), nil } +func (n *node) GetResourceInfo(prefix, path string) (int64, time.Time, error) { + if prefix == "disk" || prefix == "mem" { + return n.GetFileInfo(prefix, path) + } else if prefix == "rtmp" { + files, err := n.peer.RTMPChannels() + if err != nil { + return 0, time.Time{}, err + } + + for _, file := range files { + if path == file.Name { + return 0, time.Now(), nil + } + } + + return 0, time.Time{}, fmt.Errorf("resource not found") + } else if prefix == "srt" { + files, err := n.peer.SRTChannels() + if err != nil { + return 0, time.Time{}, err + } + + for _, file := range files { + if path == file.Name { + return 0, time.Now(), nil + } + } + + return 0, time.Time{}, fmt.Errorf("resource not found") + } + + return 0, time.Time{}, fmt.Errorf("unknown prefix: %s", prefix) +} + func (n *node) ProcessList(options ProcessListOptions) ([]clientapi.Process, error) { n.peerLock.RLock() defer n.peerLock.RUnlock() diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index 17813258..3d1ea2df 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -1,7 +1,6 @@ package proxy import ( - "context" "errors" "fmt" "io" @@ -59,15 +58,7 @@ type proxy struct { nodes map[string]Node // List of known nodes nodesLock sync.RWMutex - idfiles map[string][]string // Map from nodeid to list of files - idupdate map[string]time.Time // Map from nodeid to time of last update - fileid map[string]string // Map from file name to nodeid - filesLock sync.RWMutex - - updates chan NodeFiles - - lock sync.RWMutex - cancel context.CancelFunc + lock sync.RWMutex running bool @@ -78,13 +69,9 @@ var ErrNodeNotFound = errors.New("node not found") func NewProxy(config ProxyConfig) (Proxy, error) { p := &proxy{ - id: config.ID, - nodes: map[string]Node{}, - idfiles: map[string][]string{}, - idupdate: map[string]time.Time{}, - fileid: map[string]string{}, - updates: make(chan NodeFiles, 64), - logger: config.Logger, + id: config.ID, + nodes: map[string]Node{}, + logger: config.Logger, } if p.logger == nil { @@ -105,64 +92,6 @@ func (p *proxy) Start() { p.running = true p.logger.Debug().Log("Starting proxy") - - ctx, cancel := context.WithCancel(context.Background()) - p.cancel = cancel - - go func(ctx context.Context) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - p.nodesLock.RLock() - for _, node := range p.nodes { - p.updates <- node.Files() - } - p.nodesLock.RUnlock() - } - } - }(ctx) - - go func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case update := <-p.updates: - p.logger.Debug().WithFields(log.Fields{ - "node": update.ID, - "files": len(update.Files), - }).Log("Got update") - - if p.id == update.ID { - continue - } - - p.filesLock.Lock() - - // Cleanup - files := p.idfiles[update.ID] - for _, file := range files { - delete(p.fileid, file) - } - delete(p.idfiles, update.ID) - delete(p.idupdate, update.ID) - - // Add files - for _, file := range update.Files { - p.fileid[file] = update.ID - } - p.idfiles[update.ID] = update.Files - p.idupdate[update.ID] = update.LastUpdate - - p.filesLock.Unlock() - } - } - }(ctx) } func (p *proxy) Stop() { @@ -177,9 +106,6 @@ func (p *proxy) Stop() { p.logger.Debug().Log("Stopping proxy") - p.cancel() - p.cancel = nil - p.nodes = map[string]Node{} } @@ -345,24 +271,55 @@ func (p *proxy) GetFileInfo(prefix, path string) (int64, time.Time, error) { } func (p *proxy) getNodeIDForFile(prefix, path string) (string, error) { - p.filesLock.RLock() - defer p.filesLock.RUnlock() + // this is only for mem and disk prefixes + nodesChan := make(chan string, 16) + nodeids := []string{} - id, ok := p.fileid[prefix+":"+path] - if !ok { + wgList := sync.WaitGroup{} + wgList.Add(1) + + go func() { + defer wgList.Done() + + for nodeid := range nodesChan { + if len(nodeid) == 0 { + continue + } + + nodeids = append(nodeids, nodeid) + } + }() + + wg := sync.WaitGroup{} + + p.nodesLock.RLock() + for id, node := range p.nodes { + wg.Add(1) + + go func(nodeid string, node Node, p chan<- string) { + defer wg.Done() + + _, _, err := node.GetResourceInfo(prefix, path) + if err != nil { + nodeid = "" + } + + p <- nodeid + }(id, node, nodesChan) + } + p.nodesLock.RUnlock() + + wg.Wait() + + close(nodesChan) + + wgList.Wait() + + if len(nodeids) == 0 { return "", fmt.Errorf("file not found") } - ts, ok := p.idupdate[id] - if !ok { - return "", fmt.Errorf("no age information found") - } - - if time.Since(ts) > 2*time.Second { - return "", fmt.Errorf("file too old") - } - - return id, nil + return nodeids[0], nil } func (p *proxy) getNodeForFile(prefix, path string) (Node, error) {