Allow negative persist interval in order to avoid collecting and storing session history
This commit is contained in:
parent
5d39620f6f
commit
68cb1b1bda
@ -271,8 +271,8 @@ func (d *Config) init() {
|
||||
d.vars.Register(value.NewInt(&d.Sessions.SessionTimeout, 30), "sessions.session_timeout_sec", "CORE_SESSIONS_SESSION_TIMEOUT_SEC", nil, "Timeout for an idle session", false, false)
|
||||
d.vars.Register(value.NewStrftime(&d.Sessions.SessionLogPathPattern, ""), "sessions.session_log_path_pattern", "CORE_SESSIONS_SESSION_LOG_PATH_PATTERN", nil, "Path to where the sessions will be logged, may contain strftime-patterns, leave empty for no session logging, persist must be enabled", false, false)
|
||||
d.vars.Register(value.NewInt(&d.Sessions.SessionLogBuffer, 15), "sessions.session_log_buffer_sec", "CORE_SESSIONS_SESSION_LOG_BUFFER_SEC", nil, "Maximum duration to buffer session logs in memory before persisting on disk", false, false)
|
||||
d.vars.Register(value.NewBool(&d.Sessions.Persist, false), "sessions.persist", "CORE_SESSIONS_PERSIST", nil, "Whether to persist session history. Will be stored in /sessions/[collector].json in db.dir", false, false)
|
||||
d.vars.Register(value.NewInt(&d.Sessions.PersistInterval, 300), "sessions.persist_interval_sec", "CORE_SESSIONS_PERSIST_INTERVAL_SEC", nil, "Interval in seconds in which to persist the current session history", false, false)
|
||||
d.vars.Register(value.NewBool(&d.Sessions.Persist, false), "sessions.persist", "CORE_SESSIONS_PERSIST", nil, "Whether to persist session history or logs. Will be stored in /sessions in db.dir", false, false)
|
||||
d.vars.Register(value.NewInt(&d.Sessions.PersistInterval, 300), "sessions.persist_interval_sec", "CORE_SESSIONS_PERSIST_INTERVAL_SEC", nil, "Interval in seconds in which to persist the current session history, 0 only on shutdown, negative never", false, false)
|
||||
d.vars.Register(value.NewUint64(&d.Sessions.MaxBitrate, 0), "sessions.max_bitrate_mbit", "CORE_SESSIONS_MAXBITRATE_MBIT", nil, "Max. allowed outgoing bitrate in mbit/s, 0 for unlimited", false, false)
|
||||
d.vars.Register(value.NewUint64(&d.Sessions.MaxSessions, 0), "sessions.max_sessions", "CORE_SESSIONS_MAX_SESSIONS", []string{"CORE_SESSIONS_MAXSESSIONS"}, "Max. allowed number of simultaneous sessions, 0 for unlimited", false, false)
|
||||
|
||||
@ -448,11 +448,6 @@ func (d *Config) Validate(resetLogs bool) {
|
||||
d.vars.Log("error", "stats.session_timeout_sec", "must be equal or greater than 1")
|
||||
}
|
||||
|
||||
// If the stats and their persistence are enabled, the persist interval has to be set to a useful value
|
||||
if d.Sessions.Enable && d.Sessions.PersistInterval < 0 {
|
||||
d.vars.Log("error", "stats.persist_interval_sec", "must be at equal or greater than 0")
|
||||
}
|
||||
|
||||
// If the service is enabled, the token and enpoint have to be defined
|
||||
if d.Service.Enable {
|
||||
if len(d.Service.Token) == 0 {
|
||||
|
||||
@ -242,7 +242,8 @@ type collector struct {
|
||||
rxBitrate *average.SlidingWindow
|
||||
txBitrate *average.SlidingWindow
|
||||
|
||||
history history
|
||||
collectHistory bool
|
||||
history history
|
||||
|
||||
inactiveTimeout time.Duration
|
||||
sessionTimeout time.Duration
|
||||
@ -269,7 +270,7 @@ const (
|
||||
// NewCollector returns a new collector according to the provided configuration. If such a
|
||||
// collector can't be created, a NullCollector is returned.
|
||||
func NewCollector(config CollectorConfig) Collector {
|
||||
collector, err := newCollector("", nil, nil, config)
|
||||
collector, err := newCollector("", nil, nil, false, config)
|
||||
if err != nil {
|
||||
return NewNullCollector()
|
||||
}
|
||||
@ -279,7 +280,7 @@ func NewCollector(config CollectorConfig) Collector {
|
||||
return collector
|
||||
}
|
||||
|
||||
func newCollector(id string, sessionsCh chan<- Session, logger log.Logger, config CollectorConfig) (*collector, error) {
|
||||
func newCollector(id string, sessionsCh chan<- Session, logger log.Logger, history bool, config CollectorConfig) (*collector, error) {
|
||||
c := &collector{
|
||||
id: id,
|
||||
logger: logger,
|
||||
@ -290,6 +291,7 @@ func newCollector(id string, sessionsCh chan<- Session, logger log.Logger, confi
|
||||
inactiveTimeout: config.InactiveTimeout,
|
||||
sessionTimeout: config.SessionTimeout,
|
||||
limiter: config.Limiter,
|
||||
collectHistory: history,
|
||||
}
|
||||
|
||||
if c.logger == nil {
|
||||
@ -347,26 +349,28 @@ func newCollector(id string, sessionsCh chan<- Session, logger log.Logger, confi
|
||||
// Only log session that have been active
|
||||
logger.Info().Log("Closed")
|
||||
|
||||
c.lock.history.Lock()
|
||||
if c.collectHistory {
|
||||
c.lock.history.Lock()
|
||||
|
||||
key := sess.location + ":" + sess.peer + ":" + sess.reference
|
||||
key := sess.location + ":" + sess.peer + ":" + sess.reference
|
||||
|
||||
// Update history totals per key
|
||||
t, ok := c.history.Sessions[key]
|
||||
t.TotalSessions++
|
||||
t.TotalRxBytes += sess.rxBytes
|
||||
t.TotalTxBytes += sess.txBytes
|
||||
// Update history totals per key
|
||||
t, ok := c.history.Sessions[key]
|
||||
t.TotalSessions++
|
||||
t.TotalRxBytes += sess.rxBytes
|
||||
t.TotalTxBytes += sess.txBytes
|
||||
|
||||
if !ok {
|
||||
t.Location = sess.location
|
||||
t.Peer = sess.peer
|
||||
t.Reference = sess.reference
|
||||
if !ok {
|
||||
t.Location = sess.location
|
||||
t.Peer = sess.peer
|
||||
t.Reference = sess.reference
|
||||
}
|
||||
|
||||
c.history.Sessions[key] = t
|
||||
|
||||
c.lock.history.Unlock()
|
||||
}
|
||||
|
||||
c.history.Sessions[key] = t
|
||||
|
||||
c.lock.history.Unlock()
|
||||
|
||||
if c.sessionsCh != nil {
|
||||
c.sessionsCh <- Session{
|
||||
Collector: c.id,
|
||||
@ -458,6 +462,11 @@ func (s *historySnapshot) Release() {
|
||||
func (c *collector) Snapshot() (Snapshot, error) {
|
||||
c.logger.Debug().Log("Creating history snapshot")
|
||||
|
||||
if !c.collectHistory {
|
||||
c.logger.Debug().Log("Not creating history snapshot because collector history is disabled")
|
||||
return nil, fmt.Errorf("collecting history is disabled")
|
||||
}
|
||||
|
||||
c.lock.history.Lock()
|
||||
defer c.lock.history.Unlock()
|
||||
|
||||
@ -478,6 +487,11 @@ func (c *collector) Restore(snapshot io.ReadCloser) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !c.collectHistory {
|
||||
c.logger.Debug().Log("Not restoring history snapshot because collector history is disabled")
|
||||
return nil
|
||||
}
|
||||
|
||||
defer snapshot.Close()
|
||||
|
||||
c.logger.Debug().Log("Restoring history snapshot")
|
||||
|
||||
@ -9,7 +9,7 @@ import (
|
||||
)
|
||||
|
||||
func createCollector(inactive, session time.Duration, sessionsCh chan<- Session) (*collector, error) {
|
||||
return newCollector("test", sessionsCh, nil, CollectorConfig{
|
||||
return newCollector("test", sessionsCh, nil, true, CollectorConfig{
|
||||
InactiveTimeout: inactive,
|
||||
SessionTimeout: session,
|
||||
})
|
||||
|
||||
@ -21,12 +21,12 @@ type Config struct {
|
||||
// history will not be persisted.
|
||||
PersistFS fs.Filesystem
|
||||
|
||||
// PersistInterval is the duration between persisting the history. Can be 0. Then the history will
|
||||
// only be persisted at stopping the collector.
|
||||
// PersistInterval is the duration between persisting the history. If 0 the history will
|
||||
// only be persisted at stopping the collector. If negative, the history will not be persisted.
|
||||
PersistInterval time.Duration
|
||||
|
||||
// SessionLogPattern is a path inside the PersistFS where the individual sessions will
|
||||
// be logged. The path can contain strftime-plateholders in order to split the log files.
|
||||
// be logged. The path can contain strftime-placeholders in order to split the log files.
|
||||
// If this string is empty or PersistFS is nil, the sessions will not be logged.
|
||||
LogPattern string
|
||||
|
||||
@ -319,7 +319,9 @@ func (r *registry) Register(id string, conf CollectorConfig) (Collector, error)
|
||||
return nil, fmt.Errorf("a collector with the ID '%s' already exists", id)
|
||||
}
|
||||
|
||||
m, err := newCollector(id, r.persist.sessionsCh, r.logger, conf)
|
||||
collectHistory := r.persist.fs != nil && r.persist.interval >= 0
|
||||
|
||||
m, err := newCollector(id, r.persist.sessionsCh, r.logger, collectHistory, conf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -360,7 +362,7 @@ func (r *registry) unregister(id string) error {
|
||||
|
||||
delete(r.collector, id)
|
||||
|
||||
if r.persist.fs != nil {
|
||||
if r.persist.fs != nil && r.persist.interval >= 0 {
|
||||
s, err := m.Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user