From 92ec93dff03e8bc68b2cb486f1bc4c8ebe94909e Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Thu, 7 Jul 2022 14:01:31 +0200 Subject: [PATCH] Add stream distribution across multiple internal servers The core provides an internal RTMP and SRT server. Pulling in a stream results by default in a HLS output. Now, this stream can also be pushed additionaly to the internal RTMP and SRT servers, given they are enabled. This results in a lower latency when you play the stream from one of those servers. It was necessary to modify the RTMP configuration a bit, such that there's always a non-TLS RTMP server available for internal use. If you enable RTMPS, it will require now that there's also a RTMP server running. Both need to run on different ports. Please check your RTMP settings after the update. The RTMPS server will run on port 1936 by default. The RTMP server will run on port 1935 by default. The UI requires now core version 16.9.0 --- src/misc/controls/RTMP.js | 55 ++++++++++++++ src/misc/controls/SRT.js | 55 ++++++++++++++ src/utils/metadata.js | 6 ++ src/utils/restreamer.js | 154 +++++++++++++++++++++++++++++++------- src/version.js | 2 +- src/views/Edit/index.js | 32 ++++++++ src/views/Main/index.js | 49 +++++++++++- src/views/Settings.js | 106 +++++++++++++------------- 8 files changed, 378 insertions(+), 81 deletions(-) create mode 100644 src/misc/controls/RTMP.js create mode 100644 src/misc/controls/SRT.js diff --git a/src/misc/controls/RTMP.js b/src/misc/controls/RTMP.js new file mode 100644 index 0000000..22aaf1f --- /dev/null +++ b/src/misc/controls/RTMP.js @@ -0,0 +1,55 @@ +import React from 'react'; + +import { Trans } from '@lingui/macro'; +import Grid from '@mui/material/Grid'; +import Typography from '@mui/material/Typography'; + +import Checkbox from '../Checkbox'; + +function init(settings) { + const initSettings = { + enable: false, + ...settings, + }; + + return initSettings; +} + +export default function Control(props) { + const settings = init(props.settings); + + // Set the defaults + React.useEffect(() => { + props.onChange(settings, true); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []); + + const handleChange = (what) => (event) => { + const value = event.target.value; + + if (['enable'].includes(what)) { + settings[what] = !settings[what]; + } else { + settings[what] = value; + } + + props.onChange(settings, false); + }; + return ( + + + {/* Todo: Check availability with props.enabled */} + Enable} checked={settings.enable} onChange={handleChange('enable')} /> + + Make the channel available as an RTMP stream. + + + + ); +} + +Control.defaulProps = { + settings: {}, + enabled: false, + onChange: function (settings, automatic) {}, +}; diff --git a/src/misc/controls/SRT.js b/src/misc/controls/SRT.js new file mode 100644 index 0000000..ace64a3 --- /dev/null +++ b/src/misc/controls/SRT.js @@ -0,0 +1,55 @@ +import React from 'react'; + +import { Trans } from '@lingui/macro'; +import Grid from '@mui/material/Grid'; +import Typography from '@mui/material/Typography'; + +import Checkbox from '../Checkbox'; + +function init(settings) { + const initSettings = { + enable: false, + ...settings, + }; + + return initSettings; +} + +export default function Control(props) { + const settings = init(props.settings); + + // Set the defaults + React.useEffect(() => { + props.onChange(settings, true); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []); + + const handleChange = (what) => (event) => { + const value = event.target.value; + + if (['enable'].includes(what)) { + settings[what] = !settings[what]; + } else { + settings[what] = value; + } + + props.onChange(settings, false); + }; + return ( + + + {/* Todo: Check availability with props.enabled */} + Enable} checked={settings.enable} onChange={handleChange('enable')} /> + + Make the channel available as an SRT stream. + + + + ); +} + +Control.defaulProps = { + settings: {}, + enabled: false, + onChange: function (settings, automatic) {}, +}; diff --git a/src/utils/metadata.js b/src/utils/metadata.js index d8b1154..4f7fc93 100644 --- a/src/utils/metadata.js +++ b/src/utils/metadata.js @@ -249,6 +249,12 @@ const defaultIngestMetadata = { segmentDuration: 2, listSize: 6, }, + rtmp: { + enable: false, + }, + srt: { + enable: false, + }, process: { autostart: true, reconnect: true, diff --git a/src/utils/restreamer.js b/src/utils/restreamer.js index ded7d11..8416054 100644 --- a/src/utils/restreamer.js +++ b/src/utils/restreamer.js @@ -922,6 +922,71 @@ class Restreamer { return [address]; } + // Get all RTMP/SRT/SNAPSHOT+MEMFS/HLS+MEMFS addresses + GetAddresses(what, channelId) { + const config = this.ConfigActive(); + const host = new URL(this.Address()).hostname; + + let address = ''; + + function getPort(servicePort) { + let port = servicePort.split(/:([0-9]+)$/)[1]; + if (port && !port.includes(':')) { + port = `:${port}`; + } + if (port) { + return port; + } else { + return ''; + } + } + + // rtmp/s + if (what && what === 'rtmp') { + const port = getPort(config.source.network.rtmp.host); + + if (config.source.network.rtmp.secure) { + address = + `rtmps://${host}${port}/` + + (config.source.network.rtmp.app.length !== 0 ? config.source.network.rtmp.app : '') + + channelId + + '.stream' + + (config.source.network.rtmp.token.length !== 0 ? `?token=${config.source.network.rtmp.token}` : ''); + } else { + address = + `rtmp://${host}${port}/` + + (config.source.network.rtmp.app.length !== 0 ? config.source.network.rtmp.app : '') + + channelId + + '.stream' + + (config.source.network.rtmp.token.length !== 0 ? `?token=${config.source.network.rtmp.token}` : ''); + } + + // srt + } else if (what && what === 'srt') { + const port = getPort(config.source.network.srt.host); + + address = + `srt://${host}${port}/?mode=caller&streamid=#!:m=request,r=${channelId}` + + (config.source.network.srt.token.length !== 0 ? `,token=${config.source.network.srt.token}` : '') + + '&transtype=live' + + (config.source.network.srt.passphrase.length !== 0 ? `&passphrase=${config.source.network.srt.passphrase}` : ''); + + // snapshot+memfs + } else if (what && what === 'snapshotMemFs') { + const port = getPort(config.source.network.hls.host); + + address = (config.http.secure === true ? 'https://' : 'http://') + `${host}${port}/memfs/${channelId}.jpg`; + + // hls+memfs + } else { + const port = getPort(config.source.network.hls.host); + + address = (config.http.secure === true ? 'https://' : 'http://') + `${host}${port}/memfs/${channelId}.m3u8`; + } + + return [address]; + } + // Channels async _discoverChannels() { @@ -1482,19 +1547,22 @@ class Restreamer { }); } + // set hls storage endpoint + let hlsStore = 'memfs'; + const output = { id: 'output_0', - address: `{memfs}/${channel.channelid}.m3u8`, + address: `{` + hlsStore + `}/${channel.channelid}.m3u8`, options: ['-dn', '-sn', ...outputs[0].options.map((o) => '' + o)], cleanup: [ { - pattern: control.hls.version >= 7 ? `memfs:/${channel.channelid}_*.mp4` : `memfs:/${channel.channelid}_*.ts`, + pattern: control.hls.version >= 7 ? hlsStore + `:/${channel.channelid}_*.mp4` : hlsStore + `:/${channel.channelid}_*.ts`, max_files: parseInt(control.hls.listSize) + 6, max_file_age_seconds: control.hls.cleanup ? parseInt(control.hls.segmentDuration) * (parseInt(control.hls.listSize) + 6) : 0, purge_on_delete: true, }, { - pattern: `memfs:/${channel.channelid}.m3u8`, + pattern: hlsStore + `:/${channel.channelid}.m3u8`, max_file_age_seconds: control.hls.cleanup ? parseInt(control.hls.segmentDuration) * (parseInt(control.hls.listSize) + 6) : 0, purge_on_delete: true, }, @@ -1507,6 +1575,30 @@ class Restreamer { output.options.push(...metadata_options); + // fetch core config + const core_config = this.ConfigActive(); + + // fetch rtmp settings + const rtmp_config = core_config.source.network.rtmp; + let rtmp_enabled = false; + if (control.rtmp && control.rtmp.enable && rtmp_config.enabled) { + rtmp_enabled = true; + } + + // fetch srt settings + const srt_config = core_config.source.network.srt; + let srt_enabled = false; + if (control.srt && control.srt.enable && srt_config.enabled) { + srt_enabled = true; + } + + // 'tee_muxer' is required for the delivery of one output to multiple endpoints without processing the input for each output + // http://ffmpeg.org/ffmpeg-all.html#tee-1 + let tee_muxer = false; + if (rtmp_enabled || srt_enabled) { + tee_muxer = true; + } + // Manifest versions // https://developer.apple.com/documentation/http_live_streaming/about_the_ext-x-version_tag // https://ffmpeg.org/ffmpeg-all.html#Options-53 @@ -1544,16 +1636,23 @@ class Restreamer { ['hls_list_size', '' + parseInt(control.hls.listSize)], ['hls_flags', 'append_list+delete_segments+program_date_time+independent_segments'], ['hls_delete_threshold', '4'], - ['hls_segment_filename', `{memfs}/${channel.channelid}_%04d.ts`], - ['segment_format_options', 'mpegts_flags=mpegts_copyts=1'], - ['max_muxing_queue_size', '400'], + [ + 'hls_segment_filename', + tee_muxer ? `{` + hlsStore + `^:}/${channel.channelid}_%04d.ts` : `{` + hlsStore + `}/${channel.channelid}_%04d.ts`, + ], ['method', 'PUT'], ]; case 7: // fix Malformed AAC bitstream detected for hls version 7 - if (control.hls.version === 7 && output.options.includes('-codec:a') && output.options.includes('copy')) { + if (output.options.includes('-codec:a') && output.options.includes('copy')) { output.options.push('-bsf:a', 'aac_adtstoasc'); } + // mp4 manifest cleanup + output.cleanup.push({ + pattern: hlsStore + `:/${channel.channelid}.mp4`, + max_file_age_seconds: control.hls.cleanup ? parseInt(control.hls.segmentDuration) * (parseInt(control.hls.listSize) + 6) : 0, + purge_on_delete: true, + }); return [ ['f', 'hls'], ['start_number', '0'], @@ -1562,10 +1661,11 @@ class Restreamer { ['hls_flags', 'append_list+delete_segments+program_date_time+independent_segments'], ['hls_delete_threshold', '4'], ['hls_segment_type', 'fmp4'], - ['hls_fmp4_init_filename', `${channel.channelid}_init.mp4`], - ['hls_segment_filename', `{memfs}/${channel.channelid}_%04d.mp4`], - ['segment_format_options', 'mpegts_flags=mpegts_copyts=1'], - ['max_muxing_queue_size', '400'], + ['hls_fmp4_init_filename', `${channel.channelid}.mp4`], + [ + 'hls_segment_filename', + tee_muxer ? `{` + hlsStore + `^:}/${channel.channelid}_%04d.mp4` : `{` + hlsStore + `}/${channel.channelid}_%04d.mp4`, + ], ['method', 'PUT'], ]; // case 3 @@ -1577,9 +1677,10 @@ class Restreamer { ['hls_list_size', '' + parseInt(control.hls.listSize)], ['hls_flags', 'append_list+delete_segments+program_date_time'], ['hls_delete_threshold', '4'], - ['hls_segment_filename', `{memfs}/${channel.channelid}_%04d.ts`], - ['segment_format_options', 'mpegts_flags=mpegts_copyts=1'], - ['max_muxing_queue_size', '400'], + [ + 'hls_segment_filename', + tee_muxer ? `{` + hlsStore + `^:}/${channel.channelid}_%04d.ts` : `{` + hlsStore + `}/${channel.channelid}_%04d.ts`, + ], ['method', 'PUT'], ]; } @@ -1587,28 +1688,31 @@ class Restreamer { }; const hls_params_raw = getHLSParams(control.hls.lhls, control.hls.version); - // 'tee_muxer' is required for the delivery of one output to multiple endpoints without processing the input for each output - // http://ffmpeg.org/ffmpeg-all.html#tee-1 - const tee_muxer = false; + // push -y + proc.options.push('-y'); // Returns the l/hls parameters with or without tee_muxer if (tee_muxer) { // f=hls:start_number=0... const hls_params = hls_params_raw .filter((o) => { - if (o[0] === 'segment_format_options' || o[0] === 'max_muxing_queue_size') { - return false; - } - - return true; + // unsupported in tee_muxer + return !(o[0] === 'segment_format_options' || o[0] === 'max_muxing_queue_size'); }) .map((o) => o[0] + '=' + o[1]) .join(':'); - output.options.push('-tag:v', '7', '-tag:a', '10', '-f', 'tee'); - // WARN: It is a magic function. Returns 'Invalid process config' and the process.id is lost (Core v16.8.0) <= this is not the case anymore with the latest dev branch + output.options.push('-flags', '+global_header', '-tag:v', '7', '-tag:a', '10', '-f', 'tee'); // ['f=hls:start_number=0...]address.m3u8 - output.address = `[` + hls_params + `]{memfs}/${channel.channelid}.m3u8`; + // use tee_muxer formatting + output.address = + `[` + + hls_params + + `]{` + + hlsStore + + `}/${channel.channelid}.m3u8` + + (rtmp_enabled ? `|[f=flv]{rtmp,name=${channel.channelid}.stream}` : '') + + (srt_enabled ? `|[f=mpegts]{srt,name=${channel.channelid},mode=publish}` : ''); } else { // ['-f', 'hls', '-start_number', '0', ...] // adding the '-' in front of the first option, then flatten everything diff --git a/src/version.js b/src/version.js index a72eac1..1dedd3d 100644 --- a/src/version.js +++ b/src/version.js @@ -1,6 +1,6 @@ import { name, version, bundle } from '../package.json'; -const Core = '^15.0.0 || ^16.0.0'; +const Core = '^16.9.0'; const FFmpeg = '^4.1.0 || ^5.0.0'; const UI = bundle ? bundle : name + ' v' + version; diff --git a/src/views/Edit/index.js b/src/views/Edit/index.js index 430a3c0..f1a5e30 100644 --- a/src/views/Edit/index.js +++ b/src/views/Edit/index.js @@ -31,7 +31,9 @@ import PaperThumb from '../../misc/PaperThumb'; import ProcessControl from '../../misc/controls/Process'; import Profile from './Profile'; import ProfileSummary from './ProfileSummary'; +import RTMPControl from '../../misc/controls/RTMP'; import SnapshotControl from '../../misc/controls/Snapshot'; +import SRTControl from '../../misc/controls/SRT'; import TabPanel from '../../misc/TabPanel'; import TabsVerticalGrid from '../../misc/TabsVerticalGrid'; @@ -477,6 +479,36 @@ export default function Edit(props) { + + + RTMP + + + + + + + + + + + SRT + + + + + + + + Snapshot diff --git a/src/views/Main/index.js b/src/views/Main/index.js index 99d5001..c43ad83 100644 --- a/src/views/Main/index.js +++ b/src/views/Main/index.js @@ -10,6 +10,7 @@ import Stack from '@mui/material/Stack'; import Typography from '@mui/material/Typography'; import WarningIcon from '@mui/icons-material/Warning'; +import * as M from '../../utils/metadata'; import useInterval from '../../hooks/useInterval'; import ActionButton from '../../misc/ActionButton'; import CopyButton from '../../misc/CopyButton'; @@ -65,6 +66,7 @@ export default function Main(props) { state: 'disconnected', onConnect: null, }); + const [$metadata, setMetadata] = React.useState(M.getDefaultIngestMetadata()); const [$processDetails, setProcessDetails] = React.useState({ open: false, data: { @@ -87,11 +89,24 @@ export default function Main(props) { React.useEffect(() => { (async () => { + await load(); await update(); })(); // eslint-disable-next-line react-hooks/exhaustive-deps }, []); + const load = async () => { + let metadata = await props.restreamer.GetIngestMetadata(_channelid); + if (metadata.version && metadata.version === 1) { + setMetadata({ + ...$metadata, + ...metadata, + }); + } + + await update(); + }; + const update = async () => { const channelid = props.restreamer.SelectChannel(_channelid); if (channelid === '' || channelid !== _channelid) { @@ -365,10 +380,40 @@ export default function Main(props) { Content URL - + HLS - + {$metadata.control.rtmp.enable && ( + + RTMP + + )} + {$metadata.control.srt.enable && ( + + SRT + + )} + Snapshot diff --git a/src/views/Settings.js b/src/views/Settings.js index 54d7c44..e164cc0 100644 --- a/src/views/Settings.js +++ b/src/views/Settings.js @@ -767,7 +767,12 @@ export default function Settings(props) { config.address = config.address.split(':').join(''); config.tls.address = config.tls.address.split(':').join(''); config.rtmp.address = config.rtmp.address.split(':').join(''); - config.rtmp.address_tls = config.rtmp.address_tls.split(':').join(''); + // fix: Cannot read properties of undefined + if (config.rtmp.address_tls) { + config.rtmp.address_tls = config.rtmp.address_tls.split(':').join(''); + } else { + config.rtmp.address_tls = '1936'; + } config.srt.address = config.srt.address.split(':').join(''); if (config.tls.auto === true) { @@ -1836,53 +1841,6 @@ export default function Settings(props) { />{' '} {env('rtmp.enable') && } - - - - - - Port} - env={env('rtmp.address')} - disabled={env('rtmp.address') || (!config.rtmp.enable && !config.rtmp.enable_tls)} - value={config.rtmp.address} - onChange={handleChange('rtmp.address')} - /> - - - RTMP server listen address. - - - - App} - env={env('rtmp.app')} - disabled={env('rtmp.app') || (!config.rtmp.enable && !config.rtmp.enable_tls)} - value={config.rtmp.app} - onChange={handleChange('rtmp.app')} - /> - - - RTMP app for publishing. - - - - Token} - env={env('rtmp.token')} - disabled={env('rtmp.token') || (!config.rtmp.enable && !config.rtmp.enable_tls)} - value={config.rtmp.token} - onChange={handleChange('rtmp.token')} - /> - - - RTMP token for publishing and playing. The token is the value of the URL query parameter 'token.' - - - - - - RTMPS server} checked={config.rtmp.enable_tls} @@ -1896,9 +1854,9 @@ export default function Settings(props) { Requires activation{' '} { - setTab('auth'); + setTab('network'); }} > TLS/HTTPS @@ -1907,11 +1865,27 @@ export default function Settings(props) { )} - + + + + Port} + label={RTMP Port} + env={env('rtmp.address')} + disabled={env('rtmp.address') || (!config.rtmp.enable && !config.rtmp.enable_tls)} + value={config.rtmp.address} + onChange={handleChange('rtmp.address')} + /> + + + RTMP server listen address. + + + + RTMPS Port} env={env('rtmp.address_tls')} - disabled={env('rtmp.address_tls') || (!config.rtmp.enable && !config.rtmp.enable_tls)} + disabled={env('rtmp.address_tls') || (!config.rtmp.enable_tls) || (!config.tls.auto)} value={config.rtmp.address_tls} onChange={handleChange('rtmp.address_tls')} /> @@ -1920,6 +1894,32 @@ export default function Settings(props) { RTMPS server listen address. + + App} + env={env('rtmp.app')} + disabled={env('rtmp.app') || (!config.rtmp.enable)} + value={config.rtmp.app} + onChange={handleChange('rtmp.app')} + /> + + + RTMP app for publishing. + + + + Token} + env={env('rtmp.token')} + disabled={env('rtmp.token') || (!config.rtmp.enable)} + value={config.rtmp.token} + onChange={handleChange('rtmp.token')} + /> + + + RTMP token for publishing and playing. The token is the value of the URL query parameter 'token.' + +