diff --git a/cluster/node/manager.go b/cluster/node/manager.go index b211baa0..926d480c 100644 --- a/cluster/node/manager.go +++ b/cluster/node/manager.go @@ -297,10 +297,16 @@ func (p *Manager) FilesystemGetFileInfo(prefix, path string) (int64, time.Time, return size, lastModified, nil } +type mediaInfo struct { + nodeid string + lastModified time.Time +} + func (p *Manager) getNodeIDForMedia(prefix, path string) (string, error) { // this is only for mem and disk prefixes - nodesChan := make(chan string, 16) - nodeids := []string{} + mediaChan := make(chan mediaInfo, 16) + nodeid := "" + lastModified := time.Time{} wgList := sync.WaitGroup{} wgList.Add(1) @@ -308,12 +314,17 @@ func (p *Manager) getNodeIDForMedia(prefix, path string) (string, error) { go func() { defer wgList.Done() - for nodeid := range nodesChan { - if len(nodeid) == 0 { + for media := range mediaChan { + if len(media.nodeid) == 0 { continue } - nodeids = append(nodeids, nodeid) + if !media.lastModified.After(lastModified) { + continue + } + + nodeid = media.nodeid + lastModified = media.lastModified } }() @@ -323,30 +334,33 @@ func (p *Manager) getNodeIDForMedia(prefix, path string) (string, error) { for id, n := range p.nodes { wg.Add(1) - go func(nodeid string, node *Node, p chan<- string) { + go func(nodeid string, node *Node, p chan<- mediaInfo) { defer wg.Done() - _, _, err := node.Core().MediaGetInfo(prefix, path) + _, lastModified, err := node.Core().MediaGetInfo(prefix, path) if err != nil { nodeid = "" } - p <- nodeid - }(id, n, nodesChan) + p <- mediaInfo{ + nodeid: nodeid, + lastModified: lastModified, + } + }(id, n, mediaChan) } p.lock.RUnlock() wg.Wait() - close(nodesChan) + close(mediaChan) wgList.Wait() - if len(nodeids) == 0 { + if len(nodeid) == 0 { return "", fmt.Errorf("file not found") } - return nodeids[0], nil + return nodeid, nil } func (p *Manager) getNodeForMedia(prefix, path string) (*Node, error) { @@ -363,7 +377,7 @@ func (p *Manager) getNodeForMedia(prefix, path string) (*Node, error) { return nil, err } - p.cache.Put(prefix+":"+path, id, 5*time.Second) + p.cache.Put(prefix+":"+path, id, 2*time.Second) return p.NodeGet(id) } diff --git a/ffmpeg/skills/skills_test.go b/ffmpeg/skills/skills_test.go index f8f67bfd..9b014b16 100644 --- a/ffmpeg/skills/skills_test.go +++ b/ffmpeg/skills/skills_test.go @@ -42,7 +42,7 @@ func TestNew(t *testing.T) { require.NoError(t, err) require.Equal(t, Skills{ FFmpeg: ffmpeg{ - Version: "4.4.1", + Version: "7.1.1", Compiler: "gcc 10.3.1 (Alpine 10.3.1_git20211027) 20211027", Configuration: "--extra-version=datarhei --prefix=/usr --extra-libs='-lpthread -lm -lz -lsupc++ -lstdc++ -lssl -lcrypto -lz -lc -ldl' --enable-nonfree --enable-gpl --enable-version3 --enable-postproc --enable-static --enable-openssl --enable-omx --enable-omx-rpi --enable-mmal --enable-v4l2_m2m --enable-libfreetype --enable-libsrt --enable-libx264 --enable-libx265 --enable-libvpx --enable-libmp3lame --enable-libopus --enable-libvorbis --disable-ffplay --disable-debug --disable-doc --disable-shared", Libraries: []Library{ diff --git a/http/fs/cluster.go b/http/fs/cluster.go index d8c1b109..fba458d7 100644 --- a/http/fs/cluster.go +++ b/http/fs/cluster.go @@ -37,9 +37,9 @@ func NewClusterFS(name string, fs fs.Filesystem, proxy *node.Manager) Filesystem func (fs *filesystem) Open(path string) fs.File { // Check if the file is locally available - if file := fs.Filesystem.Open(path); file != nil { - return file - } + //if file := fs.Filesystem.Open(path); file != nil { + // return file + //} // Check if the file is available in the cluster size, lastModified, err := fs.proxy.FilesystemGetFileInfo(fs.name, path) @@ -51,9 +51,9 @@ func (fs *filesystem) Open(path string) fs.File { getFile: func(offset int64) (io.ReadCloser, error) { return fs.proxy.FilesystemGetFile(fs.name, path, offset) }, - name: path, - size: size, - lastModiefied: lastModified, + name: path, + size: size, + lastModified: lastModified, } return file @@ -62,10 +62,10 @@ func (fs *filesystem) Open(path string) fs.File { type file struct { io.ReadCloser - getFile func(offset int64) (io.ReadCloser, error) - name string - size int64 - lastModiefied time.Time + getFile func(offset int64) (io.ReadCloser, error) + name string + size int64 + lastModified time.Time } func (f *file) Read(p []byte) (int, error) { @@ -128,7 +128,7 @@ func (f *file) Size() int64 { } func (f *file) ModTime() time.Time { - return f.lastModiefied + return f.lastModified } func (f *file) IsLink() (string, bool) { diff --git a/restream/core_test.go b/restream/core_test.go index 513e5afe..a1587018 100644 --- a/restream/core_test.go +++ b/restream/core_test.go @@ -1534,7 +1534,7 @@ func TestProcessReplacer(t *testing.T) { process = &app.Config{ ID: "314159265359", Reference: "refref", - FFVersion: "^4.4.1", + FFVersion: "^7.1.1", Input: []app.ConfigIO{ { ID: "in_314159265359_refref", diff --git a/restream/fs/fs_test.go b/restream/fs/fs_test.go index 55e9938e..cf66a494 100644 --- a/restream/fs/fs_test.go +++ b/restream/fs/fs_test.go @@ -34,7 +34,7 @@ func TestUpdateCleanup(t *testing.T) { }, } - cleanfs.UpdateCleanup("foobar", patterns, false) + cleanfs.UpdateCleanup("foobar", patterns, true) require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns) @@ -44,21 +44,21 @@ func TestUpdateCleanup(t *testing.T) { MaxFileAge: 0, }) - cleanfs.UpdateCleanup("foobar", patterns, false) + cleanfs.UpdateCleanup("foobar", patterns, true) require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns) patterns[0].MaxFiles = 42 - cleanfs.UpdateCleanup("foobar", patterns, false) + cleanfs.UpdateCleanup("foobar", patterns, true) require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns) - cleanfs.UpdateCleanup("foobar", patterns[1:], false) + cleanfs.UpdateCleanup("foobar", patterns[1:], true) require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns[1:]) - cleanfs.UpdateCleanup("foobar", nil, false) + cleanfs.UpdateCleanup("foobar", nil, true) require.Empty(t, cleanfs.cleanupPatterns["foobar"]) } @@ -81,7 +81,7 @@ func TestMaxFiles(t *testing.T) { MaxFiles: 3, MaxFileAge: 0, }, - }, false) + }, true) cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1) cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1) @@ -130,7 +130,7 @@ func TestMaxAge(t *testing.T) { MaxFiles: 0, MaxFileAge: 3 * time.Second, }, - }, false) + }, true) cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1) cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1) @@ -179,7 +179,7 @@ func TestUnsetCleanup(t *testing.T) { MaxFiles: 3, MaxFileAge: 0, }, - }, false) + }, true) cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1) cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1) @@ -207,7 +207,7 @@ func TestUnsetCleanup(t *testing.T) { return true }, 3*time.Second, time.Second) - cleanfs.UpdateCleanup("foobar", nil, false) + cleanfs.UpdateCleanup("foobar", nil, true) cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1) @@ -249,7 +249,7 @@ func TestPurge(t *testing.T) { MaxFileAge: 0, PurgeOnDelete: true, }, - }, false) + }, true) cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1) cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1) @@ -277,7 +277,7 @@ func TestPurge(t *testing.T) { return true }, 3*time.Second, time.Second) - cleanfs.UpdateCleanup("foobar", nil, false) + cleanfs.UpdateCleanup("foobar", nil, true) cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1) @@ -350,7 +350,7 @@ func BenchmarkCleanup(b *testing.B) { }, } - cleanfs.UpdateCleanup(id, patterns, false) + cleanfs.UpdateCleanup(id, patterns, true) ids[i] = id }