Add AppendFileReader to filesystem, allows session logging with less I/O

This commit is contained in:
Ingo Oppermann 2024-08-20 17:34:50 +02:00
parent 1327fd6e21
commit 3756ce4977
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
8 changed files with 169 additions and 33 deletions

View File

@ -403,6 +403,31 @@ func (fs *diskFilesystem) WriteFileSafe(path string, data []byte) (int64, bool,
return int64(size), !replace, nil
}
func (fs *diskFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int) (int64, error) {
path = fs.cleanPath(path)
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
return -1, fmt.Errorf("creating file failed: %w", err)
}
f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return -1, err
}
defer f.Close()
size, err := f.ReadFrom(r)
if err != nil {
return -1, fmt.Errorf("reading data failed: %w", err)
}
fs.lastSizeCheck = time.Time{}
return size, nil
}
func (fs *diskFilesystem) Rename(src, dst string) error {
src = fs.cleanPath(src)
dst = fs.cleanPath(dst)

View File

@ -108,6 +108,10 @@ type WriteFilesystem interface {
// an error adding the file and error is not nil.
WriteFileSafe(path string, data []byte) (int64, bool, error)
// AppendFileReader appends the contents from reader to the file at path. If the file doesn't
// exist, it will be created. The number of written bytes will be returned, -1 otherwise.
AppendFileReader(path string, r io.Reader, size int) (int64, error)
// MkdirAll creates a directory named path, along with any necessary parents, and returns nil,
// or else returns an error. The permission bits perm (before umask) are used for all directories
// that MkdirAll creates. If path is already a directory, MkdirAll does nothing and returns nil.

View File

@ -115,6 +115,8 @@ func TestFilesystem(t *testing.T) {
"symlinkErrors": testSymlinkErrors,
"symlinkOpenStat": testSymlinkOpenStat,
"open": testOpen,
"append": testAppend,
"appendCreate": testAppendCreate,
}
for fsname, fs := range filesystems {
@ -125,6 +127,11 @@ func TestFilesystem(t *testing.T) {
}
filesystem, err := fs(name)
require.NoError(t, err)
if fsname == "s3fs" {
filesystem.RemoveList("/", ListOptions{Pattern: "/**"})
}
test(t, filesystem)
})
}
@ -859,3 +866,28 @@ func testSymlinkErrors(t *testing.T, fs Filesystem) {
err = fs.Symlink("/bazfoo", "/barfoo")
require.Error(t, err)
}
func testAppend(t *testing.T, fs Filesystem) {
_, _, err := fs.WriteFileReader("/foobar", strings.NewReader("part1"), -1)
require.NoError(t, err)
_, err = fs.AppendFileReader("/foobar", strings.NewReader("part2"), -1)
require.NoError(t, err)
file := fs.Open("/foobar")
require.NotNil(t, file)
data, err := io.ReadAll(file)
require.Equal(t, []byte("part1part2"), data)
}
func testAppendCreate(t *testing.T, fs Filesystem) {
_, err := fs.AppendFileReader("/foobar", strings.NewReader("part1"), -1)
require.NoError(t, err)
file := fs.Open("/foobar")
require.NotNil(t, file)
data, err := io.ReadAll(file)
require.Equal(t, []byte("part1"), data)
}

View File

@ -525,6 +525,46 @@ func (fs *memFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, e
return fs.WriteFileReader(path, bytes.NewReader(data), len(data))
}
func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int) (int64, error) {
path = fs.cleanPath(path)
file, hasFile := fs.storage.LoadAndCopy(path)
if !hasFile {
size, _, err := fs.WriteFileReader(path, r, sizeHint)
return size, err
}
size, err := copyToBufferFromReader(file.data, r, 8*1024)
if err != nil {
fs.logger.WithFields(log.Fields{
"path": path,
"filesize_bytes": size,
"error": err,
}).Warn().Log("Incomplete file")
file.Close()
return -1, fmt.Errorf("incomplete file")
}
file.size += size
fs.storage.Store(path, file)
fs.sizeLock.Lock()
defer fs.sizeLock.Unlock()
fs.currentSize += size
fs.logger.Debug().WithFields(log.Fields{
"path": file.name,
"filesize_bytes": file.size,
"size_bytes": fs.currentSize,
}).Log("Appended to file")
return size, nil
}
func (fs *memFilesystem) Purge(size int64) int64 {
files := []*memFile{}

View File

@ -9,11 +9,27 @@ import (
)
type memStorage interface {
// Delete deletes a file from the storage.
Delete(key string) (*memFile, bool)
// Store stores a file to the storage. If there's already a file with
// the same key, that value will be returned and replaced with the
// new file.
Store(key string, value *memFile) (*memFile, bool)
// Load loads a file from the storage. This is a references to the file,
// i.e. all changes to the file will be reflected on the storage.
Load(key string) (value *memFile, ok bool)
// LoadAndCopy loads a file from the storage. The returned file is a copy
// and can be modified without modifying the file on the storage.
LoadAndCopy(key string) (value *memFile, ok bool)
// Has checks whether a file exists at path.
Has(key string) bool
// Range ranges over all files on the storage. The callback needs to return
// false in order to stop the iteration.
Range(f func(key string, value *memFile) bool)
}

View File

@ -360,6 +360,25 @@ func (fs *s3Filesystem) WriteFileSafe(path string, data []byte) (int64, bool, er
return fs.WriteFileReader(path, bytes.NewReader(data), len(data))
}
func (fs *s3Filesystem) AppendFileReader(path string, r io.Reader, sizeHint int) (int64, error) {
path = fs.cleanPath(path)
ctx := context.Background()
object, err := fs.client.GetObject(ctx, fs.bucket, path, minio.GetObjectOptions{})
if err != nil {
size, _, err := fs.write(path, r)
return size, err
}
buffer := bytes.Buffer{}
buffer.ReadFrom(object)
buffer.ReadFrom(r)
size, _, err := fs.write(path, &buffer)
return size, err
}
func (fs *s3Filesystem) Rename(src, dst string) error {
src = fs.cleanPath(src)
dst = fs.cleanPath(dst)

View File

@ -135,34 +135,40 @@ func (r *sizedFilesystem) WriteFileSafe(path string, data []byte) (int64, bool,
return r.Filesystem.WriteFileSafe(path, data)
}
func (r *sizedFilesystem) AppendFileReader(path string, rd io.Reader, sizeHint int) (int64, error) {
currentSize, maxSize := r.Size()
if maxSize <= 0 {
return r.Filesystem.AppendFileReader(path, rd, sizeHint)
}
data := bytes.Buffer{}
size, err := copyToBufferFromReader(&data, rd, 8*1024)
if err != nil {
return -1, err
}
// Calculate the new size of the filesystem
newSize := currentSize + size
// If the the new size is larger than the allowed size, we have to free
// some space.
if newSize > maxSize {
if !r.purge {
return -1, fmt.Errorf("not enough space on device")
}
if r.Purge(size) < size {
return -1, fmt.Errorf("not enough space on device")
}
}
return r.Filesystem.AppendFileReader(path, &data, int(size))
}
func (r *sizedFilesystem) Purge(size int64) int64 {
if purger, ok := r.Filesystem.(PurgeFilesystem); ok {
return purger.Purge(size)
}
return 0
/*
files := r.Filesystem.List("/", "")
sort.Slice(files, func(i, j int) bool {
return files[i].ModTime().Before(files[j].ModTime())
})
var freed int64 = 0
for _, f := range files {
r.Filesystem.Remove(f.Name())
size -= f.Size()
freed += f.Size()
r.currentSize -= f.Size()
if size <= 0 {
break
}
}
files = nil
return freed
*/
}

View File

@ -199,12 +199,6 @@ func (r *registry) sessionPersister(pattern *strftime.Strftime, bufferDuration t
buffer := &bytes.Buffer{}
path := pattern.FormatString(time.Now())
file := r.persist.fs.Open(path)
if file != nil {
buffer.ReadFrom(file)
file.Close()
}
enc := json.NewEncoder(buffer)
ticker := time.NewTicker(bufferDuration)
@ -222,7 +216,7 @@ loop:
currentPath := pattern.FormatString(session.ClosedAt)
if currentPath != path && session.ClosedAt.After(splitTime) {
if buffer.Len() > 0 {
_, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes())
_, err := r.persist.fs.AppendFileReader(path, buffer, -1)
if err != nil {
r.logger.Error().WithError(err).WithField("path", path).Log("")
}
@ -239,7 +233,7 @@ loop:
enc.Encode(&session)
case t := <-ticker.C:
if buffer.Len() > 0 {
_, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes())
_, err := r.persist.fs.AppendFileReader(path, buffer, -1)
if err != nil {
r.logger.Error().WithError(err).WithField("path", path).Log("")
} else {
@ -260,7 +254,7 @@ loop:
}
if buffer.Len() > 0 {
_, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes())
_, err := r.persist.fs.AppendFileReader(path, buffer, -1)
if err != nil {
r.logger.Error().WithError(err).WithField("path", path).Log("")
} else {