Fix register session after close
This commit is contained in:
parent
3ff5251eba
commit
352289f759
@ -431,6 +431,13 @@ func (c *collector) stop() {
|
||||
c.sessionsWG.Wait()
|
||||
}
|
||||
|
||||
func (c *collector) isRunning() bool {
|
||||
c.lock.run.Lock()
|
||||
defer c.lock.run.Unlock()
|
||||
|
||||
return c.running
|
||||
}
|
||||
|
||||
type historySnapshot struct {
|
||||
data []byte
|
||||
}
|
||||
@ -507,11 +514,19 @@ func (c *collector) IsKnownSession(id string) bool {
|
||||
}
|
||||
|
||||
func (c *collector) RegisterAndActivate(id, reference, location, peer string) {
|
||||
if !c.isRunning() {
|
||||
return
|
||||
}
|
||||
|
||||
c.Register(id, reference, location, peer)
|
||||
c.Activate(id)
|
||||
}
|
||||
|
||||
func (c *collector) Register(id, reference, location, peer string) {
|
||||
if !c.isRunning() {
|
||||
return
|
||||
}
|
||||
|
||||
c.lock.session.Lock()
|
||||
defer c.lock.session.Unlock()
|
||||
|
||||
|
||||
@ -407,8 +407,6 @@ func (r *registry) UnregisterAll() {
|
||||
for id := range r.collector {
|
||||
r.unregister(id)
|
||||
}
|
||||
|
||||
r.collector = make(map[string]*collector)
|
||||
}
|
||||
|
||||
func (r *registry) Summary(id string) Summary {
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package session
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -300,3 +301,39 @@ func TestPersistSessionBuffer(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Greater(t, info.Size(), int64(0))
|
||||
}
|
||||
|
||||
func TestRegisterAfterCloseWithPersist(t *testing.T) {
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
pattern := "/log/%Y-%m-%d.log"
|
||||
|
||||
r, err := New(Config{
|
||||
PersistFS: memfs,
|
||||
LogPattern: pattern,
|
||||
LogBufferDuration: 5 * time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
r.Close()
|
||||
})
|
||||
|
||||
c, err := r.Register("foobar", CollectorConfig{
|
||||
SessionTimeout: 3 * time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
c.RegisterAndActivate("foo_"+strconv.Itoa(i), "ref", "location", "peer")
|
||||
c.Egress("foo", int64(i))
|
||||
}
|
||||
|
||||
r.Close()
|
||||
|
||||
c.RegisterAndActivate("foo_XXX", "ref", "location", "peer")
|
||||
c.Egress("foo", 42)
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
require.Equal(t, int64(2), memfs.Files())
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user