diff --git a/go.mod b/go.mod index c1e1be51..15be9165 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/atrox/haikunatorgo/v2 v2.0.1 github.com/caddyserver/certmagic v0.21.2 github.com/datarhei/gosrt v0.6.0 - github.com/datarhei/joy4 v0.0.0-20240529142948-6b449f526167 + github.com/datarhei/joy4 v0.0.0-20240530204135-9c6cb8a1c911 github.com/go-playground/validator/v10 v10.20.0 github.com/gobwas/glob v0.2.3 github.com/golang-jwt/jwt/v5 v5.2.1 diff --git a/go.sum b/go.sum index c3fe1f3d..63446921 100644 --- a/go.sum +++ b/go.sum @@ -30,8 +30,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lV github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/datarhei/gosrt v0.6.0 h1:HrrXAw90V78ok4WMIhX6se1aTHPCn82Sg2hj+PhdmGc= github.com/datarhei/gosrt v0.6.0/go.mod h1:fsOWdLSHUHShHjgi/46h6wjtdQrtnSdAQFnlas8ONxs= -github.com/datarhei/joy4 v0.0.0-20240529142948-6b449f526167 h1:hkMdYcoj4H0TQ2rSfN5uU164+Qm27P2qN04D37ISKo0= -github.com/datarhei/joy4 v0.0.0-20240529142948-6b449f526167/go.mod h1:Jcw/6jZDQQmPx8A7INEkXmuEF7E9jjBbSTfVSLwmiQw= +github.com/datarhei/joy4 v0.0.0-20240530204135-9c6cb8a1c911 h1:kDvbQ49kbq6vY0UpflEMkqJwjYpll/JI553Ry+1XCmo= +github.com/datarhei/joy4 v0.0.0-20240530204135-9c6cb8a1c911/go.mod h1:Jcw/6jZDQQmPx8A7INEkXmuEF7E9jjBbSTfVSLwmiQw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/vendor/github.com/datarhei/joy4/format/rtmp/rtmp.go b/vendor/github.com/datarhei/joy4/format/rtmp/rtmp.go index ea1454ac..7700d239 100644 --- a/vendor/github.com/datarhei/joy4/format/rtmp/rtmp.go +++ b/vendor/github.com/datarhei/joy4/format/rtmp/rtmp.go @@ -35,11 +35,16 @@ func ParseURL(uri string) (u *url.URL, err error) { return } -func Dial(uri string) (conn *Conn, err error) { - return DialTimeout(uri, 0) +type DialOptions struct { + MaxProbePacketCount int + DebugChunks func(conn net.Conn) bool } -func DialTimeout(uri string, timeout time.Duration) (conn *Conn, err error) { +func Dial(uri string, options DialOptions) (conn *Conn, err error) { + return DialTimeout(uri, 0, options) +} + +func DialTimeout(uri string, timeout time.Duration, options DialOptions) (conn *Conn, err error) { var u *url.URL if u, err = ParseURL(uri); err != nil { return @@ -53,6 +58,10 @@ func DialTimeout(uri string, timeout time.Duration) (conn *Conn, err error) { conn = NewConn(netconn) conn.URL = u + conn.prober = flv.NewProber(options.MaxProbePacketCount) + if options.DebugChunks != nil { + conn.debugChunks = options.DebugChunks(netconn) + } return } @@ -374,20 +383,20 @@ func NewConn(netconn net.Conn) *Conn { } type chunkStream struct { - timenow uint32 - prevtimenow uint32 - tscount int - gentimenow bool - timedelta uint32 - hastimeext bool - timeext uint32 - msgsid uint32 - msgtypeid uint8 - msgdatalen uint32 - msgdataleft uint32 - msghdrtype uint8 - msgdata []byte - msgcount int + timenow uint32 + prevtimenow uint32 + sametscount int + genwallclocktime bool + timedelta uint32 + hastimeext bool + timeext uint32 + msgsid uint32 + msgtypeid uint8 + msgdatalen uint32 + msgdataleft uint32 + msghdrtype uint8 + msgdata []byte + msgcount uint64 } func (cs *chunkStream) Start() { @@ -421,6 +430,14 @@ func (conn *Conn) NetConn() net.Conn { return conn.netconn.Conn } +func (conn *Conn) LocalAddr() net.Addr { + return conn.netconn.LocalAddr() +} + +func (conn *Conn) RemoteAddr() net.Addr { + return conn.netconn.RemoteAddr() +} + func (conn *Conn) SetReadIdleTimeout(d time.Duration) error { return conn.netconn.SetReadIdleTimeout(d) } @@ -1659,8 +1676,12 @@ func (conn *Conn) readChunk() (err error) { } if conn.debugChunks { - data := fmt.Sprintf("rtmp: chunk id=%d msgsid=%d msgtypeid=%d msghdrtype=%d timestamp=%d ext=%v len=%d left=%d max=%d", - csid, cs.msgsid, cs.msgtypeid, msghdrtype, cs.timenow, cs.hastimeext, cs.msgdatalen, cs.msgdataleft, conn.readMaxChunkSize) + path := "***" + if conn.URL != nil { + path = conn.URL.Path + } + data := fmt.Sprintf("%s: chunk id=%d msgsid=%d msgtypeid=%d msghdrtype=%d timestamp=%d ext=%v wallclock=%v len=%d left=%d max=%d", + path, csid, cs.msgsid, cs.msgtypeid, msghdrtype, cs.timenow, cs.hastimeext, cs.genwallclocktime, cs.msgdatalen, cs.msgdataleft, conn.readMaxChunkSize) if cs.msgtypeid != msgtypeidVideoMsg && cs.msgtypeid != msgtypeidAudioMsg { if len(cs.msgdata) > 1024 { @@ -1686,26 +1707,28 @@ func (conn *Conn) readChunk() (err error) { timestamp = cs.timenow if cs.msgtypeid == msgtypeidVideoMsg || cs.msgtypeid == msgtypeidAudioMsg { - if cs.msgcount < 20 { // only consider the first video and audio messages - if !cs.gentimenow { - if cs.prevtimenow >= cs.timenow { - cs.tscount++ - } else { - cs.tscount = 0 - } - - // if the previous timestamp is the same as the current for too often in a row, assume defect timestamps - if cs.tscount > 10 { - cs.gentimenow = true - } - - cs.prevtimenow = cs.timenow + if !cs.genwallclocktime { + if cs.prevtimenow >= cs.timenow { + cs.sametscount++ + } else { + cs.sametscount = 0 } + // if the previous timestamp is the same as the current for too often in a row, assume defect timestamps + if cs.sametscount > 10 { + if cs.msgcount < 20 { // only consider the first video and audio messages + cs.genwallclocktime = true + } else { // otherwise bail out + err = fmt.Errorf("detected sequence of non-changing timestamps: %d (msgtypeid %d)", cs.timenow, cs.msgtypeid) + return + } + } + + cs.prevtimenow = cs.timenow cs.msgcount++ } - if cs.gentimenow { + if cs.genwallclocktime { timestamp = uint32(time.Since(conn.start).Milliseconds() % 0xFFFFFFFF) } } @@ -2104,7 +2127,7 @@ func Handler(h *avutil.RegisterHandler) { return } ok = true - demuxer, err = Dial(uri) + demuxer, err = Dial(uri, DialOptions{}) return } @@ -2113,7 +2136,7 @@ func Handler(h *avutil.RegisterHandler) { return } ok = true - muxer, err = Dial(uri) + muxer, err = Dial(uri, DialOptions{}) return } diff --git a/vendor/modules.txt b/vendor/modules.txt index 15228159..25512975 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -64,7 +64,7 @@ github.com/datarhei/gosrt/crypto github.com/datarhei/gosrt/net github.com/datarhei/gosrt/packet github.com/datarhei/gosrt/rand -# github.com/datarhei/joy4 v0.0.0-20240529142948-6b449f526167 +# github.com/datarhei/joy4 v0.0.0-20240530204135-9c6cb8a1c911 ## explicit; go 1.14 github.com/datarhei/joy4/av github.com/datarhei/joy4/av/avutil