From 1650b17e0505ca93654c4e459968d82ae1a5559f Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 19 Aug 2024 15:21:24 +0200 Subject: [PATCH 1/8] Simply return error as-is, check process list length --- cluster/node/manager.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/cluster/node/manager.go b/cluster/node/manager.go index 5518a59e..cc1072ea 100644 --- a/cluster/node/manager.go +++ b/cluster/node/manager.go @@ -154,7 +154,7 @@ func (p *Manager) NodeGet(id string) (*Node, error) { node, ok := p.nodes[id] if !ok { - return nil, fmt.Errorf("node not found") + return nil, fmt.Errorf("node not found: %s", id) } return node, nil @@ -538,7 +538,7 @@ func (p *Manager) ProcessList(options client.ProcessListOptions) []api.Process { func (p *Manager) ProcessGet(nodeid string, id app.ProcessID, filter []string) (api.Process, error) { node, err := p.NodeGet(nodeid) if err != nil { - return api.Process{}, fmt.Errorf("node not found: %w", err) + return api.Process{}, err } list, err := node.Core().ProcessList(client.ProcessListOptions{ @@ -550,13 +550,17 @@ func (p *Manager) ProcessGet(nodeid string, id app.ProcessID, filter []string) ( return api.Process{}, err } + if len(list) == 0 { + return api.Process{}, fmt.Errorf("process not found") + } + return list[0], nil } func (p *Manager) ProcessAdd(nodeid string, config *app.Config, metadata map[string]interface{}) error { node, err := p.NodeGet(nodeid) if err != nil { - return fmt.Errorf("node not found: %w", err) + return err } return node.Core().ProcessAdd(config, metadata) @@ -565,7 +569,7 @@ func (p *Manager) ProcessAdd(nodeid string, config *app.Config, metadata map[str func (p *Manager) ProcessDelete(nodeid string, id app.ProcessID) error { node, err := p.NodeGet(nodeid) if err != nil { - return fmt.Errorf("node not found: %w", err) + return err } return node.Core().ProcessDelete(id) @@ -574,7 +578,7 @@ func (p *Manager) ProcessDelete(nodeid string, id app.ProcessID) error { func (p *Manager) ProcessUpdate(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]interface{}) error { node, err := p.NodeGet(nodeid) if err != nil { - return fmt.Errorf("node not found: %w", err) + return err } return node.Core().ProcessUpdate(id, config, metadata) @@ -583,7 +587,7 @@ func (p *Manager) ProcessUpdate(nodeid string, id app.ProcessID, config *app.Con func (p *Manager) ProcessReportSet(nodeid string, id app.ProcessID, report *app.Report) error { node, err := p.NodeGet(nodeid) if err != nil { - return fmt.Errorf("node not found: %w", err) + return err } return node.Core().ProcessReportSet(id, report) @@ -592,7 +596,7 @@ func (p *Manager) ProcessReportSet(nodeid string, id app.ProcessID, report *app. func (p *Manager) ProcessCommand(nodeid string, id app.ProcessID, command string) error { node, err := p.NodeGet(nodeid) if err != nil { - return fmt.Errorf("node not found: %w", err) + return err } return node.Core().ProcessCommand(id, command) @@ -604,7 +608,7 @@ func (p *Manager) ProcessProbe(nodeid string, id app.ProcessID) (api.Probe, erro probe := api.Probe{ Log: []string{fmt.Sprintf("the node %s where the process %s should reside on, doesn't exist", nodeid, id.String())}, } - return probe, fmt.Errorf("node not found: %w", err) + return probe, err } return node.Core().ProcessProbe(id) @@ -616,7 +620,7 @@ func (p *Manager) ProcessProbeConfig(nodeid string, config *app.Config) (api.Pro probe := api.Probe{ Log: []string{fmt.Sprintf("the node %s where the process config should be probed on, doesn't exist", nodeid)}, } - return probe, fmt.Errorf("node not found: %w", err) + return probe, err } return node.Core().ProcessProbeConfig(config) From 0b1601542d698fc81f163d31d461eaf97004e554 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 19 Aug 2024 15:22:24 +0200 Subject: [PATCH 2/8] Wait for follower and leader loops to finish --- cluster/cluster.go | 4 ---- cluster/leader.go | 16 ++++++++++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 590b8c06..69551277 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -648,10 +648,6 @@ func (c *cluster) Shutdown() error { c.raft.Shutdown() } - // TODO: here might some situations, where the manager is still need from the synchronize loop and will run into a panic - c.manager = nil - c.raft = nil - return nil } diff --git a/cluster/leader.go b/cluster/leader.go index 60ab84d4..9d555f0e 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -180,6 +180,22 @@ func (c *cluster) monitorLeadership() { c.leaderLock.Unlock() } case <-c.shutdownCh: + if weAreFollowerCh != nil { + close(weAreFollowerCh) + } + + if weAreLeaderCh != nil { + close(weAreLeaderCh) + } + + if weAreEmergencyLeaderCh != nil { + close(weAreEmergencyLeaderCh) + } + + leaderLoop.Wait() + emergencyLeaderLoop.Wait() + followerLoop.Wait() + return } } From 68607ed932c091ba802abd53bc04f3b97d17babf Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 20 Aug 2024 11:55:08 +0200 Subject: [PATCH 3/8] Add basic resource information in about response --- app/api/api.go | 1 + http/api/about.go | 44 +++++++++++++++++++++----------- http/handler/api/about.go | 30 +++++++++++++++++----- http/handler/api/about_test.go | 2 +- http/handler/api/cluster_node.go | 2 +- http/server.go | 3 +++ 6 files changed, 58 insertions(+), 24 deletions(-) diff --git a/app/api/api.go b/app/api/api.go index 90ad6ea5..da5aeeac 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -1500,6 +1500,7 @@ func (a *api) start(ctx context.Context) error { return false }, + Resources: a.resources, } mainserverhandler, err := http.NewServer(serverConfig) diff --git a/http/api/about.go b/http/api/about.go index 0203ae1d..5887423a 100644 --- a/http/api/about.go +++ b/http/api/about.go @@ -2,17 +2,18 @@ package api // About is some general information about the API type About struct { - App string `json:"app"` - Auths []string `json:"auths"` - Name string `json:"name"` - ID string `json:"id"` - CreatedAt string `json:"created_at"` // RFC3339 - Uptime uint64 `json:"uptime_seconds"` - Version Version `json:"version"` + App string `json:"app"` + Auths []string `json:"auths"` + Name string `json:"name"` + ID string `json:"id"` + CreatedAt string `json:"created_at"` // RFC3339 + Uptime uint64 `json:"uptime_seconds"` + Version AboutVersion `json:"version"` + Resources AboutResources `json:"resources"` } -// Version is some information about the binary -type Version struct { +// AboutVersion is some information about the binary +type AboutVersion struct { Number string `json:"number"` Commit string `json:"repository_commit"` Branch string `json:"repository_branch"` @@ -21,13 +22,26 @@ type Version struct { Compiler string `json:"compiler"` } -// MinimalAbout is the minimal information about the API -type MinimalAbout struct { - App string `json:"app"` - Auths []string `json:"auths"` - Version VersionMinimal `json:"version"` +// AboutResources holds information about the current resource usage +type AboutResources struct { + IsThrottling bool // Whether this core is currently throttling + NCPU float64 // Number of CPU on this node + CPU float64 // Current CPU load, 0-100*ncpu + CPULimit float64 // Defined CPU load limit, 0-100*ncpu + CPUCore float64 // Current CPU load of the core itself, 0-100*ncpu + Mem uint64 // Currently used memory in bytes + MemLimit uint64 // Defined memory limit in bytes + MemTotal uint64 // Total available memory in bytes + MemCore uint64 // Current used memory of the core itself in bytes } -type VersionMinimal struct { +// MinimalAbout is the minimal information about the API +type MinimalAbout struct { + App string `json:"app"` + Auths []string `json:"auths"` + Version AboutVersionMinimal `json:"version"` +} + +type AboutVersionMinimal struct { Number string `json:"number"` } diff --git a/http/handler/api/about.go b/http/handler/api/about.go index a9add8ca..cba674dd 100644 --- a/http/handler/api/about.go +++ b/http/handler/api/about.go @@ -6,6 +6,7 @@ import ( "github.com/datarhei/core/v16/app" "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/resources" "github.com/datarhei/core/v16/restream" "github.com/labstack/echo/v4" @@ -14,15 +15,17 @@ import ( // The AboutHandler type provides handler functions for retrieving details // about the API version and build infos. type AboutHandler struct { - restream restream.Restreamer - auths func() []string + restream restream.Restreamer + resources resources.Resources + auths func() []string } // NewAbout returns a new About type -func NewAbout(restream restream.Restreamer, auths func() []string) *AboutHandler { +func NewAbout(restream restream.Restreamer, resources resources.Resources, auths func() []string) *AboutHandler { return &AboutHandler{ - restream: restream, - auths: auths, + restream: restream, + resources: resources, + auths: auths, } } @@ -41,7 +44,7 @@ func (p *AboutHandler) About(c echo.Context) error { return c.JSON(http.StatusOK, api.MinimalAbout{ App: app.Name, Auths: p.auths(), - Version: api.VersionMinimal{ + Version: api.AboutVersionMinimal{ Number: app.Version.MajorString(), }, }) @@ -56,7 +59,7 @@ func (p *AboutHandler) About(c echo.Context) error { ID: p.restream.ID(), CreatedAt: createdAt.Format(time.RFC3339), Uptime: uint64(time.Since(createdAt).Seconds()), - Version: api.Version{ + Version: api.AboutVersion{ Number: app.Version.String(), Commit: app.Commit, Branch: app.Branch, @@ -66,5 +69,18 @@ func (p *AboutHandler) About(c echo.Context) error { }, } + if p.resources != nil { + res := p.resources.Info() + about.Resources.IsThrottling = res.CPU.Throttling + about.Resources.NCPU = res.CPU.NCPU + about.Resources.CPU = (100 - res.CPU.Idle) * res.CPU.NCPU + about.Resources.CPULimit = res.CPU.Limit * res.CPU.NCPU + about.Resources.CPUCore = res.CPU.Core * res.CPU.NCPU + about.Resources.Mem = res.Mem.Total - res.Mem.Available + about.Resources.MemLimit = res.Mem.Limit + about.Resources.MemTotal = res.Mem.Total + about.Resources.MemCore = res.Mem.Core + } + return c.JSON(http.StatusOK, about) } diff --git a/http/handler/api/about_test.go b/http/handler/api/about_test.go index a2f039b5..192b5e98 100644 --- a/http/handler/api/about_test.go +++ b/http/handler/api/about_test.go @@ -19,7 +19,7 @@ func getDummyAboutRouter() (*echo.Echo, error) { return nil, err } - handler := NewAbout(rs, func() []string { return []string{} }) + handler := NewAbout(rs, nil, func() []string { return []string{} }) router.Add("GET", "/", handler.About) diff --git a/http/handler/api/cluster_node.go b/http/handler/api/cluster_node.go index c3bfee05..51f4c463 100644 --- a/http/handler/api/cluster_node.go +++ b/http/handler/api/cluster_node.go @@ -100,7 +100,7 @@ func (h *ClusterHandler) NodeGetVersion(c echo.Context) error { v := peer.CoreAbout() - version := api.Version{ + version := api.AboutVersion{ Number: v.Version.Number, Commit: v.Version.Commit, Branch: v.Version.Branch, diff --git a/http/server.go b/http/server.go index adde251a..959165bd 100644 --- a/http/server.go +++ b/http/server.go @@ -51,6 +51,7 @@ import ( "github.com/datarhei/core/v16/monitor" "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/prometheus" + "github.com/datarhei/core/v16/resources" "github.com/datarhei/core/v16/restream" "github.com/datarhei/core/v16/rtmp" "github.com/datarhei/core/v16/session" @@ -100,6 +101,7 @@ type Config struct { Cluster cluster.Cluster IAM iam.IAM IAMSkipper func(ip string) bool + Resources resources.Resources } type CorsConfig struct { @@ -251,6 +253,7 @@ func NewServer(config Config) (serverhandler.Server, error) { s.handler.about = api.NewAbout( config.Restream, + config.Resources, func() []string { return config.IAM.Validators() }, ) From 1327fd6e2114304a1acc8415efe069520fa6ca86 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 20 Aug 2024 14:14:47 +0200 Subject: [PATCH 4/8] Add memfs storage based on dolthub's swiss maps --- go.mod | 2 + go.sum | 4 + io/fs/fs_test.go | 10 +- io/fs/mem.go | 2 + io/fs/mem_storage.go | 89 +++++ io/fs/mem_test.go | 156 ++++---- vendor/github.com/dolthub/maphash/.gitignore | 2 + vendor/github.com/dolthub/maphash/LICENSE | 201 ++++++++++ vendor/github.com/dolthub/maphash/README.md | 4 + vendor/github.com/dolthub/maphash/hasher.go | 48 +++ vendor/github.com/dolthub/maphash/runtime.go | 111 ++++++ vendor/github.com/dolthub/swiss/.gitignore | 5 + vendor/github.com/dolthub/swiss/LICENSE | 201 ++++++++++ vendor/github.com/dolthub/swiss/README.md | 54 +++ vendor/github.com/dolthub/swiss/bits.go | 58 +++ vendor/github.com/dolthub/swiss/bits_amd64.go | 50 +++ vendor/github.com/dolthub/swiss/map.go | 359 ++++++++++++++++++ vendor/github.com/dolthub/swiss/simd/match.s | 19 + .../dolthub/swiss/simd/match_amd64.go | 9 + vendor/modules.txt | 7 + 20 files changed, 1317 insertions(+), 74 deletions(-) create mode 100644 vendor/github.com/dolthub/maphash/.gitignore create mode 100644 vendor/github.com/dolthub/maphash/LICENSE create mode 100644 vendor/github.com/dolthub/maphash/README.md create mode 100644 vendor/github.com/dolthub/maphash/hasher.go create mode 100644 vendor/github.com/dolthub/maphash/runtime.go create mode 100644 vendor/github.com/dolthub/swiss/.gitignore create mode 100644 vendor/github.com/dolthub/swiss/LICENSE create mode 100644 vendor/github.com/dolthub/swiss/README.md create mode 100644 vendor/github.com/dolthub/swiss/bits.go create mode 100644 vendor/github.com/dolthub/swiss/bits_amd64.go create mode 100644 vendor/github.com/dolthub/swiss/map.go create mode 100644 vendor/github.com/dolthub/swiss/simd/match.s create mode 100644 vendor/github.com/dolthub/swiss/simd/match_amd64.go diff --git a/go.mod b/go.mod index 50e55ef3..c9b6f090 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/caddyserver/certmagic v0.21.3 github.com/datarhei/gosrt v0.7.0 github.com/datarhei/joy4 v0.0.0-20240603190808-b1407345907e + github.com/dolthub/swiss v0.2.1 github.com/fujiwara/shapeio v1.0.0 github.com/go-playground/validator/v10 v10.22.0 github.com/gobwas/glob v0.2.3 @@ -61,6 +62,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dolthub/maphash v0.1.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/fatih/color v1.17.0 // indirect github.com/gabriel-vasile/mimetype v1.4.5 // indirect diff --git a/go.sum b/go.sum index 4213378d..381e7d01 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA= +github.com/dolthub/maphash v0.1.0 h1:bsQ7JsF4FkkWyrP3oCnFJgrCUAFbFf3kOl4L/QxPDyQ= +github.com/dolthub/maphash v0.1.0/go.mod h1:gkg4Ch4CdCDu5h6PMriVLawB7koZ+5ijb9puGMV50a4= +github.com/dolthub/swiss v0.2.1 h1:gs2osYs5SJkAaH5/ggVJqXQxRXtWshF6uE0lgR/Y3Gw= +github.com/dolthub/swiss v0.2.1/go.mod h1:8AhKZZ1HK7g18j7v7k6c5cYIGEZJcPn0ARsai8cUrh0= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= diff --git a/io/fs/fs_test.go b/io/fs/fs_test.go index 9d52dddb..7e7c2362 100644 --- a/io/fs/fs_test.go +++ b/io/fs/fs_test.go @@ -59,8 +59,14 @@ func TestFilesystem(t *testing.T) { os.RemoveAll("./testing/") filesystems := map[string]func(string) (Filesystem, error){ - "memfs": func(name string) (Filesystem, error) { - return NewMemFilesystem(MemConfig{}) + "memfs-map": func(name string) (Filesystem, error) { + return NewMemFilesystem(MemConfig{Storage: "map"}) + }, + "memfs-xsync": func(name string) (Filesystem, error) { + return NewMemFilesystem(MemConfig{Storage: "xsync"}) + }, + "memfs-swiss": func(name string) (Filesystem, error) { + return NewMemFilesystem(MemConfig{Storage: "swiss"}) }, "diskfs": func(name string) (Filesystem, error) { return NewRootedDiskFilesystem(RootedDiskConfig{ diff --git a/io/fs/mem.go b/io/fs/mem.go index 2033d349..dee83c78 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -208,6 +208,8 @@ func NewMemFilesystem(config MemConfig) (Filesystem, error) { if config.Storage == "map" { fs.storage = newMapStorage() + } else if config.Storage == "swiss" { + fs.storage = newSwissMapStorage() } else { fs.storage = newMapOfStorage() } diff --git a/io/fs/mem_storage.go b/io/fs/mem_storage.go index e6bc2362..5df8f946 100644 --- a/io/fs/mem_storage.go +++ b/io/fs/mem_storage.go @@ -4,6 +4,7 @@ import ( "bytes" "sync" + "github.com/dolthub/swiss" "github.com/puzpuzpuz/xsync/v3" ) @@ -182,3 +183,91 @@ func (m *mapStorage) Range(f func(key string, value *memFile) bool) { } } } + +type swissMapStorage struct { + lock *xsync.RBMutex + files *swiss.Map[string, *memFile] +} + +func newSwissMapStorage() memStorage { + m := &swissMapStorage{ + lock: xsync.NewRBMutex(), + files: swiss.NewMap[string, *memFile](128), + } + + return m +} + +func (m *swissMapStorage) Delete(key string) (*memFile, bool) { + m.lock.Lock() + defer m.lock.Unlock() + + file, hasFile := m.files.Get(key) + if !hasFile { + return nil, false + } + + m.files.Delete(key) + + return file, true +} + +func (m *swissMapStorage) Store(key string, value *memFile) (*memFile, bool) { + m.lock.Lock() + defer m.lock.Unlock() + + file, hasFile := m.files.Get(key) + m.files.Put(key, value) + + return file, hasFile +} + +func (m *swissMapStorage) Load(key string) (*memFile, bool) { + token := m.lock.RLock() + defer m.lock.RUnlock(token) + + return m.files.Get(key) +} + +func (m *swissMapStorage) LoadAndCopy(key string) (*memFile, bool) { + token := m.lock.RLock() + defer m.lock.RUnlock(token) + + v, ok := m.files.Get(key) + if !ok { + return nil, false + } + + f := &memFile{ + memFileInfo: memFileInfo{ + name: v.name, + size: v.size, + dir: v.dir, + lastMod: v.lastMod, + linkTo: v.linkTo, + }, + r: nil, + } + + if v.data != nil { + f.data = bytes.NewBuffer(v.data.Bytes()) + } + + return f, true +} + +func (m *swissMapStorage) Has(key string) bool { + token := m.lock.RLock() + defer m.lock.RUnlock(token) + + return m.files.Has(key) +} + +func (m *swissMapStorage) Range(f func(key string, value *memFile) bool) { + token := m.lock.RLock() + defer m.lock.RUnlock(token) + + m.files.Iter(func(key string, value *memFile) bool { + return !f(key, value) + }) +} diff --git a/io/fs/mem_test.go b/io/fs/mem_test.go index 0f8c1758..1e9f43fb 100644 --- a/io/fs/mem_test.go +++ b/io/fs/mem_test.go @@ -30,66 +30,6 @@ func TestMemFromDir(t *testing.T) { }, names) } -func BenchmarkMemList(b *testing.B) { - mem, err := NewMemFilesystem(MemConfig{}) - require.NoError(b, err) - - for i := 0; i < 1000; i++ { - id := rand.StringAlphanumeric(8) - path := fmt.Sprintf("/%d/%s.dat", i, id) - mem.WriteFile(path, []byte("foobar")) - } - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - mem.List("/", ListOptions{ - Pattern: "/5/**", - }) - } -} - -func BenchmarkMemRemoveList(b *testing.B) { - mem, err := NewMemFilesystem(MemConfig{}) - require.NoError(b, err) - - for i := 0; i < 1000; i++ { - id := rand.StringAlphanumeric(8) - path := fmt.Sprintf("/%d/%s.dat", i, id) - mem.WriteFile(path, []byte("foobar")) - } - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - mem.RemoveList("/", ListOptions{ - Pattern: "/5/**", - }) - } -} - -func BenchmarkMemReadFile(b *testing.B) { - mem, err := NewMemFilesystem(MemConfig{}) - require.NoError(b, err) - - nFiles := 1000 - - for i := 0; i < nFiles; i++ { - path := fmt.Sprintf("/%d.dat", i) - mem.WriteFile(path, []byte(rand.StringAlphanumeric(2*1024))) - } - - r := gorand.New(gorand.NewSource(42)) - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - num := r.Intn(nFiles) - f := mem.Open("/" + strconv.Itoa(num) + ".dat") - f.Close() - } -} - func TestWriteWhileRead(t *testing.T) { fs, err := NewMemFilesystem(MemConfig{}) require.NoError(t, err) @@ -108,29 +48,101 @@ func TestWriteWhileRead(t *testing.T) { require.Equal(t, []byte("xxxxx"), data) } -func BenchmarkMemWriteFile(b *testing.B) { - mem, err := NewMemFilesystem(MemConfig{}) - require.NoError(b, err) +func BenchmarkMemStorages(b *testing.B) { + storages := []string{ + "map", + "xsync", + "swiss", + } + benchmarks := map[string]func(*testing.B, Filesystem){ + "list": benchmarkMemList, + "removeList": benchmarkMemRemoveList, + "readFile": benchmarkMemReadFile, + "writeFile": benchmarkMemWriteFile, + "readWhileWrite": benchmarkMemReadFileWhileWriting, + } + + for name, fn := range benchmarks { + for _, storage := range storages { + mem, err := NewMemFilesystem(MemConfig{Storage: storage}) + require.NoError(b, err) + + b.Run(name+"-"+storage, func(b *testing.B) { + fn(b, mem) + }) + } + } +} + +func benchmarkMemList(b *testing.B, fs Filesystem) { + for i := 0; i < 1000; i++ { + id := rand.StringAlphanumeric(8) + path := fmt.Sprintf("/%d/%s.dat", i, id) + fs.WriteFile(path, []byte("foobar")) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + fs.List("/", ListOptions{ + Pattern: "/5/**", + }) + } +} + +func benchmarkMemRemoveList(b *testing.B, fs Filesystem) { + for i := 0; i < 1000; i++ { + id := rand.StringAlphanumeric(8) + path := fmt.Sprintf("/%d/%s.dat", i, id) + fs.WriteFile(path, []byte("foobar")) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + fs.RemoveList("/", ListOptions{ + Pattern: "/5/**", + }) + } +} + +func benchmarkMemReadFile(b *testing.B, fs Filesystem) { + nFiles := 1000 + + for i := 0; i < nFiles; i++ { + path := fmt.Sprintf("/%d.dat", i) + fs.WriteFile(path, []byte(rand.StringAlphanumeric(2*1024))) + } + + r := gorand.New(gorand.NewSource(42)) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + num := r.Intn(nFiles) + f := fs.Open("/" + strconv.Itoa(num) + ".dat") + f.Close() + } +} + +func benchmarkMemWriteFile(b *testing.B, fs Filesystem) { nFiles := 50000 for i := 0; i < nFiles; i++ { path := fmt.Sprintf("/%d.dat", i) - mem.WriteFile(path, []byte(rand.StringAlphanumeric(1))) + fs.WriteFile(path, []byte(rand.StringAlphanumeric(1))) } b.ResetTimer() for i := 0; i < b.N; i++ { path := fmt.Sprintf("/%d.dat", i%nFiles) - mem.WriteFile(path, []byte(rand.StringAlphanumeric(1))) + fs.WriteFile(path, []byte(rand.StringAlphanumeric(1))) } } -func BenchmarkMemReadFileWhileWriting(b *testing.B) { - mem, err := NewMemFilesystem(MemConfig{}) - require.NoError(b, err) - +func benchmarkMemReadFileWhileWriting(b *testing.B, fs Filesystem) { nReaders := 500 nWriters := 1000 nFiles := 30 @@ -148,7 +160,7 @@ func BenchmarkMemReadFileWhileWriting(b *testing.B) { go func(ctx context.Context, from int) { for i := 0; i < nFiles; i++ { path := fmt.Sprintf("/%d.dat", from+i) - mem.WriteFile(path, data) + fs.WriteFile(path, data) } ticker := time.NewTicker(40 * time.Millisecond) @@ -163,7 +175,7 @@ func BenchmarkMemReadFileWhileWriting(b *testing.B) { case <-ticker.C: num := gorand.Intn(nFiles) + from path := fmt.Sprintf("/%d.dat", num) - mem.WriteFile(path, data) + fs.WriteFile(path, data) } } }(ctx, i*nFiles) @@ -183,7 +195,7 @@ func BenchmarkMemReadFileWhileWriting(b *testing.B) { for i := 0; i < b.N; i++ { num := gorand.Intn(nWriters * nFiles) - f := mem.Open("/" + strconv.Itoa(num) + ".dat") + f := fs.Open("/" + strconv.Itoa(num) + ".dat") f.Close() } }() diff --git a/vendor/github.com/dolthub/maphash/.gitignore b/vendor/github.com/dolthub/maphash/.gitignore new file mode 100644 index 00000000..977a7cad --- /dev/null +++ b/vendor/github.com/dolthub/maphash/.gitignore @@ -0,0 +1,2 @@ +*.idea +*.test \ No newline at end of file diff --git a/vendor/github.com/dolthub/maphash/LICENSE b/vendor/github.com/dolthub/maphash/LICENSE new file mode 100644 index 00000000..261eeb9e --- /dev/null +++ b/vendor/github.com/dolthub/maphash/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/dolthub/maphash/README.md b/vendor/github.com/dolthub/maphash/README.md new file mode 100644 index 00000000..d91530f9 --- /dev/null +++ b/vendor/github.com/dolthub/maphash/README.md @@ -0,0 +1,4 @@ +# maphash + +Hash any `comparable` type using Golang's fast runtime hash. +Uses [AES](https://en.wikipedia.org/wiki/AES_instruction_set) instructions when available. \ No newline at end of file diff --git a/vendor/github.com/dolthub/maphash/hasher.go b/vendor/github.com/dolthub/maphash/hasher.go new file mode 100644 index 00000000..ef53596a --- /dev/null +++ b/vendor/github.com/dolthub/maphash/hasher.go @@ -0,0 +1,48 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package maphash + +import "unsafe" + +// Hasher hashes values of type K. +// Uses runtime AES-based hashing. +type Hasher[K comparable] struct { + hash hashfn + seed uintptr +} + +// NewHasher creates a new Hasher[K] with a random seed. +func NewHasher[K comparable]() Hasher[K] { + return Hasher[K]{ + hash: getRuntimeHasher[K](), + seed: newHashSeed(), + } +} + +// NewSeed returns a copy of |h| with a new hash seed. +func NewSeed[K comparable](h Hasher[K]) Hasher[K] { + return Hasher[K]{ + hash: h.hash, + seed: newHashSeed(), + } +} + +// Hash hashes |key|. +func (h Hasher[K]) Hash(key K) uint64 { + // promise to the compiler that pointer + // |p| does not escape the stack. + p := noescape(unsafe.Pointer(&key)) + return uint64(h.hash(p, h.seed)) +} diff --git a/vendor/github.com/dolthub/maphash/runtime.go b/vendor/github.com/dolthub/maphash/runtime.go new file mode 100644 index 00000000..29cd6a8e --- /dev/null +++ b/vendor/github.com/dolthub/maphash/runtime.go @@ -0,0 +1,111 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// This file incorporates work covered by the following copyright and +// permission notice: +// +// Copyright 2022 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.18 || go1.19 +// +build go1.18 go1.19 + +package maphash + +import ( + "math/rand" + "unsafe" +) + +type hashfn func(unsafe.Pointer, uintptr) uintptr + +func getRuntimeHasher[K comparable]() (h hashfn) { + a := any(make(map[K]struct{})) + i := (*mapiface)(unsafe.Pointer(&a)) + h = i.typ.hasher + return +} + +func newHashSeed() uintptr { + return uintptr(rand.Int()) +} + +// noescape hides a pointer from escape analysis. It is the identity function +// but escape analysis doesn't think the output depends on the input. +// noescape is inlined and currently compiles down to zero instructions. +// USE CAREFULLY! +// This was copied from the runtime (via pkg "strings"); see issues 23382 and 7921. +// +//go:nosplit +//go:nocheckptr +func noescape(p unsafe.Pointer) unsafe.Pointer { + x := uintptr(p) + return unsafe.Pointer(x ^ 0) +} + +type mapiface struct { + typ *maptype + val *hmap +} + +// go/src/runtime/type.go +type maptype struct { + typ _type + key *_type + elem *_type + bucket *_type + // function for hashing keys (ptr to key, seed) -> hash + hasher func(unsafe.Pointer, uintptr) uintptr + keysize uint8 + elemsize uint8 + bucketsize uint16 + flags uint32 +} + +// go/src/runtime/map.go +type hmap struct { + count int + flags uint8 + B uint8 + noverflow uint16 + // hash seed + hash0 uint32 + buckets unsafe.Pointer + oldbuckets unsafe.Pointer + nevacuate uintptr + // true type is *mapextra + // but we don't need this data + extra unsafe.Pointer +} + +// go/src/runtime/type.go +type tflag uint8 +type nameOff int32 +type typeOff int32 + +// go/src/runtime/type.go +type _type struct { + size uintptr + ptrdata uintptr + hash uint32 + tflag tflag + align uint8 + fieldAlign uint8 + kind uint8 + equal func(unsafe.Pointer, unsafe.Pointer) bool + gcdata *byte + str nameOff + ptrToThis typeOff +} diff --git a/vendor/github.com/dolthub/swiss/.gitignore b/vendor/github.com/dolthub/swiss/.gitignore new file mode 100644 index 00000000..1f9adf93 --- /dev/null +++ b/vendor/github.com/dolthub/swiss/.gitignore @@ -0,0 +1,5 @@ +**/.idea/ +.vscode +.run +venv +.DS_Store \ No newline at end of file diff --git a/vendor/github.com/dolthub/swiss/LICENSE b/vendor/github.com/dolthub/swiss/LICENSE new file mode 100644 index 00000000..261eeb9e --- /dev/null +++ b/vendor/github.com/dolthub/swiss/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/dolthub/swiss/README.md b/vendor/github.com/dolthub/swiss/README.md new file mode 100644 index 00000000..71c6f7dd --- /dev/null +++ b/vendor/github.com/dolthub/swiss/README.md @@ -0,0 +1,54 @@ +# SwissMap + +SwissMap is a hash table adapated from the "SwissTable" family of hash tables from [Abseil](https://abseil.io/blog/20180927-swisstables). It uses [AES](https://github.com/dolthub/maphash) instructions for fast-hashing and performs key lookups in parallel using [SSE](https://en.wikipedia.org/wiki/Streaming_SIMD_Extensions) instructions. Because of these optimizations, SwissMap is faster and more memory efficient than Golang's built-in `map`. If you'd like to learn more about its design and implementation, check out this [blog post](https://www.dolthub.com/blog/2023-03-28-swiss-map/) announcing its release. + + +## Example + +SwissMap exposes the same interface as the built-in `map`. Give it a try using this [Go playground](https://go.dev/play/p/JPDC5WhYN7g). + +```go +package main + +import "github.com/dolthub/swiss" + +func main() { + m := swiss.NewMap[string, int](42) + + m.Put("foo", 1) + m.Put("bar", 2) + + m.Iter(func(k string, v int) (stop bool) { + println("iter", k, v) + return false // continue + }) + + if x, ok := m.Get("foo"); ok { + println(x) + } + if m.Has("bar") { + x, _ := m.Get("bar") + println(x) + } + + m.Put("foo", -1) + m.Delete("bar") + + if x, ok := m.Get("foo"); ok { + println(x) + } + if m.Has("bar") { + x, _ := m.Get("bar") + println(x) + } + + m.Clear() + + // Output: + // iter foo 1 + // iter bar 2 + // 1 + // 2 + // -1 +} +``` diff --git a/vendor/github.com/dolthub/swiss/bits.go b/vendor/github.com/dolthub/swiss/bits.go new file mode 100644 index 00000000..f435b6dc --- /dev/null +++ b/vendor/github.com/dolthub/swiss/bits.go @@ -0,0 +1,58 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !amd64 || nosimd + +package swiss + +import ( + "math/bits" + "unsafe" +) + +const ( + groupSize = 8 + maxAvgGroupLoad = 7 + + loBits uint64 = 0x0101010101010101 + hiBits uint64 = 0x8080808080808080 +) + +type bitset uint64 + +func metaMatchH2(m *metadata, h h2) bitset { + // https://graphics.stanford.edu/~seander/bithacks.html##ValueInWord + return hasZeroByte(castUint64(m) ^ (loBits * uint64(h))) +} + +func metaMatchEmpty(m *metadata) bitset { + return hasZeroByte(castUint64(m) ^ hiBits) +} + +func nextMatch(b *bitset) uint32 { + s := uint32(bits.TrailingZeros64(uint64(*b))) + *b &= ^(1 << s) // clear bit |s| + return s >> 3 // div by 8 +} + +func hasZeroByte(x uint64) bitset { + return bitset(((x - loBits) & ^(x)) & hiBits) +} + +func castUint64(m *metadata) uint64 { + return *(*uint64)((unsafe.Pointer)(m)) +} + +//go:linkname fastrand runtime.fastrand +func fastrand() uint32 diff --git a/vendor/github.com/dolthub/swiss/bits_amd64.go b/vendor/github.com/dolthub/swiss/bits_amd64.go new file mode 100644 index 00000000..8b91f57c --- /dev/null +++ b/vendor/github.com/dolthub/swiss/bits_amd64.go @@ -0,0 +1,50 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build amd64 && !nosimd + +package swiss + +import ( + "math/bits" + _ "unsafe" + + "github.com/dolthub/swiss/simd" +) + +const ( + groupSize = 16 + maxAvgGroupLoad = 14 +) + +type bitset uint16 + +func metaMatchH2(m *metadata, h h2) bitset { + b := simd.MatchMetadata((*[16]int8)(m), int8(h)) + return bitset(b) +} + +func metaMatchEmpty(m *metadata) bitset { + b := simd.MatchMetadata((*[16]int8)(m), empty) + return bitset(b) +} + +func nextMatch(b *bitset) (s uint32) { + s = uint32(bits.TrailingZeros16(uint16(*b))) + *b &= ^(1 << s) // clear bit |s| + return +} + +//go:linkname fastrand runtime.fastrand +func fastrand() uint32 diff --git a/vendor/github.com/dolthub/swiss/map.go b/vendor/github.com/dolthub/swiss/map.go new file mode 100644 index 00000000..e5ad2038 --- /dev/null +++ b/vendor/github.com/dolthub/swiss/map.go @@ -0,0 +1,359 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package swiss + +import ( + "github.com/dolthub/maphash" +) + +const ( + maxLoadFactor = float32(maxAvgGroupLoad) / float32(groupSize) +) + +// Map is an open-addressing hash map +// based on Abseil's flat_hash_map. +type Map[K comparable, V any] struct { + ctrl []metadata + groups []group[K, V] + hash maphash.Hasher[K] + resident uint32 + dead uint32 + limit uint32 +} + +// metadata is the h2 metadata array for a group. +// find operations first probe the controls bytes +// to filter candidates before matching keys +type metadata [groupSize]int8 + +// group is a group of 16 key-value pairs +type group[K comparable, V any] struct { + keys [groupSize]K + values [groupSize]V +} + +const ( + h1Mask uint64 = 0xffff_ffff_ffff_ff80 + h2Mask uint64 = 0x0000_0000_0000_007f + empty int8 = -128 // 0b1000_0000 + tombstone int8 = -2 // 0b1111_1110 +) + +// h1 is a 57 bit hash prefix +type h1 uint64 + +// h2 is a 7 bit hash suffix +type h2 int8 + +// NewMap constructs a Map. +func NewMap[K comparable, V any](sz uint32) (m *Map[K, V]) { + groups := numGroups(sz) + m = &Map[K, V]{ + ctrl: make([]metadata, groups), + groups: make([]group[K, V], groups), + hash: maphash.NewHasher[K](), + limit: groups * maxAvgGroupLoad, + } + for i := range m.ctrl { + m.ctrl[i] = newEmptyMetadata() + } + return +} + +// Has returns true if |key| is present in |m|. +func (m *Map[K, V]) Has(key K) (ok bool) { + hi, lo := splitHash(m.hash.Hash(key)) + g := probeStart(hi, len(m.groups)) + for { // inlined find loop + matches := metaMatchH2(&m.ctrl[g], lo) + for matches != 0 { + s := nextMatch(&matches) + if key == m.groups[g].keys[s] { + ok = true + return + } + } + // |key| is not in group |g|, + // stop probing if we see an empty slot + matches = metaMatchEmpty(&m.ctrl[g]) + if matches != 0 { + ok = false + return + } + g += 1 // linear probing + if g >= uint32(len(m.groups)) { + g = 0 + } + } +} + +// Get returns the |value| mapped by |key| if one exists. +func (m *Map[K, V]) Get(key K) (value V, ok bool) { + hi, lo := splitHash(m.hash.Hash(key)) + g := probeStart(hi, len(m.groups)) + for { // inlined find loop + matches := metaMatchH2(&m.ctrl[g], lo) + for matches != 0 { + s := nextMatch(&matches) + if key == m.groups[g].keys[s] { + value, ok = m.groups[g].values[s], true + return + } + } + // |key| is not in group |g|, + // stop probing if we see an empty slot + matches = metaMatchEmpty(&m.ctrl[g]) + if matches != 0 { + ok = false + return + } + g += 1 // linear probing + if g >= uint32(len(m.groups)) { + g = 0 + } + } +} + +// Put attempts to insert |key| and |value| +func (m *Map[K, V]) Put(key K, value V) { + if m.resident >= m.limit { + m.rehash(m.nextSize()) + } + hi, lo := splitHash(m.hash.Hash(key)) + g := probeStart(hi, len(m.groups)) + for { // inlined find loop + matches := metaMatchH2(&m.ctrl[g], lo) + for matches != 0 { + s := nextMatch(&matches) + if key == m.groups[g].keys[s] { // update + m.groups[g].keys[s] = key + m.groups[g].values[s] = value + return + } + } + // |key| is not in group |g|, + // stop probing if we see an empty slot + matches = metaMatchEmpty(&m.ctrl[g]) + if matches != 0 { // insert + s := nextMatch(&matches) + m.groups[g].keys[s] = key + m.groups[g].values[s] = value + m.ctrl[g][s] = int8(lo) + m.resident++ + return + } + g += 1 // linear probing + if g >= uint32(len(m.groups)) { + g = 0 + } + } +} + +// Delete attempts to remove |key|, returns true successful. +func (m *Map[K, V]) Delete(key K) (ok bool) { + hi, lo := splitHash(m.hash.Hash(key)) + g := probeStart(hi, len(m.groups)) + for { + matches := metaMatchH2(&m.ctrl[g], lo) + for matches != 0 { + s := nextMatch(&matches) + if key == m.groups[g].keys[s] { + ok = true + // optimization: if |m.ctrl[g]| contains any empty + // metadata bytes, we can physically delete |key| + // rather than placing a tombstone. + // The observation is that any probes into group |g| + // would already be terminated by the existing empty + // slot, and therefore reclaiming slot |s| will not + // cause premature termination of probes into |g|. + if metaMatchEmpty(&m.ctrl[g]) != 0 { + m.ctrl[g][s] = empty + m.resident-- + } else { + m.ctrl[g][s] = tombstone + m.dead++ + } + var k K + var v V + m.groups[g].keys[s] = k + m.groups[g].values[s] = v + return + } + } + // |key| is not in group |g|, + // stop probing if we see an empty slot + matches = metaMatchEmpty(&m.ctrl[g]) + if matches != 0 { // |key| absent + ok = false + return + } + g += 1 // linear probing + if g >= uint32(len(m.groups)) { + g = 0 + } + } +} + +// Iter iterates the elements of the Map, passing them to the callback. +// It guarantees that any key in the Map will be visited only once, and +// for un-mutated Maps, every key will be visited once. If the Map is +// Mutated during iteration, mutations will be reflected on return from +// Iter, but the set of keys visited by Iter is non-deterministic. +func (m *Map[K, V]) Iter(cb func(k K, v V) (stop bool)) { + // take a consistent view of the table in case + // we rehash during iteration + ctrl, groups := m.ctrl, m.groups + // pick a random starting group + g := randIntN(len(groups)) + for n := 0; n < len(groups); n++ { + for s, c := range ctrl[g] { + if c == empty || c == tombstone { + continue + } + k, v := groups[g].keys[s], groups[g].values[s] + if stop := cb(k, v); stop { + return + } + } + g++ + if g >= uint32(len(groups)) { + g = 0 + } + } +} + +// Clear removes all elements from the Map. +func (m *Map[K, V]) Clear() { + for i, c := range m.ctrl { + for j := range c { + m.ctrl[i][j] = empty + } + } + var k K + var v V + for i := range m.groups { + g := &m.groups[i] + for i := range g.keys { + g.keys[i] = k + g.values[i] = v + } + } + m.resident, m.dead = 0, 0 +} + +// Count returns the number of elements in the Map. +func (m *Map[K, V]) Count() int { + return int(m.resident - m.dead) +} + +// Capacity returns the number of additional elements +// the can be added to the Map before resizing. +func (m *Map[K, V]) Capacity() int { + return int(m.limit - m.resident) +} + +// find returns the location of |key| if present, or its insertion location if absent. +// for performance, find is manually inlined into public methods. +func (m *Map[K, V]) find(key K, hi h1, lo h2) (g, s uint32, ok bool) { + g = probeStart(hi, len(m.groups)) + for { + matches := metaMatchH2(&m.ctrl[g], lo) + for matches != 0 { + s = nextMatch(&matches) + if key == m.groups[g].keys[s] { + return g, s, true + } + } + // |key| is not in group |g|, + // stop probing if we see an empty slot + matches = metaMatchEmpty(&m.ctrl[g]) + if matches != 0 { + s = nextMatch(&matches) + return g, s, false + } + g += 1 // linear probing + if g >= uint32(len(m.groups)) { + g = 0 + } + } +} + +func (m *Map[K, V]) nextSize() (n uint32) { + n = uint32(len(m.groups)) * 2 + if m.dead >= (m.resident / 2) { + n = uint32(len(m.groups)) + } + return +} + +func (m *Map[K, V]) rehash(n uint32) { + groups, ctrl := m.groups, m.ctrl + m.groups = make([]group[K, V], n) + m.ctrl = make([]metadata, n) + for i := range m.ctrl { + m.ctrl[i] = newEmptyMetadata() + } + m.hash = maphash.NewSeed(m.hash) + m.limit = n * maxAvgGroupLoad + m.resident, m.dead = 0, 0 + for g := range ctrl { + for s := range ctrl[g] { + c := ctrl[g][s] + if c == empty || c == tombstone { + continue + } + m.Put(groups[g].keys[s], groups[g].values[s]) + } + } +} + +func (m *Map[K, V]) loadFactor() float32 { + slots := float32(len(m.groups) * groupSize) + return float32(m.resident-m.dead) / slots +} + +// numGroups returns the minimum number of groups needed to store |n| elems. +func numGroups(n uint32) (groups uint32) { + groups = (n + maxAvgGroupLoad - 1) / maxAvgGroupLoad + if groups == 0 { + groups = 1 + } + return +} + +func newEmptyMetadata() (meta metadata) { + for i := range meta { + meta[i] = empty + } + return +} + +func splitHash(h uint64) (h1, h2) { + return h1((h & h1Mask) >> 7), h2(h & h2Mask) +} + +func probeStart(hi h1, groups int) uint32 { + return fastModN(uint32(hi), uint32(groups)) +} + +// lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ +func fastModN(x, n uint32) uint32 { + return uint32((uint64(x) * uint64(n)) >> 32) +} + +// randIntN returns a random number in the interval [0, n). +func randIntN(n int) uint32 { + return fastModN(fastrand(), uint32(n)) +} diff --git a/vendor/github.com/dolthub/swiss/simd/match.s b/vendor/github.com/dolthub/swiss/simd/match.s new file mode 100644 index 00000000..4ae29e77 --- /dev/null +++ b/vendor/github.com/dolthub/swiss/simd/match.s @@ -0,0 +1,19 @@ +// Code generated by command: go run asm.go -out match.s -stubs match_amd64.go. DO NOT EDIT. + +//go:build amd64 + +#include "textflag.h" + +// func MatchMetadata(metadata *[16]int8, hash int8) uint16 +// Requires: SSE2, SSSE3 +TEXT ·MatchMetadata(SB), NOSPLIT, $0-18 + MOVQ metadata+0(FP), AX + MOVBLSX hash+8(FP), CX + MOVD CX, X0 + PXOR X1, X1 + PSHUFB X1, X0 + MOVOU (AX), X1 + PCMPEQB X1, X0 + PMOVMSKB X0, AX + MOVW AX, ret+16(FP) + RET diff --git a/vendor/github.com/dolthub/swiss/simd/match_amd64.go b/vendor/github.com/dolthub/swiss/simd/match_amd64.go new file mode 100644 index 00000000..538c8e12 --- /dev/null +++ b/vendor/github.com/dolthub/swiss/simd/match_amd64.go @@ -0,0 +1,9 @@ +// Code generated by command: go run asm.go -out match.s -stubs match_amd64.go. DO NOT EDIT. + +//go:build amd64 + +package simd + +// MatchMetadata performs a 16-way probe of |metadata| using SSE instructions +// nb: |metadata| must be an aligned pointer +func MatchMetadata(metadata *[16]int8, hash int8) uint16 diff --git a/vendor/modules.txt b/vendor/modules.txt index 8fb33cf5..b7461958 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -106,6 +106,13 @@ github.com/datarhei/joy4/utils/bits/pio # github.com/davecgh/go-spew v1.1.1 ## explicit github.com/davecgh/go-spew/spew +# github.com/dolthub/maphash v0.1.0 +## explicit; go 1.18 +github.com/dolthub/maphash +# github.com/dolthub/swiss v0.2.1 +## explicit; go 1.18 +github.com/dolthub/swiss +github.com/dolthub/swiss/simd # github.com/dustin/go-humanize v1.0.1 ## explicit; go 1.16 github.com/dustin/go-humanize From 3756ce4977372b2495ec435a0a97404a10a8b5ae Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 20 Aug 2024 17:34:50 +0200 Subject: [PATCH 5/8] Add AppendFileReader to filesystem, allows session logging with less I/O --- io/fs/disk.go | 25 ++++++++++++++++++++ io/fs/fs.go | 4 ++++ io/fs/fs_test.go | 32 ++++++++++++++++++++++++++ io/fs/mem.go | 40 ++++++++++++++++++++++++++++++++ io/fs/mem_storage.go | 16 +++++++++++++ io/fs/s3.go | 19 ++++++++++++++++ io/fs/sized.go | 54 ++++++++++++++++++++++++-------------------- session/registry.go | 12 +++------- 8 files changed, 169 insertions(+), 33 deletions(-) diff --git a/io/fs/disk.go b/io/fs/disk.go index 6cac9d40..b9f76b77 100644 --- a/io/fs/disk.go +++ b/io/fs/disk.go @@ -403,6 +403,31 @@ func (fs *diskFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, return int64(size), !replace, nil } +func (fs *diskFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int) (int64, error) { + path = fs.cleanPath(path) + + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0755); err != nil { + return -1, fmt.Errorf("creating file failed: %w", err) + } + + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return -1, err + } + + defer f.Close() + + size, err := f.ReadFrom(r) + if err != nil { + return -1, fmt.Errorf("reading data failed: %w", err) + } + + fs.lastSizeCheck = time.Time{} + + return size, nil +} + func (fs *diskFilesystem) Rename(src, dst string) error { src = fs.cleanPath(src) dst = fs.cleanPath(dst) diff --git a/io/fs/fs.go b/io/fs/fs.go index 4cc8e201..973be194 100644 --- a/io/fs/fs.go +++ b/io/fs/fs.go @@ -108,6 +108,10 @@ type WriteFilesystem interface { // an error adding the file and error is not nil. WriteFileSafe(path string, data []byte) (int64, bool, error) + // AppendFileReader appends the contents from reader to the file at path. If the file doesn't + // exist, it will be created. The number of written bytes will be returned, -1 otherwise. + AppendFileReader(path string, r io.Reader, size int) (int64, error) + // MkdirAll creates a directory named path, along with any necessary parents, and returns nil, // or else returns an error. The permission bits perm (before umask) are used for all directories // that MkdirAll creates. If path is already a directory, MkdirAll does nothing and returns nil. diff --git a/io/fs/fs_test.go b/io/fs/fs_test.go index 7e7c2362..17d79e3c 100644 --- a/io/fs/fs_test.go +++ b/io/fs/fs_test.go @@ -115,6 +115,8 @@ func TestFilesystem(t *testing.T) { "symlinkErrors": testSymlinkErrors, "symlinkOpenStat": testSymlinkOpenStat, "open": testOpen, + "append": testAppend, + "appendCreate": testAppendCreate, } for fsname, fs := range filesystems { @@ -125,6 +127,11 @@ func TestFilesystem(t *testing.T) { } filesystem, err := fs(name) require.NoError(t, err) + + if fsname == "s3fs" { + filesystem.RemoveList("/", ListOptions{Pattern: "/**"}) + } + test(t, filesystem) }) } @@ -859,3 +866,28 @@ func testSymlinkErrors(t *testing.T, fs Filesystem) { err = fs.Symlink("/bazfoo", "/barfoo") require.Error(t, err) } + +func testAppend(t *testing.T, fs Filesystem) { + _, _, err := fs.WriteFileReader("/foobar", strings.NewReader("part1"), -1) + require.NoError(t, err) + + _, err = fs.AppendFileReader("/foobar", strings.NewReader("part2"), -1) + require.NoError(t, err) + + file := fs.Open("/foobar") + require.NotNil(t, file) + + data, err := io.ReadAll(file) + require.Equal(t, []byte("part1part2"), data) +} + +func testAppendCreate(t *testing.T, fs Filesystem) { + _, err := fs.AppendFileReader("/foobar", strings.NewReader("part1"), -1) + require.NoError(t, err) + + file := fs.Open("/foobar") + require.NotNil(t, file) + + data, err := io.ReadAll(file) + require.Equal(t, []byte("part1"), data) +} diff --git a/io/fs/mem.go b/io/fs/mem.go index dee83c78..de8b3cb7 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -525,6 +525,46 @@ func (fs *memFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, e return fs.WriteFileReader(path, bytes.NewReader(data), len(data)) } +func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int) (int64, error) { + path = fs.cleanPath(path) + + file, hasFile := fs.storage.LoadAndCopy(path) + if !hasFile { + size, _, err := fs.WriteFileReader(path, r, sizeHint) + return size, err + } + + size, err := copyToBufferFromReader(file.data, r, 8*1024) + if err != nil { + fs.logger.WithFields(log.Fields{ + "path": path, + "filesize_bytes": size, + "error": err, + }).Warn().Log("Incomplete file") + + file.Close() + + return -1, fmt.Errorf("incomplete file") + } + + file.size += size + + fs.storage.Store(path, file) + + fs.sizeLock.Lock() + defer fs.sizeLock.Unlock() + + fs.currentSize += size + + fs.logger.Debug().WithFields(log.Fields{ + "path": file.name, + "filesize_bytes": file.size, + "size_bytes": fs.currentSize, + }).Log("Appended to file") + + return size, nil +} + func (fs *memFilesystem) Purge(size int64) int64 { files := []*memFile{} diff --git a/io/fs/mem_storage.go b/io/fs/mem_storage.go index 5df8f946..044b375e 100644 --- a/io/fs/mem_storage.go +++ b/io/fs/mem_storage.go @@ -9,11 +9,27 @@ import ( ) type memStorage interface { + // Delete deletes a file from the storage. Delete(key string) (*memFile, bool) + + // Store stores a file to the storage. If there's already a file with + // the same key, that value will be returned and replaced with the + // new file. Store(key string, value *memFile) (*memFile, bool) + + // Load loads a file from the storage. This is a references to the file, + // i.e. all changes to the file will be reflected on the storage. Load(key string) (value *memFile, ok bool) + + // LoadAndCopy loads a file from the storage. The returned file is a copy + // and can be modified without modifying the file on the storage. LoadAndCopy(key string) (value *memFile, ok bool) + + // Has checks whether a file exists at path. Has(key string) bool + + // Range ranges over all files on the storage. The callback needs to return + // false in order to stop the iteration. Range(f func(key string, value *memFile) bool) } diff --git a/io/fs/s3.go b/io/fs/s3.go index c75162ed..ed9d1ade 100644 --- a/io/fs/s3.go +++ b/io/fs/s3.go @@ -360,6 +360,25 @@ func (fs *s3Filesystem) WriteFileSafe(path string, data []byte) (int64, bool, er return fs.WriteFileReader(path, bytes.NewReader(data), len(data)) } +func (fs *s3Filesystem) AppendFileReader(path string, r io.Reader, sizeHint int) (int64, error) { + path = fs.cleanPath(path) + + ctx := context.Background() + + object, err := fs.client.GetObject(ctx, fs.bucket, path, minio.GetObjectOptions{}) + if err != nil { + size, _, err := fs.write(path, r) + return size, err + } + + buffer := bytes.Buffer{} + buffer.ReadFrom(object) + buffer.ReadFrom(r) + + size, _, err := fs.write(path, &buffer) + return size, err +} + func (fs *s3Filesystem) Rename(src, dst string) error { src = fs.cleanPath(src) dst = fs.cleanPath(dst) diff --git a/io/fs/sized.go b/io/fs/sized.go index aa6b552b..cfa159eb 100644 --- a/io/fs/sized.go +++ b/io/fs/sized.go @@ -135,34 +135,40 @@ func (r *sizedFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, return r.Filesystem.WriteFileSafe(path, data) } +func (r *sizedFilesystem) AppendFileReader(path string, rd io.Reader, sizeHint int) (int64, error) { + currentSize, maxSize := r.Size() + if maxSize <= 0 { + return r.Filesystem.AppendFileReader(path, rd, sizeHint) + } + + data := bytes.Buffer{} + size, err := copyToBufferFromReader(&data, rd, 8*1024) + if err != nil { + return -1, err + } + + // Calculate the new size of the filesystem + newSize := currentSize + size + + // If the the new size is larger than the allowed size, we have to free + // some space. + if newSize > maxSize { + if !r.purge { + return -1, fmt.Errorf("not enough space on device") + } + + if r.Purge(size) < size { + return -1, fmt.Errorf("not enough space on device") + } + } + + return r.Filesystem.AppendFileReader(path, &data, int(size)) +} + func (r *sizedFilesystem) Purge(size int64) int64 { if purger, ok := r.Filesystem.(PurgeFilesystem); ok { return purger.Purge(size) } return 0 - /* - files := r.Filesystem.List("/", "") - - sort.Slice(files, func(i, j int) bool { - return files[i].ModTime().Before(files[j].ModTime()) - }) - - var freed int64 = 0 - - for _, f := range files { - r.Filesystem.Remove(f.Name()) - size -= f.Size() - freed += f.Size() - r.currentSize -= f.Size() - - if size <= 0 { - break - } - } - - files = nil - - return freed - */ } diff --git a/session/registry.go b/session/registry.go index 0dd54af6..1a08b3f0 100644 --- a/session/registry.go +++ b/session/registry.go @@ -199,12 +199,6 @@ func (r *registry) sessionPersister(pattern *strftime.Strftime, bufferDuration t buffer := &bytes.Buffer{} path := pattern.FormatString(time.Now()) - file := r.persist.fs.Open(path) - if file != nil { - buffer.ReadFrom(file) - file.Close() - } - enc := json.NewEncoder(buffer) ticker := time.NewTicker(bufferDuration) @@ -222,7 +216,7 @@ loop: currentPath := pattern.FormatString(session.ClosedAt) if currentPath != path && session.ClosedAt.After(splitTime) { if buffer.Len() > 0 { - _, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes()) + _, err := r.persist.fs.AppendFileReader(path, buffer, -1) if err != nil { r.logger.Error().WithError(err).WithField("path", path).Log("") } @@ -239,7 +233,7 @@ loop: enc.Encode(&session) case t := <-ticker.C: if buffer.Len() > 0 { - _, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes()) + _, err := r.persist.fs.AppendFileReader(path, buffer, -1) if err != nil { r.logger.Error().WithError(err).WithField("path", path).Log("") } else { @@ -260,7 +254,7 @@ loop: } if buffer.Len() > 0 { - _, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes()) + _, err := r.persist.fs.AppendFileReader(path, buffer, -1) if err != nil { r.logger.Error().WithError(err).WithField("path", path).Log("") } else { From 1c56d53a6bfbe65cab272212df0d5eb859bad016 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 21 Aug 2024 17:02:22 +0200 Subject: [PATCH 6/8] Adjust comments --- process/limiter.go | 1 + process/process.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/process/limiter.go b/process/limiter.go index 79f791e3..ea5df9c2 100644 --- a/process/limiter.go +++ b/process/limiter.go @@ -425,6 +425,7 @@ func (l *limiter) Limit(cpu, memory bool) error { // limitCPU will limit the CPU usage of this process. The limit is the max. CPU usage // normed to 0-1. The interval defines how long a time slot is that will be splitted // into sleeping and working. +// Inspired by https://github.com/opsengine/cpulimit func (l *limiter) limitCPU(ctx context.Context, limit float64, interval time.Duration) { defer func() { l.lock.Lock() diff --git a/process/process.go b/process/process.go index c6bc01c8..0fe0d45f 100644 --- a/process/process.go +++ b/process/process.go @@ -43,7 +43,7 @@ type Process interface { // running or not. IsRunning() bool - // Limit enabled or disables CPU and memory limiting. CPU will be throttled + // Limit enables or disables CPU and memory limiting. CPU will be throttled // into the configured limit. If memory consumption is above the configured // limit, the process will be killed. Limit(cpu, memory bool) error From 9947ba822b31ec2e999ed4f2e30e0fe471a1b9f7 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 21 Aug 2024 20:31:22 +0200 Subject: [PATCH 7/8] Add missing JSON tags --- http/api/about.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/http/api/about.go b/http/api/about.go index 5887423a..332420a9 100644 --- a/http/api/about.go +++ b/http/api/about.go @@ -24,15 +24,15 @@ type AboutVersion struct { // AboutResources holds information about the current resource usage type AboutResources struct { - IsThrottling bool // Whether this core is currently throttling - NCPU float64 // Number of CPU on this node - CPU float64 // Current CPU load, 0-100*ncpu - CPULimit float64 // Defined CPU load limit, 0-100*ncpu - CPUCore float64 // Current CPU load of the core itself, 0-100*ncpu - Mem uint64 // Currently used memory in bytes - MemLimit uint64 // Defined memory limit in bytes - MemTotal uint64 // Total available memory in bytes - MemCore uint64 // Current used memory of the core itself in bytes + IsThrottling bool `json:"is_throttling"` // Whether this core is currently throttling + NCPU float64 `json:"ncpu"` // Number of CPU on this node + CPU float64 `json:"cpu_used"` // Current CPU load, 0-100*ncpu + CPULimit float64 `json:"cpu_limit"` // Defined CPU load limit, 0-100*ncpu + CPUCore float64 `json:"cpu_core"` // Current CPU load of the core itself, 0-100*ncpu + Mem uint64 `json:"memory_used_bytes"` // Currently used memory in bytes + MemLimit uint64 `json:"memory_limit_bytes"` // Defined memory limit in bytes + MemTotal uint64 `json:"memory_total_bytes"` // Total available memory in bytes + MemCore uint64 `json:"memory_core_bytes"` // Current used memory of the core itself in bytes } // MinimalAbout is the minimal information about the API From bebef61e55866bbc177dfb647b4fdcfac0720737 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Thu, 22 Aug 2024 13:40:38 +0200 Subject: [PATCH 8/8] Add /v3/cluster/events endpoint to gather events from all nodes --- cluster/node/core.go | 16 +++- cluster/node/manager.go | 23 +++++ http/api/event.go | 20 +++- http/client/client.go | 60 ++++++------ http/handler/api/cluster_events.go | 144 +++++++++++++++++++++++++++++ http/server.go | 2 + 6 files changed, 228 insertions(+), 37 deletions(-) create mode 100644 http/handler/api/cluster_events.go diff --git a/cluster/node/core.go b/cluster/node/core.go index 2af52762..5341db06 100644 --- a/cluster/node/core.go +++ b/cluster/node/core.go @@ -164,8 +164,9 @@ func (n *Core) connect() error { Address: u.String(), Client: &http.Client{ Transport: tr, - Timeout: 5 * time.Second, + Timeout: 0, }, + Timeout: 5 * time.Second, }) if err != nil { return fmt.Errorf("creating client failed (%s): %w", address, err) @@ -267,7 +268,6 @@ type CoreVersion struct { } func (n *Core) About() (CoreAbout, error) { - n.lock.RLock() client := n.client n.lock.RUnlock() @@ -808,3 +808,15 @@ func (n *Core) ClusterProcessList() ([]Process, error) { return processes, nil } + +func (n *Core) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) { + n.lock.RLock() + client := n.client + n.lock.RUnlock() + + if client == nil { + return nil, ErrNoPeer + } + + return client.Events(ctx, filters) +} diff --git a/cluster/node/manager.go b/cluster/node/manager.go index cc1072ea..d9eefc3e 100644 --- a/cluster/node/manager.go +++ b/cluster/node/manager.go @@ -1,6 +1,7 @@ package node import ( + "context" "errors" "fmt" "io" @@ -625,3 +626,25 @@ func (p *Manager) ProcessProbeConfig(nodeid string, config *app.Config) (api.Pro return node.Core().ProcessProbeConfig(config) } + +func (p *Manager) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) { + eventChan := make(chan api.Event, 128) + + p.lock.RLock() + for _, n := range p.nodes { + go func(node *Node, e chan<- api.Event) { + eventChan, err := node.Core().Events(ctx, filters) + if err != nil { + return + } + + for event := range eventChan { + event.CoreID = node.id + e <- event + } + }(n, eventChan) + } + p.lock.RUnlock() + + return eventChan, nil +} diff --git a/http/api/event.go b/http/api/event.go index 416e37d5..0685cb82 100644 --- a/http/api/event.go +++ b/http/api/event.go @@ -15,6 +15,7 @@ type Event struct { Component string `json:"event"` Message string `json:"message"` Caller string `json:"caller"` + CoreID string `json:"core_id,omitempty"` Data map[string]string `json:"data"` } @@ -66,12 +67,18 @@ func (e *Event) Filter(ef *EventFilter) bool { } } - if ef.reCaller != nil { + if len(e.Caller) != 0 && ef.reCaller != nil { if !ef.reCaller.MatchString(e.Caller) { return false } } + if len(e.CoreID) != 0 && ef.reCoreID != nil { + if !ef.reCoreID.MatchString(e.CoreID) { + return false + } + } + for k, r := range ef.reData { v, ok := e.Data[k] if !ok { @@ -91,11 +98,13 @@ type EventFilter struct { Message string `json:"message"` Level string `json:"level"` Caller string `json:"caller"` + CoreID string `json:"core_id"` Data map[string]string `json:"data"` reMessage *regexp.Regexp reLevel *regexp.Regexp reCaller *regexp.Regexp + reCoreID *regexp.Regexp reData map[string]*regexp.Regexp } @@ -131,6 +140,15 @@ func (ef *EventFilter) Compile() error { ef.reCaller = r } + if len(ef.CoreID) != 0 { + r, err := regexp.Compile("(?i)" + ef.CoreID) + if err != nil { + return err + } + + ef.reCoreID = r + } + ef.reData = make(map[string]*regexp.Regexp) for k, v := range ef.Data { diff --git a/http/client/client.go b/http/client/client.go index d73fab98..ab235c0d 100644 --- a/http/client/client.go +++ b/http/client/client.go @@ -167,8 +167,13 @@ type Config struct { // Auth0Token is a valid Auth0 token to authorize access to the API. Auth0Token string - // Client is a HTTPClient that will be used for the API calls. Optional. + // Client is a HTTPClient that will be used for the API calls. Optional. Don't + // set a timeout in the client if you want to use the timeout in this config. Client HTTPClient + + // Timeout is the timeout for the whole connection. Don't set a timeout in + // the optional HTTPClient as it will override this timeout. + Timeout time.Duration } type apiconstraint struct { @@ -178,16 +183,17 @@ type apiconstraint struct { // restclient implements the RestClient interface. type restclient struct { - address string - prefix string - accessToken Token - refreshToken Token - username string - password string - auth0Token string - client HTTPClient - about api.About - aboutLock sync.RWMutex + address string + prefix string + accessToken Token + refreshToken Token + username string + password string + auth0Token string + client HTTPClient + clientTimeout time.Duration + about api.About + aboutLock sync.RWMutex version struct { connectedCore *semver.Version @@ -199,12 +205,13 @@ type restclient struct { // in case of an error. func New(config Config) (RestClient, error) { r := &restclient{ - address: config.Address, - prefix: "/api", - username: config.Username, - password: config.Password, - auth0Token: config.Auth0Token, - client: config.Client, + address: config.Address, + prefix: "/api", + username: config.Username, + password: config.Password, + auth0Token: config.Auth0Token, + client: config.Client, + clientTimeout: config.Timeout, } if len(config.AccessToken) != 0 { @@ -806,26 +813,11 @@ func (r *restclient) info() (api.About, error) { } func (r *restclient) request(req *http.Request) (int, io.ReadCloser, error) { - /* - fmt.Printf("%s %s\n", req.Method, req.URL) - for key, value := range req.Header { - for _, v := range value { - fmt.Printf("%s: %s\n", key, v) - } - } - fmt.Printf("\n") - */ resp, err := r.client.Do(req) if err != nil { return -1, nil, err } - /* - for key, value := range resp.Header { - for _, v := range value { - fmt.Printf("%s: %s\n", key, v) - } - } - */ + reader := resp.Body contentEncoding := resp.Header.Get("Content-Encoding") @@ -923,7 +915,7 @@ func (r *restclient) stream(ctx context.Context, method, path string, query *url } func (r *restclient) call(method, path string, query *url.Values, header http.Header, contentType string, data io.Reader) ([]byte, error) { - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), r.clientTimeout) defer cancel() body, err := r.stream(ctx, method, path, query, header, contentType, data) diff --git a/http/handler/api/cluster_events.go b/http/handler/api/cluster_events.go new file mode 100644 index 00000000..7072a44c --- /dev/null +++ b/http/handler/api/cluster_events.go @@ -0,0 +1,144 @@ +package api + +import ( + "context" + "net/http" + "strings" + "time" + + "github.com/datarhei/core/v16/encoding/json" + "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/http/handler/util" + + "github.com/labstack/echo/v4" +) + +// Events returns a stream of event +// @Summary Stream of events +// @Description Stream of events of whats happening on each node in the cluster +// @ID cluster-3-events +// @Tags v16.?.? +// @Accept json +// @Produce text/event-stream +// @Produce json-stream +// @Param filters body api.EventFilters false "Event filters" +// @Success 200 {object} api.Event +// @Security ApiKeyAuth +// @Router /api/v3/cluster/events [post] +func (h *ClusterHandler) Events(c echo.Context) error { + filters := api.EventFilters{} + + if err := util.ShouldBindJSON(c, &filters); err != nil { + return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) + } + + filter := map[string]*api.EventFilter{} + + for _, f := range filters.Filters { + f := f + + if err := f.Compile(); err != nil { + return api.Err(http.StatusBadRequest, "", "invalid filter: %s: %s", f.Component, err.Error()) + } + + component := strings.ToLower(f.Component) + filter[component] = &f + } + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + req := c.Request() + reqctx := req.Context() + + contentType := "text/event-stream" + accept := req.Header.Get(echo.HeaderAccept) + if strings.Contains(accept, "application/x-json-stream") { + contentType = "application/x-json-stream" + } + + res := c.Response() + + res.Header().Set(echo.HeaderContentType, contentType+"; charset=UTF-8") + res.Header().Set(echo.HeaderCacheControl, "no-store") + res.Header().Set(echo.HeaderConnection, "close") + res.WriteHeader(http.StatusOK) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + evts, err := h.proxy.Events(ctx, filters) + if err != nil { + return api.Err(http.StatusInternalServerError, "", "%s", err.Error()) + } + + enc := json.NewEncoder(res) + enc.SetIndent("", "") + + done := make(chan error, 1) + + filterEvent := func(event *api.Event) bool { + if len(filter) == 0 { + return true + } + + f, ok := filter[event.Component] + if !ok { + return false + } + + return event.Filter(f) + } + + if contentType == "text/event-stream" { + res.Write([]byte(":keepalive\n\n")) + res.Flush() + + for { + select { + case err := <-done: + return err + case <-reqctx.Done(): + done <- nil + case <-ticker.C: + res.Write([]byte(":keepalive\n\n")) + res.Flush() + case event := <-evts: + if !filterEvent(&event) { + continue + } + + res.Write([]byte("event: " + event.Component + "\ndata: ")) + if err := enc.Encode(event); err != nil { + done <- err + } + res.Write([]byte("\n")) + res.Flush() + } + } + } else { + res.Write([]byte("{\"event\": \"keepalive\"}\n")) + res.Flush() + + for { + select { + case err := <-done: + return err + case <-reqctx.Done(): + done <- nil + case <-ticker.C: + res.Write([]byte("{\"event\": \"keepalive\"}\n")) + res.Flush() + case event := <-evts: + if !filterEvent(&event) { + continue + } + + if err := enc.Encode(event); err != nil { + done <- err + } + res.Flush() + } + } + } +} diff --git a/http/server.go b/http/server.go index 959165bd..bbfb2726 100644 --- a/http/server.go +++ b/http/server.go @@ -762,6 +762,8 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.GET("/cluster/fs/:storage", s.v3handler.cluster.FilesystemListFiles) + v3.POST("/cluster/events", s.v3handler.cluster.Events) + if !s.readOnly { v3.PUT("/cluster/transfer/:id", s.v3handler.cluster.TransferLeadership) v3.PUT("/cluster/leave", s.v3handler.cluster.Leave)