diff --git a/cluster/raft/raft.go b/cluster/raft/raft.go index 996345e0..347a7012 100644 --- a/cluster/raft/raft.go +++ b/cluster/raft/raft.go @@ -1,6 +1,7 @@ package raft import ( + "bytes" "encoding/base64" "fmt" "io" @@ -15,7 +16,6 @@ import ( "github.com/datarhei/core/v16/cluster/store" "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/log" - "github.com/datarhei/core/v16/mem" "go.etcd.io/bbolt" "github.com/hashicorp/go-hclog" @@ -318,18 +318,6 @@ func (r *raft) LeadershipTransfer(id string) error { return nil } -type readCloserWrapper struct { - io.Reader -} - -func (rcw *readCloserWrapper) Read(p []byte) (int, error) { - return rcw.Reader.Read(p) -} - -func (rcw *readCloserWrapper) Close() error { - return nil -} - type Snapshot struct { Metadata *hcraft.SnapshotMeta Data string @@ -359,14 +347,14 @@ func (r *raft) Snapshot() (io.ReadCloser, error) { Data: base64.StdEncoding.EncodeToString(data), } - buffer := mem.Get() + buffer := &bytes.Buffer{} enc := json.NewEncoder(buffer) err = enc.Encode(snapshot) if err != nil { return nil, err } - return &readCloserWrapper{buffer}, nil + return io.NopCloser(buffer), nil } func (r *raft) start(fsm hcraft.FSM, peers []Peer, inmem bool) error { diff --git a/http/client/client.go b/http/client/client.go index ae25779e..a4d2f65c 100644 --- a/http/client/client.go +++ b/http/client/client.go @@ -661,7 +661,7 @@ func (r *restclient) login() error { e := json.NewEncoder(buf) e.Encode(login) - req, err := http.NewRequest("POST", r.address+r.prefix+"/login", buf) + req, err := http.NewRequest("POST", r.address+r.prefix+"/login", buf.Reader()) if err != nil { return err } diff --git a/http/client/events.go b/http/client/events.go index e2f25279..22780e1e 100644 --- a/http/client/events.go +++ b/http/client/events.go @@ -19,7 +19,7 @@ func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-ch header := make(http.Header) header.Set("Accept", "application/x-json-stream") - stream, err := r.stream(ctx, "POST", "/v3/events", nil, header, "application/json", buf) + stream, err := r.stream(ctx, "POST", "/v3/events", nil, header, "application/json", buf.Reader()) if err != nil { return nil, err } diff --git a/http/client/process.go b/http/client/process.go index 485ab313..6a5f5e11 100644 --- a/http/client/process.go +++ b/http/client/process.go @@ -74,7 +74,7 @@ func (r *restclient) ProcessAdd(p *app.Config, metadata map[string]interface{}) e := json.NewEncoder(buf) e.Encode(config) - _, err := r.call("POST", "/v3/process", nil, nil, "application/json", buf) + _, err := r.call("POST", "/v3/process", nil, nil, "application/json", buf.Reader()) if err != nil { return err } @@ -95,7 +95,7 @@ func (r *restclient) ProcessUpdate(id app.ProcessID, p *app.Config, metadata map query := &url.Values{} query.Set("domain", id.Domain) - _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID), query, nil, "application/json", buf) + _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID), query, nil, "application/json", buf.Reader()) if err != nil { return err } @@ -116,7 +116,7 @@ func (r *restclient) ProcessReportSet(id app.ProcessID, report *app.Report) erro query := &url.Values{} query.Set("domain", id.Domain) - _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/report", query, nil, "application/json", buf) + _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/report", query, nil, "application/json", buf.Reader()) if err != nil { return err } @@ -145,7 +145,7 @@ func (r *restclient) ProcessCommand(id app.ProcessID, command string) error { query := &url.Values{} query.Set("domain", id.Domain) - _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/command", query, nil, "application/json", buf) + _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/command", query, nil, "application/json", buf.Reader()) return err } @@ -182,7 +182,7 @@ func (r *restclient) ProcessMetadataSet(id app.ProcessID, key string, metadata a query := &url.Values{} query.Set("domain", id.Domain) - _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/metadata/"+url.PathEscape(key), query, nil, "application/json", buf) + _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/metadata/"+url.PathEscape(key), query, nil, "application/json", buf.Reader()) return err } @@ -215,7 +215,7 @@ func (r *restclient) ProcessProbeConfig(p *app.Config) (api.Probe, error) { e := json.NewEncoder(buf) e.Encode(config) - data, err := r.call("POST", "/v3/process/probe", nil, nil, "application/json", buf) + data, err := r.call("POST", "/v3/process/probe", nil, nil, "application/json", buf.Reader()) if err != nil { return probe, err } diff --git a/http/middleware/compress/compress.go b/http/middleware/compress/compress.go index 43688d2f..c4244503 100644 --- a/http/middleware/compress/compress.go +++ b/http/middleware/compress/compress.go @@ -2,7 +2,6 @@ package compress import ( "bufio" - "bytes" "fmt" "io" "net" @@ -55,7 +54,7 @@ type compressResponseWriter struct { wroteBody bool minLength int minLengthExceeded bool - buffer *bytes.Buffer + buffer *mem.Buffer code int headerContentLength string scheme string diff --git a/http/middleware/hlsrewrite/hlsrewrite.go b/http/middleware/hlsrewrite/hlsrewrite.go index 02f21f2c..bfc6fd47 100644 --- a/http/middleware/hlsrewrite/hlsrewrite.go +++ b/http/middleware/hlsrewrite/hlsrewrite.go @@ -127,7 +127,7 @@ func (h *hlsrewrite) rewrite(c echo.Context, next echo.HandlerFunc) error { type hlsRewriter struct { http.ResponseWriter - buffer *bytes.Buffer + buffer *mem.Buffer } func (g *hlsRewriter) Write(data []byte) (int, error) { @@ -137,9 +137,9 @@ func (g *hlsRewriter) Write(data []byte) (int, error) { return w, err } -func (g *hlsRewriter) rewrite(pathPrefix []byte, buffer *bytes.Buffer) { +func (g *hlsRewriter) rewrite(pathPrefix []byte, buffer *mem.Buffer) { // Find all URLS in the .m3u8 and add the session ID to the query string - scanner := bufio.NewScanner(g.buffer) + scanner := bufio.NewScanner(g.buffer.Reader()) for scanner.Scan() { line := scanner.Bytes() diff --git a/http/middleware/hlsrewrite/hlsrewrite_test.go b/http/middleware/hlsrewrite/hlsrewrite_test.go index 7fb5341e..a149feaa 100644 --- a/http/middleware/hlsrewrite/hlsrewrite_test.go +++ b/http/middleware/hlsrewrite/hlsrewrite_test.go @@ -1,10 +1,10 @@ package hlsrewrite import ( - "bytes" "os" "testing" + "github.com/datarhei/core/v16/mem" "github.com/stretchr/testify/require" ) @@ -16,12 +16,12 @@ func TestRewrite(t *testing.T) { require.NoError(t, err) r := &hlsRewriter{ - buffer: &bytes.Buffer{}, + buffer: &mem.Buffer{}, } r.Write(data) - buffer := &bytes.Buffer{} + buffer := &mem.Buffer{} prefix := []byte("/path/to/foobar/") r.rewrite(prefix, buffer) @@ -33,10 +33,10 @@ func BenchmarkRewrite(b *testing.B) { require.NoError(b, err) r := &hlsRewriter{ - buffer: &bytes.Buffer{}, + buffer: &mem.Buffer{}, } - buffer := &bytes.Buffer{} + buffer := &mem.Buffer{} prefix := []byte("/path/to/foobar/") for i := 0; i < b.N; i++ { diff --git a/http/middleware/session/HLS.go b/http/middleware/session/HLS.go index c76dd4d2..f8654bd6 100644 --- a/http/middleware/session/HLS.go +++ b/http/middleware/session/HLS.go @@ -3,7 +3,6 @@ package session import ( "bufio" - "bytes" "io" "net/http" "net/url" @@ -11,6 +10,7 @@ import ( "path/filepath" "strings" + "github.com/datarhei/core/v16/mem" "github.com/datarhei/core/v16/net" "github.com/lithammer/shortuuid/v4" @@ -233,7 +233,7 @@ func (h *handler) handleHLSEgress(c echo.Context, _ string, data map[string]inte type segmentReader struct { reader io.ReadCloser - buffer *bytes.Buffer + buffer *mem.Buffer size int64 } @@ -255,7 +255,7 @@ func (r *segmentReader) getSegments(dir string) []string { segments := []string{} // Find all segment URLs in the .m3u8 - scanner := bufio.NewScanner(r.buffer) + scanner := bufio.NewScanner(r.buffer.Reader()) for scanner.Scan() { line := scanner.Text() @@ -299,7 +299,7 @@ func (r *segmentReader) getSegments(dir string) []string { type sessionRewriter struct { http.ResponseWriter - buffer *bytes.Buffer + buffer *mem.Buffer } func (g *sessionRewriter) Write(data []byte) (int, error) { @@ -307,11 +307,11 @@ func (g *sessionRewriter) Write(data []byte) (int, error) { return g.buffer.Write(data) } -func (g *sessionRewriter) rewriteHLS(sessionID string, requestURL *url.URL, buffer *bytes.Buffer) { +func (g *sessionRewriter) rewriteHLS(sessionID string, requestURL *url.URL, buffer *mem.Buffer) { isMaster := false // Find all URLS in the .m3u8 and add the session ID to the query string - scanner := bufio.NewScanner(g.buffer) + scanner := bufio.NewScanner(g.buffer.Reader()) for scanner.Scan() { byteline := scanner.Bytes() diff --git a/http/middleware/session/HLS_test.go b/http/middleware/session/HLS_test.go index e5cede1a..b7bc6564 100644 --- a/http/middleware/session/HLS_test.go +++ b/http/middleware/session/HLS_test.go @@ -19,7 +19,7 @@ func TestHLSSegmentReader(t *testing.T) { br := &segmentReader{ reader: io.NopCloser(r), - buffer: &bytes.Buffer{}, + buffer: &mem.Buffer{}, } _, err = io.ReadAll(br) @@ -66,7 +66,7 @@ func TestHLSRewrite(t *testing.T) { require.NoError(t, err) br := &sessionRewriter{ - buffer: &bytes.Buffer{}, + buffer: &mem.Buffer{}, } _, err = br.Write(data) @@ -75,7 +75,7 @@ func TestHLSRewrite(t *testing.T) { u, err := url.Parse("http://example.com/test.m3u8") require.NoError(t, err) - buffer := &bytes.Buffer{} + buffer := &mem.Buffer{} br.rewriteHLS("oT5GV8eWBbRAh4aib5egoK", u, buffer) diff --git a/http/middleware/session/session.go b/http/middleware/session/session.go index 3f12f405..d30eae08 100644 --- a/http/middleware/session/session.go +++ b/http/middleware/session/session.go @@ -1,7 +1,6 @@ package session import ( - "bytes" "fmt" "io" "net/http" @@ -177,7 +176,7 @@ func verifySession(raw interface{}, path, referrer string) (map[string]interface return data, nil } -func headerSize(header http.Header, buffer *bytes.Buffer) int64 { +func headerSize(header http.Header, buffer *mem.Buffer) int64 { buffer.Reset() header.Write(buffer) diff --git a/http/middleware/session/session_test.go b/http/middleware/session/session_test.go index b8633b19..f5c0c597 100644 --- a/http/middleware/session/session_test.go +++ b/http/middleware/session/session_test.go @@ -1,11 +1,11 @@ package session import ( - "bytes" "net/http" "testing" "github.com/datarhei/core/v16/encoding/json" + "github.com/datarhei/core/v16/mem" "github.com/stretchr/testify/require" ) @@ -143,7 +143,7 @@ func TestHeaderSize(t *testing.T) { header.Add("Content-Type", "application/json") header.Add("Content-Encoding", "gzip") - buffer := &bytes.Buffer{} + buffer := &mem.Buffer{} size := headerSize(header, buffer) require.Equal(t, "Content-Encoding: gzip\r\nContent-Type: application/json\r\n", buffer.String()) @@ -156,7 +156,7 @@ func BenchmarkHeaderSize(b *testing.B) { header.Add("Content-Type", "application/json") header.Add("Content-Encoding", "gzip") - buffer := &bytes.Buffer{} + buffer := &mem.Buffer{} for i := 0; i < b.N; i++ { headerSize(header, buffer) diff --git a/io/fs/mem.go b/io/fs/mem.go index 2eb68410..237d486e 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -2,7 +2,6 @@ package fs import ( "bytes" - "errors" "fmt" "io" "io/fs" @@ -69,8 +68,8 @@ func (f *memFileInfo) IsDir() bool { type memFile struct { memFileInfo - data *bytes.Buffer // Contents of the file - r *bytes.Reader + data *mem.Buffer // Contents of the file + r io.ReadSeeker } func (f *memFile) Name() string { @@ -380,9 +379,7 @@ func (fs *memFilesystem) ReadFile(path string) ([]byte, error) { } data := pool.Get() - - data.Grow(file.data.Len()) - data.Write(file.data.Bytes()) + file.data.WriteTo(data) return data.Bytes(), nil } @@ -433,35 +430,6 @@ func (fs *memFilesystem) Symlink(oldname, newname string) error { return nil } -func copyToBufferFromReader(buf *bytes.Buffer, r io.Reader, _ int) (int64, error) { - chunkData := [128 * 1024]byte{} - chunk := chunkData[0:] - - size := int64(0) - - for { - n, err := r.Read(chunk) - if n != 0 { - buf.Write(chunk[:n]) - size += int64(n) - } - - if err != nil { - if errors.Is(err, io.EOF) { - return size, nil - } - - return size, err - } - - if n == 0 { - break - } - } - - return size, nil -} - func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int) (int64, bool, error) { path = fs.cleanPath(path) @@ -480,11 +448,7 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int) data: pool.Get(), } - if sizeHint > 0 && sizeHint < 5*1024*1024 { - newFile.data.Grow(sizeHint) - } - - size, err := copyToBufferFromReader(newFile.data, r, 8*1024) + size, err := newFile.data.ReadFrom(r) if err != nil { fs.logger.WithFields(log.Fields{ "path": path, @@ -558,10 +522,9 @@ func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int data: pool.Get(), } - newFile.data.Grow(file.data.Len()) - newFile.data.Write(file.data.Bytes()) + file.data.WriteTo(newFile.data) - size, err := copyToBufferFromReader(newFile.data, r, 8*1024) + size, err := newFile.data.ReadFrom(r) if err != nil { fs.logger.WithFields(log.Fields{ "path": path, @@ -720,8 +683,7 @@ func (fs *memFilesystem) Copy(src, dst string) error { data: pool.Get(), } - dstFile.data.Grow(srcFile.data.Len()) - dstFile.data.Write(srcFile.data.Bytes()) + srcFile.data.WriteTo(dstFile.data) f, replace := fs.storage.Store(dst, dstFile) diff --git a/io/fs/mem_storage.go b/io/fs/mem_storage.go index 8267660a..18b447ba 100644 --- a/io/fs/mem_storage.go +++ b/io/fs/mem_storage.go @@ -1,7 +1,6 @@ package fs import ( - "bytes" "sync" "github.com/dolthub/swiss" @@ -122,33 +121,6 @@ func (m *mapStorage) Load(key string) (*memFile, bool) { return v, ok } -func (m *mapStorage) LoadAndCopy(key string) (*memFile, bool) { - m.lock.RLock() - defer m.lock.RUnlock() - - v, ok := m.files[key] - if !ok { - return nil, false - } - - f := &memFile{ - memFileInfo: memFileInfo{ - name: v.name, - size: v.size, - dir: v.dir, - lastMod: v.lastMod, - linkTo: v.linkTo, - }, - r: nil, - } - - if v.data != nil { - f.data = bytes.NewBuffer(v.data.Bytes()) - } - - return f, true -} - func (m *mapStorage) Has(key string) bool { m.lock.RLock() defer m.lock.RUnlock() @@ -214,33 +186,6 @@ func (m *swissMapStorage) Load(key string) (*memFile, bool) { return m.files.Get(key) } -func (m *swissMapStorage) LoadAndCopy(key string) (*memFile, bool) { - token := m.lock.RLock() - defer m.lock.RUnlock(token) - - v, ok := m.files.Get(key) - if !ok { - return nil, false - } - - f := &memFile{ - memFileInfo: memFileInfo{ - name: v.name, - size: v.size, - dir: v.dir, - lastMod: v.lastMod, - linkTo: v.linkTo, - }, - r: nil, - } - - if v.data != nil { - f.data = bytes.NewBuffer(v.data.Bytes()) - } - - return f, true -} - func (m *swissMapStorage) Has(key string) bool { token := m.lock.RLock() defer m.lock.RUnlock(token) diff --git a/io/fs/mem_test.go b/io/fs/mem_test.go index 63a8f0f3..be179d5c 100644 --- a/io/fs/mem_test.go +++ b/io/fs/mem_test.go @@ -1,7 +1,6 @@ package fs import ( - "bytes" "context" "fmt" "io" @@ -231,36 +230,3 @@ func benchmarkMemReadFileWhileWriting(b *testing.B, fs Filesystem) { readerWg.Wait() } - -func BenchmarkBufferReadFrom(b *testing.B) { - data := []byte(rand.StringAlphanumeric(1024 * 1024)) - - for i := 0; i < b.N; i++ { - r := bytes.NewReader(data) - buf := &bytes.Buffer{} - buf.ReadFrom(r) - } -} - -func TestBufferReadChunks(t *testing.T) { - data := []byte(rand.StringAlphanumeric(1024 * 1024)) - - r := bytes.NewReader(data) - buf := &bytes.Buffer{} - - copyToBufferFromReader(buf, r, 32*1024) - - res := bytes.Compare(data, buf.Bytes()) - require.Equal(t, 0, res) -} - -func BenchmarkBufferReadChunks(b *testing.B) { - data := []byte(rand.StringAlphanumeric(1024 * 1024)) - - for i := 0; i < b.N; i++ { - r := bytes.NewReader(data) - buf := &bytes.Buffer{} - - copyToBufferFromReader(buf, r, 32*1024) - } -} diff --git a/io/fs/s3.go b/io/fs/s3.go index c1659aef..316fd7ca 100644 --- a/io/fs/s3.go +++ b/io/fs/s3.go @@ -378,7 +378,7 @@ func (fs *s3Filesystem) AppendFileReader(path string, r io.Reader, sizeHint int) buffer.ReadFrom(object) buffer.ReadFrom(r) - size, _, err := fs.write(path, buffer) + size, _, err := fs.write(path, buffer.Reader()) return size, err } diff --git a/io/fs/sized.go b/io/fs/sized.go index cfa159eb..bd2c3fc6 100644 --- a/io/fs/sized.go +++ b/io/fs/sized.go @@ -71,8 +71,10 @@ func (r *sizedFilesystem) WriteFileReader(path string, rd io.Reader, sizeHint in return r.Filesystem.WriteFileReader(path, rd, sizeHint) } - data := bytes.Buffer{} - size, err := copyToBufferFromReader(&data, rd, 8*1024) + data := pool.Get() + defer pool.Put(data) + + size, err := data.ReadFrom(rd) if err != nil { return -1, false, err } @@ -97,7 +99,7 @@ func (r *sizedFilesystem) WriteFileReader(path string, rd io.Reader, sizeHint in } } - return r.Filesystem.WriteFileReader(path, &data, int(size)) + return r.Filesystem.WriteFileReader(path, data.Reader(), int(size)) } func (r *sizedFilesystem) WriteFile(path string, data []byte) (int64, bool, error) { @@ -141,8 +143,10 @@ func (r *sizedFilesystem) AppendFileReader(path string, rd io.Reader, sizeHint i return r.Filesystem.AppendFileReader(path, rd, sizeHint) } - data := bytes.Buffer{} - size, err := copyToBufferFromReader(&data, rd, 8*1024) + data := pool.Get() + defer pool.Put(data) + + size, err := data.ReadFrom(rd) if err != nil { return -1, err } @@ -162,7 +166,7 @@ func (r *sizedFilesystem) AppendFileReader(path string, rd io.Reader, sizeHint i } } - return r.Filesystem.AppendFileReader(path, &data, int(size)) + return r.Filesystem.AppendFileReader(path, data.Reader(), int(size)) } func (r *sizedFilesystem) Purge(size int64) int64 { diff --git a/mem/buffer.go b/mem/buffer.go index af676c10..e3a39fe9 100644 --- a/mem/buffer.go +++ b/mem/buffer.go @@ -2,46 +2,90 @@ package mem import ( "bytes" - "sync" + "errors" + "io" ) -type BufferPool struct { - pool sync.Pool +type Buffer struct { + data bytes.Buffer } -func NewBufferPool() *BufferPool { - p := &BufferPool{ - pool: sync.Pool{ - New: func() any { - return &bytes.Buffer{} - }, - }, +// Len returns the length of the buffer. +func (b *Buffer) Len() int { + return b.data.Len() +} + +// Bytes returns the buffer, but keeps ownership. +func (b *Buffer) Bytes() []byte { + return b.data.Bytes() +} + +// WriteTo writes the bytes to the writer. +func (b *Buffer) WriteTo(w io.Writer) (int64, error) { + n, err := w.Write(b.data.Bytes()) + return int64(n), err +} + +// Reset empties the buffer and keeps it's capacity. +func (b *Buffer) Reset() { + b.data.Reset() +} + +// Write appends to the buffer. +func (b *Buffer) Write(p []byte) (int, error) { + return b.data.Write(p) +} + +// ReadFrom reads from the reader and appends to the buffer. +func (b *Buffer) ReadFrom(r io.Reader) (int64, error) { + if br, ok := r.(*bytes.Reader); ok { + b.data.Grow(br.Len()) } - return p + chunkData := [128 * 1024]byte{} + chunk := chunkData[0:] + + size := int64(0) + + for { + n, err := r.Read(chunk) + if n != 0 { + b.data.Write(chunk[:n]) + size += int64(n) + } + + if err != nil { + if errors.Is(err, io.EOF) { + return size, nil + } + + return size, err + } + + if n == 0 { + break + } + } + + return size, nil } -func (p *BufferPool) Get() *bytes.Buffer { - buf := p.pool.Get().(*bytes.Buffer) - buf.Reset() - - return buf +// WriteByte appends a byte to the buffer. +func (b *Buffer) WriteByte(c byte) error { + return b.data.WriteByte(c) } -func (p *BufferPool) Put(buf *bytes.Buffer) { - p.pool.Put(buf) +// WriteString appends a string to the buffer. +func (b *Buffer) WriteString(s string) (n int, err error) { + return b.data.WriteString(s) } -var DefaultBufferPool *BufferPool - -func init() { - DefaultBufferPool = NewBufferPool() +// Reader returns a bytes.Reader based on the data in the buffer. +func (b *Buffer) Reader() *bytes.Reader { + return bytes.NewReader(b.Bytes()) } -func Get() *bytes.Buffer { - return DefaultBufferPool.Get() -} - -func Put(buf *bytes.Buffer) { - DefaultBufferPool.Put(buf) +// String returns the data in the buffer a string. +func (b *Buffer) String() string { + return b.data.String() } diff --git a/mem/buffer_test.go b/mem/buffer_test.go new file mode 100644 index 00000000..a19c1a72 --- /dev/null +++ b/mem/buffer_test.go @@ -0,0 +1,47 @@ +package mem + +import ( + "bytes" + "io" + "testing" + + "github.com/datarhei/core/v16/math/rand" + + "github.com/stretchr/testify/require" +) + +func TestBufferReadChunks(t *testing.T) { + data := []byte(rand.StringAlphanumeric(1024 * 1024)) + + r := bytes.NewReader(data) + buf := &Buffer{} + + buf.ReadFrom(r) + + res := bytes.Compare(data, buf.Bytes()) + require.Equal(t, 0, res) +} + +func BenchmarkBufferReadFrom(b *testing.B) { + data := []byte(rand.StringAlphanumeric(1024 * 1024)) + + r := bytes.NewReader(data) + + for i := 0; i < b.N; i++ { + r.Seek(0, io.SeekStart) + buf := &Buffer{} + buf.ReadFrom(r) + } +} + +func BenchmarkBytesBufferReadFrom(b *testing.B) { + data := []byte(rand.StringAlphanumeric(1024 * 1024)) + + r := bytes.NewReader(data) + + for i := 0; i < b.N; i++ { + r.Seek(0, io.SeekStart) + buf := &bytes.Buffer{} + buf.ReadFrom(r) + } +} diff --git a/mem/pool.go b/mem/pool.go new file mode 100644 index 00000000..641791f3 --- /dev/null +++ b/mem/pool.go @@ -0,0 +1,46 @@ +package mem + +import ( + "sync" +) + +type BufferPool struct { + pool sync.Pool +} + +func NewBufferPool() *BufferPool { + p := &BufferPool{ + pool: sync.Pool{ + New: func() any { + return &Buffer{} + }, + }, + } + + return p +} + +func (p *BufferPool) Get() *Buffer { + buf := p.pool.Get().(*Buffer) + buf.Reset() + + return buf +} + +func (p *BufferPool) Put(buf *Buffer) { + p.pool.Put(buf) +} + +var DefaultBufferPool *BufferPool + +func init() { + DefaultBufferPool = NewBufferPool() +} + +func Get() *Buffer { + return DefaultBufferPool.Get() +} + +func Put(buf *Buffer) { + DefaultBufferPool.Put(buf) +} diff --git a/service/api/api.go b/service/api/api.go index bae2cbb1..b34f589a 100644 --- a/service/api/api.go +++ b/service/api/api.go @@ -243,7 +243,7 @@ func (a *api) Monitor(id string, monitordata MonitorData) (MonitorResponse, erro } */ - response, err := a.callWithRetry(http.MethodPut, "api/v1/core/monitor/"+id, data) + response, err := a.callWithRetry(http.MethodPut, "api/v1/core/monitor/"+id, data.Reader()) if err != nil { return MonitorResponse{}, fmt.Errorf("error sending request: %w", err) }