diff --git a/io/fs/disk.go b/io/fs/disk.go index e714d0ed..99662993 100644 --- a/io/fs/disk.go +++ b/io/fs/disk.go @@ -371,9 +371,9 @@ func (fs *diskFilesystem) WriteFile(path string, data []byte) (int64, bool, erro func (fs *diskFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, error) { path = fs.cleanPath(path) - dir, filename := filepath.Split(path) + _, filename := filepath.Split(path) - tmpfile, err := os.CreateTemp(dir, filename) + tmpfile, err := os.CreateTemp("", filename) if err != nil { return -1, false, err } @@ -415,6 +415,11 @@ func (fs *diskFilesystem) rename(src, dst string) error { return nil } + dir, _ := filepath.Split(dst) + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("failed to create destination directory: %s: %w", dir, err) + } + // First try to rename the file if err := os.Rename(src, dst); err == nil { return nil @@ -447,6 +452,12 @@ func (fs *diskFilesystem) copy(src, dst string) error { return fmt.Errorf("failed to open source file: %w", err) } + dir, _ := filepath.Split(dst) + if err := os.MkdirAll(dir, 0755); err != nil { + source.Close() + return fmt.Errorf("failed to create destination directory: %s: %w", dir, err) + } + destination, err := os.Create(dst) if err != nil { source.Close() diff --git a/session/collector.go b/session/collector.go index a3ac6de1..9c1739eb 100644 --- a/session/collector.go +++ b/session/collector.go @@ -15,20 +15,20 @@ import ( // Session represents an active session type Session struct { - Collector string - ID string - Reference string - CreatedAt time.Time - ClosedAt time.Time - Location string - Peer string - Extra map[string]interface{} - RxBytes uint64 - RxBitrate float64 // bit/s - TopRxBitrate float64 // bit/s - TxBytes uint64 - TxBitrate float64 // bit/s - TopTxBitrate float64 // bit/s + Collector string `json:"collector"` + ID string `json:"id"` + Reference string `json:"reference"` + CreatedAt time.Time `json:"created_at"` + ClosedAt time.Time `json:"closed_at"` + Location string `json:"local"` + Peer string `json:"remote"` + Extra map[string]interface{} `json:"extra"` + RxBytes uint64 `json:"rx_bytes"` + RxBitrate float64 `json:"rx_bitrate"` // bit/s + TopRxBitrate float64 `json:"rx_top_bitrate"` // bit/s + TxBytes uint64 `json:"tx_bytes"` + TxBitrate float64 `json:"tx_bitrate"` // bit/s + TopTxBitrate float64 `json:"tx_top_bitrate"` // bit/s } // Summary is a summary over all current and past sessions. diff --git a/session/registry.go b/session/registry.go index bf7ad014..21910e86 100644 --- a/session/registry.go +++ b/session/registry.go @@ -198,6 +198,12 @@ func (r *registry) sessionPersister(pattern *strftime.Strftime, bufferDuration t buffer := &bytes.Buffer{} path := pattern.FormatString(time.Now()) + file := r.persist.fs.Open(path) + if file != nil { + buffer.ReadFrom(file) + file.Close() + } + enc := json.NewEncoder(buffer) ticker := time.NewTicker(bufferDuration) @@ -214,9 +220,15 @@ loop: if currentPath != path { if buffer.Len() > 0 { _, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes()) - r.logger.Error().WithError(err).WithField("path", path).Log("") + if err != nil { + r.logger.Error().WithError(err).WithField("path", path).Log("") + } } buffer.Reset() + r.logger.Info().WithFields(log.Fields{ + "previous": path, + "current": currentPath, + }).Log("Creating new session log file") path = currentPath } @@ -224,11 +236,19 @@ loop: case t := <-ticker.C: if buffer.Len() > 0 { _, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes()) - r.logger.Error().WithError(err).WithField("path", path).Log("") + if err != nil { + r.logger.Error().WithError(err).WithField("path", path).Log("") + } else { + r.logger.Debug().WithField("path", path).Log("Persisted session log") + } } currentPath := pattern.FormatString(t) if currentPath != path { buffer.Reset() + r.logger.Info().WithFields(log.Fields{ + "previous": path, + "current": currentPath, + }).Log("Creating new session log file") path = currentPath } } @@ -236,7 +256,12 @@ loop: if buffer.Len() > 0 { _, _, err := r.persist.fs.WriteFileSafe(path, buffer.Bytes()) - r.logger.Error().WithError(err).WithField("path", path).Log("") + if err != nil { + r.logger.Error().WithError(err).WithField("path", path).Log("") + } else { + r.logger.Debug().WithField("path", path).Log("Persisted session log") + } + buffer.Reset() } buffer = nil