From 062204eca94d5b3ea2fe78b9aba1f49b76803dfc Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Thu, 18 Sep 2025 22:28:07 +0200 Subject: [PATCH] Add filesystem events --- io/fs/event.go | 132 +++++++++++++++++++++++++++++++++++++++++++++++++ io/fs/mem.go | 27 ++++++++++ 2 files changed, 159 insertions(+) create mode 100644 io/fs/event.go diff --git a/io/fs/event.go b/io/fs/event.go new file mode 100644 index 00000000..cadf08f5 --- /dev/null +++ b/io/fs/event.go @@ -0,0 +1,132 @@ +package fs + +import ( + "context" + "fmt" + "sync" + + "github.com/lithammer/shortuuid/v4" +) + +type Event struct { + Action string + Name string +} + +func (e Event) clone() Event { + return Event{ + Action: e.Action, + Name: e.Name, + } +} + +type EventsCancelFunc func() + +type EventFilesystem interface { + Events() (<-chan Event, EventsCancelFunc) +} + +type EventWriter struct { + publisher chan Event + publisherClosed bool + publisherLock sync.Mutex + + ctx context.Context + cancel context.CancelFunc + + subscriber map[string]chan Event + subscriberLock sync.Mutex +} + +func NewEventWriter() *EventWriter { + w := &EventWriter{ + publisher: make(chan Event, 1024), + publisherClosed: false, + subscriber: make(map[string]chan Event), + } + + w.ctx, w.cancel = context.WithCancel(context.Background()) + + go w.broadcast() + + return w +} + +func (w *EventWriter) Publish(e Event) error { + event := e.clone() + + w.publisherLock.Lock() + defer w.publisherLock.Unlock() + + if w.publisherClosed { + return fmt.Errorf("writer is closed") + } + + select { + case w.publisher <- event: + default: + return fmt.Errorf("publisher queue full") + } + + return nil +} + +func (w *EventWriter) Close() { + w.cancel() + + w.publisherLock.Lock() + close(w.publisher) + w.publisherClosed = true + w.publisherLock.Unlock() + + w.subscriberLock.Lock() + for _, c := range w.subscriber { + close(c) + } + w.subscriber = make(map[string]chan Event) + w.subscriberLock.Unlock() +} + +func (w *EventWriter) Subscribe() (<-chan Event, EventsCancelFunc) { + l := make(chan Event, 1024) + + var id string = "" + + w.subscriberLock.Lock() + for { + id = shortuuid.New() + if _, ok := w.subscriber[id]; !ok { + w.subscriber[id] = l + break + } + } + w.subscriberLock.Unlock() + + unsubscribe := func() { + w.subscriberLock.Lock() + delete(w.subscriber, id) + w.subscriberLock.Unlock() + } + + return l, unsubscribe +} + +func (w *EventWriter) broadcast() { + for { + select { + case <-w.ctx.Done(): + return + case e := <-w.publisher: + w.subscriberLock.Lock() + for _, c := range w.subscriber { + pp := e.clone() + + select { + case c <- pp: + default: + } + } + w.subscriberLock.Unlock() + } + } +} diff --git a/io/fs/mem.go b/io/fs/mem.go index e1d33aae..0a028c6c 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -135,6 +135,8 @@ type memFilesystem struct { // Storage backend storage memStorage dirs *dirStorage + + events *EventWriter } type dirStorage struct { @@ -222,6 +224,8 @@ func NewMemFilesystem(config MemConfig) (Filesystem, error) { fs.logger.Debug().Log("Created") + fs.events = NewEventWriter() + return fs, nil } @@ -462,8 +466,10 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int) }) if replace { + fs.events.Publish(Event{Action: "update", Name: newFile.name}) logger.Debug().Log("Replaced file") } else { + fs.events.Publish(Event{Action: "create", Name: newFile.name}) logger.Debug().Log("Added file") } @@ -520,6 +526,8 @@ func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int "size_bytes": fs.currentSize, }).Log("Appended to file") + fs.events.Publish(Event{Action: "update", Name: file.name}) + return size, nil } @@ -556,6 +564,8 @@ func (fs *memFilesystem) Purge(size int64) int64 { "size_bytes": fs.currentSize, }).Debug().Log("Purged file") + fs.events.Publish(Event{Action: "remove", Name: f.name}) + if size <= 0 { break } @@ -599,15 +609,21 @@ func (fs *memFilesystem) Rename(src, dst string) error { dstFile, replace := fs.storage.Store(dst, srcFile) fs.storage.Delete(src) + fs.events.Publish(Event{Action: "remove", Name: src}) + fs.dirs.Remove(src) if !replace { fs.dirs.Add(dst) + + fs.events.Publish(Event{Action: "create", Name: dst}) } fs.sizeLock.Lock() defer fs.sizeLock.Unlock() if replace { + fs.events.Publish(Event{Action: "update", Name: dst}) + dstFile.Close() fs.currentSize -= dstFile.size @@ -643,12 +659,15 @@ func (fs *memFilesystem) Copy(src, dst string) error { if !replace { fs.dirs.Add(dst) + + fs.events.Publish(Event{Action: "create", Name: dst}) } fs.sizeLock.Lock() defer fs.sizeLock.Unlock() if replace { + fs.events.Publish(Event{Action: "update", Name: dst}) replacedFile.Close() fs.currentSize -= replacedFile.size } @@ -735,6 +754,8 @@ func (fs *memFilesystem) remove(path string) int64 { "size_bytes": fs.currentSize, }).Debug().Log("Removed file") + fs.events.Publish(Event{Action: "remove", Name: file.name}) + return file.size } @@ -808,6 +829,8 @@ func (fs *memFilesystem) RemoveList(path string, options ListOptions) ([]string, fs.dirs.Remove(file.name) file.Close() + + fs.events.Publish(Event{Action: "remove", Name: file.name}) } fs.sizeLock.Lock() @@ -924,3 +947,7 @@ func (fs *memFilesystem) cleanPath(path string) string { return filepath.Join("/", filepath.Clean(path)) } + +func (fs *memFilesystem) Events() (<-chan Event, EventsCancelFunc) { + return fs.events.Subscribe() +}