From 4344f0bdc9f10d04ca91e6a4448682504d92a162 Mon Sep 17 00:00:00 2001 From: Jan Stabenow Date: Wed, 6 Jul 2022 16:37:41 +0200 Subject: [PATCH] Add stream distribution across multiple internal servers --- src/misc/controls/RTMP.js | 54 ++++++++++++++ src/misc/controls/SRT.js | 54 ++++++++++++++ src/utils/restreamer.js | 148 +++++++++++++++++++++++++++++++------- src/views/Edit/index.js | 24 +++++++ src/views/Main/index.js | 12 +++- 5 files changed, 266 insertions(+), 26 deletions(-) create mode 100644 src/misc/controls/RTMP.js create mode 100644 src/misc/controls/SRT.js diff --git a/src/misc/controls/RTMP.js b/src/misc/controls/RTMP.js new file mode 100644 index 0000000..fb68399 --- /dev/null +++ b/src/misc/controls/RTMP.js @@ -0,0 +1,54 @@ +import React from 'react'; + +import { Trans } from '@lingui/macro'; +import Grid from '@mui/material/Grid'; +import Typography from '@mui/material/Typography'; + +import Checkbox from '../Checkbox'; + +function init(settings) { + const initSettings = { + enable: false, + ...settings, + }; + + return initSettings; +} + +export default function Control(props) { + const settings = init(props.settings); + + // Set the defaults + React.useEffect(() => { + props.onChange(settings, true); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []); + + const handleChange = (what) => (event) => { + const value = event.target.value; + + if (['enable'].includes(what)) { + settings[what] = !settings[what]; + } else { + settings[what] = value; + } + + props.onChange(settings, false); + }; + return ( + + + {/* Todo: Check availability */} + Enable} checked={settings.enable} onChange={handleChange('enable')} /> + + It makes the channel available as an RTMP stream. + + + + ); +} + +Control.defaulProps = { + settings: {}, + onChange: function (settings, automatic) {}, +}; diff --git a/src/misc/controls/SRT.js b/src/misc/controls/SRT.js new file mode 100644 index 0000000..48189a7 --- /dev/null +++ b/src/misc/controls/SRT.js @@ -0,0 +1,54 @@ +import React from 'react'; + +import { Trans } from '@lingui/macro'; +import Grid from '@mui/material/Grid'; +import Typography from '@mui/material/Typography'; + +import Checkbox from '../Checkbox'; + +function init(settings) { + const initSettings = { + enable: false, + ...settings, + }; + + return initSettings; +} + +export default function Control(props) { + const settings = init(props.settings); + + // Set the defaults + React.useEffect(() => { + props.onChange(settings, true); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []); + + const handleChange = (what) => (event) => { + const value = event.target.value; + + if (['enable'].includes(what)) { + settings[what] = !settings[what]; + } else { + settings[what] = value; + } + + props.onChange(settings, false); + }; + return ( + + + {/* Todo: Check availability */} + Enable} checked={settings.enable} onChange={handleChange('enable')} /> + + It makes the channel available as an SRT stream. + + + + ); +} + +Control.defaulProps = { + settings: {}, + onChange: function (settings, automatic) {}, +}; diff --git a/src/utils/restreamer.js b/src/utils/restreamer.js index ded7d11..0f7406d 100644 --- a/src/utils/restreamer.js +++ b/src/utils/restreamer.js @@ -922,6 +922,57 @@ class Restreamer { return [address]; } + // Get all RTMP/SRT/SNAPSHOT+MEMFS/HLS+MEMFS addresses + GetAddresses(what, channelId) { + const config = this.ConfigActive(); + const host = new URL(this.Address()).hostname; + + let address = ''; + + function getPort(servicePort) { + let port = servicePort.split(/:([0-9]+)$/)[1]; + if (port && !port.includes(':')) { + port = `:${port}`; + } + if (port) { + return port; + } else { + return ''; + } + } + + // rtmp/s + if (what && what === 'rtmp') { + const port = getPort(config.source.network.rtmp.host); + + if (config.source.network.rtmp.secure) { + address = `rtmps://${host}${port}/` + (config.source.network.rtmp.app.length !== 0 ? config.source.network.rtmp.app : '') + channelId + '.stream' + (config.source.network.rtmp.token.length !== 0 ? `?token=${config.source.network.rtmp.token}` : ''); + } else { + address = `rtmp://${host}${port}/` + (config.source.network.rtmp.app.length !== 0 ? config.source.network.rtmp.app : '') + channelId + '.stream' + (config.source.network.rtmp.token.length !== 0 ? `?token=${config.source.network.rtmp.token}` : ''); + } + + // srt + } else if (what && what === 'srt') { + const port = getPort(config.source.network.srt.host); + + address = `srt://${host}${port}/?mode=caller&streamid=#!:m=request,r=${channelId}` + (config.source.network.srt.token.length !== 0 ? `,token=${config.source.network.srt.token}` : '') + '&transtype=live' + (config.source.network.srt.passphrase.length !== 0 ? `&passphrase=${config.source.network.srt.passphrase}` : ''); + + // snapshot+memfs + } else if (what && what === 'snapshotMemFs') { + const port = getPort(config.source.network.hls.host); + + address = (config.http.secure === true ? 'https://' : 'http://') + `${host}${port}/memfs/${channelId}.jpg`; + + // hls+memfs + } else { + const port = getPort(config.source.network.hls.host); + + address = (config.http.secure === true ? 'https://' : 'http://') + `${host}${port}/memfs/${channelId}.m3u8`; + } + + return [address]; + } + // Channels async _discoverChannels() { @@ -1482,19 +1533,22 @@ class Restreamer { }); } + // set hls storage endpoint + let hlsStore = 'memfs'; + const output = { id: 'output_0', - address: `{memfs}/${channel.channelid}.m3u8`, + address: `{` + hlsStore + `}/${channel.channelid}.m3u8`, options: ['-dn', '-sn', ...outputs[0].options.map((o) => '' + o)], cleanup: [ { - pattern: control.hls.version >= 7 ? `memfs:/${channel.channelid}_*.mp4` : `memfs:/${channel.channelid}_*.ts`, + pattern: control.hls.version >= 7 ? hlsStore + `:/${channel.channelid}_*.mp4` : hlsStore + `:/${channel.channelid}_*.ts`, max_files: parseInt(control.hls.listSize) + 6, max_file_age_seconds: control.hls.cleanup ? parseInt(control.hls.segmentDuration) * (parseInt(control.hls.listSize) + 6) : 0, purge_on_delete: true, }, { - pattern: `memfs:/${channel.channelid}.m3u8`, + pattern: hlsStore + `:/${channel.channelid}.m3u8`, max_file_age_seconds: control.hls.cleanup ? parseInt(control.hls.segmentDuration) * (parseInt(control.hls.listSize) + 6) : 0, purge_on_delete: true, }, @@ -1507,6 +1561,30 @@ class Restreamer { output.options.push(...metadata_options); + // fetch core config + const core_config = this.ConfigActive(); + + // fetch rtmp settings + const rtmp_config = core_config.source.network.rtmp; + let rtmp_enabled = false; + if (control.rtmp && control.rtmp.enable && rtmp_config.enabled) { + rtmp_enabled = true; + } + + // fetch srt settings + const srt_config = core_config.source.network.srt; + let srt_enabled = false; + if (control.srt && control.srt.enable && srt_config.enabled) { + srt_enabled = true; + } + + // 'tee_muxer' is required for the delivery of one output to multiple endpoints without processing the input for each output + // http://ffmpeg.org/ffmpeg-all.html#tee-1 + let tee_muxer = false; + if (rtmp_enabled || srt_enabled) { + tee_muxer = true; + } + // Manifest versions // https://developer.apple.com/documentation/http_live_streaming/about_the_ext-x-version_tag // https://ffmpeg.org/ffmpeg-all.html#Options-53 @@ -1544,16 +1622,22 @@ class Restreamer { ['hls_list_size', '' + parseInt(control.hls.listSize)], ['hls_flags', 'append_list+delete_segments+program_date_time+independent_segments'], ['hls_delete_threshold', '4'], - ['hls_segment_filename', `{memfs}/${channel.channelid}_%04d.ts`], - ['segment_format_options', 'mpegts_flags=mpegts_copyts=1'], - ['max_muxing_queue_size', '400'], + ['hls_segment_filename', tee_muxer ? `{` + hlsStore + `^:}/${channel.channelid}_%04d.ts` : `{` + hlsStore + `}/${channel.channelid}_%04d.ts`], ['method', 'PUT'], ]; case 7: // fix Malformed AAC bitstream detected for hls version 7 - if (control.hls.version === 7 && output.options.includes('-codec:a') && output.options.includes('copy')) { + if (output.options.includes('-codec:a') && output.options.includes('copy')) { output.options.push('-bsf:a', 'aac_adtstoasc'); } + // mp4 manifest cleanup + output.cleanup.push( + { + pattern: hlsStore + `:/${channel.channelid}.mp4`, + max_file_age_seconds: control.hls.cleanup ? parseInt(control.hls.segmentDuration) * (parseInt(control.hls.listSize) + 6) : 0, + purge_on_delete: true, + } + ) return [ ['f', 'hls'], ['start_number', '0'], @@ -1562,10 +1646,8 @@ class Restreamer { ['hls_flags', 'append_list+delete_segments+program_date_time+independent_segments'], ['hls_delete_threshold', '4'], ['hls_segment_type', 'fmp4'], - ['hls_fmp4_init_filename', `${channel.channelid}_init.mp4`], - ['hls_segment_filename', `{memfs}/${channel.channelid}_%04d.mp4`], - ['segment_format_options', 'mpegts_flags=mpegts_copyts=1'], - ['max_muxing_queue_size', '400'], + ['hls_fmp4_init_filename', `${channel.channelid}.mp4`], + ['hls_segment_filename', tee_muxer ? `{` + hlsStore + `^:}/${channel.channelid}_%04d.mp4` : `{` + hlsStore + `}/${channel.channelid}_%04d.mp4`], ['method', 'PUT'], ]; // case 3 @@ -1577,9 +1659,7 @@ class Restreamer { ['hls_list_size', '' + parseInt(control.hls.listSize)], ['hls_flags', 'append_list+delete_segments+program_date_time'], ['hls_delete_threshold', '4'], - ['hls_segment_filename', `{memfs}/${channel.channelid}_%04d.ts`], - ['segment_format_options', 'mpegts_flags=mpegts_copyts=1'], - ['max_muxing_queue_size', '400'], + ['hls_segment_filename', tee_muxer ? `{` + hlsStore + `^:}/${channel.channelid}_%04d.ts` : `{` + hlsStore + `}/${channel.channelid}_%04d.ts`], ['method', 'PUT'], ]; } @@ -1587,28 +1667,48 @@ class Restreamer { }; const hls_params_raw = getHLSParams(control.hls.lhls, control.hls.version); - // 'tee_muxer' is required for the delivery of one output to multiple endpoints without processing the input for each output - // http://ffmpeg.org/ffmpeg-all.html#tee-1 - const tee_muxer = false; + // push -y + proc.options.push('-y'); + + // build tee_muxer rtmp endpoint + let rtmp_params = ''; + if (rtmp_enabled) { + rtmp_params = '|[f=flv]rtmp' + (rtmp_config.secure ? 's' : '') + '://' + rtmp_config.local + rtmp_config.app + `/${channel.channelid}.stream`; + if (rtmp_config.token.length !== 0) { + rtmp_params += '?token=' + encodeURIComponent(rtmp_config.token); + } + } + + // build tee_muxer srt endpoint + let srt_params = ''; + if (srt_enabled) { + // mode=caller&streamid=#\!:m=publish,r=12345,token=foobarfoobar&passphrase=foobarfoobar&transtype=live + srt_params = '|[f=mpegts]srt://' + srt_config.local + `?mode=caller&streamid=#!:m=publish,r=${channel.channelid}`; + if (srt_config.token.length !== 0) { + srt_params += ',token=' + encodeURIComponent(srt_config.token); + } + if (srt_config.passphrase.length !== 0) { + srt_params += '&passphrase=' + encodeURIComponent(srt_config.passphrase); + } + srt_params += '&transtype=live' + } // Returns the l/hls parameters with or without tee_muxer if (tee_muxer) { // f=hls:start_number=0... const hls_params = hls_params_raw .filter((o) => { - if (o[0] === 'segment_format_options' || o[0] === 'max_muxing_queue_size') { - return false; - } - - return true; + // unsupported in tee_muxer + return !(o[0] === 'segment_format_options' || o[0] === 'max_muxing_queue_size'); }) .map((o) => o[0] + '=' + o[1]) .join(':'); - output.options.push('-tag:v', '7', '-tag:a', '10', '-f', 'tee'); + output.options.push('-flags', '+global_header', '-tag:v', '7', '-tag:a', '10', '-f', 'tee'); // WARN: It is a magic function. Returns 'Invalid process config' and the process.id is lost (Core v16.8.0) <= this is not the case anymore with the latest dev branch // ['f=hls:start_number=0...]address.m3u8 - output.address = `[` + hls_params + `]{memfs}/${channel.channelid}.m3u8`; + + output.address = `[` + hls_params + `]{` + hlsStore + `}/${channel.channelid}.m3u8` + rtmp_params + srt_params; } else { // ['-f', 'hls', '-start_number', '0', ...] // adding the '-' in front of the first option, then flatten everything diff --git a/src/views/Edit/index.js b/src/views/Edit/index.js index 430a3c0..a024a45 100644 --- a/src/views/Edit/index.js +++ b/src/views/Edit/index.js @@ -31,7 +31,9 @@ import PaperThumb from '../../misc/PaperThumb'; import ProcessControl from '../../misc/controls/Process'; import Profile from './Profile'; import ProfileSummary from './ProfileSummary'; +import RTMPControl from '../../misc/controls/RTMP'; import SnapshotControl from '../../misc/controls/Snapshot'; +import SRTControl from '../../misc/controls/SRT'; import TabPanel from '../../misc/TabPanel'; import TabsVerticalGrid from '../../misc/TabsVerticalGrid'; @@ -477,6 +479,28 @@ export default function Edit(props) { + + + RTMP + + + + + + + + + + + SRT + + + + + + + + Snapshot diff --git a/src/views/Main/index.js b/src/views/Main/index.js index 99d5001..8a37aea 100644 --- a/src/views/Main/index.js +++ b/src/views/Main/index.js @@ -365,10 +365,18 @@ export default function Main(props) { Content URL - + HLS - + {/* Todo: Check availability */} + + RTMP + + {/* Todo: Check availability */} + + SRT + + Snapshot