Merge branch 'dev' into cluster
This commit is contained in:
commit
5cdef36750
@ -13,11 +13,11 @@ import (
|
||||
|
||||
// The SessionHandler type provides handlers to retrieve session information
|
||||
type SessionHandler struct {
|
||||
registry session.Registry
|
||||
registry session.RegistryReader
|
||||
}
|
||||
|
||||
// NewSession returns a new Session type. You have to provide a session registry.
|
||||
func NewSession(registry session.Registry) *SessionHandler {
|
||||
func NewSession(registry session.RegistryReader) *SessionHandler {
|
||||
return &SessionHandler{
|
||||
registry: registry,
|
||||
}
|
||||
|
||||
@ -13,13 +13,13 @@ import (
|
||||
|
||||
type WidgetConfig struct {
|
||||
Restream restream.Restreamer
|
||||
Registry session.Registry
|
||||
Registry session.RegistryReader
|
||||
}
|
||||
|
||||
// The WidgetHandler type provides handlers for the widget API
|
||||
type WidgetHandler struct {
|
||||
restream restream.Restreamer
|
||||
registry session.Registry
|
||||
registry session.RegistryReader
|
||||
}
|
||||
|
||||
// NewWidget return a new Widget type
|
||||
|
||||
@ -91,7 +91,7 @@ type Config struct {
|
||||
JWT jwt.JWT
|
||||
Config config.Store
|
||||
Cache cache.Cacher
|
||||
Sessions session.Registry
|
||||
Sessions session.RegistryReader
|
||||
Router router.Router
|
||||
ReadOnly bool
|
||||
Cluster cluster.Cluster
|
||||
|
||||
@ -7,7 +7,7 @@ import (
|
||||
|
||||
type sessionCollector struct {
|
||||
prefix string
|
||||
r session.Registry
|
||||
r session.RegistryReader
|
||||
collectors []string
|
||||
totalDescr *metric.Description
|
||||
limitDescr *metric.Description
|
||||
@ -20,7 +20,7 @@ type sessionCollector struct {
|
||||
maxRxBitrateDescr *metric.Description
|
||||
}
|
||||
|
||||
func NewSessionCollector(r session.Registry, collectors []string) metric.Collector {
|
||||
func NewSessionCollector(r session.RegistryReader, collectors []string) metric.Collector {
|
||||
c := &sessionCollector{
|
||||
prefix: "session",
|
||||
r: r,
|
||||
|
||||
@ -111,6 +111,7 @@ func (fs *filesystem) SetCleanup(id string, patterns []Pattern) {
|
||||
|
||||
fs.cleanupPatterns[id] = append(fs.cleanupPatterns[id], patterns...)
|
||||
}
|
||||
|
||||
func (fs *filesystem) UnsetCleanup(id string) {
|
||||
fs.logger.Debug().WithField("id", id).Log("Remove pattern group")
|
||||
|
||||
@ -118,7 +119,6 @@ func (fs *filesystem) UnsetCleanup(id string) {
|
||||
defer fs.cleanupLock.Unlock()
|
||||
|
||||
patterns := fs.cleanupPatterns[id]
|
||||
|
||||
delete(fs.cleanupPatterns, id)
|
||||
|
||||
fs.purge(patterns)
|
||||
@ -155,7 +155,7 @@ func (fs *filesystem) cleanup() {
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *filesystem) purge(patterns []Pattern) {
|
||||
func (fs *filesystem) purge(patterns []Pattern) (nfiles uint64) {
|
||||
for _, pattern := range patterns {
|
||||
if !pattern.PurgeOnDelete {
|
||||
continue
|
||||
@ -165,8 +165,11 @@ func (fs *filesystem) purge(patterns []Pattern) {
|
||||
for _, f := range files {
|
||||
fs.logger.Debug().WithField("path", f.Name()).Log("Purging file")
|
||||
fs.Filesystem.Delete(f.Name())
|
||||
nfiles++
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (fs *filesystem) cleanupTicker(ctx context.Context, interval time.Duration) {
|
||||
|
||||
@ -913,7 +913,6 @@ func (r *restream) deleteProcess(id string) error {
|
||||
}
|
||||
|
||||
r.unsetPlayoutPorts(task)
|
||||
|
||||
r.unsetCleanup(id)
|
||||
|
||||
delete(r.tasks, id)
|
||||
|
||||
@ -4,8 +4,6 @@ import (
|
||||
"context"
|
||||
"net"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -87,6 +85,7 @@ type channel struct {
|
||||
|
||||
collector session.Collector
|
||||
path string
|
||||
reference string
|
||||
|
||||
publisher *client
|
||||
subscriber map[string]*client
|
||||
@ -95,9 +94,10 @@ type channel struct {
|
||||
isProxy bool
|
||||
}
|
||||
|
||||
func newChannel(conn connection, u *url.URL, remote net.Addr, streams []av.CodecData, isProxy bool, collector session.Collector) *channel {
|
||||
func newChannel(conn connection, u *url.URL, reference string, remote net.Addr, streams []av.CodecData, isProxy bool, collector session.Collector) *channel {
|
||||
ch := &channel{
|
||||
path: u.Path,
|
||||
reference: reference,
|
||||
publisher: newClient(conn, u.Path, collector),
|
||||
subscriber: make(map[string]*client),
|
||||
collector: collector,
|
||||
@ -106,12 +106,13 @@ func newChannel(conn connection, u *url.URL, remote net.Addr, streams []av.Codec
|
||||
isProxy: isProxy,
|
||||
}
|
||||
|
||||
ch.queue.WriteHeader(streams)
|
||||
|
||||
addr := remote.String()
|
||||
ip, _, _ := net.SplitHostPort(addr)
|
||||
|
||||
if collector.IsCollectableIP(ip) {
|
||||
reference := strings.TrimSuffix(filepath.Base(u.Path), filepath.Ext(u.Path))
|
||||
collector.RegisterAndActivate(u.Path, reference, "publish:"+u.Path, addr)
|
||||
collector.RegisterAndActivate(ch.path, ch.reference, "publish:"+ch.path, addr)
|
||||
}
|
||||
|
||||
return ch
|
||||
@ -135,8 +136,7 @@ func (ch *channel) AddSubscriber(conn *rtmp.Conn) string {
|
||||
client := newClient(conn, addr, ch.collector)
|
||||
|
||||
if ch.collector.IsCollectableIP(ip) {
|
||||
reference := strings.TrimSuffix(filepath.Base(conn.URL.Path), filepath.Ext(conn.URL.Path))
|
||||
ch.collector.RegisterAndActivate(addr, reference, "play:"+conn.URL.Path, addr)
|
||||
ch.collector.RegisterAndActivate(addr, ch.reference, "play:"+conn.URL.Path, addr)
|
||||
}
|
||||
|
||||
ch.lock.Lock()
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -349,9 +350,10 @@ func (s *server) publish(src connection, u *url.URL, remote net.Addr, isProxy bo
|
||||
|
||||
ch := s.channels[u.Path]
|
||||
if ch == nil {
|
||||
reference := strings.TrimPrefix(strings.TrimSuffix(u.Path, filepath.Ext(u.Path)), s.app+"/")
|
||||
|
||||
// Create a new channel
|
||||
ch = newChannel(src, u, remote, streams, isProxy, s.collector)
|
||||
ch.queue.WriteHeader(streams)
|
||||
ch = newChannel(src, u, reference, remote, streams, isProxy, s.collector)
|
||||
|
||||
for _, stream := range streams {
|
||||
typ := stream.Type()
|
||||
|
||||
@ -22,6 +22,20 @@ type Config struct {
|
||||
Logger log.Logger
|
||||
}
|
||||
|
||||
type RegistryReader interface {
|
||||
// Collectors returns an array of all registered IDs
|
||||
Collectors() []string
|
||||
|
||||
// Collector returns the collector with the ID, or nil if the ID is not registered
|
||||
Collector(id string) Collector
|
||||
|
||||
// Summary returns the summary from a collector with the ID, or an empty summary if the ID is not registered
|
||||
Summary(id string) Summary
|
||||
|
||||
// Active returns the active sessions from a collector with the ID, or an empty list if the ID is not registered
|
||||
Active(id string) []Session
|
||||
}
|
||||
|
||||
// The Registry interface
|
||||
type Registry interface {
|
||||
// Register returns a new collector from conf and registers it under the id and error is nil. In case of error
|
||||
@ -34,17 +48,7 @@ type Registry interface {
|
||||
// UnregisterAll unregisters al registered collectors
|
||||
UnregisterAll()
|
||||
|
||||
// Collectors returns an array of all registered IDs
|
||||
Collectors() []string
|
||||
|
||||
// Collector returns the collector with the ID, or nil if the ID is not registered
|
||||
Collector(id string) Collector
|
||||
|
||||
// Summary returns the summary from a collector with the ID, or an empty summary if the ID is not registered
|
||||
Summary(id string) Summary
|
||||
|
||||
// Active returns the active sessions from a collector with the ID, or an empty list if the ID is not registered
|
||||
Active(id string) []Session
|
||||
RegistryReader
|
||||
}
|
||||
|
||||
type registry struct {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user