Register proxied srt as normal publisher

This commit is contained in:
Ingo Oppermann 2022-08-15 11:13:42 +03:00
parent 02fec74457
commit 2b05d5fb31
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
2 changed files with 231 additions and 226 deletions

145
srt/channel.go Normal file
View File

@ -0,0 +1,145 @@
package srt
import (
"context"
"net"
"sync"
"time"
"github.com/datarhei/core/v16/session"
srt "github.com/datarhei/gosrt"
)
type client struct {
conn srt.Conn
id string
createdAt time.Time
txbytes uint64
rxbytes uint64
collector session.Collector
cancel context.CancelFunc
}
func newClient(conn srt.Conn, id string, collector session.Collector) *client {
c := &client{
conn: conn,
id: id,
createdAt: time.Now(),
collector: collector,
}
var ctx context.Context
ctx, c.cancel = context.WithCancel(context.Background())
go c.ticker(ctx)
return c
}
func (c *client) ticker(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
stats := c.conn.Stats()
rxbytes := stats.ByteRecv
txbytes := stats.ByteSent
c.collector.Ingress(c.id, int64(rxbytes-c.rxbytes))
c.collector.Egress(c.id, int64(txbytes-c.txbytes))
c.txbytes = txbytes
c.rxbytes = rxbytes
}
}
}
func (c *client) Close() {
c.cancel()
c.conn.Close()
}
// channel represents a stream that is sent to the server
type channel struct {
pubsub srt.PubSub
collector session.Collector
path string
publisher *client
subscriber map[string]*client
lock sync.RWMutex
isProxy bool
}
func newChannel(conn srt.Conn, resource string, isProxy bool, collector session.Collector) *channel {
ch := &channel{
pubsub: srt.NewPubSub(srt.PubSubConfig{}),
path: resource,
publisher: newClient(conn, resource, collector),
subscriber: make(map[string]*client),
collector: collector,
isProxy: isProxy,
}
addr := conn.RemoteAddr().String()
ip, _, _ := net.SplitHostPort(addr)
if collector.IsCollectableIP(ip) {
collector.RegisterAndActivate(resource, resource, "publish:"+resource, addr)
}
return ch
}
func (ch *channel) Close() {
if ch.publisher == nil {
return
}
ch.publisher.Close()
ch.publisher = nil
}
func (ch *channel) AddSubscriber(conn srt.Conn, resource string) string {
addr := conn.RemoteAddr().String()
ip, _, _ := net.SplitHostPort(addr)
client := newClient(conn, addr, ch.collector)
if ch.collector.IsCollectableIP(ip) {
ch.collector.RegisterAndActivate(addr, resource, "play:"+resource, addr)
}
ch.lock.Lock()
ch.subscriber[addr] = client
ch.lock.Unlock()
return addr
}
func (ch *channel) RemoveSubscriber(id string) {
ch.lock.Lock()
defer ch.lock.Unlock()
client := ch.subscriber[id]
if client != nil {
delete(ch.subscriber, id)
client.Close()
}
// If this is a proxied channel and the last subscriber leaves,
// close the channel.
if len(ch.subscriber) == 0 && ch.isProxy {
ch.Close()
}
}

View File

@ -4,7 +4,6 @@ import (
"container/ring"
"context"
"fmt"
"io"
"net"
"strings"
"sync"
@ -20,130 +19,6 @@ import (
// has been closed regularly with the Close() function.
var ErrServerClosed = srt.ErrServerClosed
type client struct {
conn srt.Conn
id string
createdAt time.Time
txbytes uint64
rxbytes uint64
collector session.Collector
cancel context.CancelFunc
}
func newClient(conn srt.Conn, id string, collector session.Collector) *client {
c := &client{
conn: conn,
id: id,
createdAt: time.Now(),
collector: collector,
}
var ctx context.Context
ctx, c.cancel = context.WithCancel(context.Background())
go c.ticker(ctx)
return c
}
func (c *client) ticker(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
stats := c.conn.Stats()
rxbytes := stats.ByteRecv
txbytes := stats.ByteSent
c.collector.Ingress(c.id, int64(rxbytes-c.rxbytes))
c.collector.Egress(c.id, int64(txbytes-c.txbytes))
c.txbytes = txbytes
c.rxbytes = rxbytes
}
}
}
func (c *client) Close() {
c.cancel()
}
// channel represents a stream that is sent to the server
type channel struct {
pubsub srt.PubSub
collector session.Collector
path string
publisher *client
subscriber map[string]*client
lock sync.RWMutex
}
func newChannel(conn srt.Conn, resource string, collector session.Collector) *channel {
ch := &channel{
pubsub: srt.NewPubSub(srt.PubSubConfig{}),
path: resource,
publisher: newClient(conn, resource, collector),
subscriber: make(map[string]*client),
collector: collector,
}
addr := conn.RemoteAddr().String()
ip, _, _ := net.SplitHostPort(addr)
if collector.IsCollectableIP(ip) {
collector.RegisterAndActivate(resource, resource, "publish:"+resource, addr)
}
return ch
}
func (ch *channel) Close() {
if ch.publisher == nil {
return
}
ch.publisher.Close()
ch.publisher = nil
}
func (ch *channel) AddSubscriber(conn srt.Conn, resource string) string {
addr := conn.RemoteAddr().String()
ip, _, _ := net.SplitHostPort(addr)
client := newClient(conn, addr, ch.collector)
if ch.collector.IsCollectableIP(ip) {
ch.collector.RegisterAndActivate(addr, resource, "play:"+resource, addr)
}
ch.lock.Lock()
ch.subscriber[addr] = client
ch.lock.Unlock()
return addr
}
func (ch *channel) RemoveSubscriber(id string) {
ch.lock.Lock()
defer ch.lock.Unlock()
client := ch.subscriber[id]
if client != nil {
delete(ch.subscriber, id)
client.Close()
}
}
// Config for a new SRT server
type Config struct {
// The address the SRT server should listen on, e.g. ":1935"
@ -192,7 +67,6 @@ type server struct {
// Map of publishing channels and a lock to serialize access to the map. The map
// index is the name of the resource.
channels map[string]*channel
proxy map[string]*proxy
lock sync.RWMutex
logger log.Logger
@ -234,7 +108,6 @@ func New(config Config) (Server, error) {
s.srtlogLock.Unlock()
s.channels = make(map[string]*channel)
s.proxy = make(map[string]*proxy)
srtconfig := srt.DefaultConfig()
@ -492,51 +365,6 @@ func (s *server) handleConnect(req srt.ConnRequest) srt.ConnType {
return mode
}
func (s *server) handlePublish(conn srt.Conn) {
streamId := conn.StreamId()
client := conn.RemoteAddr()
si, _ := parseStreamId(streamId)
// Look for the stream
s.lock.Lock()
ch := s.channels[si.resource]
if ch == nil {
ch = newChannel(conn, si.resource, s.collector)
s.channels[si.resource] = ch
} else {
ch = nil
}
s.lock.Unlock()
if ch == nil {
s.log("PUBLISH", "CONFLICT", si.resource, "already publishing", client)
conn.Close()
return
}
s.log("PUBLISH", "START", si.resource, "", client)
// Blocks until connection closes
ch.pubsub.Publish(conn)
s.lock.Lock()
delete(s.channels, si.resource)
s.lock.Unlock()
ch.Close()
s.log("PUBLISH", "STOP", si.resource, "", client)
conn.Close()
}
type proxy struct {
listeners uint64
pubsub srt.PubSub
publisher srt.Conn
}
func (s *server) handleSubscribe(conn srt.Conn) {
defer conn.Close()
@ -550,26 +378,7 @@ func (s *server) handleSubscribe(conn srt.Conn) {
ch := s.channels[si.resource]
s.lock.RUnlock()
if ch != nil {
s.log("SUBSCRIBE", "START", si.resource, "", client)
id := ch.AddSubscriber(conn, si.resource)
// Blocks until connection closes
ch.pubsub.Subscribe(conn)
s.log("SUBSCRIBE", "STOP", si.resource, "", client)
ch.RemoveSubscriber(id)
return
}
// Check if the stream is already proxied
s.lock.Lock()
p := s.proxy[si.resource]
if p == nil {
if ch == nil {
// Check in the cluster for the stream and proxy it
srturl, err := s.cluster.GetURL("srt:" + si.resource)
if err != nil {
@ -592,49 +401,100 @@ func (s *server) handleSubscribe(conn srt.Conn) {
return
}
s.log("SUBSCRIBE", "PROXYPUBLISHSTART", srturl, "", client)
wg := sync.WaitGroup{}
wg.Add(1)
p = &proxy{
pubsub: srt.NewPubSub(srt.PubSubConfig{}),
publisher: src,
}
go func() {
s.log("SUBSCRIBE", "PROXYSTART", srturl, "", client)
wg.Done()
err := s.publish(src, true)
if err != nil {
s.logger.Error().WithField("address", srturl).WithError(err).Log("Proxying address failed")
}
s.log("SUBSCRIBE", "PROXYPUBLISHSTOP", srturl, "", client)
}()
go func(resource string, p *proxy) {
err := p.pubsub.Publish(p.publisher)
if err != io.EOF {
s.logger.Error().WithField("address", srturl).WithError(err).Log("Publish failed")
// Wait for the goroutine to start
wg.Wait()
// Wait for channel to become available
ticker := time.NewTicker(200 * time.Millisecond)
tickerStart := time.Now()
for range ticker.C {
s.lock.RLock()
ch = s.channels[si.resource]
s.lock.RUnlock()
if ch != nil {
break
}
p.publisher.Close()
if time.Since(tickerStart) > 2*time.Second {
break
}
}
s.log("SUBSCRIBE", "PROXYPUBLISHSTOP", srturl, "", client)
ticker.Stop()
}
s.lock.Lock()
delete(s.proxy, resource)
s.lock.Unlock()
}(si.resource, p)
if ch != nil {
s.log("SUBSCRIBE", "START", si.resource, "", client)
s.proxy[si.resource] = p
id := ch.AddSubscriber(conn, si.resource)
// Blocks until connection closes
ch.pubsub.Subscribe(conn)
s.log("SUBSCRIBE", "STOP", si.resource, "", client)
ch.RemoveSubscriber(id)
return
}
}
func (s *server) handlePublish(conn srt.Conn) {
s.publish(conn, false)
}
func (s *server) publish(conn srt.Conn, isProxy bool) error {
streamId := conn.StreamId()
client := conn.RemoteAddr()
si, _ := parseStreamId(streamId)
// Look for the stream
s.lock.Lock()
ch := s.channels[si.resource]
if ch == nil {
ch = newChannel(conn, si.resource, isProxy, s.collector)
s.channels[si.resource] = ch
} else {
ch = nil
}
s.lock.Unlock()
if p != nil {
p.listeners++
s.log("SUBSCRIBE", "PROXYSTART", conn.StreamId(), "", client)
// Blocks until connection closes
err := p.pubsub.Subscribe(conn)
if err != io.EOF {
s.logger.Error().WithField("streamid", conn.StreamId()).WithError(err).Log("Subscribe failed")
}
s.log("SUBSCRIBE", "PROXYSTOP", conn.StreamId(), "", client)
p.listeners--
if p.listeners <= 0 {
p.publisher.Close()
}
if ch == nil {
s.log("PUBLISH", "CONFLICT", si.resource, "already publishing", client)
conn.Close()
return fmt.Errorf("already publishing this resource")
}
s.log("PUBLISH", "START", si.resource, "", client)
// Blocks until connection closes
ch.pubsub.Publish(conn)
s.lock.Lock()
delete(s.channels, si.resource)
s.lock.Unlock()
ch.Close()
s.log("PUBLISH", "STOP", si.resource, "", client)
conn.Close()
return nil
}