From e6391b0b7b60723b5305e9cf543314761197cd54 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Thu, 20 Jul 2023 15:17:03 +0200 Subject: [PATCH] Replace polling with asking other nodes for files in proxy Polling has the drawback, that it's usually delayed and can lead to 404 responses even though the files would be available on another node. With asking, this problem doesn't exist but it may lead to more internal requests and a bit higher latency. --- cluster/proxy/node.go | 87 +++++++++++++------------ cluster/proxy/proxy.go | 141 ++++++++++++++--------------------------- 2 files changed, 96 insertions(+), 132 deletions(-) diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 7cf87ddd..b4897dbd 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -48,6 +48,8 @@ type NodeReader interface { GetFile(prefix, path string, offset int64) (io.ReadCloser, error) GetFileInfo(prefix, path string) (int64, time.Time, error) + GetResourceInfo(prefix, path string) (int64, time.Time, error) + ProcessList(ProcessListOptions) ([]clientapi.Process, error) ProxyProcessList() ([]Process, error) } @@ -125,11 +127,9 @@ type node struct { config *config.Config - state nodeState - latency float64 // Seconds - stateLock sync.RWMutex - filesList []string - lastUpdate time.Time + state nodeState + latency float64 // Seconds + stateLock sync.RWMutex secure bool httpAddress *url.URL @@ -163,7 +163,7 @@ func NewNode(id, address string, config *config.Config) Node { n.peerErr = err } - n.peerWg.Add(4) + n.peerWg.Add(3) go func(ctx context.Context) { // This tries to reconnect to the core API. If everything's @@ -188,7 +188,6 @@ func NewNode(id, address string, config *config.Config) Node { go n.pingPeer(ctx, &n.peerWg) go n.updateResources(ctx, &n.peerWg) - go n.updateFiles(ctx, &n.peerWg) return n } @@ -462,21 +461,6 @@ func (n *node) updateResources(ctx context.Context, wg *sync.WaitGroup) { } } -func (n *node) updateFiles(ctx context.Context, wg *sync.WaitGroup) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - defer wg.Done() - - for { - select { - case <-ticker.C: - n.files() - case <-ctx.Done(): - return - } - } -} - func (n *node) Ping() (time.Duration, error) { n.peerLock.RLock() defer n.peerLock.RUnlock() @@ -604,23 +588,21 @@ func (n *node) Version() NodeVersion { func (n *node) Files() NodeFiles { id := n.About().ID - n.stateLock.RLock() - defer n.stateLock.RUnlock() - files := NodeFiles{ ID: id, - LastUpdate: n.lastUpdate, + LastUpdate: time.Now(), } - if n.state != stateDisconnected && time.Since(n.lastUpdate) <= 2*time.Second { - files.Files = make([]string, len(n.filesList)) - copy(files.Files, n.filesList) + if ok, _ := n.IsConnected(); !ok { + return files } + files.Files = n.files() + return files } -func (n *node) files() { +func (n *node) files() []string { errorsChan := make(chan error, 8) filesChan := make(chan string, 1024) filesList := []string{} @@ -747,16 +729,7 @@ func (n *node) files() { wgList.Wait() - n.stateLock.Lock() - - if len(errorList) == 0 { - n.filesList = make([]string, len(filesList)) - copy(n.filesList, filesList) - n.lastUpdate = time.Now() - n.lastContact = n.lastUpdate - } - - n.stateLock.Unlock() + return filesList } func cloneURL(src *url.URL) *url.URL { @@ -840,6 +813,40 @@ func (n *node) GetFileInfo(prefix, path string) (int64, time.Time, error) { return info[0].Size, time.Unix(info[0].LastMod, 0), nil } +func (n *node) GetResourceInfo(prefix, path string) (int64, time.Time, error) { + if prefix == "disk" || prefix == "mem" { + return n.GetFileInfo(prefix, path) + } else if prefix == "rtmp" { + files, err := n.peer.RTMPChannels() + if err != nil { + return 0, time.Time{}, err + } + + for _, file := range files { + if path == file.Name { + return 0, time.Now(), nil + } + } + + return 0, time.Time{}, fmt.Errorf("resource not found") + } else if prefix == "srt" { + files, err := n.peer.SRTChannels() + if err != nil { + return 0, time.Time{}, err + } + + for _, file := range files { + if path == file.Name { + return 0, time.Now(), nil + } + } + + return 0, time.Time{}, fmt.Errorf("resource not found") + } + + return 0, time.Time{}, fmt.Errorf("unknown prefix: %s", prefix) +} + func (n *node) ProcessList(options ProcessListOptions) ([]clientapi.Process, error) { n.peerLock.RLock() defer n.peerLock.RUnlock() diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index 17813258..3d1ea2df 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -1,7 +1,6 @@ package proxy import ( - "context" "errors" "fmt" "io" @@ -59,15 +58,7 @@ type proxy struct { nodes map[string]Node // List of known nodes nodesLock sync.RWMutex - idfiles map[string][]string // Map from nodeid to list of files - idupdate map[string]time.Time // Map from nodeid to time of last update - fileid map[string]string // Map from file name to nodeid - filesLock sync.RWMutex - - updates chan NodeFiles - - lock sync.RWMutex - cancel context.CancelFunc + lock sync.RWMutex running bool @@ -78,13 +69,9 @@ var ErrNodeNotFound = errors.New("node not found") func NewProxy(config ProxyConfig) (Proxy, error) { p := &proxy{ - id: config.ID, - nodes: map[string]Node{}, - idfiles: map[string][]string{}, - idupdate: map[string]time.Time{}, - fileid: map[string]string{}, - updates: make(chan NodeFiles, 64), - logger: config.Logger, + id: config.ID, + nodes: map[string]Node{}, + logger: config.Logger, } if p.logger == nil { @@ -105,64 +92,6 @@ func (p *proxy) Start() { p.running = true p.logger.Debug().Log("Starting proxy") - - ctx, cancel := context.WithCancel(context.Background()) - p.cancel = cancel - - go func(ctx context.Context) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - p.nodesLock.RLock() - for _, node := range p.nodes { - p.updates <- node.Files() - } - p.nodesLock.RUnlock() - } - } - }(ctx) - - go func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case update := <-p.updates: - p.logger.Debug().WithFields(log.Fields{ - "node": update.ID, - "files": len(update.Files), - }).Log("Got update") - - if p.id == update.ID { - continue - } - - p.filesLock.Lock() - - // Cleanup - files := p.idfiles[update.ID] - for _, file := range files { - delete(p.fileid, file) - } - delete(p.idfiles, update.ID) - delete(p.idupdate, update.ID) - - // Add files - for _, file := range update.Files { - p.fileid[file] = update.ID - } - p.idfiles[update.ID] = update.Files - p.idupdate[update.ID] = update.LastUpdate - - p.filesLock.Unlock() - } - } - }(ctx) } func (p *proxy) Stop() { @@ -177,9 +106,6 @@ func (p *proxy) Stop() { p.logger.Debug().Log("Stopping proxy") - p.cancel() - p.cancel = nil - p.nodes = map[string]Node{} } @@ -345,24 +271,55 @@ func (p *proxy) GetFileInfo(prefix, path string) (int64, time.Time, error) { } func (p *proxy) getNodeIDForFile(prefix, path string) (string, error) { - p.filesLock.RLock() - defer p.filesLock.RUnlock() + // this is only for mem and disk prefixes + nodesChan := make(chan string, 16) + nodeids := []string{} - id, ok := p.fileid[prefix+":"+path] - if !ok { + wgList := sync.WaitGroup{} + wgList.Add(1) + + go func() { + defer wgList.Done() + + for nodeid := range nodesChan { + if len(nodeid) == 0 { + continue + } + + nodeids = append(nodeids, nodeid) + } + }() + + wg := sync.WaitGroup{} + + p.nodesLock.RLock() + for id, node := range p.nodes { + wg.Add(1) + + go func(nodeid string, node Node, p chan<- string) { + defer wg.Done() + + _, _, err := node.GetResourceInfo(prefix, path) + if err != nil { + nodeid = "" + } + + p <- nodeid + }(id, node, nodesChan) + } + p.nodesLock.RUnlock() + + wg.Wait() + + close(nodesChan) + + wgList.Wait() + + if len(nodeids) == 0 { return "", fmt.Errorf("file not found") } - ts, ok := p.idupdate[id] - if !ok { - return "", fmt.Errorf("no age information found") - } - - if time.Since(ts) > 2*time.Second { - return "", fmt.Errorf("file too old") - } - - return id, nil + return nodeids[0], nil } func (p *proxy) getNodeForFile(prefix, path string) (Node, error) {