Add stream distribution across multiple internal servers

The core provides an internal RTMP and SRT server. Pulling in a stream
results by default in a HLS output. Now, this stream can also be pushed
additionaly to the internal RTMP and SRT servers, given they are enabled.
This results in a lower latency when you play the stream from one of
those servers.

It was necessary to modify the RTMP configuration a bit, such that there's
always a non-TLS RTMP server available for internal use. If you enable
RTMPS, it will require now that there's also a RTMP server running. Both need
to run on different ports. Please check your RTMP settings after the update.
The RTMPS server will run on port 1936 by default. The RTMP server will run
on port 1935 by default.

The UI requires now core version 16.9.0
This commit is contained in:
Ingo Oppermann 2022-07-07 14:01:31 +02:00
parent 0af3a66687
commit 92ec93dff0
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
8 changed files with 378 additions and 81 deletions

55
src/misc/controls/RTMP.js Normal file
View File

@ -0,0 +1,55 @@
import React from 'react';
import { Trans } from '@lingui/macro';
import Grid from '@mui/material/Grid';
import Typography from '@mui/material/Typography';
import Checkbox from '../Checkbox';
function init(settings) {
const initSettings = {
enable: false,
...settings,
};
return initSettings;
}
export default function Control(props) {
const settings = init(props.settings);
// Set the defaults
React.useEffect(() => {
props.onChange(settings, true);
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
const handleChange = (what) => (event) => {
const value = event.target.value;
if (['enable'].includes(what)) {
settings[what] = !settings[what];
} else {
settings[what] = value;
}
props.onChange(settings, false);
};
return (
<Grid container spacing={2}>
<Grid item xs={12}>
{/* Todo: Check availability with props.enabled */}
<Checkbox label={<Trans>Enable</Trans>} checked={settings.enable} onChange={handleChange('enable')} />
<Typography variant="caption">
<Trans>Make the channel available as an RTMP stream.</Trans>
</Typography>
</Grid>
</Grid>
);
}
Control.defaulProps = {
settings: {},
enabled: false,
onChange: function (settings, automatic) {},
};

55
src/misc/controls/SRT.js Normal file
View File

@ -0,0 +1,55 @@
import React from 'react';
import { Trans } from '@lingui/macro';
import Grid from '@mui/material/Grid';
import Typography from '@mui/material/Typography';
import Checkbox from '../Checkbox';
function init(settings) {
const initSettings = {
enable: false,
...settings,
};
return initSettings;
}
export default function Control(props) {
const settings = init(props.settings);
// Set the defaults
React.useEffect(() => {
props.onChange(settings, true);
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
const handleChange = (what) => (event) => {
const value = event.target.value;
if (['enable'].includes(what)) {
settings[what] = !settings[what];
} else {
settings[what] = value;
}
props.onChange(settings, false);
};
return (
<Grid container spacing={2}>
<Grid item xs={12}>
{/* Todo: Check availability with props.enabled */}
<Checkbox label={<Trans>Enable</Trans>} checked={settings.enable} onChange={handleChange('enable')} />
<Typography variant="caption">
<Trans>Make the channel available as an SRT stream.</Trans>
</Typography>
</Grid>
</Grid>
);
}
Control.defaulProps = {
settings: {},
enabled: false,
onChange: function (settings, automatic) {},
};

View File

@ -249,6 +249,12 @@ const defaultIngestMetadata = {
segmentDuration: 2,
listSize: 6,
},
rtmp: {
enable: false,
},
srt: {
enable: false,
},
process: {
autostart: true,
reconnect: true,

View File

@ -922,6 +922,71 @@ class Restreamer {
return [address];
}
// Get all RTMP/SRT/SNAPSHOT+MEMFS/HLS+MEMFS addresses
GetAddresses(what, channelId) {
const config = this.ConfigActive();
const host = new URL(this.Address()).hostname;
let address = '';
function getPort(servicePort) {
let port = servicePort.split(/:([0-9]+)$/)[1];
if (port && !port.includes(':')) {
port = `:${port}`;
}
if (port) {
return port;
} else {
return '';
}
}
// rtmp/s
if (what && what === 'rtmp') {
const port = getPort(config.source.network.rtmp.host);
if (config.source.network.rtmp.secure) {
address =
`rtmps://${host}${port}/` +
(config.source.network.rtmp.app.length !== 0 ? config.source.network.rtmp.app : '') +
channelId +
'.stream' +
(config.source.network.rtmp.token.length !== 0 ? `?token=${config.source.network.rtmp.token}` : '');
} else {
address =
`rtmp://${host}${port}/` +
(config.source.network.rtmp.app.length !== 0 ? config.source.network.rtmp.app : '') +
channelId +
'.stream' +
(config.source.network.rtmp.token.length !== 0 ? `?token=${config.source.network.rtmp.token}` : '');
}
// srt
} else if (what && what === 'srt') {
const port = getPort(config.source.network.srt.host);
address =
`srt://${host}${port}/?mode=caller&streamid=#!:m=request,r=${channelId}` +
(config.source.network.srt.token.length !== 0 ? `,token=${config.source.network.srt.token}` : '') +
'&transtype=live' +
(config.source.network.srt.passphrase.length !== 0 ? `&passphrase=${config.source.network.srt.passphrase}` : '');
// snapshot+memfs
} else if (what && what === 'snapshotMemFs') {
const port = getPort(config.source.network.hls.host);
address = (config.http.secure === true ? 'https://' : 'http://') + `${host}${port}/memfs/${channelId}.jpg`;
// hls+memfs
} else {
const port = getPort(config.source.network.hls.host);
address = (config.http.secure === true ? 'https://' : 'http://') + `${host}${port}/memfs/${channelId}.m3u8`;
}
return [address];
}
// Channels
async _discoverChannels() {
@ -1482,19 +1547,22 @@ class Restreamer {
});
}
// set hls storage endpoint
let hlsStore = 'memfs';
const output = {
id: 'output_0',
address: `{memfs}/${channel.channelid}.m3u8`,
address: `{` + hlsStore + `}/${channel.channelid}.m3u8`,
options: ['-dn', '-sn', ...outputs[0].options.map((o) => '' + o)],
cleanup: [
{
pattern: control.hls.version >= 7 ? `memfs:/${channel.channelid}_*.mp4` : `memfs:/${channel.channelid}_*.ts`,
pattern: control.hls.version >= 7 ? hlsStore + `:/${channel.channelid}_*.mp4` : hlsStore + `:/${channel.channelid}_*.ts`,
max_files: parseInt(control.hls.listSize) + 6,
max_file_age_seconds: control.hls.cleanup ? parseInt(control.hls.segmentDuration) * (parseInt(control.hls.listSize) + 6) : 0,
purge_on_delete: true,
},
{
pattern: `memfs:/${channel.channelid}.m3u8`,
pattern: hlsStore + `:/${channel.channelid}.m3u8`,
max_file_age_seconds: control.hls.cleanup ? parseInt(control.hls.segmentDuration) * (parseInt(control.hls.listSize) + 6) : 0,
purge_on_delete: true,
},
@ -1507,6 +1575,30 @@ class Restreamer {
output.options.push(...metadata_options);
// fetch core config
const core_config = this.ConfigActive();
// fetch rtmp settings
const rtmp_config = core_config.source.network.rtmp;
let rtmp_enabled = false;
if (control.rtmp && control.rtmp.enable && rtmp_config.enabled) {
rtmp_enabled = true;
}
// fetch srt settings
const srt_config = core_config.source.network.srt;
let srt_enabled = false;
if (control.srt && control.srt.enable && srt_config.enabled) {
srt_enabled = true;
}
// 'tee_muxer' is required for the delivery of one output to multiple endpoints without processing the input for each output
// http://ffmpeg.org/ffmpeg-all.html#tee-1
let tee_muxer = false;
if (rtmp_enabled || srt_enabled) {
tee_muxer = true;
}
// Manifest versions
// https://developer.apple.com/documentation/http_live_streaming/about_the_ext-x-version_tag
// https://ffmpeg.org/ffmpeg-all.html#Options-53
@ -1544,16 +1636,23 @@ class Restreamer {
['hls_list_size', '' + parseInt(control.hls.listSize)],
['hls_flags', 'append_list+delete_segments+program_date_time+independent_segments'],
['hls_delete_threshold', '4'],
['hls_segment_filename', `{memfs}/${channel.channelid}_%04d.ts`],
['segment_format_options', 'mpegts_flags=mpegts_copyts=1'],
['max_muxing_queue_size', '400'],
[
'hls_segment_filename',
tee_muxer ? `{` + hlsStore + `^:}/${channel.channelid}_%04d.ts` : `{` + hlsStore + `}/${channel.channelid}_%04d.ts`,
],
['method', 'PUT'],
];
case 7:
// fix Malformed AAC bitstream detected for hls version 7
if (control.hls.version === 7 && output.options.includes('-codec:a') && output.options.includes('copy')) {
if (output.options.includes('-codec:a') && output.options.includes('copy')) {
output.options.push('-bsf:a', 'aac_adtstoasc');
}
// mp4 manifest cleanup
output.cleanup.push({
pattern: hlsStore + `:/${channel.channelid}.mp4`,
max_file_age_seconds: control.hls.cleanup ? parseInt(control.hls.segmentDuration) * (parseInt(control.hls.listSize) + 6) : 0,
purge_on_delete: true,
});
return [
['f', 'hls'],
['start_number', '0'],
@ -1562,10 +1661,11 @@ class Restreamer {
['hls_flags', 'append_list+delete_segments+program_date_time+independent_segments'],
['hls_delete_threshold', '4'],
['hls_segment_type', 'fmp4'],
['hls_fmp4_init_filename', `${channel.channelid}_init.mp4`],
['hls_segment_filename', `{memfs}/${channel.channelid}_%04d.mp4`],
['segment_format_options', 'mpegts_flags=mpegts_copyts=1'],
['max_muxing_queue_size', '400'],
['hls_fmp4_init_filename', `${channel.channelid}.mp4`],
[
'hls_segment_filename',
tee_muxer ? `{` + hlsStore + `^:}/${channel.channelid}_%04d.mp4` : `{` + hlsStore + `}/${channel.channelid}_%04d.mp4`,
],
['method', 'PUT'],
];
// case 3
@ -1577,9 +1677,10 @@ class Restreamer {
['hls_list_size', '' + parseInt(control.hls.listSize)],
['hls_flags', 'append_list+delete_segments+program_date_time'],
['hls_delete_threshold', '4'],
['hls_segment_filename', `{memfs}/${channel.channelid}_%04d.ts`],
['segment_format_options', 'mpegts_flags=mpegts_copyts=1'],
['max_muxing_queue_size', '400'],
[
'hls_segment_filename',
tee_muxer ? `{` + hlsStore + `^:}/${channel.channelid}_%04d.ts` : `{` + hlsStore + `}/${channel.channelid}_%04d.ts`,
],
['method', 'PUT'],
];
}
@ -1587,28 +1688,31 @@ class Restreamer {
};
const hls_params_raw = getHLSParams(control.hls.lhls, control.hls.version);
// 'tee_muxer' is required for the delivery of one output to multiple endpoints without processing the input for each output
// http://ffmpeg.org/ffmpeg-all.html#tee-1
const tee_muxer = false;
// push -y
proc.options.push('-y');
// Returns the l/hls parameters with or without tee_muxer
if (tee_muxer) {
// f=hls:start_number=0...
const hls_params = hls_params_raw
.filter((o) => {
if (o[0] === 'segment_format_options' || o[0] === 'max_muxing_queue_size') {
return false;
}
return true;
// unsupported in tee_muxer
return !(o[0] === 'segment_format_options' || o[0] === 'max_muxing_queue_size');
})
.map((o) => o[0] + '=' + o[1])
.join(':');
output.options.push('-tag:v', '7', '-tag:a', '10', '-f', 'tee');
// WARN: It is a magic function. Returns 'Invalid process config' and the process.id is lost (Core v16.8.0) <= this is not the case anymore with the latest dev branch
output.options.push('-flags', '+global_header', '-tag:v', '7', '-tag:a', '10', '-f', 'tee');
// ['f=hls:start_number=0...]address.m3u8
output.address = `[` + hls_params + `]{memfs}/${channel.channelid}.m3u8`;
// use tee_muxer formatting
output.address =
`[` +
hls_params +
`]{` +
hlsStore +
`}/${channel.channelid}.m3u8` +
(rtmp_enabled ? `|[f=flv]{rtmp,name=${channel.channelid}.stream}` : '') +
(srt_enabled ? `|[f=mpegts]{srt,name=${channel.channelid},mode=publish}` : '');
} else {
// ['-f', 'hls', '-start_number', '0', ...]
// adding the '-' in front of the first option, then flatten everything

View File

@ -1,6 +1,6 @@
import { name, version, bundle } from '../package.json';
const Core = '^15.0.0 || ^16.0.0';
const Core = '^16.9.0';
const FFmpeg = '^4.1.0 || ^5.0.0';
const UI = bundle ? bundle : name + ' v' + version;

View File

@ -31,7 +31,9 @@ import PaperThumb from '../../misc/PaperThumb';
import ProcessControl from '../../misc/controls/Process';
import Profile from './Profile';
import ProfileSummary from './ProfileSummary';
import RTMPControl from '../../misc/controls/RTMP';
import SnapshotControl from '../../misc/controls/Snapshot';
import SRTControl from '../../misc/controls/SRT';
import TabPanel from '../../misc/TabPanel';
import TabsVerticalGrid from '../../misc/TabsVerticalGrid';
@ -477,6 +479,36 @@ export default function Edit(props) {
<Grid item xs={12}>
<Divider />
</Grid>
<Grid item xs={12}>
<Typography variant="h3">
<Trans>RTMP</Trans>
</Typography>
</Grid>
<Grid item xs={12}>
<RTMPControl
settings={$data.control.rtmp}
enabled={$config.source.network.rtmp.enabled}
onChange={handleControlChange('rtmp')}
/>
</Grid>
<Grid item xs={12}>
<Divider />
</Grid>
<Grid item xs={12}>
<Typography variant="h3">
<Trans>SRT</Trans>
</Typography>
</Grid>
<Grid item xs={12}>
<SRTControl
settings={$data.control.srt}
enabled={$config.source.network.srt.enabled}
onChange={handleControlChange('srt')}
/>
</Grid>
<Grid item xs={12}>
<Divider />
</Grid>
<Grid item xs={12}>
<Typography variant="h3">
<Trans>Snapshot</Trans>

View File

@ -10,6 +10,7 @@ import Stack from '@mui/material/Stack';
import Typography from '@mui/material/Typography';
import WarningIcon from '@mui/icons-material/Warning';
import * as M from '../../utils/metadata';
import useInterval from '../../hooks/useInterval';
import ActionButton from '../../misc/ActionButton';
import CopyButton from '../../misc/CopyButton';
@ -65,6 +66,7 @@ export default function Main(props) {
state: 'disconnected',
onConnect: null,
});
const [$metadata, setMetadata] = React.useState(M.getDefaultIngestMetadata());
const [$processDetails, setProcessDetails] = React.useState({
open: false,
data: {
@ -87,11 +89,24 @@ export default function Main(props) {
React.useEffect(() => {
(async () => {
await load();
await update();
})();
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
const load = async () => {
let metadata = await props.restreamer.GetIngestMetadata(_channelid);
if (metadata.version && metadata.version === 1) {
setMetadata({
...$metadata,
...metadata,
});
}
await update();
};
const update = async () => {
const channelid = props.restreamer.SelectChannel(_channelid);
if (channelid === '' || channelid !== _channelid) {
@ -365,10 +380,40 @@ export default function Main(props) {
<Trans>Content URL</Trans>
</Typography>
<Stack direction="row" justifyContent="flex-end" alignItems="center" spacing={0.5}>
<CopyButton variant="outlined" color="default" size="small" value={address + manifest}>
<CopyButton
variant="outlined"
color="default"
size="small"
value={props.restreamer.GetAddresses('hlsMemFs', _channelid)}
>
<Trans>HLS</Trans>
</CopyButton>
<CopyButton variant="outlined" color="default" size="small" value={address + poster}>
{$metadata.control.rtmp.enable && (
<CopyButton
variant="outlined"
color="default"
size="small"
value={props.restreamer.GetAddresses('rtmp', _channelid)}
>
<Trans>RTMP</Trans>
</CopyButton>
)}
{$metadata.control.srt.enable && (
<CopyButton
variant="outlined"
color="default"
size="small"
value={props.restreamer.GetAddresses('srt', _channelid)}
>
<Trans>SRT</Trans>
</CopyButton>
)}
<CopyButton
variant="outlined"
color="default"
size="small"
value={props.restreamer.GetAddresses('snapshotMemFs', _channelid)}
>
<Trans>Snapshot</Trans>
</CopyButton>
</Stack>

View File

@ -767,7 +767,12 @@ export default function Settings(props) {
config.address = config.address.split(':').join('');
config.tls.address = config.tls.address.split(':').join('');
config.rtmp.address = config.rtmp.address.split(':').join('');
config.rtmp.address_tls = config.rtmp.address_tls.split(':').join('');
// fix: Cannot read properties of undefined
if (config.rtmp.address_tls) {
config.rtmp.address_tls = config.rtmp.address_tls.split(':').join('');
} else {
config.rtmp.address_tls = '1936';
}
config.srt.address = config.srt.address.split(':').join('');
if (config.tls.auto === true) {
@ -1836,53 +1841,6 @@ export default function Settings(props) {
/>{' '}
{env('rtmp.enable') && <Env style={{ marginRight: '2em' }} />}
<ErrorBox configvalue="rtmp.enable" messages={$tabs.rtmp.messages} />
</Grid>
<Grid item xs={12}>
<Divider />
</Grid>
<Grid item xs={6} md={4}>
<TextField
label={<Trans>Port</Trans>}
env={env('rtmp.address')}
disabled={env('rtmp.address') || (!config.rtmp.enable && !config.rtmp.enable_tls)}
value={config.rtmp.address}
onChange={handleChange('rtmp.address')}
/>
<ErrorBox configvalue="rtmp.address" messages={$tabs.rtmp.messages} />
<Typography variant="caption">
<Trans>RTMP server listen address.</Trans>
</Typography>
</Grid>
<Grid item xs={6} md={8}>
<TextField
label={<Trans>App</Trans>}
env={env('rtmp.app')}
disabled={env('rtmp.app') || (!config.rtmp.enable && !config.rtmp.enable_tls)}
value={config.rtmp.app}
onChange={handleChange('rtmp.app')}
/>
<ErrorBox configvalue="rtmp.app" messages={$tabs.rtmp.messages} />
<Typography variant="caption">
<Trans>RTMP app for publishing.</Trans>
</Typography>
</Grid>
<Grid item xs={12}>
<Password
label={<Trans>Token</Trans>}
env={env('rtmp.token')}
disabled={env('rtmp.token') || (!config.rtmp.enable && !config.rtmp.enable_tls)}
value={config.rtmp.token}
onChange={handleChange('rtmp.token')}
/>
<ErrorBox configvalue="rtmp.token" messages={$tabs.rtmp.messages} />
<Typography variant="caption">
<Trans>RTMP token for publishing and playing. The token is the value of the URL query parameter 'token.'</Trans>
</Typography>
</Grid>
<Grid item xs={12}>
<Divider />
</Grid>
<Grid item xs={12}>
<Checkbox
label={<Trans>RTMPS server</Trans>}
checked={config.rtmp.enable_tls}
@ -1896,9 +1854,9 @@ export default function Settings(props) {
<Trans>Requires activation</Trans>{' '}
<Link
color="secondary"
href="#/settings/auth"
href="#/settings/network"
onClick={() => {
setTab('auth');
setTab('network');
}}
>
TLS/HTTPS
@ -1907,11 +1865,27 @@ export default function Settings(props) {
</Typography>
)}
</Grid>
<Grid item xs={6} md={4}>
<Grid item xs={12}>
<Divider />
</Grid>
<Grid item xs={6} md={3}>
<TextField
label={<Trans>Port</Trans>}
label={<Trans>RTMP Port</Trans>}
env={env('rtmp.address')}
disabled={env('rtmp.address') || (!config.rtmp.enable && !config.rtmp.enable_tls)}
value={config.rtmp.address}
onChange={handleChange('rtmp.address')}
/>
<ErrorBox configvalue="rtmp.address" messages={$tabs.rtmp.messages} />
<Typography variant="caption">
<Trans>RTMP server listen address.</Trans>
</Typography>
</Grid>
<Grid item xs={6} md={3}>
<TextField
label={<Trans>RTMPS Port</Trans>}
env={env('rtmp.address_tls')}
disabled={env('rtmp.address_tls') || (!config.rtmp.enable && !config.rtmp.enable_tls)}
disabled={env('rtmp.address_tls') || (!config.rtmp.enable_tls) || (!config.tls.auto)}
value={config.rtmp.address_tls}
onChange={handleChange('rtmp.address_tls')}
/>
@ -1920,6 +1894,32 @@ export default function Settings(props) {
<Trans>RTMPS server listen address.</Trans>
</Typography>
</Grid>
<Grid item xs={12} md={6}>
<TextField
label={<Trans>App</Trans>}
env={env('rtmp.app')}
disabled={env('rtmp.app') || (!config.rtmp.enable)}
value={config.rtmp.app}
onChange={handleChange('rtmp.app')}
/>
<ErrorBox configvalue="rtmp.app" messages={$tabs.rtmp.messages} />
<Typography variant="caption">
<Trans>RTMP app for publishing.</Trans>
</Typography>
</Grid>
<Grid item xs={12}>
<Password
label={<Trans>Token</Trans>}
env={env('rtmp.token')}
disabled={env('rtmp.token') || (!config.rtmp.enable)}
value={config.rtmp.token}
onChange={handleChange('rtmp.token')}
/>
<ErrorBox configvalue="rtmp.token" messages={$tabs.rtmp.messages} />
<Typography variant="caption">
<Trans>RTMP token for publishing and playing. The token is the value of the URL query parameter 'token.'</Trans>
</Typography>
</Grid>
</Grid>
</TabPanel>
<TabPanel value={$tab} index="srt" className="panel">