From 92decc7111edc383dd854fad671e8844596e3ba2 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 14 Oct 2024 10:51:35 +0200 Subject: [PATCH] Use global buffer pool where feasible --- http/client/client.go | 7 +- http/client/events.go | 5 +- http/client/process.go | 25 ++--- http/middleware/cache/cache.go | 5 +- http/middleware/compress/compress.go | 6 +- http/middleware/session/HLS.go | 28 +++--- http/middleware/session/HLS_test.go | 16 ++-- http/middleware/session/HTTP.go | 5 +- http/middleware/session/session.go | 3 - io/fs/mem.go | 112 +++++++--------------- io/fs/mem_storage.go | 100 +++++++++++++++++++- io/fs/sized.go | 10 +- mem/buffer.go | 134 ++++++++++++++++++++------- mem/pool.go | 121 ++++++++++++++++++++++-- restream/app/process.go | 4 +- service/api/api.go | 9 +- update/update.go | 9 +- 17 files changed, 407 insertions(+), 192 deletions(-) diff --git a/http/client/client.go b/http/client/client.go index a4d2f65c..d877076b 100644 --- a/http/client/client.go +++ b/http/client/client.go @@ -199,8 +199,6 @@ type restclient struct { connectedCore *semver.Version methods map[string][]apiconstraint } - - pool *mem.BufferPool } // New returns a new REST API client for the given config. The error is non-nil @@ -214,7 +212,6 @@ func New(config Config) (RestClient, error) { auth0Token: config.Auth0Token, client: config.Client, clientTimeout: config.Timeout, - pool: mem.NewBufferPool(), } if len(config.AccessToken) != 0 { @@ -655,8 +652,8 @@ func (r *restclient) login() error { login.Password = r.password } - buf := r.pool.Get() - defer r.pool.Put(buf) + buf := mem.Get() + defer mem.Put(buf) e := json.NewEncoder(buf) e.Encode(login) diff --git a/http/client/events.go b/http/client/events.go index 22780e1e..97909edb 100644 --- a/http/client/events.go +++ b/http/client/events.go @@ -7,11 +7,12 @@ import ( "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/mem" ) func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) { - buf := r.pool.Get() - defer r.pool.Put(buf) + buf := mem.Get() + defer mem.Put(buf) e := json.NewEncoder(buf) e.Encode(filters) diff --git a/http/client/process.go b/http/client/process.go index 6a5f5e11..8de34aa8 100644 --- a/http/client/process.go +++ b/http/client/process.go @@ -6,6 +6,7 @@ import ( "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/mem" "github.com/datarhei/core/v16/restream/app" ) @@ -65,8 +66,8 @@ func (r *restclient) Process(id app.ProcessID, filter []string) (api.Process, er } func (r *restclient) ProcessAdd(p *app.Config, metadata map[string]interface{}) error { - buf := r.pool.Get() - defer r.pool.Put(buf) + buf := mem.Get() + defer mem.Put(buf) config := api.ProcessConfig{} config.Unmarshal(p, metadata) @@ -83,8 +84,8 @@ func (r *restclient) ProcessAdd(p *app.Config, metadata map[string]interface{}) } func (r *restclient) ProcessUpdate(id app.ProcessID, p *app.Config, metadata map[string]interface{}) error { - buf := r.pool.Get() - defer r.pool.Put(buf) + buf := mem.Get() + defer mem.Put(buf) config := api.ProcessConfig{} config.Unmarshal(p, metadata) @@ -104,8 +105,8 @@ func (r *restclient) ProcessUpdate(id app.ProcessID, p *app.Config, metadata map } func (r *restclient) ProcessReportSet(id app.ProcessID, report *app.Report) error { - buf := r.pool.Get() - defer r.pool.Put(buf) + buf := mem.Get() + defer mem.Put(buf) data := api.ProcessReport{} data.Unmarshal(report) @@ -134,8 +135,8 @@ func (r *restclient) ProcessDelete(id app.ProcessID) error { } func (r *restclient) ProcessCommand(id app.ProcessID, command string) error { - buf := r.pool.Get() - defer r.pool.Put(buf) + buf := mem.Get() + defer mem.Put(buf) e := json.NewEncoder(buf) e.Encode(api.Command{ @@ -173,8 +174,8 @@ func (r *restclient) ProcessMetadata(id app.ProcessID, key string) (api.Metadata } func (r *restclient) ProcessMetadataSet(id app.ProcessID, key string, metadata api.Metadata) error { - buf := r.pool.Get() - defer r.pool.Put(buf) + buf := mem.Get() + defer mem.Put(buf) e := json.NewEncoder(buf) e.Encode(metadata) @@ -206,8 +207,8 @@ func (r *restclient) ProcessProbe(id app.ProcessID) (api.Probe, error) { func (r *restclient) ProcessProbeConfig(p *app.Config) (api.Probe, error) { var probe api.Probe - buf := r.pool.Get() - defer r.pool.Put(buf) + buf := mem.Get() + defer mem.Put(buf) config := api.ProcessConfig{} config.Unmarshal(p, nil) diff --git a/http/middleware/cache/cache.go b/http/middleware/cache/cache.go index 3b25fc17..f7d12aa9 100644 --- a/http/middleware/cache/cache.go +++ b/http/middleware/cache/cache.go @@ -2,13 +2,13 @@ package cache import ( - "bytes" "fmt" "net/http" "path" "strings" "github.com/datarhei/core/v16/http/cache" + "github.com/datarhei/core/v16/mem" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" @@ -78,6 +78,7 @@ func NewWithConfig(config Config) echo.MiddlewareFunc { w := &cacheWriter{ header: writer.Header().Clone(), + body: mem.Get(), } res.Writer = w @@ -170,7 +171,7 @@ type cacheObject struct { type cacheWriter struct { code int header http.Header - body bytes.Buffer + body *mem.Buffer } func (w *cacheWriter) Header() http.Header { diff --git a/http/middleware/compress/compress.go b/http/middleware/compress/compress.go index c4244503..b595f47b 100644 --- a/http/middleware/compress/compress.go +++ b/http/middleware/compress/compress.go @@ -138,8 +138,6 @@ func NewWithConfig(config Config) echo.MiddlewareFunc { zstdCompressor = NewZstd(config.Level) } - bufferPool := mem.NewBufferPool() - return func(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { if config.Skipper(c) { @@ -171,7 +169,7 @@ func NewWithConfig(config Config) echo.MiddlewareFunc { rw := res.Writer compressor.Reset(rw) - buffer := bufferPool.Get() + buffer := mem.Get() grw := &compressResponseWriter{ Compressor: compressor, @@ -208,7 +206,7 @@ func NewWithConfig(config Config) echo.MiddlewareFunc { } } compressor.Close() - bufferPool.Put(buffer) + mem.Put(buffer) compress.Release(compressor) }() diff --git a/http/middleware/session/HLS.go b/http/middleware/session/HLS.go index f8654bd6..fa03d4e2 100644 --- a/http/middleware/session/HLS.go +++ b/http/middleware/session/HLS.go @@ -39,7 +39,7 @@ func (h *handler) handleHLSIngress(c echo.Context, _ string, data map[string]int reader := req.Body r := &segmentReader{ reader: req.Body, - buffer: h.bufferPool.Get(), + buffer: mem.Get(), } req.Body = r @@ -47,7 +47,7 @@ func (h *handler) handleHLSIngress(c echo.Context, _ string, data map[string]int req.Body = reader if r.size == 0 { - h.bufferPool.Put(r.buffer) + mem.Put(r.buffer) return } @@ -60,10 +60,10 @@ func (h *handler) handleHLSIngress(c echo.Context, _ string, data map[string]int h.hlsIngressCollector.Extra(path, data) } - buffer := h.bufferPool.Get() + buffer := mem.Get() h.hlsIngressCollector.Ingress(path, headerSize(req.Header, buffer)) h.hlsIngressCollector.Ingress(path, r.size) - h.bufferPool.Put(buffer) + mem.Put(buffer) segments := r.getSegments(urlpath.Dir(path)) @@ -79,7 +79,7 @@ func (h *handler) handleHLSIngress(c echo.Context, _ string, data map[string]int h.lock.Unlock() } - h.bufferPool.Put(r.buffer) + mem.Put(r.buffer) }() } else if strings.HasSuffix(path, ".ts") { // Get the size of the .ts file and store it in the ts-map for later use. @@ -93,11 +93,11 @@ func (h *handler) handleHLSIngress(c echo.Context, _ string, data map[string]int req.Body = reader if r.size != 0 { - buffer := h.bufferPool.Get() + buffer := mem.Get() h.lock.Lock() h.rxsegments[path] = r.size + headerSize(req.Header, buffer) h.lock.Unlock() - h.bufferPool.Put(buffer) + mem.Put(buffer) } }() } @@ -179,7 +179,7 @@ func (h *handler) handleHLSEgress(c echo.Context, _ string, data map[string]inte // the data that we need to rewrite. rewriter = &sessionRewriter{ ResponseWriter: res.Writer, - buffer: h.bufferPool.Get(), + buffer: mem.Get(), } res.Writer = rewriter @@ -197,11 +197,11 @@ func (h *handler) handleHLSEgress(c echo.Context, _ string, data map[string]inte if rewrite { if res.Status < 200 || res.Status >= 300 { res.Write(rewriter.buffer.Bytes()) - h.bufferPool.Put(rewriter.buffer) + mem.Put(rewriter.buffer) return nil } - buffer := h.bufferPool.Get() + buffer := mem.Get() // Rewrite the data befor sending it to the client rewriter.rewriteHLS(sessionID, c.Request().URL, buffer) @@ -209,17 +209,17 @@ func (h *handler) handleHLSEgress(c echo.Context, _ string, data map[string]inte res.Header().Set("Cache-Control", "private") res.Write(buffer.Bytes()) - h.bufferPool.Put(buffer) - h.bufferPool.Put(rewriter.buffer) + mem.Put(buffer) + mem.Put(rewriter.buffer) } if isM3U8 || isTS { if res.Status >= 200 && res.Status < 300 { // Collect how many bytes we've written in this session - buffer := h.bufferPool.Get() + buffer := mem.Get() h.hlsEgressCollector.Egress(sessionID, headerSize(res.Header(), buffer)) h.hlsEgressCollector.Egress(sessionID, res.Size) - h.bufferPool.Put(buffer) + mem.Put(buffer) if isTS { // Activate the session. If the session is already active, this is a noop diff --git a/http/middleware/session/HLS_test.go b/http/middleware/session/HLS_test.go index b7bc6564..5d27103b 100644 --- a/http/middleware/session/HLS_test.go +++ b/http/middleware/session/HLS_test.go @@ -39,8 +39,6 @@ func TestHLSSegmentReader(t *testing.T) { } func BenchmarkHLSSegmentReader(b *testing.B) { - pool := mem.NewBufferPool() - data, err := os.ReadFile("./fixtures/segments.txt") require.NoError(b, err) @@ -51,13 +49,13 @@ func BenchmarkHLSSegmentReader(b *testing.B) { rd.Reset(data) br := &segmentReader{ reader: io.NopCloser(r), - buffer: pool.Get(), + buffer: mem.Get(), } _, err := io.ReadAll(br) require.NoError(b, err) - pool.Put(br.buffer) + mem.Put(br.buffer) } } @@ -86,8 +84,6 @@ func TestHLSRewrite(t *testing.T) { } func BenchmarkHLSRewrite(b *testing.B) { - pool := mem.NewBufferPool() - data, err := os.ReadFile("./fixtures/segments.txt") require.NoError(b, err) @@ -96,17 +92,17 @@ func BenchmarkHLSRewrite(b *testing.B) { for i := 0; i < b.N; i++ { br := &sessionRewriter{ - buffer: pool.Get(), + buffer: mem.Get(), } _, err = br.Write(data) require.NoError(b, err) - buffer := pool.Get() + buffer := mem.Get() br.rewriteHLS("oT5GV8eWBbRAh4aib5egoK", u, buffer) - pool.Put(br.buffer) - pool.Put(buffer) + mem.Put(br.buffer) + mem.Put(buffer) } } diff --git a/http/middleware/session/HTTP.go b/http/middleware/session/HTTP.go index 4913171f..ccd50127 100644 --- a/http/middleware/session/HTTP.go +++ b/http/middleware/session/HTTP.go @@ -3,6 +3,7 @@ package session import ( "net/url" + "github.com/datarhei/core/v16/mem" "github.com/labstack/echo/v4" "github.com/lithammer/shortuuid/v4" ) @@ -45,7 +46,7 @@ func (h *handler) handleHTTP(c echo.Context, _ string, data map[string]interface h.httpCollector.Extra(id, data) defer func() { - buffer := h.bufferPool.Get() + buffer := mem.Get() req.Body = reader h.httpCollector.Ingress(id, r.size+headerSize(req.Header, buffer)) @@ -58,7 +59,7 @@ func (h *handler) handleHTTP(c echo.Context, _ string, data map[string]interface h.httpCollector.Close(id) - h.bufferPool.Put(buffer) + mem.Put(buffer) }() return next(c) diff --git a/http/middleware/session/session.go b/http/middleware/session/session.go index d30eae08..9cd77955 100644 --- a/http/middleware/session/session.go +++ b/http/middleware/session/session.go @@ -44,8 +44,6 @@ type handler struct { rxsegments map[string]int64 lock sync.Mutex - - bufferPool *mem.BufferPool } // New returns a new session middleware with default config @@ -77,7 +75,6 @@ func NewWithConfig(config Config) echo.MiddlewareFunc { hlsIngressCollector: config.HLSIngressCollector, reSessionID: regexp.MustCompile(`^[` + regexp.QuoteMeta(shortuuid.DefaultAlphabet) + `]{22}$`), rxsegments: make(map[string]int64), - bufferPool: mem.NewBufferPool(), } return func(next echo.HandlerFunc) echo.HandlerFunc { diff --git a/io/fs/mem.go b/io/fs/mem.go index 237d486e..2cc51637 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -111,25 +111,12 @@ func (f *memFile) Close() error { f.r = nil - return nil -} - -func (f *memFile) free() { - f.Close() - - if f.data == nil { - return + if f.data != nil { + mem.Put(f.data) + f.data = nil } - pool.Put(f.data) - - f.data = nil -} - -var pool *mem.BufferPool = nil - -func init() { - pool = mem.NewBufferPool() + return nil } type memFilesystem struct { @@ -331,24 +318,13 @@ func (fs *memFilesystem) Files() int64 { func (fs *memFilesystem) Open(path string) File { path = fs.cleanPath(path) - file, ok := fs.storage.Load(path) + newFile, ok := fs.storage.LoadAndCopy(path) if !ok { return nil } - newFile := &memFile{ - memFileInfo: memFileInfo{ - name: file.name, - size: file.size, - dir: file.dir, - lastMod: file.lastMod, - linkTo: file.linkTo, - }, - data: file.data, - } - - if len(file.linkTo) != 0 { - file, ok := fs.storage.Load(file.linkTo) + if len(newFile.linkTo) != 0 { + file, ok := fs.storage.LoadAndCopy(newFile.linkTo) if !ok { return nil } @@ -358,7 +334,7 @@ func (fs *memFilesystem) Open(path string) File { newFile.size = file.size } - newFile.r = bytes.NewReader(newFile.data.Bytes()) + newFile.r = newFile.data.Reader() return newFile } @@ -366,22 +342,19 @@ func (fs *memFilesystem) Open(path string) File { func (fs *memFilesystem) ReadFile(path string) ([]byte, error) { path = fs.cleanPath(path) - file, ok := fs.storage.Load(path) + file, ok := fs.storage.LoadAndCopy(path) if !ok { return nil, ErrNotExist } if len(file.linkTo) != 0 { - file, ok = fs.storage.Load(file.linkTo) + file, ok = fs.storage.LoadAndCopy(file.linkTo) if !ok { return nil, ErrNotExist } } - data := pool.Get() - file.data.WriteTo(data) - - return data.Bytes(), nil + return file.data.Bytes(), nil } func (fs *memFilesystem) Symlink(oldname, newname string) error { @@ -421,7 +394,7 @@ func (fs *memFilesystem) Symlink(oldname, newname string) error { defer fs.sizeLock.Unlock() if replaced { - oldFile.free() + oldFile.Close() fs.currentSize -= oldFile.size } @@ -445,7 +418,7 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int) size: 0, lastMod: time.Now(), }, - data: pool.Get(), + data: mem.Get(), } size, err := newFile.data.ReadFrom(r) @@ -456,7 +429,7 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int) "error": err, }).Warn().Log("Incomplete file") - newFile.free() + newFile.Close() return -1, false, fmt.Errorf("incomplete file") } @@ -473,7 +446,7 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int) defer fs.sizeLock.Unlock() if replace { - oldFile.free() + oldFile.Close() fs.currentSize -= oldFile.size } @@ -506,25 +479,13 @@ func (fs *memFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, e func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int) (int64, error) { path = fs.cleanPath(path) - file, hasFile := fs.storage.Load(path) + file, hasFile := fs.storage.LoadAndCopy(path) if !hasFile { size, _, err := fs.WriteFileReader(path, r, sizeHint) return size, err } - newFile := &memFile{ - memFileInfo: memFileInfo{ - name: path, - dir: false, - size: 0, - lastMod: time.Now(), - }, - data: pool.Get(), - } - - file.data.WriteTo(newFile.data) - - size, err := newFile.data.ReadFrom(r) + size, err := file.data.ReadFrom(r) if err != nil { fs.logger.WithFields(log.Fields{ "path": path, @@ -532,20 +493,21 @@ func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int "error": err, }).Warn().Log("Incomplete file") - newFile.free() + file.Close() return -1, fmt.Errorf("incomplete file") } file.size += size + file.lastMod = time.Now() - oldFile, replace := fs.storage.Store(path, newFile) + oldFile, replace := fs.storage.Store(path, file) fs.sizeLock.Lock() defer fs.sizeLock.Unlock() if replace { - oldFile.free() + oldFile.Close() } fs.currentSize += size @@ -584,7 +546,7 @@ func (fs *memFilesystem) Purge(size int64) int64 { fs.currentSize -= f.size fs.sizeLock.Unlock() - f.free() + f.Close() fs.logger.WithFields(log.Fields{ "path": f.name, @@ -644,7 +606,7 @@ func (fs *memFilesystem) Rename(src, dst string) error { defer fs.sizeLock.Unlock() if replace { - dstFile.free() + dstFile.Close() fs.currentSize -= dstFile.size } @@ -664,28 +626,18 @@ func (fs *memFilesystem) Copy(src, dst string) error { return os.ErrInvalid } - srcFile, ok := fs.storage.Load(src) + file, ok := fs.storage.LoadAndCopy(src) if !ok { return ErrNotExist } - if srcFile.dir { + if file.dir { return ErrNotExist } - dstFile := &memFile{ - memFileInfo: memFileInfo{ - name: dst, - dir: false, - size: srcFile.size, - lastMod: time.Now(), - }, - data: pool.Get(), - } + file.lastMod = time.Now() - srcFile.data.WriteTo(dstFile.data) - - f, replace := fs.storage.Store(dst, dstFile) + replacedFile, replace := fs.storage.Store(dst, file) if !replace { fs.dirs.Add(dst) @@ -695,11 +647,11 @@ func (fs *memFilesystem) Copy(src, dst string) error { defer fs.sizeLock.Unlock() if replace { - f.free() - fs.currentSize -= f.size + replacedFile.Close() + fs.currentSize -= replacedFile.size } - fs.currentSize += dstFile.size + fs.currentSize += file.size return nil } @@ -763,7 +715,7 @@ func (fs *memFilesystem) Remove(path string) int64 { func (fs *memFilesystem) remove(path string) int64 { file, ok := fs.storage.Delete(path) if ok { - file.free() + file.Close() fs.dirs.Remove(path) @@ -853,7 +805,7 @@ func (fs *memFilesystem) RemoveList(path string, options ListOptions) ([]string, fs.dirs.Remove(file.name) - file.free() + file.Close() } fs.sizeLock.Lock() diff --git a/io/fs/mem_storage.go b/io/fs/mem_storage.go index 18b447ba..18d8f156 100644 --- a/io/fs/mem_storage.go +++ b/io/fs/mem_storage.go @@ -3,29 +3,34 @@ package fs import ( "sync" + "github.com/datarhei/core/v16/mem" "github.com/dolthub/swiss" "github.com/puzpuzpuz/xsync/v3" ) type memStorage interface { // Delete deletes a file from the storage. - Delete(key string) (*memFile, bool) + Delete(key string) (file *memFile, ok bool) // Store stores a file to the storage. If there's already a file with // the same key, that value will be returned and replaced with the // new file. - Store(key string, value *memFile) (*memFile, bool) + Store(key string, file *memFile) (oldfile *memFile, ok bool) // Load loads a file from the storage. This is a references to the file, // i.e. all changes to the file will be reflected on the storage. - Load(key string) (value *memFile, ok bool) + Load(key string) (file *memFile, ok bool) + + // LoadAndCopy loads a file from the storage. This is a copy of file + // metadata and content. + LoadAndCopy(key string) (file *memFile, ok bool) // Has checks whether a file exists at path. Has(key string) bool // Range ranges over all files on the storage. The callback needs to return // false in order to stop the iteration. - Range(f func(key string, value *memFile) bool) + Range(f func(key string, file *memFile) bool) } type mapOfStorage struct { @@ -63,6 +68,35 @@ func (m *mapOfStorage) Load(key string) (*memFile, bool) { return m.files.Load(key) } +func (m *mapOfStorage) LoadAndCopy(key string) (*memFile, bool) { + token := m.lock.RLock() + defer m.lock.RUnlock(token) + + file, ok := m.files.Load(key) + if !ok { + return nil, false + } + + newFile := &memFile{ + memFileInfo: memFileInfo{ + name: file.name, + size: file.size, + dir: file.dir, + lastMod: file.lastMod, + linkTo: file.linkTo, + }, + data: nil, + r: nil, + } + + if file.data != nil { + newFile.data = mem.Get() + file.data.WriteTo(newFile.data) + } + + return newFile, true +} + func (m *mapOfStorage) Has(key string) bool { token := m.lock.RLock() defer m.lock.RUnlock(token) @@ -121,6 +155,35 @@ func (m *mapStorage) Load(key string) (*memFile, bool) { return v, ok } +func (m *mapStorage) LoadAndCopy(key string) (*memFile, bool) { + m.lock.RLock() + defer m.lock.RUnlock() + + v, ok := m.files[key] + if !ok { + return nil, false + } + + newFile := &memFile{ + memFileInfo: memFileInfo{ + name: v.name, + size: v.size, + dir: v.dir, + lastMod: v.lastMod, + linkTo: v.linkTo, + }, + data: nil, + r: nil, + } + + if v.data != nil { + newFile.data = mem.Get() + v.data.WriteTo(newFile.data) + } + + return newFile, true +} + func (m *mapStorage) Has(key string) bool { m.lock.RLock() defer m.lock.RUnlock() @@ -186,6 +249,35 @@ func (m *swissMapStorage) Load(key string) (*memFile, bool) { return m.files.Get(key) } +func (m *swissMapStorage) LoadAndCopy(key string) (*memFile, bool) { + token := m.lock.RLock() + defer m.lock.RUnlock(token) + + file, ok := m.files.Get(key) + if !ok { + return nil, false + } + + newFile := &memFile{ + memFileInfo: memFileInfo{ + name: file.name, + size: file.size, + dir: file.dir, + lastMod: file.lastMod, + linkTo: file.linkTo, + }, + data: nil, + r: nil, + } + + if file.data != nil { + newFile.data = mem.Get() + file.data.WriteTo(newFile.data) + } + + return newFile, true +} + func (m *swissMapStorage) Has(key string) bool { token := m.lock.RLock() defer m.lock.RUnlock(token) diff --git a/io/fs/sized.go b/io/fs/sized.go index bd2c3fc6..9e64729f 100644 --- a/io/fs/sized.go +++ b/io/fs/sized.go @@ -4,6 +4,8 @@ import ( "bytes" "fmt" "io" + + "github.com/datarhei/core/v16/mem" ) type SizedFilesystem interface { @@ -71,8 +73,8 @@ func (r *sizedFilesystem) WriteFileReader(path string, rd io.Reader, sizeHint in return r.Filesystem.WriteFileReader(path, rd, sizeHint) } - data := pool.Get() - defer pool.Put(data) + data := mem.Get() + defer mem.Put(data) size, err := data.ReadFrom(rd) if err != nil { @@ -143,8 +145,8 @@ func (r *sizedFilesystem) AppendFileReader(path string, rd io.Reader, sizeHint i return r.Filesystem.AppendFileReader(path, rd, sizeHint) } - data := pool.Get() - defer pool.Put(data) + data := mem.Get() + defer mem.Put(data) size, err := data.ReadFrom(rd) if err != nil { diff --git a/mem/buffer.go b/mem/buffer.go index e3a39fe9..808676fa 100644 --- a/mem/buffer.go +++ b/mem/buffer.go @@ -1,5 +1,7 @@ package mem +// Based on github.com/valyala/bytebufferpool + import ( "bytes" "errors" @@ -7,85 +9,155 @@ import ( ) type Buffer struct { - data bytes.Buffer + data []byte } // Len returns the length of the buffer. func (b *Buffer) Len() int { - return b.data.Len() + return len(b.data) } // Bytes returns the buffer, but keeps ownership. func (b *Buffer) Bytes() []byte { - return b.data.Bytes() + return b.data } // WriteTo writes the bytes to the writer. func (b *Buffer) WriteTo(w io.Writer) (int64, error) { - n, err := w.Write(b.data.Bytes()) + n, err := w.Write(b.data) return int64(n), err } // Reset empties the buffer and keeps it's capacity. func (b *Buffer) Reset() { - b.data.Reset() + b.data = b.data[:0] } // Write appends to the buffer. func (b *Buffer) Write(p []byte) (int, error) { - return b.data.Write(p) + b.data = append(b.data, p...) + return len(p), nil } // ReadFrom reads from the reader and appends to the buffer. func (b *Buffer) ReadFrom(r io.Reader) (int64, error) { - if br, ok := r.(*bytes.Reader); ok { - b.data.Grow(br.Len()) - } + /* + chunkData := [128 * 1024]byte{} + chunk := chunkData[0:] - chunkData := [128 * 1024]byte{} - chunk := chunkData[0:] + size := int64(0) - size := int64(0) - - for { - n, err := r.Read(chunk) - if n != 0 { - b.data.Write(chunk[:n]) - size += int64(n) - } - - if err != nil { - if errors.Is(err, io.EOF) { - return size, nil + for { + n, err := r.Read(chunk) + if n != 0 { + b.data = append(b.data, chunk[:n]...) + size += int64(n) } - return size, err + if err != nil { + if errors.Is(err, io.EOF) { + return size, nil + } + + return size, err + } + + if n == 0 { + break + } } - if n == 0 { - break + return size, nil + */ + p := b.data + nStart := int64(len(p)) + nMax := int64(cap(p)) + n := nStart + if nMax == 0 { + nMax = 64 + p = make([]byte, nMax) + } else { + p = p[:nMax] + } + for { + if n == nMax { + nMax *= 2 + bNew := make([]byte, nMax) + copy(bNew, p) + p = bNew + } + nn, err := r.Read(p[n:]) + n += int64(nn) + if err != nil { + b.data = p[:n] + n -= nStart + if errors.Is(err, io.EOF) { + return n, nil + } + return n, err } } + /* + if br, ok := r.(*bytes.Reader); ok { + if cap(b.data) < br.Len() { + data := make([]byte, br.Len()) + copy(data, b.data) + b.data = data + } + } - return size, nil + chunkData := [128 * 1024]byte{} + chunk := chunkData[0:] + + size := int64(0) + + for { + n, err := r.Read(chunk) + if n != 0 { + if cap(b.data) < len(b.data)+n { + data := make([]byte, cap(b.data)+1024*1024) + copy(data, b.data) + b.data = data + } + b.data = append(b.data, chunk[:n]...) + size += int64(n) + } + + if err != nil { + if errors.Is(err, io.EOF) { + return size, nil + } + + return size, err + } + + if n == 0 { + break + } + } + + return size, nil + */ } // WriteByte appends a byte to the buffer. func (b *Buffer) WriteByte(c byte) error { - return b.data.WriteByte(c) + b.data = append(b.data, c) + return nil } // WriteString appends a string to the buffer. func (b *Buffer) WriteString(s string) (n int, err error) { - return b.data.WriteString(s) + b.data = append(b.data, s...) + return len(s), nil } // Reader returns a bytes.Reader based on the data in the buffer. func (b *Buffer) Reader() *bytes.Reader { - return bytes.NewReader(b.Bytes()) + return bytes.NewReader(b.data) } // String returns the data in the buffer a string. func (b *Buffer) String() string { - return b.data.String() + return string(b.data) } diff --git a/mem/pool.go b/mem/pool.go index 641791f3..b2817d83 100644 --- a/mem/pool.go +++ b/mem/pool.go @@ -1,34 +1,137 @@ package mem +// Based on github.com/valyala/bytebufferpool + import ( + "sort" "sync" + "sync/atomic" +) + +const ( + minBitSize = 6 // 2**6=64 is a CPU cache line size + steps = 20 + + minSize = 1 << minBitSize + maxSize = 1 << (minBitSize + steps - 1) + + calibrateCallsThreshold = 42000 + maxPercentile = 0.95 ) type BufferPool struct { + calls [steps]uint64 + calibrating uint64 + + defaultSize uint64 + maxSize uint64 + pool sync.Pool } func NewBufferPool() *BufferPool { p := &BufferPool{ - pool: sync.Pool{ - New: func() any { - return &Buffer{} - }, - }, + pool: sync.Pool{}, } return p } func (p *BufferPool) Get() *Buffer { - buf := p.pool.Get().(*Buffer) - buf.Reset() + v := p.pool.Get() + if v != nil { + return v.(*Buffer) + } - return buf + return &Buffer{ + data: make([]byte, 0, atomic.LoadUint64(&p.defaultSize)), + } } func (p *BufferPool) Put(buf *Buffer) { - p.pool.Put(buf) + idx := index(len(buf.data)) + + if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold { + p.calibrate() + } + + maxSize := int(atomic.LoadUint64(&p.maxSize)) + if maxSize == 0 || cap(buf.data) <= maxSize { + buf.Reset() + p.pool.Put(buf) + } +} + +func (p *BufferPool) calibrate() { + if !atomic.CompareAndSwapUint64(&p.calibrating, 0, 1) { + return + } + + a := make(callSizes, 0, steps) + var callsSum uint64 + for i := uint64(0); i < steps; i++ { + calls := atomic.SwapUint64(&p.calls[i], 0) + callsSum += calls + a = append(a, callSize{ + calls: calls, + size: minSize << i, + }) + } + sort.Sort(a) + + defaultSize := a[0].size + maxSize := defaultSize + + maxSum := uint64(float64(callsSum) * maxPercentile) + callsSum = 0 + for i := 0; i < steps; i++ { + if callsSum > maxSum { + break + } + callsSum += a[i].calls + size := a[i].size + if size > maxSize { + maxSize = size + } + } + + atomic.StoreUint64(&p.defaultSize, defaultSize) + atomic.StoreUint64(&p.maxSize, maxSize) + + atomic.StoreUint64(&p.calibrating, 0) +} + +type callSize struct { + calls uint64 + size uint64 +} + +type callSizes []callSize + +func (ci callSizes) Len() int { + return len(ci) +} + +func (ci callSizes) Less(i, j int) bool { + return ci[i].calls > ci[j].calls +} + +func (ci callSizes) Swap(i, j int) { + ci[i], ci[j] = ci[j], ci[i] +} + +func index(n int) int { + n-- + n >>= minBitSize + idx := 0 + for n > 0 { + n >>= 1 + idx++ + } + if idx >= steps { + idx = steps - 1 + } + return idx } var DefaultBufferPool *BufferPool diff --git a/restream/app/process.go b/restream/app/process.go index 8d58d746..974309cb 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/datarhei/core/v16/ffmpeg/parse" + "github.com/datarhei/core/v16/mem" "github.com/datarhei/core/v16/process" ) @@ -156,7 +157,8 @@ func (config *Config) String() string { } func (config *Config) Hash() []byte { - b := bytes.Buffer{} + b := mem.Get() + defer mem.Put(b) b.WriteString(config.ID) b.WriteString(config.Reference) diff --git a/service/api/api.go b/service/api/api.go index b34f589a..266824c4 100644 --- a/service/api/api.go +++ b/service/api/api.go @@ -1,7 +1,6 @@ package api import ( - "bytes" "errors" "fmt" "io" @@ -88,13 +87,13 @@ func (e statusError) Is(target error) bool { type copyReader struct { reader io.Reader - copy *bytes.Buffer + copy *mem.Buffer } func newCopyReader(r io.Reader) io.Reader { c := ©Reader{ reader: r, - copy: new(bytes.Buffer), + copy: mem.Get(), } return c @@ -106,8 +105,8 @@ func (c *copyReader) Read(p []byte) (int, error) { c.copy.Write(p) if err == io.EOF { - c.reader = c.copy - c.copy = &bytes.Buffer{} + c.reader = c.copy.Reader() + c.copy = mem.Get() } return i, err diff --git a/update/update.go b/update/update.go index 331f6da0..24a88e1f 100644 --- a/update/update.go +++ b/update/update.go @@ -1,7 +1,6 @@ package update import ( - "bytes" "context" "fmt" "io" @@ -12,6 +11,7 @@ import ( "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/log" + "github.com/datarhei/core/v16/mem" "github.com/datarhei/core/v16/monitor/metric" "golang.org/x/mod/semver" ) @@ -156,15 +156,16 @@ func (s *checker) check() error { Timeout: 5 * time.Second, } - var data bytes.Buffer - encoder := json.NewEncoder(&data) + data := mem.Get() + defer mem.Put(data) + encoder := json.NewEncoder(data) if err := encoder.Encode(&request); err != nil { return err } s.logger.Debug().WithField("request", data.String()).Log("") - req, err := http.NewRequest(http.MethodPut, "https://service.datarhei.com/api/v1/app_version", &data) + req, err := http.NewRequest(http.MethodPut, "https://service.datarhei.com/api/v1/app_version", data.Reader()) if err != nil { return err }