Fix erroring out on defect session history file
Instead of throwing an error and refusing to register a collector the defect history file is renamed and the session history is started from scratch.
This commit is contained in:
parent
9ece518525
commit
24be6a4bc5
@ -2,6 +2,7 @@ package session
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
"sync"
|
||||
@ -481,15 +482,12 @@ func (c *collector) Restore(snapshot io.ReadCloser) error {
|
||||
|
||||
c.logger.Debug().Log("Restoring history snapshot")
|
||||
|
||||
jsondata, err := io.ReadAll(snapshot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
decoder := json.NewDecoder(snapshot)
|
||||
|
||||
data := history{}
|
||||
|
||||
if err = json.Unmarshal(jsondata, &data); err != nil {
|
||||
return err
|
||||
if err := decoder.Decode(&data); err != nil {
|
||||
return fmt.Errorf("%s: failed to read snapshot: %w", c.id, err)
|
||||
}
|
||||
|
||||
c.lock.history.Lock()
|
||||
|
||||
@ -9,7 +9,7 @@ import (
|
||||
)
|
||||
|
||||
func createCollector(inactive, session time.Duration, sessionsCh chan<- Session) (*collector, error) {
|
||||
return newCollector("", sessionsCh, nil, CollectorConfig{
|
||||
return newCollector("test", sessionsCh, nil, CollectorConfig{
|
||||
InactiveTimeout: inactive,
|
||||
SessionTimeout: session,
|
||||
})
|
||||
@ -156,3 +156,27 @@ func TestHistoryRestore(t *testing.T) {
|
||||
|
||||
<-sessions
|
||||
}
|
||||
|
||||
func TestHistoryRestoreInvalid(t *testing.T) {
|
||||
sessions := make(chan Session, 1)
|
||||
|
||||
c, err := createCollector(time.Hour, time.Hour, sessions)
|
||||
require.Equal(t, nil, err)
|
||||
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, err = memfs.WriteFile("/foobar.json", []byte(""))
|
||||
require.NoError(t, err)
|
||||
|
||||
snapshot, err := NewHistorySource(memfs, "/foobar.json")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = c.Restore(snapshot)
|
||||
require.Error(t, err)
|
||||
|
||||
c.RegisterAndActivate("foo", "", "", "")
|
||||
c.Close("foo")
|
||||
|
||||
<-sessions
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -321,12 +322,13 @@ func (r *registry) Register(id string, conf CollectorConfig) (Collector, error)
|
||||
|
||||
if r.persist.fs != nil {
|
||||
s, err := NewHistorySource(r.persist.fs, "/"+id+".json")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if err == nil {
|
||||
err = m.Restore(s)
|
||||
}
|
||||
|
||||
if err := m.Restore(s); err != nil {
|
||||
return nil, err
|
||||
if err != nil {
|
||||
r.logger.Warn().WithError(err).WithField("file", "/"+id+".json").Log("Can't restore history")
|
||||
r.persist.fs.Rename("/"+id+".json", "/"+id+"."+strconv.FormatInt(time.Now().Unix(), 10)+".json")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -115,6 +115,59 @@ func TestUnregisterAll(t *testing.T) {
|
||||
require.Equal(t, []string{}, c)
|
||||
}
|
||||
|
||||
func TestRestoreHistory(t *testing.T) {
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, err = memfs.WriteFile("/foobar.json", []byte("{}"))
|
||||
require.NoError(t, err)
|
||||
|
||||
r, err := New(Config{
|
||||
PersistFS: memfs,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
r.Close()
|
||||
})
|
||||
|
||||
c, err := r.Register("foobar", CollectorConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
c.RegisterAndActivate("foo", "ref", "location", "peer")
|
||||
c.Egress("foo", 42)
|
||||
|
||||
err = r.Unregister("foobar")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestRestoreInvalidHistory(t *testing.T) {
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, err = memfs.WriteFile("/foobar.json", []byte(""))
|
||||
require.NoError(t, err)
|
||||
|
||||
r, err := New(Config{
|
||||
PersistFS: memfs,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
r.Close()
|
||||
})
|
||||
|
||||
c, err := r.Register("foobar", CollectorConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
c.RegisterAndActivate("foo", "ref", "location", "peer")
|
||||
c.Egress("foo", 42)
|
||||
|
||||
err = r.Unregister("foobar")
|
||||
require.NoError(t, err)
|
||||
|
||||
files := memfs.List("/", fs.ListOptions{})
|
||||
require.Equal(t, 2, len(files))
|
||||
}
|
||||
|
||||
func TestPersistHistory(t *testing.T) {
|
||||
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user