From 69e264f22c7b7b099d346711cad2d832eec19245 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 21 Jun 2023 13:21:25 +0200 Subject: [PATCH] Don't expose Stop() on collector, allow to close a session explicitely --- session/collector.go | 23 +++++++++++++++++++---- session/collector_test.go | 6 +++--- session/registry.go | 2 +- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/session/collector.go b/session/collector.go index 03baf662..fed0adc8 100644 --- a/session/collector.go +++ b/session/collector.go @@ -127,6 +127,9 @@ type Collector interface { // IsKnowsession returns whether a session with the given id exists. IsKnownSession(id string) bool + // Close closes the session with the id. + Close(id string) bool + // IsAllowedIP returns whether traffic from/to the given IP should be considered. IsCollectableIP(ip string) bool @@ -165,9 +168,6 @@ type Collector interface { // TopEgressBitrate returns the summed current top bitrates of all egress sessions. CompanionTopEgressBitrate() float64 - // Stop stops the collector to calculate rates - Stop() - // Snapshot returns the current snapshot of the history Snapshot() (Snapshot, error) @@ -410,7 +410,7 @@ func (c *collector) start() { c.txBitrate, _ = average.New(averageWindow, averageGranularity) } -func (c *collector) Stop() { +func (c *collector) stop() { c.lock.run.Lock() defer c.lock.run.Unlock() @@ -578,6 +578,20 @@ func (c *collector) Activate(id string) bool { return false } +func (c *collector) Close(id string) bool { + c.lock.session.RLock() + sess, ok := c.sessions[id] + c.lock.session.RUnlock() + + if !ok { + return false + } + + sess.Cancel() + + return true +} + func (c *collector) Extra(id string, extra map[string]interface{}) { c.lock.session.RLock() sess, ok := c.sessions[id] @@ -921,6 +935,7 @@ type nullCollector struct{} func NewNullCollector() Collector { return &nullCollector{} } func (n *nullCollector) Register(id, reference, location, peer string) {} func (n *nullCollector) Activate(id string) bool { return false } +func (n *nullCollector) Close(id string) bool { return true } func (n *nullCollector) RegisterAndActivate(id, reference, location, peer string) {} func (n *nullCollector) Extra(id string, extra map[string]interface{}) {} func (n *nullCollector) Unregister(id string) {} diff --git a/session/collector_test.go b/session/collector_test.go index abff46b4..3e4e3ca8 100644 --- a/session/collector_test.go +++ b/session/collector_test.go @@ -83,7 +83,7 @@ func TestIngress(t *testing.T) { require.Equal(t, 1, len(sessions)) require.Equal(t, uint64(1024), sessions[0].RxBytes) - c.Stop() + c.stop() } func TestEgress(t *testing.T) { @@ -99,7 +99,7 @@ func TestEgress(t *testing.T) { require.Equal(t, 1, len(sessions)) require.Equal(t, uint64(1024), sessions[0].TxBytes) - c.Stop() + c.stop() } func TestNbSessions(t *testing.T) { @@ -124,7 +124,7 @@ func TestNbSessions(t *testing.T) { nsessions = c.Sessions() require.Equal(t, uint64(2), nsessions) - c.Stop() + c.stop() time.Sleep(2 * time.Second) diff --git a/session/registry.go b/session/registry.go index 21910e86..8317453c 100644 --- a/session/registry.go +++ b/session/registry.go @@ -350,7 +350,7 @@ func (r *registry) unregister(id string) error { return fmt.Errorf("a collector with the ID '%s' doesn't exist", id) } - m.Stop() + m.stop() delete(r.collector, id)