Add filesystem events

This commit is contained in:
Ingo Oppermann 2025-09-18 22:28:07 +02:00
parent a6e806fd31
commit 062204eca9
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
2 changed files with 159 additions and 0 deletions

132
io/fs/event.go Normal file
View File

@ -0,0 +1,132 @@
package fs
import (
"context"
"fmt"
"sync"
"github.com/lithammer/shortuuid/v4"
)
type Event struct {
Action string
Name string
}
func (e Event) clone() Event {
return Event{
Action: e.Action,
Name: e.Name,
}
}
type EventsCancelFunc func()
type EventFilesystem interface {
Events() (<-chan Event, EventsCancelFunc)
}
type EventWriter struct {
publisher chan Event
publisherClosed bool
publisherLock sync.Mutex
ctx context.Context
cancel context.CancelFunc
subscriber map[string]chan Event
subscriberLock sync.Mutex
}
func NewEventWriter() *EventWriter {
w := &EventWriter{
publisher: make(chan Event, 1024),
publisherClosed: false,
subscriber: make(map[string]chan Event),
}
w.ctx, w.cancel = context.WithCancel(context.Background())
go w.broadcast()
return w
}
func (w *EventWriter) Publish(e Event) error {
event := e.clone()
w.publisherLock.Lock()
defer w.publisherLock.Unlock()
if w.publisherClosed {
return fmt.Errorf("writer is closed")
}
select {
case w.publisher <- event:
default:
return fmt.Errorf("publisher queue full")
}
return nil
}
func (w *EventWriter) Close() {
w.cancel()
w.publisherLock.Lock()
close(w.publisher)
w.publisherClosed = true
w.publisherLock.Unlock()
w.subscriberLock.Lock()
for _, c := range w.subscriber {
close(c)
}
w.subscriber = make(map[string]chan Event)
w.subscriberLock.Unlock()
}
func (w *EventWriter) Subscribe() (<-chan Event, EventsCancelFunc) {
l := make(chan Event, 1024)
var id string = ""
w.subscriberLock.Lock()
for {
id = shortuuid.New()
if _, ok := w.subscriber[id]; !ok {
w.subscriber[id] = l
break
}
}
w.subscriberLock.Unlock()
unsubscribe := func() {
w.subscriberLock.Lock()
delete(w.subscriber, id)
w.subscriberLock.Unlock()
}
return l, unsubscribe
}
func (w *EventWriter) broadcast() {
for {
select {
case <-w.ctx.Done():
return
case e := <-w.publisher:
w.subscriberLock.Lock()
for _, c := range w.subscriber {
pp := e.clone()
select {
case c <- pp:
default:
}
}
w.subscriberLock.Unlock()
}
}
}

View File

@ -135,6 +135,8 @@ type memFilesystem struct {
// Storage backend
storage memStorage
dirs *dirStorage
events *EventWriter
}
type dirStorage struct {
@ -222,6 +224,8 @@ func NewMemFilesystem(config MemConfig) (Filesystem, error) {
fs.logger.Debug().Log("Created")
fs.events = NewEventWriter()
return fs, nil
}
@ -462,8 +466,10 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int)
})
if replace {
fs.events.Publish(Event{Action: "update", Name: newFile.name})
logger.Debug().Log("Replaced file")
} else {
fs.events.Publish(Event{Action: "create", Name: newFile.name})
logger.Debug().Log("Added file")
}
@ -520,6 +526,8 @@ func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int
"size_bytes": fs.currentSize,
}).Log("Appended to file")
fs.events.Publish(Event{Action: "update", Name: file.name})
return size, nil
}
@ -556,6 +564,8 @@ func (fs *memFilesystem) Purge(size int64) int64 {
"size_bytes": fs.currentSize,
}).Debug().Log("Purged file")
fs.events.Publish(Event{Action: "remove", Name: f.name})
if size <= 0 {
break
}
@ -599,15 +609,21 @@ func (fs *memFilesystem) Rename(src, dst string) error {
dstFile, replace := fs.storage.Store(dst, srcFile)
fs.storage.Delete(src)
fs.events.Publish(Event{Action: "remove", Name: src})
fs.dirs.Remove(src)
if !replace {
fs.dirs.Add(dst)
fs.events.Publish(Event{Action: "create", Name: dst})
}
fs.sizeLock.Lock()
defer fs.sizeLock.Unlock()
if replace {
fs.events.Publish(Event{Action: "update", Name: dst})
dstFile.Close()
fs.currentSize -= dstFile.size
@ -643,12 +659,15 @@ func (fs *memFilesystem) Copy(src, dst string) error {
if !replace {
fs.dirs.Add(dst)
fs.events.Publish(Event{Action: "create", Name: dst})
}
fs.sizeLock.Lock()
defer fs.sizeLock.Unlock()
if replace {
fs.events.Publish(Event{Action: "update", Name: dst})
replacedFile.Close()
fs.currentSize -= replacedFile.size
}
@ -735,6 +754,8 @@ func (fs *memFilesystem) remove(path string) int64 {
"size_bytes": fs.currentSize,
}).Debug().Log("Removed file")
fs.events.Publish(Event{Action: "remove", Name: file.name})
return file.size
}
@ -808,6 +829,8 @@ func (fs *memFilesystem) RemoveList(path string, options ListOptions) ([]string,
fs.dirs.Remove(file.name)
file.Close()
fs.events.Publish(Event{Action: "remove", Name: file.name})
}
fs.sizeLock.Lock()
@ -924,3 +947,7 @@ func (fs *memFilesystem) cleanPath(path string) string {
return filepath.Join("/", filepath.Clean(path))
}
func (fs *memFilesystem) Events() (<-chan Event, EventsCancelFunc) {
return fs.events.Subscribe()
}