From 0febae3242a1f3233c91c2076eab69bd8f76b3e8 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Thu, 18 Aug 2022 12:00:37 +0300 Subject: [PATCH 1/3] Return number of purged files --- restream/fs/fs.go | 7 +++++-- restream/restream.go | 1 - 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/restream/fs/fs.go b/restream/fs/fs.go index f4952ebf..4769597f 100644 --- a/restream/fs/fs.go +++ b/restream/fs/fs.go @@ -111,6 +111,7 @@ func (fs *filesystem) SetCleanup(id string, patterns []Pattern) { fs.cleanupPatterns[id] = append(fs.cleanupPatterns[id], patterns...) } + func (fs *filesystem) UnsetCleanup(id string) { fs.logger.Debug().WithField("id", id).Log("Remove pattern group") @@ -118,7 +119,6 @@ func (fs *filesystem) UnsetCleanup(id string) { defer fs.cleanupLock.Unlock() patterns := fs.cleanupPatterns[id] - delete(fs.cleanupPatterns, id) fs.purge(patterns) @@ -155,7 +155,7 @@ func (fs *filesystem) cleanup() { } } -func (fs *filesystem) purge(patterns []Pattern) { +func (fs *filesystem) purge(patterns []Pattern) (nfiles uint64) { for _, pattern := range patterns { if !pattern.PurgeOnDelete { continue @@ -165,8 +165,11 @@ func (fs *filesystem) purge(patterns []Pattern) { for _, f := range files { fs.logger.Debug().WithField("path", f.Name()).Log("Purging file") fs.Filesystem.Delete(f.Name()) + nfiles++ } } + + return } func (fs *filesystem) cleanupTicker(ctx context.Context, interval time.Duration) { diff --git a/restream/restream.go b/restream/restream.go index 3793a787..137e0acb 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -913,7 +913,6 @@ func (r *restream) deleteProcess(id string) error { } r.unsetPlayoutPorts(task) - r.unsetCleanup(id) delete(r.tasks, id) From 9cd132650e6ae5c1432a468757941b9954f16a0e Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Fri, 19 Aug 2022 11:24:44 +0300 Subject: [PATCH 2/3] Use path without app as session reference --- rtmp/rtmp.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 17809a49..fafb466b 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -101,15 +101,17 @@ type channel struct { collector session.Collector path string + reference string publisher *client subscriber map[string]*client lock sync.RWMutex } -func newChannel(conn *rtmp.Conn, collector session.Collector) *channel { +func newChannel(conn *rtmp.Conn, reference string, collector session.Collector) *channel { ch := &channel{ path: conn.URL.Path, + reference: reference, publisher: newClient(conn, conn.URL.Path, collector), subscriber: make(map[string]*client), collector: collector, @@ -119,8 +121,7 @@ func newChannel(conn *rtmp.Conn, collector session.Collector) *channel { ip, _, _ := net.SplitHostPort(addr) if collector.IsCollectableIP(ip) { - reference := strings.TrimSuffix(filepath.Base(conn.URL.Path), filepath.Ext(conn.URL.Path)) - collector.RegisterAndActivate(conn.URL.Path, reference, "publish:"+conn.URL.Path, addr) + collector.RegisterAndActivate(ch.path, ch.reference, "publish:"+ch.path, addr) } return ch @@ -144,8 +145,7 @@ func (ch *channel) AddSubscriber(conn *rtmp.Conn) string { client := newClient(conn, addr, ch.collector) if ch.collector.IsCollectableIP(ip) { - reference := strings.TrimSuffix(filepath.Base(conn.URL.Path), filepath.Ext(conn.URL.Path)) - ch.collector.RegisterAndActivate(addr, reference, "play:"+conn.URL.Path, addr) + ch.collector.RegisterAndActivate(addr, ch.reference, "play:"+ch.path, addr) } ch.lock.Lock() @@ -437,8 +437,10 @@ func (s *server) handlePublish(conn *rtmp.Conn) { ch := s.channels[conn.URL.Path] if ch == nil { + reference := strings.TrimPrefix(strings.TrimSuffix(conn.URL.Path, filepath.Ext(conn.URL.Path)), s.app+"/") + // Create a new channel - ch = newChannel(conn, s.collector) + ch = newChannel(conn, reference, s.collector) ch.metadata = conn.GetMetaData() ch.queue = pubsub.NewQueue() ch.queue.WriteHeader(streams) From f60d09963c94a1bf7b0d786f2241f823e3c04784 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Fri, 19 Aug 2022 11:46:30 +0300 Subject: [PATCH 3/3] Add RegistryReader interface for read-only registry --- http/handler/api/session.go | 4 ++-- http/handler/api/widget.go | 4 ++-- http/server.go | 2 +- monitor/session.go | 4 ++-- session/registry.go | 26 +++++++++++++++----------- 5 files changed, 22 insertions(+), 18 deletions(-) diff --git a/http/handler/api/session.go b/http/handler/api/session.go index c0257a99..de297cd9 100644 --- a/http/handler/api/session.go +++ b/http/handler/api/session.go @@ -13,11 +13,11 @@ import ( // The SessionHandler type provides handlers to retrieve session information type SessionHandler struct { - registry session.Registry + registry session.RegistryReader } // NewSession returns a new Session type. You have to provide a session registry. -func NewSession(registry session.Registry) *SessionHandler { +func NewSession(registry session.RegistryReader) *SessionHandler { return &SessionHandler{ registry: registry, } diff --git a/http/handler/api/widget.go b/http/handler/api/widget.go index 6eba3c09..7bd22602 100644 --- a/http/handler/api/widget.go +++ b/http/handler/api/widget.go @@ -13,13 +13,13 @@ import ( type WidgetConfig struct { Restream restream.Restreamer - Registry session.Registry + Registry session.RegistryReader } // The WidgetHandler type provides handlers for the widget API type WidgetHandler struct { restream restream.Restreamer - registry session.Registry + registry session.RegistryReader } // NewWidget return a new Widget type diff --git a/http/server.go b/http/server.go index 94c4b026..439df8ab 100644 --- a/http/server.go +++ b/http/server.go @@ -89,7 +89,7 @@ type Config struct { JWT jwt.JWT Config config.Store Cache cache.Cacher - Sessions session.Registry + Sessions session.RegistryReader Router router.Router ReadOnly bool } diff --git a/monitor/session.go b/monitor/session.go index 118a5ea4..d2ac15b9 100644 --- a/monitor/session.go +++ b/monitor/session.go @@ -7,7 +7,7 @@ import ( type sessionCollector struct { prefix string - r session.Registry + r session.RegistryReader collectors []string totalDescr *metric.Description limitDescr *metric.Description @@ -20,7 +20,7 @@ type sessionCollector struct { maxRxBitrateDescr *metric.Description } -func NewSessionCollector(r session.Registry, collectors []string) metric.Collector { +func NewSessionCollector(r session.RegistryReader, collectors []string) metric.Collector { c := &sessionCollector{ prefix: "session", r: r, diff --git a/session/registry.go b/session/registry.go index 3e2570f2..adf40530 100644 --- a/session/registry.go +++ b/session/registry.go @@ -22,6 +22,20 @@ type Config struct { Logger log.Logger } +type RegistryReader interface { + // Collectors returns an array of all registered IDs + Collectors() []string + + // Collector returns the collector with the ID, or nil if the ID is not registered + Collector(id string) Collector + + // Summary returns the summary from a collector with the ID, or an empty summary if the ID is not registered + Summary(id string) Summary + + // Active returns the active sessions from a collector with the ID, or an empty list if the ID is not registered + Active(id string) []Session +} + // The Registry interface type Registry interface { // Register returns a new collector from conf and registers it under the id and error is nil. In case of error @@ -34,17 +48,7 @@ type Registry interface { // UnregisterAll unregisters al registered collectors UnregisterAll() - // Collectors returns an array of all registered IDs - Collectors() []string - - // Collector returns the collector with the ID, or nil if the ID is not registered - Collector(id string) Collector - - // Summary returns the summary from a collector with the ID, or an empty summary if the ID is not registered - Summary(id string) Summary - - // Active returns the active sessions from a collector with the ID, or an empty list if the ID is not registered - Active(id string) []Session + RegistryReader } type registry struct {