core/srt/srt.go
Ingo Oppermann 35c5c9f077
Add alternative streamid format for SRT
The streamid format that starts with #!: is recommended in the SRT
specs but it usually causes trouble where you're limited in the
use of such characters. Some hardware devices will not accept such
streamids.

The alternative format is simpler and has the form
[resource](,token:[token])?(,mode:[mode])?

token and mode are optional. mode can have the values "publish" or
"request". If mode is not provided, a value of "request" is
assumed.
2022-10-25 14:00:27 +02:00

592 lines
12 KiB
Go

package srt
import (
"container/ring"
"context"
"fmt"
"net"
"regexp"
"strings"
"sync"
"time"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/session"
srt "github.com/datarhei/gosrt"
)
// ErrServerClosed is returned by ListenAndServe if the server
// has been closed regularly with the Close() function.
var ErrServerClosed = srt.ErrServerClosed
type client struct {
conn srt.Conn
id string
createdAt time.Time
txbytes uint64
rxbytes uint64
collector session.Collector
cancel context.CancelFunc
}
func newClient(conn srt.Conn, id string, collector session.Collector) *client {
c := &client{
conn: conn,
id: id,
createdAt: time.Now(),
collector: collector,
}
var ctx context.Context
ctx, c.cancel = context.WithCancel(context.Background())
go c.ticker(ctx)
return c
}
func (c *client) ticker(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
stats := c.conn.Stats()
rxbytes := stats.ByteRecv
txbytes := stats.ByteSent
c.collector.Ingress(c.id, int64(rxbytes-c.rxbytes))
c.collector.Egress(c.id, int64(txbytes-c.txbytes))
c.txbytes = txbytes
c.rxbytes = rxbytes
}
}
}
func (c *client) Close() {
c.cancel()
}
// channel represents a stream that is sent to the server
type channel struct {
pubsub srt.PubSub
collector session.Collector
path string
publisher *client
subscriber map[string]*client
lock sync.RWMutex
}
func newChannel(conn srt.Conn, resource string, collector session.Collector) *channel {
ch := &channel{
pubsub: srt.NewPubSub(srt.PubSubConfig{}),
path: resource,
publisher: newClient(conn, resource, collector),
subscriber: make(map[string]*client),
collector: collector,
}
addr := conn.RemoteAddr().String()
ip, _, _ := net.SplitHostPort(addr)
if collector.IsCollectableIP(ip) {
collector.RegisterAndActivate(resource, resource, "publish:"+resource, addr)
}
return ch
}
func (ch *channel) Close() {
if ch.publisher == nil {
return
}
ch.publisher.Close()
ch.publisher = nil
}
func (ch *channel) AddSubscriber(conn srt.Conn, resource string) string {
addr := conn.RemoteAddr().String()
ip, _, _ := net.SplitHostPort(addr)
client := newClient(conn, addr, ch.collector)
if ch.collector.IsCollectableIP(ip) {
ch.collector.RegisterAndActivate(addr, resource, "play:"+resource, addr)
}
ch.lock.Lock()
ch.subscriber[addr] = client
ch.lock.Unlock()
return addr
}
func (ch *channel) RemoveSubscriber(id string) {
ch.lock.Lock()
defer ch.lock.Unlock()
client := ch.subscriber[id]
if client != nil {
delete(ch.subscriber, id)
client.Close()
}
}
// Config for a new SRT server
type Config struct {
// The address the SRT server should listen on, e.g. ":1935"
Addr string
// A token that needs to be added to the URL as query string
// in order push or pull a stream. The key for the query
// parameter is "token". Optional. By default no token is
// required.
Token string
Passphrase string
// Logger. Optional.
Logger log.Logger
Collector session.Collector
SRTLogTopics []string
}
// Server represents a SRT server
type Server interface {
// ListenAndServe starts the SRT server
ListenAndServe() error
// Close stops the RTMP server and closes all connections
Close()
// Channels return a list of currently publishing streams
Channels() Channels
}
// server implements the Server interface
type server struct {
addr string
token string
passphrase string
collector session.Collector
server srt.Server
// Map of publishing channels and a lock to serialize
// access to the map.
channels map[string]*channel
lock sync.RWMutex
logger log.Logger
srtlogger srt.Logger
srtloggerCancel context.CancelFunc
srtlog map[string]*ring.Ring
srtlogLock sync.RWMutex
}
func New(config Config) (Server, error) {
s := &server{
addr: config.Addr,
token: config.Token,
passphrase: config.Passphrase,
collector: config.Collector,
logger: config.Logger,
}
if s.collector == nil {
s.collector = session.NewNullCollector()
}
if s.logger == nil {
s.logger = log.New("")
}
s.srtlogger = srt.NewLogger(config.SRTLogTopics)
s.srtlogLock.Lock()
s.srtlog = make(map[string]*ring.Ring)
s.srtlogLock.Unlock()
s.channels = make(map[string]*channel)
srtconfig := srt.DefaultConfig()
srtconfig.KMPreAnnounce = 200
srtconfig.KMRefreshRate = 10000
srtconfig.Passphrase = config.Passphrase
srtconfig.Logger = s.srtlogger
s.server.Addr = config.Addr
s.server.Config = &srtconfig
s.server.HandleConnect = s.handleConnect
s.server.HandlePublish = s.handlePublish
s.server.HandleSubscribe = s.handleSubscribe
return s, nil
}
func (s *server) ListenAndServe() error {
var ctx context.Context
ctx, s.srtloggerCancel = context.WithCancel(context.Background())
go s.srtlogListener(ctx)
return s.server.ListenAndServe()
}
func (s *server) Close() {
s.server.Shutdown()
s.srtloggerCancel()
}
type Log struct {
Timestamp time.Time
Message []string
}
type Connection struct {
Log map[string][]Log
Stats srt.Statistics
}
type Channels struct {
Publisher map[string]uint32
Subscriber map[string][]uint32
Connections map[uint32]Connection
Log map[string][]Log
}
func (s *server) Channels() Channels {
st := Channels{
Publisher: map[string]uint32{},
Subscriber: map[string][]uint32{},
Connections: map[uint32]Connection{},
Log: map[string][]Log{},
}
s.lock.RLock()
for id, ch := range s.channels {
socketId := ch.publisher.conn.SocketId()
st.Publisher[id] = socketId
st.Connections[socketId] = Connection{
Stats: ch.publisher.conn.Stats(),
Log: map[string][]Log{},
}
for _, c := range ch.subscriber {
socketId := c.conn.SocketId()
st.Subscriber[id] = append(st.Subscriber[id], socketId)
st.Connections[socketId] = Connection{
Stats: c.conn.Stats(),
Log: map[string][]Log{},
}
}
}
s.lock.RUnlock()
s.srtlogLock.RLock()
for topic, buf := range s.srtlog {
buf.Do(func(l interface{}) {
if l == nil {
return
}
ll := l.(srt.Log)
log := Log{
Timestamp: ll.Time,
Message: strings.Split(ll.Message, "\n"),
}
if ll.SocketId != 0 {
if _, ok := st.Connections[ll.SocketId]; ok {
st.Connections[ll.SocketId].Log[topic] = append(st.Connections[ll.SocketId].Log[topic], log)
}
} else {
st.Log[topic] = append(st.Log[topic], log)
}
})
}
s.srtlogLock.RUnlock()
return st
}
func (s *server) srtlogListener(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case l := <-s.srtlogger.Listen():
s.srtlogLock.Lock()
if buf := s.srtlog[l.Topic]; buf == nil {
s.srtlog[l.Topic] = ring.New(100)
}
s.srtlog[l.Topic].Value = l
s.srtlog[l.Topic] = s.srtlog[l.Topic].Next()
s.srtlogLock.Unlock()
}
}
}
func (s *server) log(handler, action, resource, message string, client net.Addr) {
s.logger.Info().WithFields(log.Fields{
"handler": handler,
"status": action,
"resource": resource,
"client": client.String(),
}).Log(message)
}
type streamInfo struct {
mode string
resource string
token string
}
func parseStreamId(streamid string) (streamInfo, error) {
si := streamInfo{}
if strings.HasPrefix(streamid, "#!:") {
return parseOldStreamId(streamid)
}
re := regexp.MustCompile(`,(token|mode):(.+)`)
results := map[string]string{}
idEnd := -1
value := streamid
key := ""
for {
matches := re.FindStringSubmatchIndex(value)
if matches == nil {
break
}
if idEnd < 0 {
idEnd = matches[2] - 1
}
if len(key) != 0 {
results[key] = value[:matches[2]-1]
}
key = value[matches[2]:matches[3]]
value = value[matches[4]:matches[5]]
results[key] = value
}
if idEnd < 0 {
idEnd = len(streamid)
}
si.resource = streamid[:idEnd]
if token, ok := results["token"]; ok {
si.token = token
}
if mode, ok := results["mode"]; ok {
si.mode = mode
} else {
si.mode = "request"
}
return si, nil
}
func parseOldStreamId(streamid string) (streamInfo, error) {
si := streamInfo{}
if !strings.HasPrefix(streamid, "#!:") {
return si, fmt.Errorf("unknown streamid format")
}
streamid = strings.TrimPrefix(streamid, "#!:")
kvs := strings.Split(streamid, ",")
splitFn := func(s, sep string) (string, string, error) {
splitted := strings.SplitN(s, sep, 2)
if len(splitted) != 2 {
return "", "", fmt.Errorf("invalid key/value pair")
}
return splitted[0], splitted[1], nil
}
for _, kv := range kvs {
key, value, err := splitFn(kv, "=")
if err != nil {
continue
}
switch key {
case "m":
si.mode = value
case "r":
si.resource = value
case "token":
si.token = value
default:
}
}
return si, nil
}
func (s *server) handleConnect(req srt.ConnRequest) srt.ConnType {
mode := srt.REJECT
client := req.RemoteAddr()
streamId := req.StreamId()
si, err := parseStreamId(streamId)
if err != nil {
s.log("CONNECT", "INVALID", "", err.Error(), client)
return srt.REJECT
}
if len(si.resource) == 0 {
s.log("CONNECT", "INVALID", "", "stream resource not provided", client)
return srt.REJECT
}
if si.mode == "publish" {
mode = srt.PUBLISH
} else if si.mode == "request" {
mode = srt.SUBSCRIBE
} else {
s.log("CONNECT", "INVALID", si.resource, "invalid connection mode", client)
return srt.REJECT
}
if len(s.passphrase) != 0 {
if !req.IsEncrypted() {
s.log("CONNECT", "FORBIDDEN", si.resource, "connection has to be encrypted", client)
return srt.REJECT
}
if err := req.SetPassphrase(s.passphrase); err != nil {
s.log("CONNECT", "FORBIDDEN", si.resource, err.Error(), client)
return srt.REJECT
}
} else {
if req.IsEncrypted() {
s.log("CONNECT", "INVALID", si.resource, "connection must not be encrypted", client)
return srt.REJECT
}
}
// Check the token
if len(s.token) != 0 && s.token != si.token {
s.log("CONNECT", "FORBIDDEN", si.resource, "invalid token ("+si.token+")", client)
return srt.REJECT
}
s.lock.RLock()
ch := s.channels[si.resource]
s.lock.RUnlock()
if mode == srt.PUBLISH && ch != nil {
s.log("CONNECT", "CONFLICT", si.resource, "already publishing", client)
return srt.REJECT
}
if mode == srt.SUBSCRIBE && ch == nil {
s.log("CONNECT", "NOTFOUND", si.resource, "no publisher for this resource found", client)
return srt.REJECT
}
return mode
}
func (s *server) handlePublish(conn srt.Conn) {
streamId := conn.StreamId()
client := conn.RemoteAddr()
si, _ := parseStreamId(streamId)
// Look for the stream
s.lock.Lock()
ch := s.channels[si.resource]
if ch == nil {
ch = newChannel(conn, si.resource, s.collector)
s.channels[si.resource] = ch
} else {
ch = nil
}
s.lock.Unlock()
if ch == nil {
s.log("PUBLISH", "CONFLICT", si.resource, "already publishing", client)
conn.Close()
return
}
s.log("PUBLISH", "START", si.resource, "", client)
ch.pubsub.Publish(conn)
s.lock.Lock()
delete(s.channels, si.resource)
s.lock.Unlock()
ch.Close()
s.log("PUBLISH", "STOP", si.resource, "", client)
conn.Close()
}
func (s *server) handleSubscribe(conn srt.Conn) {
streamId := conn.StreamId()
client := conn.RemoteAddr()
si, _ := parseStreamId(streamId)
// Look for the stream
s.lock.RLock()
ch := s.channels[si.resource]
s.lock.RUnlock()
if ch == nil {
s.log("SUBSCRIBE", "NOTFOUND", si.resource, "no publisher for this resource found", client)
conn.Close()
return
}
s.log("SUBSCRIBE", "START", si.resource, "", client)
id := ch.AddSubscriber(conn, si.resource)
ch.pubsub.Subscribe(conn)
s.log("SUBSCRIBE", "STOP", si.resource, "", client)
ch.RemoveSubscriber(id)
conn.Close()
}