Use global buffer pool where feasible

This commit is contained in:
Ingo Oppermann 2024-10-14 10:51:35 +02:00
parent 719449a4c8
commit 92decc7111
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
17 changed files with 407 additions and 192 deletions

View File

@ -199,8 +199,6 @@ type restclient struct {
connectedCore *semver.Version
methods map[string][]apiconstraint
}
pool *mem.BufferPool
}
// New returns a new REST API client for the given config. The error is non-nil
@ -214,7 +212,6 @@ func New(config Config) (RestClient, error) {
auth0Token: config.Auth0Token,
client: config.Client,
clientTimeout: config.Timeout,
pool: mem.NewBufferPool(),
}
if len(config.AccessToken) != 0 {
@ -655,8 +652,8 @@ func (r *restclient) login() error {
login.Password = r.password
}
buf := r.pool.Get()
defer r.pool.Put(buf)
buf := mem.Get()
defer mem.Put(buf)
e := json.NewEncoder(buf)
e.Encode(login)

View File

@ -7,11 +7,12 @@ import (
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/mem"
)
func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) {
buf := r.pool.Get()
defer r.pool.Put(buf)
buf := mem.Get()
defer mem.Put(buf)
e := json.NewEncoder(buf)
e.Encode(filters)

View File

@ -6,6 +6,7 @@ import (
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/mem"
"github.com/datarhei/core/v16/restream/app"
)
@ -65,8 +66,8 @@ func (r *restclient) Process(id app.ProcessID, filter []string) (api.Process, er
}
func (r *restclient) ProcessAdd(p *app.Config, metadata map[string]interface{}) error {
buf := r.pool.Get()
defer r.pool.Put(buf)
buf := mem.Get()
defer mem.Put(buf)
config := api.ProcessConfig{}
config.Unmarshal(p, metadata)
@ -83,8 +84,8 @@ func (r *restclient) ProcessAdd(p *app.Config, metadata map[string]interface{})
}
func (r *restclient) ProcessUpdate(id app.ProcessID, p *app.Config, metadata map[string]interface{}) error {
buf := r.pool.Get()
defer r.pool.Put(buf)
buf := mem.Get()
defer mem.Put(buf)
config := api.ProcessConfig{}
config.Unmarshal(p, metadata)
@ -104,8 +105,8 @@ func (r *restclient) ProcessUpdate(id app.ProcessID, p *app.Config, metadata map
}
func (r *restclient) ProcessReportSet(id app.ProcessID, report *app.Report) error {
buf := r.pool.Get()
defer r.pool.Put(buf)
buf := mem.Get()
defer mem.Put(buf)
data := api.ProcessReport{}
data.Unmarshal(report)
@ -134,8 +135,8 @@ func (r *restclient) ProcessDelete(id app.ProcessID) error {
}
func (r *restclient) ProcessCommand(id app.ProcessID, command string) error {
buf := r.pool.Get()
defer r.pool.Put(buf)
buf := mem.Get()
defer mem.Put(buf)
e := json.NewEncoder(buf)
e.Encode(api.Command{
@ -173,8 +174,8 @@ func (r *restclient) ProcessMetadata(id app.ProcessID, key string) (api.Metadata
}
func (r *restclient) ProcessMetadataSet(id app.ProcessID, key string, metadata api.Metadata) error {
buf := r.pool.Get()
defer r.pool.Put(buf)
buf := mem.Get()
defer mem.Put(buf)
e := json.NewEncoder(buf)
e.Encode(metadata)
@ -206,8 +207,8 @@ func (r *restclient) ProcessProbe(id app.ProcessID) (api.Probe, error) {
func (r *restclient) ProcessProbeConfig(p *app.Config) (api.Probe, error) {
var probe api.Probe
buf := r.pool.Get()
defer r.pool.Put(buf)
buf := mem.Get()
defer mem.Put(buf)
config := api.ProcessConfig{}
config.Unmarshal(p, nil)

View File

@ -2,13 +2,13 @@
package cache
import (
"bytes"
"fmt"
"net/http"
"path"
"strings"
"github.com/datarhei/core/v16/http/cache"
"github.com/datarhei/core/v16/mem"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
@ -78,6 +78,7 @@ func NewWithConfig(config Config) echo.MiddlewareFunc {
w := &cacheWriter{
header: writer.Header().Clone(),
body: mem.Get(),
}
res.Writer = w
@ -170,7 +171,7 @@ type cacheObject struct {
type cacheWriter struct {
code int
header http.Header
body bytes.Buffer
body *mem.Buffer
}
func (w *cacheWriter) Header() http.Header {

View File

@ -138,8 +138,6 @@ func NewWithConfig(config Config) echo.MiddlewareFunc {
zstdCompressor = NewZstd(config.Level)
}
bufferPool := mem.NewBufferPool()
return func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
if config.Skipper(c) {
@ -171,7 +169,7 @@ func NewWithConfig(config Config) echo.MiddlewareFunc {
rw := res.Writer
compressor.Reset(rw)
buffer := bufferPool.Get()
buffer := mem.Get()
grw := &compressResponseWriter{
Compressor: compressor,
@ -208,7 +206,7 @@ func NewWithConfig(config Config) echo.MiddlewareFunc {
}
}
compressor.Close()
bufferPool.Put(buffer)
mem.Put(buffer)
compress.Release(compressor)
}()

View File

@ -39,7 +39,7 @@ func (h *handler) handleHLSIngress(c echo.Context, _ string, data map[string]int
reader := req.Body
r := &segmentReader{
reader: req.Body,
buffer: h.bufferPool.Get(),
buffer: mem.Get(),
}
req.Body = r
@ -47,7 +47,7 @@ func (h *handler) handleHLSIngress(c echo.Context, _ string, data map[string]int
req.Body = reader
if r.size == 0 {
h.bufferPool.Put(r.buffer)
mem.Put(r.buffer)
return
}
@ -60,10 +60,10 @@ func (h *handler) handleHLSIngress(c echo.Context, _ string, data map[string]int
h.hlsIngressCollector.Extra(path, data)
}
buffer := h.bufferPool.Get()
buffer := mem.Get()
h.hlsIngressCollector.Ingress(path, headerSize(req.Header, buffer))
h.hlsIngressCollector.Ingress(path, r.size)
h.bufferPool.Put(buffer)
mem.Put(buffer)
segments := r.getSegments(urlpath.Dir(path))
@ -79,7 +79,7 @@ func (h *handler) handleHLSIngress(c echo.Context, _ string, data map[string]int
h.lock.Unlock()
}
h.bufferPool.Put(r.buffer)
mem.Put(r.buffer)
}()
} else if strings.HasSuffix(path, ".ts") {
// Get the size of the .ts file and store it in the ts-map for later use.
@ -93,11 +93,11 @@ func (h *handler) handleHLSIngress(c echo.Context, _ string, data map[string]int
req.Body = reader
if r.size != 0 {
buffer := h.bufferPool.Get()
buffer := mem.Get()
h.lock.Lock()
h.rxsegments[path] = r.size + headerSize(req.Header, buffer)
h.lock.Unlock()
h.bufferPool.Put(buffer)
mem.Put(buffer)
}
}()
}
@ -179,7 +179,7 @@ func (h *handler) handleHLSEgress(c echo.Context, _ string, data map[string]inte
// the data that we need to rewrite.
rewriter = &sessionRewriter{
ResponseWriter: res.Writer,
buffer: h.bufferPool.Get(),
buffer: mem.Get(),
}
res.Writer = rewriter
@ -197,11 +197,11 @@ func (h *handler) handleHLSEgress(c echo.Context, _ string, data map[string]inte
if rewrite {
if res.Status < 200 || res.Status >= 300 {
res.Write(rewriter.buffer.Bytes())
h.bufferPool.Put(rewriter.buffer)
mem.Put(rewriter.buffer)
return nil
}
buffer := h.bufferPool.Get()
buffer := mem.Get()
// Rewrite the data befor sending it to the client
rewriter.rewriteHLS(sessionID, c.Request().URL, buffer)
@ -209,17 +209,17 @@ func (h *handler) handleHLSEgress(c echo.Context, _ string, data map[string]inte
res.Header().Set("Cache-Control", "private")
res.Write(buffer.Bytes())
h.bufferPool.Put(buffer)
h.bufferPool.Put(rewriter.buffer)
mem.Put(buffer)
mem.Put(rewriter.buffer)
}
if isM3U8 || isTS {
if res.Status >= 200 && res.Status < 300 {
// Collect how many bytes we've written in this session
buffer := h.bufferPool.Get()
buffer := mem.Get()
h.hlsEgressCollector.Egress(sessionID, headerSize(res.Header(), buffer))
h.hlsEgressCollector.Egress(sessionID, res.Size)
h.bufferPool.Put(buffer)
mem.Put(buffer)
if isTS {
// Activate the session. If the session is already active, this is a noop

View File

@ -39,8 +39,6 @@ func TestHLSSegmentReader(t *testing.T) {
}
func BenchmarkHLSSegmentReader(b *testing.B) {
pool := mem.NewBufferPool()
data, err := os.ReadFile("./fixtures/segments.txt")
require.NoError(b, err)
@ -51,13 +49,13 @@ func BenchmarkHLSSegmentReader(b *testing.B) {
rd.Reset(data)
br := &segmentReader{
reader: io.NopCloser(r),
buffer: pool.Get(),
buffer: mem.Get(),
}
_, err := io.ReadAll(br)
require.NoError(b, err)
pool.Put(br.buffer)
mem.Put(br.buffer)
}
}
@ -86,8 +84,6 @@ func TestHLSRewrite(t *testing.T) {
}
func BenchmarkHLSRewrite(b *testing.B) {
pool := mem.NewBufferPool()
data, err := os.ReadFile("./fixtures/segments.txt")
require.NoError(b, err)
@ -96,17 +92,17 @@ func BenchmarkHLSRewrite(b *testing.B) {
for i := 0; i < b.N; i++ {
br := &sessionRewriter{
buffer: pool.Get(),
buffer: mem.Get(),
}
_, err = br.Write(data)
require.NoError(b, err)
buffer := pool.Get()
buffer := mem.Get()
br.rewriteHLS("oT5GV8eWBbRAh4aib5egoK", u, buffer)
pool.Put(br.buffer)
pool.Put(buffer)
mem.Put(br.buffer)
mem.Put(buffer)
}
}

View File

@ -3,6 +3,7 @@ package session
import (
"net/url"
"github.com/datarhei/core/v16/mem"
"github.com/labstack/echo/v4"
"github.com/lithammer/shortuuid/v4"
)
@ -45,7 +46,7 @@ func (h *handler) handleHTTP(c echo.Context, _ string, data map[string]interface
h.httpCollector.Extra(id, data)
defer func() {
buffer := h.bufferPool.Get()
buffer := mem.Get()
req.Body = reader
h.httpCollector.Ingress(id, r.size+headerSize(req.Header, buffer))
@ -58,7 +59,7 @@ func (h *handler) handleHTTP(c echo.Context, _ string, data map[string]interface
h.httpCollector.Close(id)
h.bufferPool.Put(buffer)
mem.Put(buffer)
}()
return next(c)

View File

@ -44,8 +44,6 @@ type handler struct {
rxsegments map[string]int64
lock sync.Mutex
bufferPool *mem.BufferPool
}
// New returns a new session middleware with default config
@ -77,7 +75,6 @@ func NewWithConfig(config Config) echo.MiddlewareFunc {
hlsIngressCollector: config.HLSIngressCollector,
reSessionID: regexp.MustCompile(`^[` + regexp.QuoteMeta(shortuuid.DefaultAlphabet) + `]{22}$`),
rxsegments: make(map[string]int64),
bufferPool: mem.NewBufferPool(),
}
return func(next echo.HandlerFunc) echo.HandlerFunc {

View File

@ -111,25 +111,12 @@ func (f *memFile) Close() error {
f.r = nil
return nil
}
func (f *memFile) free() {
f.Close()
if f.data == nil {
return
if f.data != nil {
mem.Put(f.data)
f.data = nil
}
pool.Put(f.data)
f.data = nil
}
var pool *mem.BufferPool = nil
func init() {
pool = mem.NewBufferPool()
return nil
}
type memFilesystem struct {
@ -331,24 +318,13 @@ func (fs *memFilesystem) Files() int64 {
func (fs *memFilesystem) Open(path string) File {
path = fs.cleanPath(path)
file, ok := fs.storage.Load(path)
newFile, ok := fs.storage.LoadAndCopy(path)
if !ok {
return nil
}
newFile := &memFile{
memFileInfo: memFileInfo{
name: file.name,
size: file.size,
dir: file.dir,
lastMod: file.lastMod,
linkTo: file.linkTo,
},
data: file.data,
}
if len(file.linkTo) != 0 {
file, ok := fs.storage.Load(file.linkTo)
if len(newFile.linkTo) != 0 {
file, ok := fs.storage.LoadAndCopy(newFile.linkTo)
if !ok {
return nil
}
@ -358,7 +334,7 @@ func (fs *memFilesystem) Open(path string) File {
newFile.size = file.size
}
newFile.r = bytes.NewReader(newFile.data.Bytes())
newFile.r = newFile.data.Reader()
return newFile
}
@ -366,22 +342,19 @@ func (fs *memFilesystem) Open(path string) File {
func (fs *memFilesystem) ReadFile(path string) ([]byte, error) {
path = fs.cleanPath(path)
file, ok := fs.storage.Load(path)
file, ok := fs.storage.LoadAndCopy(path)
if !ok {
return nil, ErrNotExist
}
if len(file.linkTo) != 0 {
file, ok = fs.storage.Load(file.linkTo)
file, ok = fs.storage.LoadAndCopy(file.linkTo)
if !ok {
return nil, ErrNotExist
}
}
data := pool.Get()
file.data.WriteTo(data)
return data.Bytes(), nil
return file.data.Bytes(), nil
}
func (fs *memFilesystem) Symlink(oldname, newname string) error {
@ -421,7 +394,7 @@ func (fs *memFilesystem) Symlink(oldname, newname string) error {
defer fs.sizeLock.Unlock()
if replaced {
oldFile.free()
oldFile.Close()
fs.currentSize -= oldFile.size
}
@ -445,7 +418,7 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int)
size: 0,
lastMod: time.Now(),
},
data: pool.Get(),
data: mem.Get(),
}
size, err := newFile.data.ReadFrom(r)
@ -456,7 +429,7 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int)
"error": err,
}).Warn().Log("Incomplete file")
newFile.free()
newFile.Close()
return -1, false, fmt.Errorf("incomplete file")
}
@ -473,7 +446,7 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int)
defer fs.sizeLock.Unlock()
if replace {
oldFile.free()
oldFile.Close()
fs.currentSize -= oldFile.size
}
@ -506,25 +479,13 @@ func (fs *memFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, e
func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int) (int64, error) {
path = fs.cleanPath(path)
file, hasFile := fs.storage.Load(path)
file, hasFile := fs.storage.LoadAndCopy(path)
if !hasFile {
size, _, err := fs.WriteFileReader(path, r, sizeHint)
return size, err
}
newFile := &memFile{
memFileInfo: memFileInfo{
name: path,
dir: false,
size: 0,
lastMod: time.Now(),
},
data: pool.Get(),
}
file.data.WriteTo(newFile.data)
size, err := newFile.data.ReadFrom(r)
size, err := file.data.ReadFrom(r)
if err != nil {
fs.logger.WithFields(log.Fields{
"path": path,
@ -532,20 +493,21 @@ func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int
"error": err,
}).Warn().Log("Incomplete file")
newFile.free()
file.Close()
return -1, fmt.Errorf("incomplete file")
}
file.size += size
file.lastMod = time.Now()
oldFile, replace := fs.storage.Store(path, newFile)
oldFile, replace := fs.storage.Store(path, file)
fs.sizeLock.Lock()
defer fs.sizeLock.Unlock()
if replace {
oldFile.free()
oldFile.Close()
}
fs.currentSize += size
@ -584,7 +546,7 @@ func (fs *memFilesystem) Purge(size int64) int64 {
fs.currentSize -= f.size
fs.sizeLock.Unlock()
f.free()
f.Close()
fs.logger.WithFields(log.Fields{
"path": f.name,
@ -644,7 +606,7 @@ func (fs *memFilesystem) Rename(src, dst string) error {
defer fs.sizeLock.Unlock()
if replace {
dstFile.free()
dstFile.Close()
fs.currentSize -= dstFile.size
}
@ -664,28 +626,18 @@ func (fs *memFilesystem) Copy(src, dst string) error {
return os.ErrInvalid
}
srcFile, ok := fs.storage.Load(src)
file, ok := fs.storage.LoadAndCopy(src)
if !ok {
return ErrNotExist
}
if srcFile.dir {
if file.dir {
return ErrNotExist
}
dstFile := &memFile{
memFileInfo: memFileInfo{
name: dst,
dir: false,
size: srcFile.size,
lastMod: time.Now(),
},
data: pool.Get(),
}
file.lastMod = time.Now()
srcFile.data.WriteTo(dstFile.data)
f, replace := fs.storage.Store(dst, dstFile)
replacedFile, replace := fs.storage.Store(dst, file)
if !replace {
fs.dirs.Add(dst)
@ -695,11 +647,11 @@ func (fs *memFilesystem) Copy(src, dst string) error {
defer fs.sizeLock.Unlock()
if replace {
f.free()
fs.currentSize -= f.size
replacedFile.Close()
fs.currentSize -= replacedFile.size
}
fs.currentSize += dstFile.size
fs.currentSize += file.size
return nil
}
@ -763,7 +715,7 @@ func (fs *memFilesystem) Remove(path string) int64 {
func (fs *memFilesystem) remove(path string) int64 {
file, ok := fs.storage.Delete(path)
if ok {
file.free()
file.Close()
fs.dirs.Remove(path)
@ -853,7 +805,7 @@ func (fs *memFilesystem) RemoveList(path string, options ListOptions) ([]string,
fs.dirs.Remove(file.name)
file.free()
file.Close()
}
fs.sizeLock.Lock()

View File

@ -3,29 +3,34 @@ package fs
import (
"sync"
"github.com/datarhei/core/v16/mem"
"github.com/dolthub/swiss"
"github.com/puzpuzpuz/xsync/v3"
)
type memStorage interface {
// Delete deletes a file from the storage.
Delete(key string) (*memFile, bool)
Delete(key string) (file *memFile, ok bool)
// Store stores a file to the storage. If there's already a file with
// the same key, that value will be returned and replaced with the
// new file.
Store(key string, value *memFile) (*memFile, bool)
Store(key string, file *memFile) (oldfile *memFile, ok bool)
// Load loads a file from the storage. This is a references to the file,
// i.e. all changes to the file will be reflected on the storage.
Load(key string) (value *memFile, ok bool)
Load(key string) (file *memFile, ok bool)
// LoadAndCopy loads a file from the storage. This is a copy of file
// metadata and content.
LoadAndCopy(key string) (file *memFile, ok bool)
// Has checks whether a file exists at path.
Has(key string) bool
// Range ranges over all files on the storage. The callback needs to return
// false in order to stop the iteration.
Range(f func(key string, value *memFile) bool)
Range(f func(key string, file *memFile) bool)
}
type mapOfStorage struct {
@ -63,6 +68,35 @@ func (m *mapOfStorage) Load(key string) (*memFile, bool) {
return m.files.Load(key)
}
func (m *mapOfStorage) LoadAndCopy(key string) (*memFile, bool) {
token := m.lock.RLock()
defer m.lock.RUnlock(token)
file, ok := m.files.Load(key)
if !ok {
return nil, false
}
newFile := &memFile{
memFileInfo: memFileInfo{
name: file.name,
size: file.size,
dir: file.dir,
lastMod: file.lastMod,
linkTo: file.linkTo,
},
data: nil,
r: nil,
}
if file.data != nil {
newFile.data = mem.Get()
file.data.WriteTo(newFile.data)
}
return newFile, true
}
func (m *mapOfStorage) Has(key string) bool {
token := m.lock.RLock()
defer m.lock.RUnlock(token)
@ -121,6 +155,35 @@ func (m *mapStorage) Load(key string) (*memFile, bool) {
return v, ok
}
func (m *mapStorage) LoadAndCopy(key string) (*memFile, bool) {
m.lock.RLock()
defer m.lock.RUnlock()
v, ok := m.files[key]
if !ok {
return nil, false
}
newFile := &memFile{
memFileInfo: memFileInfo{
name: v.name,
size: v.size,
dir: v.dir,
lastMod: v.lastMod,
linkTo: v.linkTo,
},
data: nil,
r: nil,
}
if v.data != nil {
newFile.data = mem.Get()
v.data.WriteTo(newFile.data)
}
return newFile, true
}
func (m *mapStorage) Has(key string) bool {
m.lock.RLock()
defer m.lock.RUnlock()
@ -186,6 +249,35 @@ func (m *swissMapStorage) Load(key string) (*memFile, bool) {
return m.files.Get(key)
}
func (m *swissMapStorage) LoadAndCopy(key string) (*memFile, bool) {
token := m.lock.RLock()
defer m.lock.RUnlock(token)
file, ok := m.files.Get(key)
if !ok {
return nil, false
}
newFile := &memFile{
memFileInfo: memFileInfo{
name: file.name,
size: file.size,
dir: file.dir,
lastMod: file.lastMod,
linkTo: file.linkTo,
},
data: nil,
r: nil,
}
if file.data != nil {
newFile.data = mem.Get()
file.data.WriteTo(newFile.data)
}
return newFile, true
}
func (m *swissMapStorage) Has(key string) bool {
token := m.lock.RLock()
defer m.lock.RUnlock(token)

View File

@ -4,6 +4,8 @@ import (
"bytes"
"fmt"
"io"
"github.com/datarhei/core/v16/mem"
)
type SizedFilesystem interface {
@ -71,8 +73,8 @@ func (r *sizedFilesystem) WriteFileReader(path string, rd io.Reader, sizeHint in
return r.Filesystem.WriteFileReader(path, rd, sizeHint)
}
data := pool.Get()
defer pool.Put(data)
data := mem.Get()
defer mem.Put(data)
size, err := data.ReadFrom(rd)
if err != nil {
@ -143,8 +145,8 @@ func (r *sizedFilesystem) AppendFileReader(path string, rd io.Reader, sizeHint i
return r.Filesystem.AppendFileReader(path, rd, sizeHint)
}
data := pool.Get()
defer pool.Put(data)
data := mem.Get()
defer mem.Put(data)
size, err := data.ReadFrom(rd)
if err != nil {

View File

@ -1,5 +1,7 @@
package mem
// Based on github.com/valyala/bytebufferpool
import (
"bytes"
"errors"
@ -7,85 +9,155 @@ import (
)
type Buffer struct {
data bytes.Buffer
data []byte
}
// Len returns the length of the buffer.
func (b *Buffer) Len() int {
return b.data.Len()
return len(b.data)
}
// Bytes returns the buffer, but keeps ownership.
func (b *Buffer) Bytes() []byte {
return b.data.Bytes()
return b.data
}
// WriteTo writes the bytes to the writer.
func (b *Buffer) WriteTo(w io.Writer) (int64, error) {
n, err := w.Write(b.data.Bytes())
n, err := w.Write(b.data)
return int64(n), err
}
// Reset empties the buffer and keeps it's capacity.
func (b *Buffer) Reset() {
b.data.Reset()
b.data = b.data[:0]
}
// Write appends to the buffer.
func (b *Buffer) Write(p []byte) (int, error) {
return b.data.Write(p)
b.data = append(b.data, p...)
return len(p), nil
}
// ReadFrom reads from the reader and appends to the buffer.
func (b *Buffer) ReadFrom(r io.Reader) (int64, error) {
if br, ok := r.(*bytes.Reader); ok {
b.data.Grow(br.Len())
}
/*
chunkData := [128 * 1024]byte{}
chunk := chunkData[0:]
chunkData := [128 * 1024]byte{}
chunk := chunkData[0:]
size := int64(0)
size := int64(0)
for {
n, err := r.Read(chunk)
if n != 0 {
b.data.Write(chunk[:n])
size += int64(n)
}
if err != nil {
if errors.Is(err, io.EOF) {
return size, nil
for {
n, err := r.Read(chunk)
if n != 0 {
b.data = append(b.data, chunk[:n]...)
size += int64(n)
}
return size, err
if err != nil {
if errors.Is(err, io.EOF) {
return size, nil
}
return size, err
}
if n == 0 {
break
}
}
if n == 0 {
break
return size, nil
*/
p := b.data
nStart := int64(len(p))
nMax := int64(cap(p))
n := nStart
if nMax == 0 {
nMax = 64
p = make([]byte, nMax)
} else {
p = p[:nMax]
}
for {
if n == nMax {
nMax *= 2
bNew := make([]byte, nMax)
copy(bNew, p)
p = bNew
}
nn, err := r.Read(p[n:])
n += int64(nn)
if err != nil {
b.data = p[:n]
n -= nStart
if errors.Is(err, io.EOF) {
return n, nil
}
return n, err
}
}
/*
if br, ok := r.(*bytes.Reader); ok {
if cap(b.data) < br.Len() {
data := make([]byte, br.Len())
copy(data, b.data)
b.data = data
}
}
return size, nil
chunkData := [128 * 1024]byte{}
chunk := chunkData[0:]
size := int64(0)
for {
n, err := r.Read(chunk)
if n != 0 {
if cap(b.data) < len(b.data)+n {
data := make([]byte, cap(b.data)+1024*1024)
copy(data, b.data)
b.data = data
}
b.data = append(b.data, chunk[:n]...)
size += int64(n)
}
if err != nil {
if errors.Is(err, io.EOF) {
return size, nil
}
return size, err
}
if n == 0 {
break
}
}
return size, nil
*/
}
// WriteByte appends a byte to the buffer.
func (b *Buffer) WriteByte(c byte) error {
return b.data.WriteByte(c)
b.data = append(b.data, c)
return nil
}
// WriteString appends a string to the buffer.
func (b *Buffer) WriteString(s string) (n int, err error) {
return b.data.WriteString(s)
b.data = append(b.data, s...)
return len(s), nil
}
// Reader returns a bytes.Reader based on the data in the buffer.
func (b *Buffer) Reader() *bytes.Reader {
return bytes.NewReader(b.Bytes())
return bytes.NewReader(b.data)
}
// String returns the data in the buffer a string.
func (b *Buffer) String() string {
return b.data.String()
return string(b.data)
}

View File

@ -1,34 +1,137 @@
package mem
// Based on github.com/valyala/bytebufferpool
import (
"sort"
"sync"
"sync/atomic"
)
const (
minBitSize = 6 // 2**6=64 is a CPU cache line size
steps = 20
minSize = 1 << minBitSize
maxSize = 1 << (minBitSize + steps - 1)
calibrateCallsThreshold = 42000
maxPercentile = 0.95
)
type BufferPool struct {
calls [steps]uint64
calibrating uint64
defaultSize uint64
maxSize uint64
pool sync.Pool
}
func NewBufferPool() *BufferPool {
p := &BufferPool{
pool: sync.Pool{
New: func() any {
return &Buffer{}
},
},
pool: sync.Pool{},
}
return p
}
func (p *BufferPool) Get() *Buffer {
buf := p.pool.Get().(*Buffer)
buf.Reset()
v := p.pool.Get()
if v != nil {
return v.(*Buffer)
}
return buf
return &Buffer{
data: make([]byte, 0, atomic.LoadUint64(&p.defaultSize)),
}
}
func (p *BufferPool) Put(buf *Buffer) {
p.pool.Put(buf)
idx := index(len(buf.data))
if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold {
p.calibrate()
}
maxSize := int(atomic.LoadUint64(&p.maxSize))
if maxSize == 0 || cap(buf.data) <= maxSize {
buf.Reset()
p.pool.Put(buf)
}
}
func (p *BufferPool) calibrate() {
if !atomic.CompareAndSwapUint64(&p.calibrating, 0, 1) {
return
}
a := make(callSizes, 0, steps)
var callsSum uint64
for i := uint64(0); i < steps; i++ {
calls := atomic.SwapUint64(&p.calls[i], 0)
callsSum += calls
a = append(a, callSize{
calls: calls,
size: minSize << i,
})
}
sort.Sort(a)
defaultSize := a[0].size
maxSize := defaultSize
maxSum := uint64(float64(callsSum) * maxPercentile)
callsSum = 0
for i := 0; i < steps; i++ {
if callsSum > maxSum {
break
}
callsSum += a[i].calls
size := a[i].size
if size > maxSize {
maxSize = size
}
}
atomic.StoreUint64(&p.defaultSize, defaultSize)
atomic.StoreUint64(&p.maxSize, maxSize)
atomic.StoreUint64(&p.calibrating, 0)
}
type callSize struct {
calls uint64
size uint64
}
type callSizes []callSize
func (ci callSizes) Len() int {
return len(ci)
}
func (ci callSizes) Less(i, j int) bool {
return ci[i].calls > ci[j].calls
}
func (ci callSizes) Swap(i, j int) {
ci[i], ci[j] = ci[j], ci[i]
}
func index(n int) int {
n--
n >>= minBitSize
idx := 0
for n > 0 {
n >>= 1
idx++
}
if idx >= steps {
idx = steps - 1
}
return idx
}
var DefaultBufferPool *BufferPool

View File

@ -9,6 +9,7 @@ import (
"sync"
"github.com/datarhei/core/v16/ffmpeg/parse"
"github.com/datarhei/core/v16/mem"
"github.com/datarhei/core/v16/process"
)
@ -156,7 +157,8 @@ func (config *Config) String() string {
}
func (config *Config) Hash() []byte {
b := bytes.Buffer{}
b := mem.Get()
defer mem.Put(b)
b.WriteString(config.ID)
b.WriteString(config.Reference)

View File

@ -1,7 +1,6 @@
package api
import (
"bytes"
"errors"
"fmt"
"io"
@ -88,13 +87,13 @@ func (e statusError) Is(target error) bool {
type copyReader struct {
reader io.Reader
copy *bytes.Buffer
copy *mem.Buffer
}
func newCopyReader(r io.Reader) io.Reader {
c := &copyReader{
reader: r,
copy: new(bytes.Buffer),
copy: mem.Get(),
}
return c
@ -106,8 +105,8 @@ func (c *copyReader) Read(p []byte) (int, error) {
c.copy.Write(p)
if err == io.EOF {
c.reader = c.copy
c.copy = &bytes.Buffer{}
c.reader = c.copy.Reader()
c.copy = mem.Get()
}
return i, err

View File

@ -1,7 +1,6 @@
package update
import (
"bytes"
"context"
"fmt"
"io"
@ -12,6 +11,7 @@ import (
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/mem"
"github.com/datarhei/core/v16/monitor/metric"
"golang.org/x/mod/semver"
)
@ -156,15 +156,16 @@ func (s *checker) check() error {
Timeout: 5 * time.Second,
}
var data bytes.Buffer
encoder := json.NewEncoder(&data)
data := mem.Get()
defer mem.Put(data)
encoder := json.NewEncoder(data)
if err := encoder.Encode(&request); err != nil {
return err
}
s.logger.Debug().WithField("request", data.String()).Log("")
req, err := http.NewRequest(http.MethodPut, "https://service.datarhei.com/api/v1/app_version", &data)
req, err := http.NewRequest(http.MethodPut, "https://service.datarhei.com/api/v1/app_version", data.Reader())
if err != nil {
return err
}