diff --git a/app/api/api.go b/app/api/api.go index 73e48d8b..bd5b7cf5 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -847,6 +847,7 @@ func (a *api) start(ctx context.Context) error { "type": "mem", "name": "mem", }), + Storage: "swiss", } var memfs fs.Filesystem = nil if len(cfg.Storage.Memory.Backup.Dir) != 0 { diff --git a/io/fs/mem.go b/io/fs/mem.go index 2cc51637..e1d33aae 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -105,8 +105,10 @@ func (f *memFile) Seek(offset int64, whence int) (int64, error) { } func (f *memFile) Close() error { + var err error = nil + if f.r == nil { - return io.EOF + err = io.EOF } f.r = nil @@ -116,7 +118,7 @@ func (f *memFile) Close() error { f.data = nil } - return nil + return err } type memFilesystem struct { diff --git a/io/fs/memtest/memtest.go b/io/fs/memtest/memtest.go index 771eb454..73828e6e 100644 --- a/io/fs/memtest/memtest.go +++ b/io/fs/memtest/memtest.go @@ -29,8 +29,9 @@ func main() { oFiles := 15 oInterval := 1 // seconds oSize := 2 // megabytes - oLimit := false + oLimit := -1 oGC := 0 + oFree := 0 flag.StringVar(&oStorage, "storage", "mapof", "type of mem storage implementation (mapof, map, swiss)") flag.IntVar(&oWriters, "writers", 500, "number of concurrent writers") @@ -38,8 +39,9 @@ func main() { flag.IntVar(&oFiles, "files", 15, "number of files to keep per writer") flag.IntVar(&oInterval, "interval", 1, "interval for writing files in seconds") flag.IntVar(&oSize, "size", 2048, "size of files to write in kilobytes") - flag.BoolVar(&oLimit, "limit", false, "set memory limit") - flag.IntVar(&oGC, "gc", 0, "force garbage collector") + flag.IntVar(&oLimit, "limit", -1, "set memory limit, 0 for automatic, otherwise memory in MB") + flag.IntVar(&oGC, "gc", 100, "GC percentage") + flag.IntVar(&oFree, "free", 0, "force freeing memory") flag.Parse() @@ -47,9 +49,14 @@ func main() { fmt.Printf("Expecting effective memory consumption of %.1fGB\n", estimatedSize) - if oLimit { - fmt.Printf("Setting memory limit to %.1fGB\n", estimatedSize*1.5) - debug.SetMemoryLimit(int64(estimatedSize * 1.5)) + if oLimit >= 0 { + limitSize := estimatedSize * 1.5 * 1024 * 1024 * 1024 + if oLimit > 0 { + limitSize = float64(oLimit) * 1024 * 1024 + } + + fmt.Printf("Setting memory limit to %.1fGB\n", limitSize/1024/1024/1024) + debug.SetMemoryLimit(int64(limitSize)) } memfs, err := fs.NewMemFilesystem(fs.MemConfig{ @@ -206,9 +213,11 @@ func main() { } }(ctx, memfs) - if oGC > 0 { + debug.SetGCPercent(oGC) + + if oFree > 0 { go func(ctx context.Context) { - ticker := time.NewTicker(time.Duration(oGC) * time.Second) + ticker := time.NewTicker(time.Duration(oFree) * time.Second) defer ticker.Stop() for { diff --git a/mem/pool_test.go b/mem/pool_test.go new file mode 100644 index 00000000..c194536c --- /dev/null +++ b/mem/pool_test.go @@ -0,0 +1,94 @@ +package mem + +import ( + "math/rand" + "testing" + "time" +) + +func TestIndex(t *testing.T) { + testIndex(t, 0, 0) + testIndex(t, 1, 0) + + testIndex(t, minSize-1, 0) + testIndex(t, minSize, 0) + testIndex(t, minSize+1, 1) + + testIndex(t, 2*minSize-1, 1) + testIndex(t, 2*minSize, 1) + testIndex(t, 2*minSize+1, 2) + + testIndex(t, maxSize-1, steps-1) + testIndex(t, maxSize, steps-1) + testIndex(t, maxSize+1, steps-1) +} + +func testIndex(t *testing.T, n, expectedIdx int) { + idx := index(n) + if idx != expectedIdx { + t.Fatalf("unexpected idx for n=%d: %d. Expecting %d", n, idx, expectedIdx) + } +} + +func TestPoolCalibrate(t *testing.T) { + for i := 0; i < steps*calibrateCallsThreshold; i++ { + n := 1004 + if i%15 == 0 { + n = rand.Intn(15234) + } + testGetPut(t, n) + } +} + +func TestPoolVariousSizesSerial(t *testing.T) { + testPoolVariousSizes(t) +} + +func TestPoolVariousSizesConcurrent(t *testing.T) { + concurrency := 5 + ch := make(chan struct{}) + for i := 0; i < concurrency; i++ { + go func() { + testPoolVariousSizes(t) + ch <- struct{}{} + }() + } + for i := 0; i < concurrency; i++ { + select { + case <-ch: + case <-time.After(3 * time.Second): + t.Fatalf("timeout") + } + } +} + +func testPoolVariousSizes(t *testing.T) { + for i := 0; i < steps+1; i++ { + n := (1 << uint32(i)) + + testGetPut(t, n) + testGetPut(t, n+1) + testGetPut(t, n-1) + + for j := 0; j < 10; j++ { + testGetPut(t, j+n) + } + } +} + +func testGetPut(t *testing.T, n int) { + bb := Get() + if len(bb.data) > 0 { + t.Fatalf("non-empty byte buffer returned from acquire") + } + bb.data = allocNBytes(bb.data, n) + Put(bb) +} + +func allocNBytes(dst []byte, n int) []byte { + diff := n - cap(dst) + if diff <= 0 { + return dst[:n] + } + return append(dst, make([]byte, diff)...) +}