diff --git a/http/handler/api/session.go b/http/handler/api/session.go index c0257a99..de297cd9 100644 --- a/http/handler/api/session.go +++ b/http/handler/api/session.go @@ -13,11 +13,11 @@ import ( // The SessionHandler type provides handlers to retrieve session information type SessionHandler struct { - registry session.Registry + registry session.RegistryReader } // NewSession returns a new Session type. You have to provide a session registry. -func NewSession(registry session.Registry) *SessionHandler { +func NewSession(registry session.RegistryReader) *SessionHandler { return &SessionHandler{ registry: registry, } diff --git a/http/handler/api/widget.go b/http/handler/api/widget.go index 6eba3c09..7bd22602 100644 --- a/http/handler/api/widget.go +++ b/http/handler/api/widget.go @@ -13,13 +13,13 @@ import ( type WidgetConfig struct { Restream restream.Restreamer - Registry session.Registry + Registry session.RegistryReader } // The WidgetHandler type provides handlers for the widget API type WidgetHandler struct { restream restream.Restreamer - registry session.Registry + registry session.RegistryReader } // NewWidget return a new Widget type diff --git a/http/server.go b/http/server.go index 2f4fec9a..e0a46c60 100644 --- a/http/server.go +++ b/http/server.go @@ -91,7 +91,7 @@ type Config struct { JWT jwt.JWT Config config.Store Cache cache.Cacher - Sessions session.Registry + Sessions session.RegistryReader Router router.Router ReadOnly bool Cluster cluster.Cluster diff --git a/monitor/session.go b/monitor/session.go index 118a5ea4..d2ac15b9 100644 --- a/monitor/session.go +++ b/monitor/session.go @@ -7,7 +7,7 @@ import ( type sessionCollector struct { prefix string - r session.Registry + r session.RegistryReader collectors []string totalDescr *metric.Description limitDescr *metric.Description @@ -20,7 +20,7 @@ type sessionCollector struct { maxRxBitrateDescr *metric.Description } -func NewSessionCollector(r session.Registry, collectors []string) metric.Collector { +func NewSessionCollector(r session.RegistryReader, collectors []string) metric.Collector { c := &sessionCollector{ prefix: "session", r: r, diff --git a/restream/fs/fs.go b/restream/fs/fs.go index f4952ebf..4769597f 100644 --- a/restream/fs/fs.go +++ b/restream/fs/fs.go @@ -111,6 +111,7 @@ func (fs *filesystem) SetCleanup(id string, patterns []Pattern) { fs.cleanupPatterns[id] = append(fs.cleanupPatterns[id], patterns...) } + func (fs *filesystem) UnsetCleanup(id string) { fs.logger.Debug().WithField("id", id).Log("Remove pattern group") @@ -118,7 +119,6 @@ func (fs *filesystem) UnsetCleanup(id string) { defer fs.cleanupLock.Unlock() patterns := fs.cleanupPatterns[id] - delete(fs.cleanupPatterns, id) fs.purge(patterns) @@ -155,7 +155,7 @@ func (fs *filesystem) cleanup() { } } -func (fs *filesystem) purge(patterns []Pattern) { +func (fs *filesystem) purge(patterns []Pattern) (nfiles uint64) { for _, pattern := range patterns { if !pattern.PurgeOnDelete { continue @@ -165,8 +165,11 @@ func (fs *filesystem) purge(patterns []Pattern) { for _, f := range files { fs.logger.Debug().WithField("path", f.Name()).Log("Purging file") fs.Filesystem.Delete(f.Name()) + nfiles++ } } + + return } func (fs *filesystem) cleanupTicker(ctx context.Context, interval time.Duration) { diff --git a/restream/restream.go b/restream/restream.go index 3793a787..137e0acb 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -913,7 +913,6 @@ func (r *restream) deleteProcess(id string) error { } r.unsetPlayoutPorts(task) - r.unsetCleanup(id) delete(r.tasks, id) diff --git a/rtmp/channel.go b/rtmp/channel.go index 52c1db84..9ea68fe3 100644 --- a/rtmp/channel.go +++ b/rtmp/channel.go @@ -4,8 +4,6 @@ import ( "context" "net" "net/url" - "path/filepath" - "strings" "sync" "time" @@ -87,6 +85,7 @@ type channel struct { collector session.Collector path string + reference string publisher *client subscriber map[string]*client @@ -95,9 +94,10 @@ type channel struct { isProxy bool } -func newChannel(conn connection, u *url.URL, remote net.Addr, streams []av.CodecData, isProxy bool, collector session.Collector) *channel { +func newChannel(conn connection, u *url.URL, reference string, remote net.Addr, streams []av.CodecData, isProxy bool, collector session.Collector) *channel { ch := &channel{ path: u.Path, + reference: reference, publisher: newClient(conn, u.Path, collector), subscriber: make(map[string]*client), collector: collector, @@ -106,12 +106,13 @@ func newChannel(conn connection, u *url.URL, remote net.Addr, streams []av.Codec isProxy: isProxy, } + ch.queue.WriteHeader(streams) + addr := remote.String() ip, _, _ := net.SplitHostPort(addr) if collector.IsCollectableIP(ip) { - reference := strings.TrimSuffix(filepath.Base(u.Path), filepath.Ext(u.Path)) - collector.RegisterAndActivate(u.Path, reference, "publish:"+u.Path, addr) + collector.RegisterAndActivate(ch.path, ch.reference, "publish:"+ch.path, addr) } return ch @@ -135,8 +136,7 @@ func (ch *channel) AddSubscriber(conn *rtmp.Conn) string { client := newClient(conn, addr, ch.collector) if ch.collector.IsCollectableIP(ip) { - reference := strings.TrimSuffix(filepath.Base(conn.URL.Path), filepath.Ext(conn.URL.Path)) - ch.collector.RegisterAndActivate(addr, reference, "play:"+conn.URL.Path, addr) + ch.collector.RegisterAndActivate(addr, ch.reference, "play:"+conn.URL.Path, addr) } ch.lock.Lock() diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 6267ace8..8366b60c 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "net/url" + "path/filepath" "strings" "sync" "time" @@ -349,9 +350,10 @@ func (s *server) publish(src connection, u *url.URL, remote net.Addr, isProxy bo ch := s.channels[u.Path] if ch == nil { + reference := strings.TrimPrefix(strings.TrimSuffix(u.Path, filepath.Ext(u.Path)), s.app+"/") + // Create a new channel - ch = newChannel(src, u, remote, streams, isProxy, s.collector) - ch.queue.WriteHeader(streams) + ch = newChannel(src, u, reference, remote, streams, isProxy, s.collector) for _, stream := range streams { typ := stream.Type() diff --git a/session/registry.go b/session/registry.go index 3e2570f2..adf40530 100644 --- a/session/registry.go +++ b/session/registry.go @@ -22,6 +22,20 @@ type Config struct { Logger log.Logger } +type RegistryReader interface { + // Collectors returns an array of all registered IDs + Collectors() []string + + // Collector returns the collector with the ID, or nil if the ID is not registered + Collector(id string) Collector + + // Summary returns the summary from a collector with the ID, or an empty summary if the ID is not registered + Summary(id string) Summary + + // Active returns the active sessions from a collector with the ID, or an empty list if the ID is not registered + Active(id string) []Session +} + // The Registry interface type Registry interface { // Register returns a new collector from conf and registers it under the id and error is nil. In case of error @@ -34,17 +48,7 @@ type Registry interface { // UnregisterAll unregisters al registered collectors UnregisterAll() - // Collectors returns an array of all registered IDs - Collectors() []string - - // Collector returns the collector with the ID, or nil if the ID is not registered - Collector(id string) Collector - - // Summary returns the summary from a collector with the ID, or an empty summary if the ID is not registered - Summary(id string) Summary - - // Active returns the active sessions from a collector with the ID, or an empty list if the ID is not registered - Active(id string) []Session + RegistryReader } type registry struct {