diff --git a/io/fs/disk.go b/io/fs/disk.go index 6cac9d40..b9f76b77 100644 --- a/io/fs/disk.go +++ b/io/fs/disk.go @@ -403,6 +403,31 @@ func (fs *diskFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, return int64(size), !replace, nil } +func (fs *diskFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int) (int64, error) { + path = fs.cleanPath(path) + + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0755); err != nil { + return -1, fmt.Errorf("creating file failed: %w", err) + } + + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return -1, err + } + + defer f.Close() + + size, err := f.ReadFrom(r) + if err != nil { + return -1, fmt.Errorf("reading data failed: %w", err) + } + + fs.lastSizeCheck = time.Time{} + + return size, nil +} + func (fs *diskFilesystem) Rename(src, dst string) error { src = fs.cleanPath(src) dst = fs.cleanPath(dst) diff --git a/io/fs/fs.go b/io/fs/fs.go index 4cc8e201..973be194 100644 --- a/io/fs/fs.go +++ b/io/fs/fs.go @@ -108,6 +108,10 @@ type WriteFilesystem interface { // an error adding the file and error is not nil. WriteFileSafe(path string, data []byte) (int64, bool, error) + // AppendFileReader appends the contents from reader to the file at path. If the file doesn't + // exist, it will be created. The number of written bytes will be returned, -1 otherwise. + AppendFileReader(path string, r io.Reader, size int) (int64, error) + // MkdirAll creates a directory named path, along with any necessary parents, and returns nil, // or else returns an error. The permission bits perm (before umask) are used for all directories // that MkdirAll creates. If path is already a directory, MkdirAll does nothing and returns nil. diff --git a/io/fs/fs_test.go b/io/fs/fs_test.go index 7e7c2362..17d79e3c 100644 --- a/io/fs/fs_test.go +++ b/io/fs/fs_test.go @@ -115,6 +115,8 @@ func TestFilesystem(t *testing.T) { "symlinkErrors": testSymlinkErrors, "symlinkOpenStat": testSymlinkOpenStat, "open": testOpen, + "append": testAppend, + "appendCreate": testAppendCreate, } for fsname, fs := range filesystems { @@ -125,6 +127,11 @@ func TestFilesystem(t *testing.T) { } filesystem, err := fs(name) require.NoError(t, err) + + if fsname == "s3fs" { + filesystem.RemoveList("/", ListOptions{Pattern: "/**"}) + } + test(t, filesystem) }) } @@ -859,3 +866,28 @@ func testSymlinkErrors(t *testing.T, fs Filesystem) { err = fs.Symlink("/bazfoo", "/barfoo") require.Error(t, err) } + +func testAppend(t *testing.T, fs Filesystem) { + _, _, err := fs.WriteFileReader("/foobar", strings.NewReader("part1"), -1) + require.NoError(t, err) + + _, err = fs.AppendFileReader("/foobar", strings.NewReader("part2"), -1) + require.NoError(t, err) + + file := fs.Open("/foobar") + require.NotNil(t, file) + + data, err := io.ReadAll(file) + require.Equal(t, []byte("part1part2"), data) +} + +func testAppendCreate(t *testing.T, fs Filesystem) { + _, err := fs.AppendFileReader("/foobar", strings.NewReader("part1"), -1) + require.NoError(t, err) + + file := fs.Open("/foobar") + require.NotNil(t, file) + + data, err := io.ReadAll(file) + require.Equal(t, []byte("part1"), data) +} diff --git a/io/fs/mem.go b/io/fs/mem.go index dee83c78..de8b3cb7 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -525,6 +525,46 @@ func (fs *memFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, e return fs.WriteFileReader(path, bytes.NewReader(data), len(data)) } +func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int) (int64, error) { + path = fs.cleanPath(path) + + file, hasFile := fs.storage.LoadAndCopy(path) + if !hasFile { + size, _, err := fs.WriteFileReader(path, r, sizeHint) + return size, err + } + + size, err := copyToBufferFromReader(file.data, r, 8*1024) + if err != nil { + fs.logger.WithFields(log.Fields{ + "path": path, + "filesize_bytes": size, + "error": err, + }).Warn().Log("Incomplete file") + + file.Close() + + return -1, fmt.Errorf("incomplete file") + } + + file.size += size + + fs.storage.Store(path, file) + + fs.sizeLock.Lock() + defer fs.sizeLock.Unlock() + + fs.currentSize += size + + fs.logger.Debug().WithFields(log.Fields{ + "path": file.name, + "filesize_bytes": file.size, + "size_bytes": fs.currentSize, + }).Log("Appended to file") + + return size, nil +} + func (fs *memFilesystem) Purge(size int64) int64 { files := []*memFile{} diff --git a/io/fs/mem_storage.go b/io/fs/mem_storage.go index 5df8f946..044b375e 100644 --- a/io/fs/mem_storage.go +++ b/io/fs/mem_storage.go @@ -9,11 +9,27 @@ import ( ) type memStorage interface { + // Delete deletes a file from the storage. Delete(key string) (*memFile, bool) + + // Store stores a file to the storage. If there's already a file with + // the same key, that value will be returned and replaced with the + // new file. Store(key string, value *memFile) (*memFile, bool) + + // Load loads a file from the storage. This is a references to the file, + // i.e. all changes to the file will be reflected on the storage. Load(key string) (value *memFile, ok bool) + + // LoadAndCopy loads a file from the storage. The returned file is a copy + // and can be modified without modifying the file on the storage. LoadAndCopy(key string) (value *memFile, ok bool) + + // Has checks whether a file exists at path. Has(key string) bool + + // Range ranges over all files on the storage. The callback needs to return + // false in order to stop the iteration. Range(f func(key string, value *memFile) bool) } diff --git a/io/fs/s3.go b/io/fs/s3.go index c75162ed..ed9d1ade 100644 --- a/io/fs/s3.go +++ b/io/fs/s3.go @@ -360,6 +360,25 @@ func (fs *s3Filesystem) WriteFileSafe(path string, data []byte) (int64, bool, er return fs.WriteFileReader(path, bytes.NewReader(data), len(data)) } +func (fs *s3Filesystem) AppendFileReader(path string, r io.Reader, sizeHint int) (int64, error) { + path = fs.cleanPath(path) + + ctx := context.Background() + + object, err := fs.client.GetObject(ctx, fs.bucket, path, minio.GetObjectOptions{}) + if err != nil { + size, _, err := fs.write(path, r) + return size, err + } + + buffer := bytes.Buffer{} + buffer.ReadFrom(object) + buffer.ReadFrom(r) + + size, _, err := fs.write(path, &buffer) + return size, err +} + func (fs *s3Filesystem) Rename(src, dst string) error { src = fs.cleanPath(src) dst = fs.cleanPath(dst) diff --git a/io/fs/sized.go b/io/fs/sized.go index aa6b552b..cfa159eb 100644 --- a/io/fs/sized.go +++ b/io/fs/sized.go @@ -135,34 +135,40 @@ func (r *sizedFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, return r.Filesystem.WriteFileSafe(path, data) } +func (r *sizedFilesystem) AppendFileReader(path string, rd io.Reader, sizeHint int) (int64, error) { + currentSize, maxSize := r.Size() + if maxSize <= 0 { + return r.Filesystem.AppendFileReader(path, rd, sizeHint) + } + + data := bytes.Buffer{} + size, err := copyToBufferFromReader(&data, rd, 8*1024) + if err != nil { + return -1, err + } + + // Calculate the new size of the filesystem + newSize := currentSize + size + + // If the the new size is larger than the allowed size, we have to free + // some space. + if newSize > maxSize { + if !r.purge { + return -1, fmt.Errorf("not enough space on device") + } + + if r.Purge(size) < size { + return -1, fmt.Errorf("not enough space on device") + } + } + + return r.Filesystem.AppendFileReader(path, &data, int(size)) +} + func (r *sizedFilesystem) Purge(size int64) int64 { if purger, ok := r.Filesystem.(PurgeFilesystem); ok { return purger.Purge(size) } return 0 - /* - files := r.Filesystem.List("/", "") - - sort.Slice(files, func(i, j int) bool { - return files[i].ModTime().Before(files[j].ModTime()) - }) - - var freed int64 = 0 - - for _, f := range files { - r.Filesystem.Remove(f.Name()) - size -= f.Size() - freed += f.Size() - r.currentSize -= f.Size() - - if size <= 0 { - break - } - } - - files = nil - - return freed - */ } diff --git a/session/registry.go b/session/registry.go index 0dd54af6..1a08b3f0 100644 --- a/session/registry.go +++ b/session/registry.go @@ -199,12 +199,6 @@ func (r *registry) sessionPersister(pattern *strftime.Strftime, bufferDuration t buffer := &bytes.Buffer{} path := pattern.FormatString(time.Now()) - file := r.persist.fs.Open(path) - if file != nil { - buffer.ReadFrom(file) - file.Close() - } - enc := json.NewEncoder(buffer) ticker := time.NewTicker(bufferDuration) @@ -222,7 +216,7 @@ loop: currentPath := pattern.FormatString(session.ClosedAt) if currentPath != path && session.ClosedAt.After(splitTime) { if buffer.Len() > 0 { - _, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes()) + _, err := r.persist.fs.AppendFileReader(path, buffer, -1) if err != nil { r.logger.Error().WithError(err).WithField("path", path).Log("") } @@ -239,7 +233,7 @@ loop: enc.Encode(&session) case t := <-ticker.C: if buffer.Len() > 0 { - _, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes()) + _, err := r.persist.fs.AppendFileReader(path, buffer, -1) if err != nil { r.logger.Error().WithError(err).WithField("path", path).Log("") } else { @@ -260,7 +254,7 @@ loop: } if buffer.Len() > 0 { - _, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes()) + _, err := r.persist.fs.AppendFileReader(path, buffer, -1) if err != nil { r.logger.Error().WithError(err).WithField("path", path).Log("") } else {