From 46950372bea0088073c1025436d4c7f773db5d7f Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 24 Jul 2024 15:40:28 +0200 Subject: [PATCH] WIP: Optimize copy from io.Reader, allow to suggest file size --- app/api/api.go | 2 +- app/import/main_test.go | 4 +- config/config_test.go | 4 +- http/handler/api/config_test.go | 4 +- http/handler/api/filesystems.go | 2 +- http/handler/api/filesystems_test.go | 40 ++++++------ http/handler/filesystem.go | 34 ++-------- io/fs/disk.go | 4 +- io/fs/fs.go | 5 +- io/fs/fs_test.go | 92 ++++++++++++++-------------- io/fs/mem.go | 52 ++++++++++++++-- io/fs/mem_test.go | 34 ++++++++++ io/fs/readonly.go | 2 +- io/fs/readonly_test.go | 2 +- io/fs/s3.go | 6 +- io/fs/sized.go | 10 +-- io/fs/sized_test.go | 56 ++++++++--------- restream/fs/fs_test.go | 26 ++++---- 18 files changed, 215 insertions(+), 164 deletions(-) diff --git a/app/api/api.go b/app/api/api.go index 25b5875a..90ad6ea5 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -1988,7 +1988,7 @@ func backupMemFS(target, source fs.Filesystem, patterns []string) error { continue } - target.WriteFileReader(name, file) + target.WriteFileReader(name, file, -1) file.Close() } diff --git a/app/import/main_test.go b/app/import/main_test.go index 305110f9..0939b79b 100644 --- a/app/import/main_test.go +++ b/app/import/main_test.go @@ -14,8 +14,8 @@ func TestImport(t *testing.T) { memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) require.NoError(t, err) - memfs.WriteFileReader("/mime.types", strings.NewReader("foobar")) - memfs.WriteFileReader("/bin/ffmpeg", strings.NewReader("foobar")) + memfs.WriteFileReader("/mime.types", strings.NewReader("foobar"), -1) + memfs.WriteFileReader("/bin/ffmpeg", strings.NewReader("foobar"), -1) configstore, err := store.NewJSON(memfs, "/config.json", nil) require.NoError(t, err) diff --git a/config/config_test.go b/config/config_test.go index 132857fe..8cf89fcf 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -60,12 +60,12 @@ func TestValidateDefault(t *testing.T) { fs, err := fs.NewMemFilesystem(fs.MemConfig{}) require.NoError(t, err) - size, fresh, err := fs.WriteFileReader("./mime.types", strings.NewReader("xxxxx")) + size, fresh, err := fs.WriteFileReader("./mime.types", strings.NewReader("xxxxx"), -1) require.Equal(t, int64(5), size) require.Equal(t, true, fresh) require.NoError(t, err) - _, _, err = fs.WriteFileReader("/bin/ffmpeg", strings.NewReader("xxxxx")) + _, _, err = fs.WriteFileReader("/bin/ffmpeg", strings.NewReader("xxxxx"), -1) require.NoError(t, err) cfg := New(fs) diff --git a/http/handler/api/config_test.go b/http/handler/api/config_test.go index 59aca767..0757cfb0 100644 --- a/http/handler/api/config_test.go +++ b/http/handler/api/config_test.go @@ -22,10 +22,10 @@ func getDummyConfigRouter(t *testing.T) (*echo.Echo, store.Store) { memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) require.NoError(t, err) - _, _, err = memfs.WriteFileReader("./mime.types", strings.NewReader("xxxxx")) + _, _, err = memfs.WriteFileReader("./mime.types", strings.NewReader("xxxxx"), -1) require.NoError(t, err) - _, _, err = memfs.WriteFileReader("/bin/ffmpeg", strings.NewReader("xxxxx")) + _, _, err = memfs.WriteFileReader("/bin/ffmpeg", strings.NewReader("xxxxx"), -1) require.NoError(t, err) config, err := store.NewJSON(memfs, "/config.json", nil) diff --git a/http/handler/api/filesystems.go b/http/handler/api/filesystems.go index f14c4f8d..fa43ba4d 100644 --- a/http/handler/api/filesystems.go +++ b/http/handler/api/filesystems.go @@ -263,7 +263,7 @@ func (h *FSHandler) FileOperation(c echo.Context) error { // In case the target is S3, allow it to determine the size of the file sizer := fs.NewReadSizer(reader, fromFileStat.Size()) - _, _, err = toFS.Handler.FS.Filesystem.WriteFileReader(toPath, sizer) + _, _, err = toFS.Handler.FS.Filesystem.WriteFileReader(toPath, sizer, int(sizer.Size())) if err != nil { toFS.Handler.FS.Filesystem.Remove(toPath) return api.Err(http.StatusBadRequest, "", "writing target file failed: %s", err) diff --git a/http/handler/api/filesystems_test.go b/http/handler/api/filesystems_test.go index 35de0314..edfbec86 100644 --- a/http/handler/api/filesystems_test.go +++ b/http/handler/api/filesystems_test.go @@ -149,10 +149,10 @@ func TestFilesystemsListSize(t *testing.T) { memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) require.NoError(t, err) - memfs.WriteFileReader("/a", strings.NewReader("a")) - memfs.WriteFileReader("/aa", strings.NewReader("aa")) - memfs.WriteFileReader("/aaa", strings.NewReader("aaa")) - memfs.WriteFileReader("/aaaa", strings.NewReader("aaaa")) + memfs.WriteFileReader("/a", strings.NewReader("a"), -1) + memfs.WriteFileReader("/aa", strings.NewReader("aa"), -1) + memfs.WriteFileReader("/aaa", strings.NewReader("aaa"), -1) + memfs.WriteFileReader("/aaaa", strings.NewReader("aaaa"), -1) filesystems := []httpfs.FS{ { @@ -207,13 +207,13 @@ func TestFilesystemsListLastmod(t *testing.T) { memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) require.NoError(t, err) - memfs.WriteFileReader("/a", strings.NewReader("a")) + memfs.WriteFileReader("/a", strings.NewReader("a"), -1) time.Sleep(1 * time.Second) - memfs.WriteFileReader("/b", strings.NewReader("b")) + memfs.WriteFileReader("/b", strings.NewReader("b"), -1) time.Sleep(1 * time.Second) - memfs.WriteFileReader("/c", strings.NewReader("c")) + memfs.WriteFileReader("/c", strings.NewReader("c"), -1) time.Sleep(1 * time.Second) - memfs.WriteFileReader("/d", strings.NewReader("d")) + memfs.WriteFileReader("/d", strings.NewReader("d"), -1) var a, b, c, d time.Time @@ -274,10 +274,10 @@ func TestFilesystemsDeleteFiles(t *testing.T) { memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) require.NoError(t, err) - memfs.WriteFileReader("/a", strings.NewReader("a")) - memfs.WriteFileReader("/aa", strings.NewReader("aa")) - memfs.WriteFileReader("/aaa", strings.NewReader("aaa")) - memfs.WriteFileReader("/aaaa", strings.NewReader("aaaa")) + memfs.WriteFileReader("/a", strings.NewReader("a"), -1) + memfs.WriteFileReader("/aa", strings.NewReader("aa"), -1) + memfs.WriteFileReader("/aaa", strings.NewReader("aaa"), -1) + memfs.WriteFileReader("/aaaa", strings.NewReader("aaaa"), -1) filesystems := []httpfs.FS{ { @@ -312,10 +312,10 @@ func TestFilesystemsDeleteFilesSize(t *testing.T) { memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) require.NoError(t, err) - memfs.WriteFileReader("/a", strings.NewReader("a")) - memfs.WriteFileReader("/aa", strings.NewReader("aa")) - memfs.WriteFileReader("/aaa", strings.NewReader("aaa")) - memfs.WriteFileReader("/aaaa", strings.NewReader("aaaa")) + memfs.WriteFileReader("/a", strings.NewReader("a"), -1) + memfs.WriteFileReader("/aa", strings.NewReader("aa"), -1) + memfs.WriteFileReader("/aaa", strings.NewReader("aaa"), -1) + memfs.WriteFileReader("/aaaa", strings.NewReader("aaaa"), -1) filesystems := []httpfs.FS{ { @@ -348,13 +348,13 @@ func TestFilesystemsDeleteFilesLastmod(t *testing.T) { memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) require.NoError(t, err) - memfs.WriteFileReader("/a", strings.NewReader("a")) + memfs.WriteFileReader("/a", strings.NewReader("a"), -1) time.Sleep(1 * time.Second) - memfs.WriteFileReader("/b", strings.NewReader("b")) + memfs.WriteFileReader("/b", strings.NewReader("b"), -1) time.Sleep(1 * time.Second) - memfs.WriteFileReader("/c", strings.NewReader("c")) + memfs.WriteFileReader("/c", strings.NewReader("c"), -1) time.Sleep(1 * time.Second) - memfs.WriteFileReader("/d", strings.NewReader("d")) + memfs.WriteFileReader("/d", strings.NewReader("d"), -1) var b, c time.Time diff --git a/http/handler/filesystem.go b/http/handler/filesystem.go index 0700e58d..88d5c151 100644 --- a/http/handler/filesystem.go +++ b/http/handler/filesystem.go @@ -115,9 +115,9 @@ func (h *FSHandler) GetFile(c echo.Context) error { } c.Response().Header().Set("Content-Range", ranges[0].contentRange(stat.Size())) - streamFile = &limitReader{ - r: streamFile, - size: int(ranges[0].length), + streamFile = &io.LimitedReader{ + R: streamFile, + N: ranges[0].length, } status = http.StatusPartialContent @@ -134,7 +134,7 @@ func (h *FSHandler) PutFile(c echo.Context) error { req := c.Request() - _, created, err := h.FS.Filesystem.WriteFileReader(path, req.Body) + _, created, err := h.FS.Filesystem.WriteFileReader(path, req.Body, -1) if err != nil { return api.Err(http.StatusBadRequest, "", "%s", err.Error()) } @@ -330,32 +330,6 @@ func (h *FSHandler) ListFiles(c echo.Context) error { return c.JSON(http.StatusOK, fileinfos) } -type limitReader struct { - r io.Reader - size int -} - -func (l *limitReader) Read(p []byte) (int, error) { - if l.size == 0 { - return 0, io.EOF - } - - len := len(p) - - if len > l.size { - p = p[:l.size] - } - - i, err := l.r.Read(p) - if err != nil { - return i, err - } - - l.size -= i - - return i, nil -} - // From: github.com/golang/go/net/http/fs.go@7dc9fcb // errNoOverlap is returned by serveContent's parseRange if first-byte-pos of diff --git a/io/fs/disk.go b/io/fs/disk.go index d4f11cc0..6cac9d40 100644 --- a/io/fs/disk.go +++ b/io/fs/disk.go @@ -330,7 +330,7 @@ func (fs *diskFilesystem) ReadFile(path string) ([]byte, error) { return os.ReadFile(path) } -func (fs *diskFilesystem) WriteFileReader(path string, r io.Reader) (int64, bool, error) { +func (fs *diskFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int) (int64, bool, error) { path = fs.cleanPath(path) replace := true @@ -366,7 +366,7 @@ func (fs *diskFilesystem) WriteFileReader(path string, r io.Reader) (int64, bool } func (fs *diskFilesystem) WriteFile(path string, data []byte) (int64, bool, error) { - return fs.WriteFileReader(path, bytes.NewReader(data)) + return fs.WriteFileReader(path, bytes.NewReader(data), len(data)) } func (fs *diskFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, error) { diff --git a/io/fs/fs.go b/io/fs/fs.go index 3d162533..4cc8e201 100644 --- a/io/fs/fs.go +++ b/io/fs/fs.go @@ -93,8 +93,9 @@ type WriteFilesystem interface { // WriteFileReader adds a file to the filesystem. Returns the size of the data that has been // stored in bytes and whether the file is new. The size is negative if there was - // an error adding the file and error is not nil. - WriteFileReader(path string, r io.Reader) (int64, bool, error) + // an error adding the file and error is not nil. The size parameter is to suggest a size + // for the file to write. Use a negative value if the size is unknown. + WriteFileReader(path string, r io.Reader, size int) (int64, bool, error) // WriteFile adds a file to the filesystem. Returns the size of the data that has been // stored in bytes and whether the file is new. The size is negative if there was diff --git a/io/fs/fs_test.go b/io/fs/fs_test.go index 20a14662..9d52dddb 100644 --- a/io/fs/fs_test.go +++ b/io/fs/fs_test.go @@ -186,7 +186,7 @@ func testWriteFileSafe(t *testing.T, fs Filesystem) { func testWriteFileReader(t *testing.T, fs Filesystem) { data := strings.NewReader("xxxxx") - size, created, err := fs.WriteFileReader("/foobar", data) + size, created, err := fs.WriteFileReader("/foobar", data, -1) require.Nil(t, err) require.Equal(t, int64(5), size) @@ -211,7 +211,7 @@ func testOpen(t *testing.T, fs Filesystem) { file := fs.Open("/foobar") require.Nil(t, file) - _, _, err := fs.WriteFileReader("/foobar", strings.NewReader("xxxxx")) + _, _, err := fs.WriteFileReader("/foobar", strings.NewReader("xxxxx"), -1) require.NoError(t, err) file = fs.Open("/foobar") @@ -232,7 +232,7 @@ func testRemove(t *testing.T, fs Filesystem) { data := strings.NewReader("xxxxx") - fs.WriteFileReader("/foobar", data) + fs.WriteFileReader("/foobar", data, -1) size = fs.Remove("/foobar") @@ -251,7 +251,7 @@ func testRemove(t *testing.T, fs Filesystem) { func testFiles(t *testing.T, fs Filesystem) { require.Equal(t, int64(0), fs.Files()) - fs.WriteFileReader("/foobar.txt", strings.NewReader("bar")) + fs.WriteFileReader("/foobar.txt", strings.NewReader("bar"), -1) require.Equal(t, int64(1), fs.Files()) @@ -267,7 +267,7 @@ func testFiles(t *testing.T, fs Filesystem) { func testReplace(t *testing.T, fs Filesystem) { data := strings.NewReader("xxxxx") - size, created, err := fs.WriteFileReader("/foobar", data) + size, created, err := fs.WriteFileReader("/foobar", data, -1) require.Nil(t, err) require.Equal(t, int64(5), size) @@ -284,7 +284,7 @@ func testReplace(t *testing.T, fs Filesystem) { data = strings.NewReader("yyy") - size, created, err = fs.WriteFileReader("/foobar", data) + size, created, err = fs.WriteFileReader("/foobar", data, -1) require.Nil(t, err) require.Equal(t, int64(3), size) @@ -301,12 +301,12 @@ func testReplace(t *testing.T, fs Filesystem) { } func testList(t *testing.T, fs Filesystem) { - fs.WriteFileReader("/foobar1", strings.NewReader("a")) - fs.WriteFileReader("/foobar2", strings.NewReader("bb")) - fs.WriteFileReader("/foobar3", strings.NewReader("ccc")) - fs.WriteFileReader("/foobar4", strings.NewReader("dddd")) - fs.WriteFileReader("/path/foobar3", strings.NewReader("ccc")) - fs.WriteFileReader("/path/to/foobar4", strings.NewReader("dddd")) + fs.WriteFileReader("/foobar1", strings.NewReader("a"), -1) + fs.WriteFileReader("/foobar2", strings.NewReader("bb"), -1) + fs.WriteFileReader("/foobar3", strings.NewReader("ccc"), -1) + fs.WriteFileReader("/foobar4", strings.NewReader("dddd"), -1) + fs.WriteFileReader("/path/foobar3", strings.NewReader("ccc"), -1) + fs.WriteFileReader("/path/to/foobar4", strings.NewReader("dddd"), -1) cur, max := fs.Size() @@ -337,10 +337,10 @@ func testList(t *testing.T, fs Filesystem) { } func testListGlob(t *testing.T, fs Filesystem) { - fs.WriteFileReader("/foobar1", strings.NewReader("a")) - fs.WriteFileReader("/path/foobar2", strings.NewReader("a")) - fs.WriteFileReader("/path/to/foobar3", strings.NewReader("a")) - fs.WriteFileReader("/foobar4", strings.NewReader("a")) + fs.WriteFileReader("/foobar1", strings.NewReader("a"), -1) + fs.WriteFileReader("/path/foobar2", strings.NewReader("a"), -1) + fs.WriteFileReader("/path/to/foobar3", strings.NewReader("a"), -1) + fs.WriteFileReader("/foobar4", strings.NewReader("a"), -1) cur := fs.Files() @@ -376,10 +376,10 @@ func testListGlob(t *testing.T, fs Filesystem) { } func testListSize(t *testing.T, fs Filesystem) { - fs.WriteFileReader("/a", strings.NewReader("a")) - fs.WriteFileReader("/aa", strings.NewReader("aa")) - fs.WriteFileReader("/aaa", strings.NewReader("aaa")) - fs.WriteFileReader("/aaaa", strings.NewReader("aaaa")) + fs.WriteFileReader("/a", strings.NewReader("a"), -1) + fs.WriteFileReader("/aa", strings.NewReader("aa"), -1) + fs.WriteFileReader("/aaa", strings.NewReader("aaa"), -1) + fs.WriteFileReader("/aaaa", strings.NewReader("aaaa"), -1) cur := fs.Files() @@ -411,13 +411,13 @@ func testListSize(t *testing.T, fs Filesystem) { } func testListModified(t *testing.T, fs Filesystem) { - fs.WriteFileReader("/a", strings.NewReader("a")) + fs.WriteFileReader("/a", strings.NewReader("a"), -1) time.Sleep(500 * time.Millisecond) - fs.WriteFileReader("/b", strings.NewReader("b")) + fs.WriteFileReader("/b", strings.NewReader("b"), -1) time.Sleep(500 * time.Millisecond) - fs.WriteFileReader("/c", strings.NewReader("c")) + fs.WriteFileReader("/c", strings.NewReader("c"), -1) time.Sleep(500 * time.Millisecond) - fs.WriteFileReader("/d", strings.NewReader("d")) + fs.WriteFileReader("/d", strings.NewReader("d"), -1) cur := fs.Files() @@ -463,10 +463,10 @@ func testListModified(t *testing.T, fs Filesystem) { } func testRemoveAll(t *testing.T, fs Filesystem) { - fs.WriteFileReader("/foobar1", strings.NewReader("abc")) - fs.WriteFileReader("/path/foobar2", strings.NewReader("abc")) - fs.WriteFileReader("/path/to/foobar3", strings.NewReader("abc")) - fs.WriteFileReader("/foobar4", strings.NewReader("abc")) + fs.WriteFileReader("/foobar1", strings.NewReader("abc"), -1) + fs.WriteFileReader("/path/foobar2", strings.NewReader("abc"), -1) + fs.WriteFileReader("/path/to/foobar3", strings.NewReader("abc"), -1) + fs.WriteFileReader("/foobar4", strings.NewReader("abc"), -1) cur := fs.Files() @@ -483,10 +483,10 @@ func testRemoveAll(t *testing.T, fs Filesystem) { } func testRemoveList(t *testing.T, fs Filesystem) { - fs.WriteFileReader("/foobar1", strings.NewReader("abc")) - fs.WriteFileReader("/path/foobar2", strings.NewReader("abc")) - fs.WriteFileReader("/path/to/foobar3", strings.NewReader("abc")) - fs.WriteFileReader("/foobar4", strings.NewReader("abc")) + fs.WriteFileReader("/foobar1", strings.NewReader("abc"), -1) + fs.WriteFileReader("/path/foobar2", strings.NewReader("abc"), -1) + fs.WriteFileReader("/path/to/foobar3", strings.NewReader("abc"), -1) + fs.WriteFileReader("/foobar4", strings.NewReader("abc"), -1) cur := fs.Files() @@ -513,7 +513,7 @@ func testData(t *testing.T, fs Filesystem) { data1 := strings.NewReader(data) - _, _, err = fs.WriteFileReader("/foobar", data1) + _, _, err = fs.WriteFileReader("/foobar", data1, -1) require.NoError(t, err) file = fs.Open("/foobar") @@ -542,7 +542,7 @@ func testStatDir(t *testing.T, fs Filesystem) { require.NotNil(t, info) require.Equal(t, true, info.IsDir()) - fs.WriteFileReader("/these/are/some/directories/foobar", strings.NewReader("gduwotoxqb")) + fs.WriteFileReader("/these/are/some/directories/foobar", strings.NewReader("gduwotoxqb"), -1) info, err = fs.Stat("/foobar") require.Error(t, err) @@ -614,7 +614,7 @@ func testMkdirAll(t *testing.T, fs Filesystem) { require.Equal(t, int64(0), info.Size()) require.Equal(t, true, info.IsDir()) - _, _, err = fs.WriteFileReader("/foobar", strings.NewReader("gduwotoxqb")) + _, _, err = fs.WriteFileReader("/foobar", strings.NewReader("gduwotoxqb"), -1) require.NoError(t, err) err = fs.MkdirAll("/foobar", 0755) @@ -631,7 +631,7 @@ func testRename(t *testing.T, fs Filesystem) { _, err = fs.Stat("/foobaz") require.Error(t, err) - _, _, err = fs.WriteFileReader("/foobar", strings.NewReader("gduwotoxqb")) + _, _, err = fs.WriteFileReader("/foobar", strings.NewReader("gduwotoxqb"), -1) require.NoError(t, err) _, err = fs.Stat("/foobar") @@ -654,10 +654,10 @@ func testRenameOverwrite(t *testing.T, fs Filesystem) { _, err = fs.Stat("/foobaz") require.Error(t, err) - _, _, err = fs.WriteFileReader("/foobar", strings.NewReader("foobar")) + _, _, err = fs.WriteFileReader("/foobar", strings.NewReader("foobar"), -1) require.NoError(t, err) - _, _, err = fs.WriteFileReader("/foobaz", strings.NewReader("foobaz")) + _, _, err = fs.WriteFileReader("/foobaz", strings.NewReader("foobaz"), -1) require.NoError(t, err) _, err = fs.Stat("/foobar") @@ -688,7 +688,7 @@ func testSymlink(t *testing.T, fs Filesystem) { err := fs.Symlink("/foobar", "/foobaz") require.Error(t, err) - _, _, err = fs.WriteFileReader("/foobar", strings.NewReader("foobar")) + _, _, err = fs.WriteFileReader("/foobar", strings.NewReader("foobar"), -1) require.NoError(t, err) err = fs.Symlink("/foobar", "/foobaz") @@ -729,7 +729,7 @@ func testSymlinkOpenStat(t *testing.T, fs Filesystem) { return } - _, _, err := fs.WriteFileReader("/foobar", strings.NewReader("foobar")) + _, _, err := fs.WriteFileReader("/foobar", strings.NewReader("foobar"), -1) require.NoError(t, err) err = fs.Symlink("/foobar", "/foobaz") @@ -756,7 +756,7 @@ func testSymlinkOpenStat(t *testing.T, fs Filesystem) { } func testStat(t *testing.T, fs Filesystem) { - _, _, err := fs.WriteFileReader("/foobar", strings.NewReader("foobar")) + _, _, err := fs.WriteFileReader("/foobar", strings.NewReader("foobar"), -1) require.NoError(t, err) file := fs.Open("/foobar") @@ -781,7 +781,7 @@ func testCopy(t *testing.T, fs Filesystem) { _, err = fs.Stat("/foobaz") require.Error(t, err) - _, _, err = fs.WriteFileReader("/foobar", strings.NewReader("gduwotoxqb")) + _, _, err = fs.WriteFileReader("/foobar", strings.NewReader("gduwotoxqb"), -1) require.NoError(t, err) _, err = fs.Stat("/foobar") @@ -804,10 +804,10 @@ func testCopyOverwrite(t *testing.T, fs Filesystem) { _, err = fs.Stat("/foobaz") require.Error(t, err) - _, _, err = fs.WriteFileReader("/foobar", strings.NewReader("foobar")) + _, _, err = fs.WriteFileReader("/foobar", strings.NewReader("foobar"), -1) require.NoError(t, err) - _, _, err = fs.WriteFileReader("/foobaz", strings.NewReader("foobaz")) + _, _, err = fs.WriteFileReader("/foobaz", strings.NewReader("foobaz"), -1) require.NoError(t, err) _, err = fs.Stat("/foobar") @@ -838,10 +838,10 @@ func testSymlinkErrors(t *testing.T, fs Filesystem) { err := fs.Symlink("/foobar", "/foobaz") require.Error(t, err) - _, _, err = fs.WriteFileReader("/foobar", strings.NewReader("foobar")) + _, _, err = fs.WriteFileReader("/foobar", strings.NewReader("foobar"), -1) require.NoError(t, err) - _, _, err = fs.WriteFileReader("/foobaz", strings.NewReader("foobaz")) + _, _, err = fs.WriteFileReader("/foobaz", strings.NewReader("foobaz"), -1) require.NoError(t, err) err = fs.Symlink("/foobar", "/foobaz") diff --git a/io/fs/mem.go b/io/fs/mem.go index 444222ed..2033d349 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -2,6 +2,7 @@ package fs import ( "bytes" + "errors" "fmt" "io" "io/fs" @@ -249,7 +250,7 @@ func NewMemFilesystemFromDir(dir string, config MemConfig) (Filesystem, error) { defer file.Close() - _, _, err = mem.WriteFileReader(strings.TrimPrefix(path, dir), file) + _, _, err = mem.WriteFileReader(strings.TrimPrefix(path, dir), file, int(info.Size())) if err != nil { return fmt.Errorf("can't copy %s", path) } @@ -408,7 +409,44 @@ func (fs *memFilesystem) Symlink(oldname, newname string) error { return nil } -func (fs *memFilesystem) WriteFileReader(path string, r io.Reader) (int64, bool, error) { +var chunkPool = sync.Pool{ + New: func() interface{} { + chunk := make([]byte, 128*1024) + return &chunk + }, +} + +func copyToBufferFromReader(buf *bytes.Buffer, r io.Reader, _ int) (int64, error) { + chunkPtr := chunkPool.Get().(*[]byte) + chunk := *chunkPtr + defer chunkPool.Put(chunkPtr) + + size := int64(0) + + for { + n, err := r.Read(chunk) + if n != 0 { + buf.Write(chunk[:n]) + size += int64(n) + } + + if err != nil { + if errors.Is(err, io.EOF) { + return size, nil + } + + return size, err + } + + if n == 0 { + break + } + } + + return size, nil +} + +func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int) (int64, bool, error) { path = fs.cleanPath(path) isdir := fs.isDir(path) @@ -426,7 +464,11 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader) (int64, bool, data: &bytes.Buffer{}, } - size, err := newFile.data.ReadFrom(r) + if sizeHint > 0 { + newFile.data.Grow(sizeHint) + } + + size, err := copyToBufferFromReader(newFile.data, r, 8*1024) if err != nil { fs.logger.WithFields(log.Fields{ "path": path, @@ -474,11 +516,11 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader) (int64, bool, } func (fs *memFilesystem) WriteFile(path string, data []byte) (int64, bool, error) { - return fs.WriteFileReader(path, bytes.NewReader(data)) + return fs.WriteFileReader(path, bytes.NewReader(data), len(data)) } func (fs *memFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, error) { - return fs.WriteFileReader(path, bytes.NewReader(data)) + return fs.WriteFileReader(path, bytes.NewReader(data), len(data)) } func (fs *memFilesystem) Purge(size int64) int64 { diff --git a/io/fs/mem_test.go b/io/fs/mem_test.go index 37152975..0f8c1758 100644 --- a/io/fs/mem_test.go +++ b/io/fs/mem_test.go @@ -1,6 +1,7 @@ package fs import ( + "bytes" "context" "fmt" "io" @@ -190,3 +191,36 @@ func BenchmarkMemReadFileWhileWriting(b *testing.B) { readerWg.Wait() } + +func BenchmarkBufferReadFrom(b *testing.B) { + data := []byte(rand.StringAlphanumeric(1024 * 1024)) + + for i := 0; i < b.N; i++ { + r := bytes.NewReader(data) + buf := &bytes.Buffer{} + buf.ReadFrom(r) + } +} + +func TestBufferReadChunks(t *testing.T) { + data := []byte(rand.StringAlphanumeric(1024 * 1024)) + + r := bytes.NewReader(data) + buf := &bytes.Buffer{} + + copyToBufferFromReader(buf, r, 32*1024) + + res := bytes.Compare(data, buf.Bytes()) + require.Equal(t, 0, res) +} + +func BenchmarkBufferReadChunks(b *testing.B) { + data := []byte(rand.StringAlphanumeric(1024 * 1024)) + + for i := 0; i < b.N; i++ { + r := bytes.NewReader(data) + buf := &bytes.Buffer{} + + copyToBufferFromReader(buf, r, 32*1024) + } +} diff --git a/io/fs/readonly.go b/io/fs/readonly.go index 2991944c..cf42d911 100644 --- a/io/fs/readonly.go +++ b/io/fs/readonly.go @@ -21,7 +21,7 @@ func (r *readOnlyFilesystem) Symlink(oldname, newname string) error { return os.ErrPermission } -func (r *readOnlyFilesystem) WriteFileReader(path string, rd io.Reader) (int64, bool, error) { +func (r *readOnlyFilesystem) WriteFileReader(path string, rd io.Reader, size int) (int64, bool, error) { return -1, false, os.ErrPermission } diff --git a/io/fs/readonly_test.go b/io/fs/readonly_test.go index 3b3b3dfb..6a784299 100644 --- a/io/fs/readonly_test.go +++ b/io/fs/readonly_test.go @@ -20,7 +20,7 @@ func TestReadOnly(t *testing.T) { _, _, err = ro.WriteFile("/readonly.go", []byte("foobar")) require.Error(t, err) - _, _, err = ro.WriteFileReader("/readonly.go", strings.NewReader("foobar")) + _, _, err = ro.WriteFileReader("/readonly.go", strings.NewReader("foobar"), -1) require.Error(t, err) _, _, err = ro.WriteFileSafe("/readonly.go", []byte("foobar")) diff --git a/io/fs/s3.go b/io/fs/s3.go index 6d7807d8..c75162ed 100644 --- a/io/fs/s3.go +++ b/io/fs/s3.go @@ -347,17 +347,17 @@ func (fs *s3Filesystem) write(path string, r io.Reader) (int64, bool, error) { return info.Size, !overwrite, nil } -func (fs *s3Filesystem) WriteFileReader(path string, r io.Reader) (int64, bool, error) { +func (fs *s3Filesystem) WriteFileReader(path string, r io.Reader, size int) (int64, bool, error) { path = fs.cleanPath(path) return fs.write(path, r) } func (fs *s3Filesystem) WriteFile(path string, data []byte) (int64, bool, error) { - return fs.WriteFileReader(path, bytes.NewReader(data)) + return fs.WriteFileReader(path, bytes.NewReader(data), len(data)) } func (fs *s3Filesystem) WriteFileSafe(path string, data []byte) (int64, bool, error) { - return fs.WriteFileReader(path, bytes.NewReader(data)) + return fs.WriteFileReader(path, bytes.NewReader(data), len(data)) } func (fs *s3Filesystem) Rename(src, dst string) error { diff --git a/io/fs/sized.go b/io/fs/sized.go index 366ef6f5..aa6b552b 100644 --- a/io/fs/sized.go +++ b/io/fs/sized.go @@ -65,14 +65,14 @@ func (r *sizedFilesystem) Resize(size int64) error { return nil } -func (r *sizedFilesystem) WriteFileReader(path string, rd io.Reader) (int64, bool, error) { +func (r *sizedFilesystem) WriteFileReader(path string, rd io.Reader, sizeHint int) (int64, bool, error) { currentSize, maxSize := r.Size() if maxSize <= 0 { - return r.Filesystem.WriteFileReader(path, rd) + return r.Filesystem.WriteFileReader(path, rd, sizeHint) } data := bytes.Buffer{} - size, err := data.ReadFrom(rd) + size, err := copyToBufferFromReader(&data, rd, 8*1024) if err != nil { return -1, false, err } @@ -97,11 +97,11 @@ func (r *sizedFilesystem) WriteFileReader(path string, rd io.Reader) (int64, boo } } - return r.Filesystem.WriteFileReader(path, &data) + return r.Filesystem.WriteFileReader(path, &data, int(size)) } func (r *sizedFilesystem) WriteFile(path string, data []byte) (int64, bool, error) { - return r.WriteFileReader(path, bytes.NewBuffer(data)) + return r.WriteFileReader(path, bytes.NewBuffer(data), len(data)) } func (r *sizedFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, error) { diff --git a/io/fs/sized_test.go b/io/fs/sized_test.go index e158c422..b5eebe6e 100644 --- a/io/fs/sized_test.go +++ b/io/fs/sized_test.go @@ -52,7 +52,7 @@ func TestSizedResizePurge(t *testing.T) { require.Equal(t, int64(0), cur) require.Equal(t, int64(10), max) - fs.WriteFileReader("/foobar", strings.NewReader("xxxxxxxxxx")) + fs.WriteFileReader("/foobar", strings.NewReader("xxxxxxxxxx"), -1) cur, max = fs.Size() @@ -76,7 +76,7 @@ func TestSizedWrite(t *testing.T) { require.Equal(t, int64(0), cur) require.Equal(t, int64(10), max) - size, created, err := fs.WriteFileReader("/foobar", strings.NewReader("xxxxx")) + size, created, err := fs.WriteFileReader("/foobar", strings.NewReader("xxxxx"), -1) require.NoError(t, err) require.Equal(t, int64(5), size) require.Equal(t, true, created) @@ -89,7 +89,7 @@ func TestSizedWrite(t *testing.T) { _, _, err = fs.WriteFile("/foobaz", []byte("xxxxxx")) require.Error(t, err) - _, _, err = fs.WriteFileReader("/foobaz", strings.NewReader("xxxxxx")) + _, _, err = fs.WriteFileReader("/foobaz", strings.NewReader("xxxxxx"), -1) require.Error(t, err) _, _, err = fs.WriteFileSafe("/foobaz", []byte("xxxxxx")) @@ -101,7 +101,7 @@ func TestSizedReplaceNoPurge(t *testing.T) { data := strings.NewReader("xxxxx") - size, created, err := fs.WriteFileReader("/foobar", data) + size, created, err := fs.WriteFileReader("/foobar", data, -1) require.Nil(t, err) require.Equal(t, int64(5), size) @@ -118,7 +118,7 @@ func TestSizedReplaceNoPurge(t *testing.T) { data = strings.NewReader("yyy") - size, created, err = fs.WriteFileReader("/foobar", data) + size, created, err = fs.WriteFileReader("/foobar", data, -1) require.Nil(t, err) require.Equal(t, int64(3), size) @@ -141,9 +141,9 @@ func TestSizedReplacePurge(t *testing.T) { data2 := strings.NewReader("yyy") data3 := strings.NewReader("zzz") - fs.WriteFileReader("/foobar1", data1) - fs.WriteFileReader("/foobar2", data2) - fs.WriteFileReader("/foobar3", data3) + fs.WriteFileReader("/foobar1", data1, -1) + fs.WriteFileReader("/foobar2", data2, -1) + fs.WriteFileReader("/foobar3", data3, -1) cur, max := fs.Size() @@ -156,7 +156,7 @@ func TestSizedReplacePurge(t *testing.T) { data4 := strings.NewReader("zzzzz") - size, _, _ := fs.WriteFileReader("/foobar1", data4) + size, _, _ := fs.WriteFileReader("/foobar1", data4, -1) require.Equal(t, int64(5), size) @@ -175,7 +175,7 @@ func TestSizedReplaceUnlimited(t *testing.T) { data := strings.NewReader("xxxxx") - size, created, err := fs.WriteFileReader("/foobar", data) + size, created, err := fs.WriteFileReader("/foobar", data, -1) require.Nil(t, err) require.Equal(t, int64(5), size) @@ -192,7 +192,7 @@ func TestSizedReplaceUnlimited(t *testing.T) { data = strings.NewReader("yyy") - size, created, err = fs.WriteFileReader("/foobar", data) + size, created, err = fs.WriteFileReader("/foobar", data, -1) require.Nil(t, err) require.Equal(t, int64(3), size) @@ -213,7 +213,7 @@ func TestSizedTooBigNoPurge(t *testing.T) { data := strings.NewReader("xxxxxyyyyyz") - size, _, err := fs.WriteFileReader("/foobar", data) + size, _, err := fs.WriteFileReader("/foobar", data, -1) require.Error(t, err) require.Equal(t, int64(-1), size) } @@ -224,12 +224,12 @@ func TestSizedTooBigPurge(t *testing.T) { data1 := strings.NewReader("xxxxx") data2 := strings.NewReader("yyyyy") - fs.WriteFileReader("/foobar1", data1) - fs.WriteFileReader("/foobar2", data2) + fs.WriteFileReader("/foobar1", data1, -1) + fs.WriteFileReader("/foobar2", data2, -1) data := strings.NewReader("xxxxxyyyyyz") - size, _, err := fs.WriteFileReader("/foobar", data) + size, _, err := fs.WriteFileReader("/foobar", data, -1) require.Error(t, err) require.Equal(t, int64(-1), size) @@ -242,8 +242,8 @@ func TestSizedFullSpaceNoPurge(t *testing.T) { data1 := strings.NewReader("xxxxx") data2 := strings.NewReader("yyyyy") - fs.WriteFileReader("/foobar1", data1) - fs.WriteFileReader("/foobar2", data2) + fs.WriteFileReader("/foobar1", data1, -1) + fs.WriteFileReader("/foobar2", data2, -1) cur, max := fs.Size() @@ -256,7 +256,7 @@ func TestSizedFullSpaceNoPurge(t *testing.T) { data3 := strings.NewReader("zzzzz") - size, _, err := fs.WriteFileReader("/foobar3", data3) + size, _, err := fs.WriteFileReader("/foobar3", data3, -1) require.Error(t, err) require.Equal(t, int64(-1), size) } @@ -267,8 +267,8 @@ func TestSizedFullSpacePurge(t *testing.T) { data1 := strings.NewReader("xxxxx") data2 := strings.NewReader("yyyyy") - fs.WriteFileReader("/foobar1", data1) - fs.WriteFileReader("/foobar2", data2) + fs.WriteFileReader("/foobar1", data1, -1) + fs.WriteFileReader("/foobar2", data2, -1) cur, max := fs.Size() @@ -281,7 +281,7 @@ func TestSizedFullSpacePurge(t *testing.T) { data3 := strings.NewReader("zzzzz") - size, _, _ := fs.WriteFileReader("/foobar3", data3) + size, _, _ := fs.WriteFileReader("/foobar3", data3, -1) require.Equal(t, int64(5), size) @@ -302,9 +302,9 @@ func TestSizedFullSpacePurgeMulti(t *testing.T) { data2 := strings.NewReader("yyy") data3 := strings.NewReader("zzz") - fs.WriteFileReader("/foobar1", data1) - fs.WriteFileReader("/foobar2", data2) - fs.WriteFileReader("/foobar3", data3) + fs.WriteFileReader("/foobar1", data1, -1) + fs.WriteFileReader("/foobar2", data2, -1) + fs.WriteFileReader("/foobar3", data3, -1) cur, max := fs.Size() @@ -317,7 +317,7 @@ func TestSizedFullSpacePurgeMulti(t *testing.T) { data4 := strings.NewReader("zzzzz") - size, _, _ := fs.WriteFileReader("/foobar4", data4) + size, _, _ := fs.WriteFileReader("/foobar4", data4, -1) require.Equal(t, int64(5), size) @@ -338,11 +338,11 @@ func TestSizedPurgeOrder(t *testing.T) { data2 := strings.NewReader("yyyyy") data3 := strings.NewReader("zzzzz") - fs.WriteFileReader("/foobar1", data1) + fs.WriteFileReader("/foobar1", data1, -1) time.Sleep(1 * time.Second) - fs.WriteFileReader("/foobar2", data2) + fs.WriteFileReader("/foobar2", data2, -1) time.Sleep(1 * time.Second) - fs.WriteFileReader("/foobar3", data3) + fs.WriteFileReader("/foobar3", data3, -1) file := fs.Open("/foobar1") diff --git a/restream/fs/fs_test.go b/restream/fs/fs_test.go index a4b7923a..46d3a964 100644 --- a/restream/fs/fs_test.go +++ b/restream/fs/fs_test.go @@ -32,15 +32,15 @@ func TestMaxFiles(t *testing.T) { }, }) - cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0")) - cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1")) - cleanfs.WriteFileReader("/chunk_2.ts", strings.NewReader("chunk_2")) + cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1) + cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1) + cleanfs.WriteFileReader("/chunk_2.ts", strings.NewReader("chunk_2"), -1) require.Eventually(t, func() bool { return cleanfs.Files() == 3 }, 3*time.Second, time.Second) - cleanfs.WriteFileReader("/chunk_3.ts", strings.NewReader("chunk_3")) + cleanfs.WriteFileReader("/chunk_3.ts", strings.NewReader("chunk_3"), -1) require.Eventually(t, func() bool { if cleanfs.Files() != 3 { @@ -81,15 +81,15 @@ func TestMaxAge(t *testing.T) { }, }) - cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0")) - cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1")) - cleanfs.WriteFileReader("/chunk_2.ts", strings.NewReader("chunk_2")) + cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1) + cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1) + cleanfs.WriteFileReader("/chunk_2.ts", strings.NewReader("chunk_2"), -1) require.Eventually(t, func() bool { return cleanfs.Files() == 0 }, 10*time.Second, time.Second) - cleanfs.WriteFileReader("/chunk_3.ts", strings.NewReader("chunk_3")) + cleanfs.WriteFileReader("/chunk_3.ts", strings.NewReader("chunk_3"), -1) require.Eventually(t, func() bool { if cleanfs.Files() != 1 { @@ -130,15 +130,15 @@ func TestUnsetCleanup(t *testing.T) { }, }) - cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0")) - cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1")) - cleanfs.WriteFileReader("/chunk_2.ts", strings.NewReader("chunk_2")) + cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1) + cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1) + cleanfs.WriteFileReader("/chunk_2.ts", strings.NewReader("chunk_2"), -1) require.Eventually(t, func() bool { return cleanfs.Files() == 3 }, 3*time.Second, time.Second) - cleanfs.WriteFileReader("/chunk_3.ts", strings.NewReader("chunk_3")) + cleanfs.WriteFileReader("/chunk_3.ts", strings.NewReader("chunk_3"), -1) require.Eventually(t, func() bool { if cleanfs.Files() != 3 { @@ -158,7 +158,7 @@ func TestUnsetCleanup(t *testing.T) { cleanfs.UnsetCleanup("foobar") - cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4")) + cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1) require.Eventually(t, func() bool { if cleanfs.Files() != 4 {