Fix buffer re-use, use swiss-map backend
This commit is contained in:
parent
92decc7111
commit
519f0cfb2b
@ -847,6 +847,7 @@ func (a *api) start(ctx context.Context) error {
|
||||
"type": "mem",
|
||||
"name": "mem",
|
||||
}),
|
||||
Storage: "swiss",
|
||||
}
|
||||
var memfs fs.Filesystem = nil
|
||||
if len(cfg.Storage.Memory.Backup.Dir) != 0 {
|
||||
|
||||
@ -105,8 +105,10 @@ func (f *memFile) Seek(offset int64, whence int) (int64, error) {
|
||||
}
|
||||
|
||||
func (f *memFile) Close() error {
|
||||
var err error = nil
|
||||
|
||||
if f.r == nil {
|
||||
return io.EOF
|
||||
err = io.EOF
|
||||
}
|
||||
|
||||
f.r = nil
|
||||
@ -116,7 +118,7 @@ func (f *memFile) Close() error {
|
||||
f.data = nil
|
||||
}
|
||||
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
type memFilesystem struct {
|
||||
|
||||
@ -29,8 +29,9 @@ func main() {
|
||||
oFiles := 15
|
||||
oInterval := 1 // seconds
|
||||
oSize := 2 // megabytes
|
||||
oLimit := false
|
||||
oLimit := -1
|
||||
oGC := 0
|
||||
oFree := 0
|
||||
|
||||
flag.StringVar(&oStorage, "storage", "mapof", "type of mem storage implementation (mapof, map, swiss)")
|
||||
flag.IntVar(&oWriters, "writers", 500, "number of concurrent writers")
|
||||
@ -38,8 +39,9 @@ func main() {
|
||||
flag.IntVar(&oFiles, "files", 15, "number of files to keep per writer")
|
||||
flag.IntVar(&oInterval, "interval", 1, "interval for writing files in seconds")
|
||||
flag.IntVar(&oSize, "size", 2048, "size of files to write in kilobytes")
|
||||
flag.BoolVar(&oLimit, "limit", false, "set memory limit")
|
||||
flag.IntVar(&oGC, "gc", 0, "force garbage collector")
|
||||
flag.IntVar(&oLimit, "limit", -1, "set memory limit, 0 for automatic, otherwise memory in MB")
|
||||
flag.IntVar(&oGC, "gc", 100, "GC percentage")
|
||||
flag.IntVar(&oFree, "free", 0, "force freeing memory")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
@ -47,9 +49,14 @@ func main() {
|
||||
|
||||
fmt.Printf("Expecting effective memory consumption of %.1fGB\n", estimatedSize)
|
||||
|
||||
if oLimit {
|
||||
fmt.Printf("Setting memory limit to %.1fGB\n", estimatedSize*1.5)
|
||||
debug.SetMemoryLimit(int64(estimatedSize * 1.5))
|
||||
if oLimit >= 0 {
|
||||
limitSize := estimatedSize * 1.5 * 1024 * 1024 * 1024
|
||||
if oLimit > 0 {
|
||||
limitSize = float64(oLimit) * 1024 * 1024
|
||||
}
|
||||
|
||||
fmt.Printf("Setting memory limit to %.1fGB\n", limitSize/1024/1024/1024)
|
||||
debug.SetMemoryLimit(int64(limitSize))
|
||||
}
|
||||
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{
|
||||
@ -206,9 +213,11 @@ func main() {
|
||||
}
|
||||
}(ctx, memfs)
|
||||
|
||||
if oGC > 0 {
|
||||
debug.SetGCPercent(oGC)
|
||||
|
||||
if oFree > 0 {
|
||||
go func(ctx context.Context) {
|
||||
ticker := time.NewTicker(time.Duration(oGC) * time.Second)
|
||||
ticker := time.NewTicker(time.Duration(oFree) * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
|
||||
94
mem/pool_test.go
Normal file
94
mem/pool_test.go
Normal file
@ -0,0 +1,94 @@
|
||||
package mem
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestIndex(t *testing.T) {
|
||||
testIndex(t, 0, 0)
|
||||
testIndex(t, 1, 0)
|
||||
|
||||
testIndex(t, minSize-1, 0)
|
||||
testIndex(t, minSize, 0)
|
||||
testIndex(t, minSize+1, 1)
|
||||
|
||||
testIndex(t, 2*minSize-1, 1)
|
||||
testIndex(t, 2*minSize, 1)
|
||||
testIndex(t, 2*minSize+1, 2)
|
||||
|
||||
testIndex(t, maxSize-1, steps-1)
|
||||
testIndex(t, maxSize, steps-1)
|
||||
testIndex(t, maxSize+1, steps-1)
|
||||
}
|
||||
|
||||
func testIndex(t *testing.T, n, expectedIdx int) {
|
||||
idx := index(n)
|
||||
if idx != expectedIdx {
|
||||
t.Fatalf("unexpected idx for n=%d: %d. Expecting %d", n, idx, expectedIdx)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPoolCalibrate(t *testing.T) {
|
||||
for i := 0; i < steps*calibrateCallsThreshold; i++ {
|
||||
n := 1004
|
||||
if i%15 == 0 {
|
||||
n = rand.Intn(15234)
|
||||
}
|
||||
testGetPut(t, n)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPoolVariousSizesSerial(t *testing.T) {
|
||||
testPoolVariousSizes(t)
|
||||
}
|
||||
|
||||
func TestPoolVariousSizesConcurrent(t *testing.T) {
|
||||
concurrency := 5
|
||||
ch := make(chan struct{})
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func() {
|
||||
testPoolVariousSizes(t)
|
||||
ch <- struct{}{}
|
||||
}()
|
||||
}
|
||||
for i := 0; i < concurrency; i++ {
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testPoolVariousSizes(t *testing.T) {
|
||||
for i := 0; i < steps+1; i++ {
|
||||
n := (1 << uint32(i))
|
||||
|
||||
testGetPut(t, n)
|
||||
testGetPut(t, n+1)
|
||||
testGetPut(t, n-1)
|
||||
|
||||
for j := 0; j < 10; j++ {
|
||||
testGetPut(t, j+n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testGetPut(t *testing.T, n int) {
|
||||
bb := Get()
|
||||
if len(bb.data) > 0 {
|
||||
t.Fatalf("non-empty byte buffer returned from acquire")
|
||||
}
|
||||
bb.data = allocNBytes(bb.data, n)
|
||||
Put(bb)
|
||||
}
|
||||
|
||||
func allocNBytes(dst []byte, n int) []byte {
|
||||
diff := n - cap(dst)
|
||||
if diff <= 0 {
|
||||
return dst[:n]
|
||||
}
|
||||
return append(dst, make([]byte, diff)...)
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user