- Create Dockerfile for Nginx with envsubst for dynamic configuration. - Add djmaster.conf.template for Nginx configuration with upstream services. - Implement docker-entrypoint.sh to substitute environment variables in the Nginx config. - Add README.md in nginx-examples for guidance on using the Nginx template. - Include djmaster.conf.template in nginx-examples for local setup. - Introduce utility functions for fetching YouTube video snippets and titles.
1266 lines
50 KiB
JavaScript
1266 lines
50 KiB
JavaScript
'use strict';
|
|
|
|
// Carga .env desde la raíz del proyecto (../../.env relativo a server/)
|
|
// No sobreescribe vars ya definidas en el entorno (docker-compose tiene prioridad).
|
|
const dotenvPath = require('path').resolve(__dirname, '../.env');
|
|
if (require('fs').existsSync(dotenvPath)) {
|
|
const lines = require('fs').readFileSync(dotenvPath, 'utf8').split('\n');
|
|
for (const line of lines) {
|
|
const trimmed = line.trim();
|
|
if (!trimmed || trimmed.startsWith('#')) continue;
|
|
const eqIdx = trimmed.indexOf('=');
|
|
if (eqIdx < 1) continue;
|
|
const key = trimmed.slice(0, eqIdx).trim();
|
|
const val = trimmed.slice(eqIdx + 1).trim().replace(/^"|"$/g, '').replace(/^'|'$/g, '');
|
|
if (!(key in process.env)) process.env[key] = val;
|
|
}
|
|
}
|
|
|
|
const express = require('express');
|
|
const cors = require('cors');
|
|
const path = require('path');
|
|
const fs = require('fs');
|
|
const https = require('https');
|
|
const http = require('http');
|
|
const crypto = require('crypto');
|
|
const { spawn } = require('child_process');
|
|
const WebSocket = require('ws');
|
|
const { WebSocketServer } = require('ws');
|
|
const { AccessToken, RoomServiceClient, IngressClient, IngressInput } = require('livekit-server-sdk');
|
|
|
|
const PORT = parseInt(process.env.FB_SERVER_PORT || '3002', 10);
|
|
const DATA_DIR = process.env.FB_DATA_DIR
|
|
? path.resolve(process.env.FB_DATA_DIR)
|
|
: path.resolve(__dirname, 'data');
|
|
const CFG_PATH = path.join(DATA_DIR, 'config.json');
|
|
|
|
const ENCRYPTION_SECRET = process.env.FB_ENCRYPTION_SECRET || 'restreamer-ui-fb-secret-key-32x!';
|
|
|
|
// ── LiveKit config ─────────────────────────────────────────────────────────────
|
|
const LK_API_KEY = process.env.LIVEKIT_API_KEY || '';
|
|
const LK_API_SECRET = process.env.LIVEKIT_API_SECRET || '';
|
|
const LK_WS_URL = process.env.LIVEKIT_WS_URL || '';
|
|
// HTTP URL para RoomServiceClient (wss:// → https://)
|
|
const LK_HTTP_URL = LK_WS_URL.replace(/^wss:\/\//, 'https://').replace(/^ws:\/\//, 'http://');
|
|
|
|
// ── RTMP relay config (relay → Restreamer Core) ───────────────────────────────
|
|
const RTMP_HOST = process.env.RTMP_HOST || '127.0.0.1';
|
|
const RTMP_PORT = process.env.RTMP_PORT || '1935';
|
|
const RTMP_APP = process.env.RTMP_APP || 'live';
|
|
|
|
// ── LiveKit Ingress config ─────────────────────────────────────────────────────
|
|
// URL interna del servicio livekit-ingress (solo accesible desde el servidor).
|
|
// OBS nunca la ve — la URL pública es https://<ui-domain>/w/<streamKey>
|
|
const LK_INGRESS_INTERNAL_URL = (process.env.LIVEKIT_INGRESS_INTERNAL_URL || '').replace(/\/+$/, '');
|
|
// URL pública del UI (para construir la WHIP URL que ve OBS).
|
|
// Si está vacío se auto-detecta del Host header en cada request.
|
|
const UI_BASE_URL = (process.env.UI_BASE_URL || '').replace(/\/+$/, '');
|
|
|
|
// ─────────────────────────────────────────────────────────────────────────────
|
|
// Schema unificado de config.json
|
|
// ─────────────────────────────────────────────────────────────────────────────
|
|
/**
|
|
* {
|
|
* "__fb_config": {
|
|
* "app_id": string,
|
|
* "app_secret": string ← AES-256-GCM encrypted
|
|
* },
|
|
* "__yt_config": {
|
|
* "client_id": string,
|
|
* "client_secret": string ← AES-256-GCM encrypted
|
|
* },
|
|
* "fb__<fb_user_id>": {
|
|
* "fb_user_id": string, // PK
|
|
* "name": string,
|
|
* "token_type": "USER",
|
|
* "access_token": string, // AES-256-GCM encrypted long-lived token (60 días)
|
|
* "expires_at": number, // Unix ms
|
|
* "scope_granted": string[],
|
|
* "pages": [{
|
|
* "id": string, // Page ID (PK)
|
|
* "name": string,
|
|
* "category": string,
|
|
* "token_type": "PAGE",
|
|
* "access_token": string, // AES-256-GCM encrypted long-lived page token
|
|
* "tasks": string[]
|
|
* }],
|
|
* "updated_at": number
|
|
* },
|
|
* "yt__<channel_id>": {
|
|
* "account_key": string, // PK = channel_id o "yt_<timestamp>"
|
|
* "label": string, // Nombre del canal
|
|
* "channel_title": string,
|
|
* "channel_id": string,
|
|
* "access_token": string, // AES-256-GCM encrypted
|
|
* "refresh_token": string, // AES-256-GCM encrypted
|
|
* "token_expiry": number, // Unix ms
|
|
* "updated_at": number
|
|
* }
|
|
* }
|
|
*/
|
|
|
|
// ── Encryption helpers (AES-256-GCM) ─────────────────────────────────────────
|
|
|
|
function deriveKey(secret) {
|
|
return crypto.createHash('sha256').update(secret).digest();
|
|
}
|
|
|
|
function encrypt(text) {
|
|
if (!text) return '';
|
|
try {
|
|
const key = deriveKey(ENCRYPTION_SECRET);
|
|
const iv = crypto.randomBytes(12);
|
|
const cipher = crypto.createCipheriv('aes-256-gcm', key, iv);
|
|
const enc = Buffer.concat([cipher.update(text, 'utf8'), cipher.final()]);
|
|
const tag = cipher.getAuthTag();
|
|
return iv.toString('hex') + ':' + tag.toString('hex') + ':' + enc.toString('hex');
|
|
} catch (_) {
|
|
return text;
|
|
}
|
|
}
|
|
|
|
function decrypt(data) {
|
|
if (!data) return '';
|
|
try {
|
|
if (!data.includes(':')) return data;
|
|
const parts = data.split(':');
|
|
if (parts.length < 3) return data;
|
|
const [ivHex, tagHex, ...encParts] = parts;
|
|
const encHex = encParts.join(':');
|
|
const key = deriveKey(ENCRYPTION_SECRET);
|
|
const iv = Buffer.from(ivHex, 'hex');
|
|
const tag = Buffer.from(tagHex, 'hex');
|
|
const encBuf = Buffer.from(encHex, 'hex');
|
|
const decipher = crypto.createDecipheriv('aes-256-gcm', key, iv);
|
|
decipher.setAuthTag(tag);
|
|
return decipher.update(encBuf, undefined, 'utf8') + decipher.final('utf8');
|
|
} catch (_) {
|
|
return data;
|
|
}
|
|
}
|
|
|
|
// ── config.json I/O ───────────────────────────────────────────────────────────
|
|
|
|
function loadCfg() {
|
|
if (!fs.existsSync(CFG_PATH)) return {};
|
|
try { return JSON.parse(fs.readFileSync(CFG_PATH, 'utf8')); }
|
|
catch (_) { return {}; }
|
|
}
|
|
|
|
function saveCfg(data) {
|
|
if (!fs.existsSync(DATA_DIR)) fs.mkdirSync(DATA_DIR, { recursive: true });
|
|
fs.writeFileSync(CFG_PATH, JSON.stringify(data, null, 2), 'utf8');
|
|
}
|
|
|
|
// ── Facebook serialization ────────────────────────────────────────────────────
|
|
|
|
function serializeFbAccount(acc) {
|
|
return {
|
|
...acc,
|
|
access_token: encrypt(acc.access_token || ''),
|
|
pages: (acc.pages || []).map((p) => ({
|
|
...p,
|
|
access_token: encrypt(p.access_token || ''),
|
|
})),
|
|
};
|
|
}
|
|
|
|
function deserializeFbAccount(acc) {
|
|
if (!acc) return null;
|
|
return {
|
|
...acc,
|
|
access_token: decrypt(acc.access_token || ''),
|
|
pages: (acc.pages || []).map((p) => ({
|
|
...p,
|
|
access_token: decrypt(p.access_token || ''),
|
|
})),
|
|
};
|
|
}
|
|
|
|
function publicFbAccount(acc) {
|
|
const { access_token, pages, ...rest } = acc;
|
|
return {
|
|
...rest,
|
|
pages: (pages || []).map(({ access_token: _t, ...p }) => p),
|
|
restreamer_channel_id: acc.restreamer_channel_id || '',
|
|
restreamer_publication_id: acc.restreamer_publication_id || '',
|
|
};
|
|
}
|
|
|
|
// ── YouTube serialization ─────────────────────────────────────────────────────
|
|
|
|
function serializeYtAccount(acc) {
|
|
return {
|
|
...acc,
|
|
access_token: encrypt(acc.access_token || ''),
|
|
refresh_token: encrypt(acc.refresh_token || ''),
|
|
};
|
|
}
|
|
|
|
function deserializeYtAccount(acc) {
|
|
if (!acc) return null;
|
|
return {
|
|
...acc,
|
|
access_token: decrypt(acc.access_token || ''),
|
|
refresh_token: decrypt(acc.refresh_token || ''),
|
|
};
|
|
}
|
|
|
|
function publicYtAccount(acc) {
|
|
const { access_token, refresh_token, ...rest } = acc;
|
|
return {
|
|
...rest,
|
|
has_refresh_token: !!(refresh_token),
|
|
has_access_token: !!(access_token),
|
|
restreamer_channel_id: acc.restreamer_channel_id || '',
|
|
restreamer_publication_id: acc.restreamer_publication_id || '',
|
|
};
|
|
}
|
|
|
|
// ── Facebook Graph API helpers ────────────────────────────────────────────────
|
|
|
|
function fbGet(url) {
|
|
return new Promise((resolve, reject) => {
|
|
const lib = url.startsWith('https') ? https : http;
|
|
lib.get(url, (res) => {
|
|
let body = '';
|
|
res.on('data', (c) => { body += c; });
|
|
res.on('end', () => {
|
|
try { resolve(JSON.parse(body)); }
|
|
catch (e) { reject(new Error('Invalid JSON from Facebook: ' + body.slice(0, 200))); }
|
|
});
|
|
}).on('error', reject);
|
|
});
|
|
}
|
|
|
|
|
|
async function fbExchangeToLongLived(appId, appSecret, shortToken) {
|
|
const data = await fbGet(
|
|
`https://graph.facebook.com/v19.0/oauth/access_token` +
|
|
`?grant_type=fb_exchange_token` +
|
|
`&client_id=${encodeURIComponent(appId)}` +
|
|
`&client_secret=${encodeURIComponent(appSecret)}` +
|
|
`&fb_exchange_token=${encodeURIComponent(shortToken)}`
|
|
);
|
|
if (data.error) throw new Error(`FB exchange error: ${data.error.message}`);
|
|
return data; // { access_token, token_type, expires_in }
|
|
}
|
|
|
|
async function fbExchangeCodeToLongLived(appId, appSecret, code, redirectUri) {
|
|
// Step 1: code → short-lived user token
|
|
const step1 = await fbGet(
|
|
`https://graph.facebook.com/v19.0/oauth/access_token` +
|
|
`?client_id=${encodeURIComponent(appId)}` +
|
|
`&client_secret=${encodeURIComponent(appSecret)}` +
|
|
`&redirect_uri=${encodeURIComponent(redirectUri)}` +
|
|
`&code=${encodeURIComponent(code)}`
|
|
);
|
|
if (step1.error) throw new Error(`Code exchange error: ${step1.error.message}`);
|
|
|
|
// Step 2: short-lived → long-lived (~60 days)
|
|
const step2 = await fbExchangeToLongLived(appId, appSecret, step1.access_token);
|
|
return { shortToken: step1.access_token, ...step2 };
|
|
}
|
|
|
|
async function fbFetchPages(longLivedUserToken) {
|
|
const data = await fbGet(
|
|
`https://graph.facebook.com/v19.0/me/accounts` +
|
|
`?fields=id,name,access_token,category,tasks` +
|
|
`&access_token=${encodeURIComponent(longLivedUserToken)}`
|
|
);
|
|
if (data.error) throw new Error(`fetchPages error: ${data.error.message}`);
|
|
return (data.data || []).map((p) => ({
|
|
id: p.id,
|
|
name: p.name,
|
|
category: p.category || '',
|
|
tasks: p.tasks || [],
|
|
access_token: p.access_token,
|
|
token_type: 'PAGE',
|
|
}));
|
|
}
|
|
|
|
async function fbFetchUserInfo(token) {
|
|
const data = await fbGet(
|
|
`https://graph.facebook.com/v19.0/me?fields=id,name&access_token=${encodeURIComponent(token)}`
|
|
);
|
|
if (data.error) throw new Error(data.error.message);
|
|
return data;
|
|
}
|
|
|
|
async function fbDebugToken(appId, appSecret, token) {
|
|
const data = await fbGet(
|
|
`https://graph.facebook.com/v19.0/debug_token` +
|
|
`?input_token=${encodeURIComponent(token)}` +
|
|
`&access_token=${encodeURIComponent(appId + '|' + appSecret)}`
|
|
);
|
|
if (data.error) return { scopes: [], expires_at: 0 };
|
|
const d = data.data || {};
|
|
return {
|
|
scopes: d.scopes || [],
|
|
expires_at: d.expires_at ? d.expires_at * 1000 : 0,
|
|
};
|
|
}
|
|
|
|
// ── YouTube OAuth2 helpers ────────────────────────────────────────────────────
|
|
|
|
async function ytHttpsPost(url, body) {
|
|
return new Promise((resolve, reject) => {
|
|
const bodyStr = typeof body === 'string' ? body : new URLSearchParams(body).toString();
|
|
const urlObj = new URL(url);
|
|
const options = {
|
|
hostname: urlObj.hostname,
|
|
path: urlObj.pathname + urlObj.search,
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
'Content-Length': Buffer.byteLength(bodyStr),
|
|
},
|
|
};
|
|
const req = https.request(options, (res) => {
|
|
let buf = '';
|
|
res.on('data', (c) => { buf += c; });
|
|
res.on('end', () => {
|
|
try { resolve(JSON.parse(buf)); }
|
|
catch (e) { reject(new Error('Invalid JSON from Google: ' + buf.slice(0, 200))); }
|
|
});
|
|
});
|
|
req.on('error', reject);
|
|
req.write(bodyStr);
|
|
req.end();
|
|
});
|
|
}
|
|
|
|
async function ytFetchChannelInfo(accessToken) {
|
|
return new Promise((resolve, reject) => {
|
|
const url = 'https://www.googleapis.com/youtube/v3/channels?part=snippet&mine=true';
|
|
const req = https.request(url, {
|
|
headers: { Authorization: 'Bearer ' + accessToken },
|
|
}, (res) => {
|
|
let buf = '';
|
|
res.on('data', (c) => { buf += c; });
|
|
res.on('end', () => {
|
|
try {
|
|
const data = JSON.parse(buf);
|
|
if (data.items && data.items.length > 0) {
|
|
resolve({
|
|
channel_title: data.items[0].snippet.title,
|
|
channel_id: data.items[0].id,
|
|
});
|
|
} else {
|
|
resolve({});
|
|
}
|
|
} catch (e) { reject(e); }
|
|
});
|
|
});
|
|
req.on('error', reject);
|
|
req.end();
|
|
});
|
|
}
|
|
|
|
// ── Express app ───────────────────────────────────────────────────────────────
|
|
|
|
const app = express();
|
|
app.use(cors({ origin: true, methods: ['GET','POST','PUT','DELETE','OPTIONS'], allowedHeaders: ['Content-Type'] }));
|
|
app.use(express.json());
|
|
|
|
// ── Health ────────────────────────────────────────────────────────────────────
|
|
app.get('/health', (_, res) => {
|
|
res.json({ ok: true, config: CFG_PATH, port: PORT, ts: new Date().toISOString() });
|
|
});
|
|
|
|
|
|
// ═══════════════════════════════════════════════════════════════════════════════
|
|
// WHIP INGRESS
|
|
// Genera una sesión LiveKit Ingress (WHIP_INPUT) y devuelve al browser
|
|
// la URL pública en el mismo dominio, p.ej.:
|
|
// https://djmaster.nextream.sytes.net/w/<streamKey>
|
|
// Caddy hace proxy de /w/* hacia el servicio livekit-ingress interno.
|
|
// OBS lo usa como si fuera un RTMP push: sin CORS, sin IP privada expuesta.
|
|
// ═══════════════════════════════════════════════════════════════════════════════
|
|
app.post('/api/whip/info', async (req, res) => {
|
|
if (!LK_API_KEY || !LK_API_SECRET) {
|
|
return res.status(503).json({ error: 'LiveKit API credentials not configured' });
|
|
}
|
|
if (!LK_WS_URL) {
|
|
return res.status(503).json({ error: 'LIVEKIT_WS_URL not configured' });
|
|
}
|
|
const { room, identity = 'obs_studio', name = 'OBS Studio' } = req.body || {};
|
|
if (!room) return res.status(400).json({ error: 'room requerido' });
|
|
|
|
// URL pública del UI: env var o derivada del Host header
|
|
const publicBase = UI_BASE_URL ||
|
|
`${req.headers['x-forwarded-proto'] || req.protocol}://${req.headers['x-forwarded-host'] || req.headers.host}`;
|
|
|
|
try {
|
|
// IngressClient debe apuntar al LiveKit SERVER (API REST, puerto 7880).
|
|
// NO al servicio livekit-ingress (puerto 8088) — ese es solo para media.
|
|
// LK_HTTP_URL ya convierte wss:// → https://
|
|
const client = new IngressClient(LK_HTTP_URL, LK_API_KEY, LK_API_SECRET);
|
|
const ingress = await client.createIngress(IngressInput.WHIP_INPUT, {
|
|
name,
|
|
roomName: room,
|
|
participantIdentity: identity,
|
|
participantName: name,
|
|
});
|
|
|
|
// Construir URL pública: mismo dominio del UI, ruta /w/<streamKey>
|
|
// Caddy hace proxy de /w/* → livekit-ingress interno
|
|
const whipUrl = `${publicBase}/w/${ingress.streamKey}`;
|
|
|
|
console.log(`[whip/info] ✅ Ingress creado: room="${room}" → ${whipUrl}`);
|
|
res.json({
|
|
whipUrl,
|
|
streamKey: ingress.streamKey,
|
|
ingressId: ingress.ingressId,
|
|
identity,
|
|
room,
|
|
obsInstructions: {
|
|
service: 'Custom (WHIP)',
|
|
url: whipUrl,
|
|
streamKey: '(dejar vacía)',
|
|
note: 'OBS → Configuración → Emisión → Servicio: Custom (WHIP) → Servidor: <whipUrl>',
|
|
},
|
|
});
|
|
} catch (err) {
|
|
console.error('[whip/info] ERROR:', err.message);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// ═════════════════════════════════════════════════════════════════════════════
|
|
// LIVEKIT
|
|
// ═════════════════════════════════════════════════════════════════════════════
|
|
|
|
/**
|
|
* GET /livekit/config
|
|
* Devuelve la wsUrl pública del servidor LiveKit (sin secretos).
|
|
*/
|
|
app.get('/livekit/config', (_, res) => {
|
|
if (!LK_WS_URL) return res.status(503).json({ error: 'LiveKit not configured' });
|
|
res.json({ wsUrl: LK_WS_URL });
|
|
});
|
|
|
|
/**
|
|
* POST /livekit/token
|
|
* Body: { roomName, participantName?, canPublish?, canSubscribe? }
|
|
* Genera un AccessToken JWT firmado para que el browser se conecte a LiveKit.
|
|
*/
|
|
app.post('/livekit/token', async (req, res) => {
|
|
if (!LK_API_KEY || !LK_API_SECRET) {
|
|
return res.status(503).json({ error: 'LiveKit API credentials not configured' });
|
|
}
|
|
const { roomName, participantName, canPublish = true, canSubscribe = false } = req.body || {};
|
|
if (!roomName) return res.status(400).json({ error: 'roomName is required' });
|
|
|
|
const identity = participantName || ('presenter-' + Date.now());
|
|
try {
|
|
const at = new AccessToken(LK_API_KEY, LK_API_SECRET, { identity, ttl: '4h' });
|
|
at.addGrant({ roomJoin: true, room: roomName, canPublish, canSubscribe, canPublishData: true });
|
|
const token = await at.toJwt();
|
|
res.json({ token, identity, wsUrl: LK_WS_URL });
|
|
} catch (err) {
|
|
console.error('[livekit] token error:', err.message);
|
|
res.status(500).json({ error: 'Failed to generate token' });
|
|
}
|
|
});
|
|
|
|
// ── Relay sessions: roomName → { ffmpeg, rtmpUrl, streamName } ────────────────
|
|
const lkRelaySessions = new Map();
|
|
|
|
/**
|
|
* POST /livekit/relay/start
|
|
* Body: { roomName: string, streamName?: string }
|
|
*
|
|
* Inicia un proceso FFmpeg que:
|
|
* 1. Escucha en un puerto RTMP local temporal (127.0.0.1:19350+)
|
|
* 2. Lee el stream y lo reenvía al Core de Restreamer via RTMP:
|
|
* rtmp://RTMP_HOST:RTMP_PORT/RTMP_APP/<streamName>.stream
|
|
*
|
|
* El browser (webrtc-room) publica en LiveKit.
|
|
* Para hacer el bridge LiveKit → RTMP, el webrtc-room también
|
|
* llama a este endpoint para que el operador sepa que el relay está activo.
|
|
*
|
|
* NOTA: El Core Restreamer escucha el input como {rtmp,name=<channelid>.stream}
|
|
* que es equivalente a recibir RTMP push en rtmp://localhost:1935/live/<channelid>.stream
|
|
*/
|
|
app.post('/livekit/relay/start', async (req, res) => {
|
|
if (!LK_API_KEY || !LK_API_SECRET) {
|
|
return res.status(503).json({ error: 'LiveKit not configured' });
|
|
}
|
|
const { roomName, streamName } = req.body || {};
|
|
if (!roomName) return res.status(400).json({ error: 'roomName is required' });
|
|
|
|
// Stop existing relay for this room
|
|
if (lkRelaySessions.has(roomName)) {
|
|
const old = lkRelaySessions.get(roomName);
|
|
try { old.ffmpeg.kill('SIGTERM'); } catch(_) {}
|
|
lkRelaySessions.delete(roomName);
|
|
await new Promise(r => setTimeout(r, 500));
|
|
}
|
|
|
|
// Stream name for RTMP push to Core: <channelid>.stream
|
|
const sName = (streamName || roomName).replace(/\.stream$/, '') + '.stream';
|
|
const rtmpUrl = `rtmp://${RTMP_HOST}:${RTMP_PORT}/${RTMP_APP}/${sName}`;
|
|
|
|
// Verify LiveKit room exists via RoomServiceClient
|
|
try {
|
|
const rsc = new RoomServiceClient(LK_HTTP_URL, LK_API_KEY, LK_API_SECRET);
|
|
const rooms = await rsc.listRooms([roomName]);
|
|
if (!rooms || rooms.length === 0) {
|
|
return res.status(404).json({ error: `LiveKit room "${roomName}" not found or empty` });
|
|
}
|
|
} catch (err) {
|
|
console.warn('[livekit-relay] RoomService check failed:', err.message, '(continuing anyway)');
|
|
}
|
|
|
|
// Build RTMP pull URL for FFmpeg from LiveKit via its built-in RTMP ingress
|
|
// Since LiveKit doesn't expose direct RTMP pull, we use the token + wsUrl
|
|
// and instruct the webrtc-room page to also push via MediaRecorder → FFmpeg stdin
|
|
// The simplest and most reliable approach: confirm relay intent and return RTMP push URL
|
|
// The actual video bridge happens in the browser via the WebRTC→RTMP bridge below.
|
|
|
|
// For the Core process input side, return the stream registration info
|
|
console.log(`[livekit-relay] ▶ Relay registered: room="${roomName}" → ${rtmpUrl}`);
|
|
lkRelaySessions.set(roomName, { rtmpUrl, streamName: sName, ffmpeg: null });
|
|
res.json({ ok: true, rtmpUrl, streamName: sName, roomName });
|
|
});
|
|
|
|
/**
|
|
* POST /livekit/relay/stop
|
|
* Body: { roomName: string }
|
|
*/
|
|
app.post('/livekit/relay/stop', (req, res) => {
|
|
const { roomName } = req.body || {};
|
|
if (!roomName) return res.status(400).json({ error: 'roomName is required' });
|
|
|
|
const session = lkRelaySessions.get(roomName);
|
|
if (!session) return res.json({ ok: true, message: 'No active relay' });
|
|
|
|
if (session.ffmpeg) {
|
|
try { session.ffmpeg.kill('SIGTERM'); } catch(_) {}
|
|
}
|
|
lkRelaySessions.delete(roomName);
|
|
console.log(`[livekit-relay] ⏹ Relay stopped: room="${roomName}"`);
|
|
res.json({ ok: true });
|
|
});
|
|
|
|
/**
|
|
* GET /livekit/relay/status
|
|
*/
|
|
app.get('/livekit/relay/status', (_, res) => {
|
|
const active = [];
|
|
lkRelaySessions.forEach((s, roomName) => active.push({ roomName, rtmpUrl: s.rtmpUrl, streamName: s.streamName }));
|
|
res.json({ ok: true, active });
|
|
});
|
|
|
|
// ═════════════════════════════════════════════════════════════════════════════
|
|
// FACEBOOK
|
|
// ═════════════════════════════════════════════════════════════════════════════
|
|
|
|
// ── FB App config (app_id + app_secret) ──────────────────────────────────────
|
|
app.get('/fb/config', (_, res) => {
|
|
const cfg = loadCfg();
|
|
const c = cfg.__fb_config || {};
|
|
// Soporte retrocompatibilidad con clave vieja __config
|
|
const old = cfg.__config || {};
|
|
res.json({
|
|
app_id: c.app_id || old.app_id || '',
|
|
has_secret: !!(c.app_secret || old.app_secret),
|
|
});
|
|
});
|
|
|
|
app.put('/fb/config', (req, res) => {
|
|
const { app_id, app_secret } = req.body || {};
|
|
const cfg = loadCfg();
|
|
// Migrar de __config a __fb_config si es necesario
|
|
if (cfg.__config && !cfg.__fb_config) {
|
|
cfg.__fb_config = { ...cfg.__config };
|
|
delete cfg.__config;
|
|
}
|
|
cfg.__fb_config = {
|
|
...(cfg.__fb_config || {}),
|
|
...(app_id !== undefined ? { app_id: String(app_id) } : {}),
|
|
...(app_secret !== undefined ? { app_secret: encrypt(String(app_secret)) } : {}),
|
|
};
|
|
saveCfg(cfg);
|
|
res.json({ ok: true });
|
|
});
|
|
|
|
// ── FB Accounts ───────────────────────────────────────────────────────────────
|
|
app.get('/fb/accounts', (_, res) => {
|
|
const cfg = loadCfg();
|
|
const accounts = Object.entries(cfg)
|
|
.filter(([k, v]) => k.startsWith('fb__') && v && v.fb_user_id)
|
|
.map(([, v]) => publicFbAccount(deserializeFbAccount(v)));
|
|
res.json(accounts);
|
|
});
|
|
|
|
app.get('/fb/accounts/:id/token', (req, res) => {
|
|
const cfg = loadCfg();
|
|
const raw = cfg['fb__' + req.params.id];
|
|
if (!raw) return res.status(404).json({ error: 'Account not found' });
|
|
const acc = deserializeFbAccount(raw);
|
|
res.json({
|
|
fb_user_id: acc.fb_user_id,
|
|
name: acc.name,
|
|
token_type: acc.token_type,
|
|
access_token: acc.access_token,
|
|
expires_at: acc.expires_at,
|
|
scope_granted: acc.scope_granted || [],
|
|
pages: acc.pages || [],
|
|
});
|
|
});
|
|
|
|
app.delete('/fb/accounts/:id', (req, res) => {
|
|
const cfg = loadCfg();
|
|
const key = 'fb__' + req.params.id;
|
|
if (!cfg[key]) return res.status(404).json({ error: 'Account not found' });
|
|
delete cfg[key];
|
|
saveCfg(cfg);
|
|
res.json({ ok: true });
|
|
});
|
|
|
|
// Asociar una cuenta FB existente con un canal/publicación de Restreamer
|
|
app.put('/fb/accounts/:id/context', (req, res) => {
|
|
const { restreamer_channel_id, restreamer_publication_id } = req.body || {};
|
|
const cfg = loadCfg();
|
|
const key = 'fb__' + req.params.id;
|
|
if (!cfg[key]) return res.status(404).json({ error: 'Account not found' });
|
|
cfg[key] = {
|
|
...cfg[key],
|
|
restreamer_channel_id: restreamer_channel_id || '',
|
|
restreamer_publication_id: restreamer_publication_id || '',
|
|
};
|
|
saveCfg(cfg);
|
|
console.log(`[fb/context] ✅ ${req.params.id} → channel:${restreamer_channel_id || '-'} pub:${restreamer_publication_id || '-'}`);
|
|
res.json({ ok: true });
|
|
});
|
|
|
|
// ── FB Exchange: Auth Code → Short-lived → Long-lived (60 días) ──────────────
|
|
/**
|
|
* POST /fb/exchange
|
|
* Body: { code, redirect_uri }
|
|
*
|
|
* Flujo completo:
|
|
* 1. code → short-lived user token (Graph API /oauth/access_token)
|
|
* 2. short-lived → long-lived token (~60 días) (grant_type=fb_exchange_token)
|
|
* 3. Fetch user info (id, name)
|
|
* 4. Debug token → scopes, expires_at
|
|
* 5. Fetch páginas del usuario (tokens de página ya son long-lived)
|
|
* 6. Guardar todo cifrado en config.json
|
|
* 7. Retornar cuenta pública (sin tokens)
|
|
*/
|
|
app.post('/fb/exchange', async (req, res) => {
|
|
const { code, redirect_uri, restreamer_channel_id, restreamer_publication_id } = req.body || {};
|
|
if (!code || !redirect_uri) {
|
|
return res.status(400).json({ error: 'code and redirect_uri are required' });
|
|
}
|
|
|
|
const cfg = loadCfg();
|
|
const c = cfg.__fb_config || cfg.__config || {};
|
|
if (!c.app_id) {
|
|
return res.status(400).json({ error: 'Facebook App ID not configured. Go to Settings → Integrations.' });
|
|
}
|
|
const appSecret = c.app_secret ? decrypt(c.app_secret) : '';
|
|
if (!appSecret) {
|
|
return res.status(400).json({ error: 'Facebook App Secret not configured. Go to Settings → Integrations.' });
|
|
}
|
|
|
|
try {
|
|
const { access_token: longToken, expires_in } = await fbExchangeCodeToLongLived(
|
|
c.app_id, appSecret, code, redirect_uri
|
|
);
|
|
const expires_at = expires_in
|
|
? Date.now() + parseInt(expires_in, 10) * 1000
|
|
: Date.now() + 60 * 24 * 60 * 60 * 1000;
|
|
|
|
const userInfo = await fbFetchUserInfo(longToken);
|
|
const { scopes } = await fbDebugToken(c.app_id, appSecret, longToken);
|
|
|
|
let pages = [];
|
|
try { pages = await fbFetchPages(longToken); } catch (_) {}
|
|
|
|
const account = {
|
|
fb_user_id: userInfo.id,
|
|
name: userInfo.name,
|
|
token_type: 'USER',
|
|
access_token: longToken,
|
|
expires_at,
|
|
scope_granted: scopes,
|
|
pages,
|
|
restreamer_channel_id: restreamer_channel_id || '',
|
|
restreamer_publication_id: restreamer_publication_id || '',
|
|
updated_at: Date.now(),
|
|
};
|
|
cfg['fb__' + userInfo.id] = serializeFbAccount(account);
|
|
saveCfg(cfg);
|
|
|
|
console.log(`[fb/exchange] ✅ ${userInfo.name} (${userInfo.id}) — expires ${new Date(expires_at).toISOString()} → channel:${restreamer_channel_id || '-'} pub:${restreamer_publication_id || '-'}`);
|
|
res.json({ ok: true, account: publicFbAccount(account) });
|
|
} catch (err) {
|
|
console.error('[fb/exchange] Error:', err.message);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// ── FB Upgrade: short-lived token → long-lived (implicit flow fallback) ───────
|
|
app.post('/fb/upgrade', async (req, res) => {
|
|
const { access_token: shortToken, restreamer_channel_id, restreamer_publication_id } = req.body || {};
|
|
if (!shortToken) return res.status(400).json({ error: 'access_token is required' });
|
|
|
|
const cfg = loadCfg();
|
|
const c = cfg.__fb_config || cfg.__config || {};
|
|
const appSecret = c.app_secret ? decrypt(c.app_secret) : c.app_secret_plain || '';
|
|
if (!c.app_id || !appSecret) {
|
|
return res.status(400).json({ error: 'App ID / Secret not configured. Cannot upgrade token.' });
|
|
}
|
|
|
|
try {
|
|
const { access_token: longToken, expires_in } = await fbExchangeToLongLived(c.app_id, appSecret, shortToken);
|
|
const expires_at = expires_in
|
|
? Date.now() + parseInt(expires_in, 10) * 1000
|
|
: Date.now() + 60 * 24 * 60 * 60 * 1000;
|
|
|
|
const userInfo = await fbFetchUserInfo(longToken);
|
|
const { scopes } = await fbDebugToken(c.app_id, appSecret, longToken);
|
|
|
|
let pages = [];
|
|
try { pages = await fbFetchPages(longToken); } catch (_) {}
|
|
|
|
const account = {
|
|
fb_user_id: userInfo.id,
|
|
name: userInfo.name,
|
|
token_type: 'USER',
|
|
access_token: longToken,
|
|
expires_at,
|
|
scope_granted: scopes,
|
|
pages,
|
|
restreamer_channel_id: restreamer_channel_id || '',
|
|
restreamer_publication_id: restreamer_publication_id || '',
|
|
updated_at: Date.now(),
|
|
};
|
|
cfg['fb__' + userInfo.id] = serializeFbAccount(account);
|
|
saveCfg(cfg);
|
|
|
|
console.log(`[fb/upgrade] ✅ Token upgraded: ${userInfo.name} (${userInfo.id})`);
|
|
res.json({ ok: true, account: publicFbAccount(account) });
|
|
} catch (err) {
|
|
console.error('[fb/upgrade] Error:', err.message);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// ── FB Refresh: renovar long-lived token existente ────────────────────────────
|
|
app.post('/fb/refresh/:id', async (req, res) => {
|
|
const cfg = loadCfg();
|
|
const raw = cfg['fb__' + req.params.id];
|
|
if (!raw) return res.status(404).json({ error: 'Account not found' });
|
|
|
|
const c = cfg.__fb_config || cfg.__config || {};
|
|
const appSecret = c.app_secret ? decrypt(c.app_secret) : '';
|
|
if (!c.app_id || !appSecret) {
|
|
return res.status(400).json({ error: 'App ID / Secret not configured' });
|
|
}
|
|
|
|
const acc = deserializeFbAccount(raw);
|
|
try {
|
|
const { access_token: newToken, expires_in } = await fbExchangeToLongLived(c.app_id, appSecret, acc.access_token);
|
|
const expires_at = expires_in
|
|
? Date.now() + parseInt(expires_in, 10) * 1000
|
|
: Date.now() + 60 * 24 * 60 * 60 * 1000;
|
|
|
|
const { scopes } = await fbDebugToken(c.app_id, appSecret, newToken);
|
|
let pages = acc.pages || [];
|
|
try { pages = await fbFetchPages(newToken); } catch (_) {}
|
|
|
|
const updated = { ...acc, access_token: newToken, expires_at, scope_granted: scopes, pages, updated_at: Date.now() };
|
|
cfg['fb__' + req.params.id] = serializeFbAccount(updated);
|
|
saveCfg(cfg);
|
|
|
|
console.log(`[fb/refresh] ✅ Token refreshed: ${acc.name} (${req.params.id})`);
|
|
res.json({ ok: true, account: publicFbAccount(updated) });
|
|
} catch (err) {
|
|
console.error('[fb/refresh] Error:', err.message);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// ═════════════════════════════════════════════════════════════════════════════
|
|
// YOUTUBE
|
|
// ═════════════════════════════════════════════════════════════════════════════
|
|
|
|
// ── YT Credentials (client_id + client_secret) ───────────────────────────────
|
|
app.get('/yt/config', (_, res) => {
|
|
const cfg = loadCfg();
|
|
const c = cfg.__yt_config || {};
|
|
res.json({ client_id: c.client_id || '', has_secret: !!(c.client_secret) });
|
|
});
|
|
|
|
app.put('/yt/config', (req, res) => {
|
|
const { client_id, client_secret } = req.body || {};
|
|
const cfg = loadCfg();
|
|
cfg.__yt_config = {
|
|
...(cfg.__yt_config || {}),
|
|
...(client_id !== undefined ? { client_id: String(client_id) } : {}),
|
|
...(client_secret !== undefined ? { client_secret: encrypt(String(client_secret)) } : {}),
|
|
};
|
|
saveCfg(cfg);
|
|
console.log(`[yt/config] ✅ Credentials saved (client_id: ${client_id || '(unchanged)'})`);
|
|
res.json({ ok: true });
|
|
});
|
|
|
|
// Versión completa con secreto descifrado (uso interno — intercambio de tokens)
|
|
app.get('/yt/config/full', (_, res) => {
|
|
const cfg = loadCfg();
|
|
const c = cfg.__yt_config || {};
|
|
res.json({
|
|
client_id: c.client_id || '',
|
|
client_secret: c.client_secret ? decrypt(c.client_secret) : '',
|
|
});
|
|
});
|
|
|
|
// ── YT Accounts ───────────────────────────────────────────────────────────────
|
|
app.get('/yt/accounts', (_, res) => {
|
|
const cfg = loadCfg();
|
|
const accounts = Object.entries(cfg)
|
|
.filter(([k, v]) => k.startsWith('yt__') && v && v.account_key)
|
|
.map(([, v]) => publicYtAccount(deserializeYtAccount(v)));
|
|
res.json(accounts);
|
|
});
|
|
|
|
app.get('/yt/accounts/:key/token', (req, res) => {
|
|
const cfg = loadCfg();
|
|
const raw = cfg['yt__' + req.params.key];
|
|
if (!raw) return res.status(404).json({ error: 'YT account not found' });
|
|
const acc = deserializeYtAccount(raw);
|
|
res.json({
|
|
account_key: acc.account_key,
|
|
label: acc.label,
|
|
channel_title: acc.channel_title || '',
|
|
channel_id: acc.channel_id || '',
|
|
access_token: acc.access_token,
|
|
refresh_token: acc.refresh_token,
|
|
token_expiry: acc.token_expiry,
|
|
});
|
|
});
|
|
|
|
app.delete('/yt/accounts/:key', (req, res) => {
|
|
const cfg = loadCfg();
|
|
const key = 'yt__' + req.params.key;
|
|
if (!cfg[key]) return res.status(404).json({ error: 'YT account not found' });
|
|
delete cfg[key];
|
|
saveCfg(cfg);
|
|
res.json({ ok: true });
|
|
});
|
|
|
|
// Asociar una cuenta YT existente con un canal/publicación de Restreamer
|
|
app.put('/yt/accounts/:key/context', (req, res) => {
|
|
const { restreamer_channel_id, restreamer_publication_id } = req.body || {};
|
|
const cfg = loadCfg();
|
|
const key = 'yt__' + req.params.key;
|
|
if (!cfg[key]) return res.status(404).json({ error: 'YT account not found' });
|
|
cfg[key] = {
|
|
...cfg[key],
|
|
restreamer_channel_id: restreamer_channel_id || '',
|
|
restreamer_publication_id: restreamer_publication_id || '',
|
|
};
|
|
saveCfg(cfg);
|
|
console.log(`[yt/context] ✅ ${req.params.key} → channel:${restreamer_channel_id || '-'} pub:${restreamer_publication_id || '-'}`);
|
|
res.json({ ok: true });
|
|
});
|
|
|
|
// ── YT Save account (tras el intercambio de código en el browser) ─────────────
|
|
/**
|
|
* POST /yt/accounts
|
|
* Body: { account_key, label, channel_title, channel_id, access_token, refresh_token, token_expiry }
|
|
*
|
|
* El browser ya hizo el intercambio code→token con Google.
|
|
* Este endpoint solo persiste el resultado cifrado en config.json.
|
|
*/
|
|
app.post('/yt/accounts', (req, res) => {
|
|
const {
|
|
account_key, label, channel_title, channel_id,
|
|
access_token, refresh_token, token_expiry,
|
|
restreamer_channel_id, restreamer_publication_id,
|
|
} = req.body || {};
|
|
if (!account_key || !access_token) {
|
|
return res.status(400).json({ error: 'account_key and access_token are required' });
|
|
}
|
|
|
|
const cfg = loadCfg();
|
|
const existing = cfg['yt__' + account_key] ? deserializeYtAccount(cfg['yt__' + account_key]) : {};
|
|
|
|
const account = {
|
|
account_key,
|
|
label: label || existing.label || account_key,
|
|
channel_title: channel_title || existing.channel_title || '',
|
|
channel_id: channel_id || existing.channel_id || '',
|
|
access_token,
|
|
refresh_token: refresh_token || existing.refresh_token || '',
|
|
token_expiry: token_expiry || existing.token_expiry || 0,
|
|
restreamer_channel_id: restreamer_channel_id || existing.restreamer_channel_id || '',
|
|
restreamer_publication_id: restreamer_publication_id || existing.restreamer_publication_id || '',
|
|
updated_at: Date.now(),
|
|
};
|
|
|
|
cfg['yt__' + account_key] = serializeYtAccount(account);
|
|
saveCfg(cfg);
|
|
|
|
console.log(`[yt/accounts] ✅ Account saved: ${label || account_key} (${channel_id || account_key}) → channel:${restreamer_channel_id || '-'} pub:${restreamer_publication_id || '-'}`);
|
|
res.json({ ok: true, account: publicYtAccount(account) });
|
|
});
|
|
|
|
// ── YT Refresh token: usar refresh_token para obtener nuevo access_token ──────
|
|
/**
|
|
* POST /yt/accounts/:key/refresh
|
|
* Usa el refresh_token almacenado + credentials para obtener un nuevo access_token.
|
|
*/
|
|
app.post('/yt/accounts/:key/refresh', async (req, res) => {
|
|
const cfg = loadCfg();
|
|
const raw = cfg['yt__' + req.params.key];
|
|
if (!raw) return res.status(404).json({ error: 'YT account not found' });
|
|
|
|
const c = cfg.__yt_config || {};
|
|
const clientSecret = c.client_secret ? decrypt(c.client_secret) : '';
|
|
if (!c.client_id || !clientSecret) {
|
|
return res.status(400).json({ error: 'YouTube OAuth2 credentials not configured in Settings → Integrations' });
|
|
}
|
|
|
|
const acc = deserializeYtAccount(raw);
|
|
if (!acc.refresh_token) {
|
|
return res.status(400).json({ error: 'No refresh token stored for this account' });
|
|
}
|
|
|
|
try {
|
|
const data = await ytHttpsPost('https://oauth2.googleapis.com/token', {
|
|
client_id: c.client_id,
|
|
client_secret: clientSecret,
|
|
refresh_token: acc.refresh_token,
|
|
grant_type: 'refresh_token',
|
|
});
|
|
|
|
if (data.error) throw new Error(data.error_description || data.error);
|
|
|
|
const updated = {
|
|
...acc,
|
|
access_token: data.access_token,
|
|
token_expiry: Date.now() + (data.expires_in || 3600) * 1000,
|
|
updated_at: Date.now(),
|
|
};
|
|
cfg['yt__' + req.params.key] = serializeYtAccount(updated);
|
|
saveCfg(cfg);
|
|
|
|
console.log(`[yt/refresh] ✅ Token refreshed: ${acc.label || req.params.key}`);
|
|
res.json({ ok: true, access_token: data.access_token, token_expiry: updated.token_expiry });
|
|
} catch (err) {
|
|
console.error('[yt/refresh] Error:', err.message);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// ── YT Exchange: Auth Code → tokens (server-side, sin exponer client_secret) ──
|
|
/**
|
|
* POST /yt/exchange
|
|
* Body: { code, redirect_uri }
|
|
*
|
|
* Flujo:
|
|
* 1. Intercambia code → access_token + refresh_token (Google /token)
|
|
* 2. Obtiene info del canal
|
|
* 3. Guarda cifrado en config.json
|
|
*/
|
|
app.post('/yt/exchange', async (req, res) => {
|
|
const { code, redirect_uri, restreamer_channel_id, restreamer_publication_id } = req.body || {};
|
|
if (!code || !redirect_uri) {
|
|
return res.status(400).json({ error: 'code and redirect_uri are required' });
|
|
}
|
|
|
|
const cfg = loadCfg();
|
|
const c = cfg.__yt_config || {};
|
|
const clientSecret = c.client_secret ? decrypt(c.client_secret) : '';
|
|
if (!c.client_id || !clientSecret) {
|
|
return res.status(400).json({ error: 'YouTube OAuth2 credentials not configured in Settings → Integrations' });
|
|
}
|
|
|
|
try {
|
|
const tokenData = await ytHttpsPost('https://oauth2.googleapis.com/token', {
|
|
code,
|
|
client_id: c.client_id,
|
|
client_secret: clientSecret,
|
|
redirect_uri,
|
|
grant_type: 'authorization_code',
|
|
});
|
|
if (tokenData.error) throw new Error(tokenData.error_description || tokenData.error);
|
|
|
|
let channelInfo = {};
|
|
try { channelInfo = await ytFetchChannelInfo(tokenData.access_token); } catch (_) {}
|
|
|
|
const accountKey = channelInfo.channel_id || ('yt_' + Date.now());
|
|
const channelName = channelInfo.channel_title || accountKey;
|
|
|
|
const account = {
|
|
account_key: accountKey,
|
|
label: channelName,
|
|
channel_title: channelInfo.channel_title || '',
|
|
channel_id: channelInfo.channel_id || '',
|
|
access_token: tokenData.access_token,
|
|
refresh_token: tokenData.refresh_token || '',
|
|
token_expiry: Date.now() + (tokenData.expires_in || 3600) * 1000,
|
|
restreamer_channel_id: restreamer_channel_id || '',
|
|
restreamer_publication_id: restreamer_publication_id || '',
|
|
updated_at: Date.now(),
|
|
};
|
|
cfg['yt__' + accountKey] = serializeYtAccount(account);
|
|
saveCfg(cfg);
|
|
|
|
console.log(`[yt/exchange] ✅ Account saved: ${channelName} (${accountKey}) → channel:${restreamer_channel_id || '-'} pub:${restreamer_publication_id || '-'}`);
|
|
res.json({ ok: true, account: publicYtAccount(account) });
|
|
} catch (err) {
|
|
console.error('[yt/exchange] Error:', err.message);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// ═════════════════════════════════════════════════════════════════════════════
|
|
// WEBRTC RELAY (WebM MediaRecorder → FFmpeg → RTMP Core)
|
|
// ═════════════════════════════════════════════════════════════════════════════
|
|
|
|
/**
|
|
* Cada conexión WebSocket abre un proceso FFmpeg que lee WebM desde stdin
|
|
* y lo empuja como RTMP al Core de Restreamer en localhost.
|
|
*
|
|
* WS URL: ws://<host>:<port>/webrtc-relay/<roomId>
|
|
*
|
|
* El roomId corresponde al channelid de Restreamer (p.ej. "external" o
|
|
* el nombre que se configure en el source).
|
|
*
|
|
* El cliente WebRTC (browser) envía:
|
|
* 1. Un JSON de config → { type:'config', room, videoBitrate, audioBitrate, mimeType }
|
|
* 2. Chunks binarios → fragmentos WebM del MediaRecorder
|
|
*/
|
|
|
|
const FFMPEG_BIN = process.env.FFMPEG_BIN || 'ffmpeg';
|
|
|
|
// Track active relay sessions for status endpoint
|
|
const relaySessions = new Map(); // roomId → { ws, ffmpeg, startedAt }
|
|
|
|
function buildRtmpUrl(roomId) {
|
|
// Formato compatible con Restreamer Core: rtmp://localhost:1935/live/{roomId}.stream
|
|
return `rtmp://${RTMP_HOST}:${RTMP_PORT}/${RTMP_APP}/${roomId}.stream`;
|
|
}
|
|
|
|
function startFfmpegRelay(roomId, videoBitrate, audioBitrate) {
|
|
const rtmpUrl = buildRtmpUrl(roomId);
|
|
const vbr = Math.floor((videoBitrate || 1500000) / 1000) + 'k';
|
|
const abr = Math.floor((audioBitrate || 128000) / 1000) + 'k';
|
|
|
|
console.log(`[webrtc-relay] 🎬 Starting FFmpeg relay for room="${roomId}" → ${rtmpUrl}`);
|
|
|
|
// FFmpeg reads WebM from stdin, re-encodes to H.264+AAC, pushes RTMP
|
|
const args = [
|
|
'-loglevel', 'warning',
|
|
'-re',
|
|
'-fflags', '+nobuffer+genpts',
|
|
'-analyzeduration', '1000000',
|
|
'-probesize', '512000',
|
|
'-i', 'pipe:0', // stdin = WebM stream
|
|
// Video: transcode to H.264 (most compatible with RTMP/HLS)
|
|
'-c:v', 'libx264',
|
|
'-preset', 'ultrafast',
|
|
'-tune', 'zerolatency',
|
|
'-b:v', vbr,
|
|
'-maxrate', vbr,
|
|
'-bufsize', String(parseInt(vbr) * 2) + 'k',
|
|
'-g', '60', // GOP = 2s @ 30fps
|
|
'-keyint_min', '60',
|
|
'-sc_threshold', '0',
|
|
'-pix_fmt', 'yuv420p',
|
|
// Audio: transcode to AAC
|
|
'-c:a', 'aac',
|
|
'-b:a', abr,
|
|
'-ar', '44100',
|
|
'-ac', '2',
|
|
// Output: RTMP
|
|
'-f', 'flv',
|
|
rtmpUrl,
|
|
];
|
|
|
|
const ffmpeg = spawn(FFMPEG_BIN, args, { stdio: ['pipe', 'pipe', 'pipe'] });
|
|
|
|
ffmpeg.stderr.on('data', (data) => {
|
|
const msg = data.toString().trim();
|
|
if (msg) console.log(`[ffmpeg/${roomId}] ${msg}`);
|
|
});
|
|
|
|
ffmpeg.on('error', (err) => {
|
|
console.error(`[webrtc-relay] FFmpeg spawn error (room=${roomId}): ${err.message}`);
|
|
if (err.code === 'ENOENT') {
|
|
console.error(`[webrtc-relay] ⚠️ ffmpeg not found. Set FFMPEG_BIN env or install ffmpeg.`);
|
|
}
|
|
});
|
|
|
|
ffmpeg.on('close', (code) => {
|
|
console.log(`[webrtc-relay] FFmpeg closed (room=${roomId}) code=${code}`);
|
|
relaySessions.delete(roomId);
|
|
});
|
|
|
|
return ffmpeg;
|
|
}
|
|
|
|
// ── Status endpoint ───────────────────────────────────────────────────────────
|
|
app.get('/webrtc-relay/status', (_, res) => {
|
|
const sessions = [];
|
|
relaySessions.forEach((sess, roomId) => {
|
|
sessions.push({
|
|
roomId,
|
|
startedAt: sess.startedAt,
|
|
uptime: Math.floor((Date.now() - sess.startedAt) / 1000),
|
|
});
|
|
});
|
|
res.json({ ok: true, sessions });
|
|
});
|
|
|
|
// ─────────────────────────────────────────────────────────────────────────────
|
|
// HTTP server (wraps Express) + WebSocket server
|
|
// ─────────────────────────────────────────────────────────────────────────────
|
|
const httpServer = http.createServer(app);
|
|
|
|
// Create WSS without 'path' or 'server' — we handle the upgrade event manually
|
|
// so that /webrtc-relay/<roomId> (prefix) is supported instead of exact match.
|
|
const wss = new WebSocketServer({ noServer: true });
|
|
|
|
// Intercept HTTP upgrade requests and forward only /webrtc-relay/* to the WSS
|
|
httpServer.on('upgrade', (req, socket, head) => {
|
|
const url = req.url || '';
|
|
if (url.startsWith('/webrtc-relay/') || url === '/webrtc-relay') {
|
|
wss.handleUpgrade(req, socket, head, (ws) => {
|
|
wss.emit('connection', ws, req);
|
|
});
|
|
} else {
|
|
socket.destroy();
|
|
}
|
|
});
|
|
|
|
wss.on('connection', (ws, req) => {
|
|
// Extract roomId from path: /webrtc-relay/<roomId>
|
|
const url = req.url || '';
|
|
const parts = url.split('/').filter(Boolean);
|
|
const roomId = parts[1] ? decodeURIComponent(parts[1]) : 'default';
|
|
|
|
console.log(`[webrtc-relay] ✅ New connection room="${roomId}" from ${req.socket.remoteAddress}`);
|
|
|
|
let ffmpeg = null;
|
|
let configured = false;
|
|
let headerSent = false;
|
|
|
|
ws.on('message', (data, isBinary) => {
|
|
// First message: JSON config
|
|
if (!configured) {
|
|
try {
|
|
const cfg = JSON.parse(data.toString());
|
|
if (cfg.type === 'config') {
|
|
configured = true;
|
|
|
|
// Kill any existing session for this room
|
|
const existing = relaySessions.get(roomId);
|
|
if (existing) {
|
|
try { existing.ffmpeg.stdin.destroy(); } catch(_) {}
|
|
try { existing.ffmpeg.kill('SIGKILL'); } catch(_) {}
|
|
relaySessions.delete(roomId);
|
|
}
|
|
|
|
ffmpeg = startFfmpegRelay(roomId, cfg.videoBitrate, cfg.audioBitrate);
|
|
|
|
relaySessions.set(roomId, { ws, ffmpeg, startedAt: Date.now() });
|
|
|
|
ffmpeg.on('spawn', () => {
|
|
ws.send(JSON.stringify({ type: 'ready', message: 'FFmpeg relay started' }));
|
|
});
|
|
|
|
ffmpeg.on('close', () => {
|
|
if (ws.readyState === WebSocket.OPEN) {
|
|
ws.send(JSON.stringify({ type: 'info', message: 'FFmpeg process ended' }));
|
|
}
|
|
});
|
|
|
|
ws.send(JSON.stringify({ type: 'info', message: `Relay configurado → ${buildRtmpUrl(roomId)}` }));
|
|
return;
|
|
}
|
|
} catch(_) {
|
|
// Not JSON → treat as binary from the start
|
|
configured = true;
|
|
}
|
|
}
|
|
|
|
// Binary data: forward to FFmpeg stdin
|
|
if (ffmpeg && ffmpeg.stdin.writable) {
|
|
if (!headerSent) {
|
|
headerSent = true;
|
|
console.log(`[webrtc-relay] First binary chunk (${isBinary ? 'binary' : 'text'}) for room="${roomId}"`);
|
|
}
|
|
try {
|
|
ffmpeg.stdin.write(data);
|
|
} catch(e) {
|
|
console.warn(`[webrtc-relay] stdin write error (room=${roomId}): ${e.message}`);
|
|
}
|
|
}
|
|
});
|
|
|
|
ws.on('close', (code, reason) => {
|
|
console.log(`[webrtc-relay] 🔌 Client disconnected room="${roomId}" code=${code}`);
|
|
if (ffmpeg) {
|
|
try { ffmpeg.stdin.end(); } catch(_) {}
|
|
setTimeout(() => {
|
|
try { ffmpeg.kill('SIGTERM'); } catch(_) {}
|
|
}, 2000);
|
|
}
|
|
relaySessions.delete(roomId);
|
|
});
|
|
|
|
ws.on('error', (err) => {
|
|
console.error(`[webrtc-relay] WS error (room=${roomId}): ${err.message}`);
|
|
});
|
|
});
|
|
|
|
// ── Start server ──────────────────────────────────────────────────────────────
|
|
httpServer.listen(PORT, '0.0.0.0', () => {
|
|
console.log(`\n[server] ✅ http://0.0.0.0:${PORT}`);
|
|
console.log(`[server] 💾 Config: ${CFG_PATH}`);
|
|
console.log(`[server] 🔐 Encryption: AES-256-GCM`);
|
|
console.log(`[server] 🎬 WebRTC Relay (legacy): ws://0.0.0.0:${PORT}/webrtc-relay/<roomId>`);
|
|
console.log(`[server] 🔴 LiveKit: ${LK_WS_URL || '(not configured)'}`);
|
|
console.log(`[server] 📡 RTMP target: rtmp://${RTMP_HOST}:${RTMP_PORT}/${RTMP_APP}/<stream>.stream\n`);
|
|
console.log(' GET /health');
|
|
console.log(' ── LiveKit ───────────────────────────────────────────');
|
|
console.log(' GET /livekit/config');
|
|
console.log(' POST /livekit/token { roomName, participantName?, canPublish?, canSubscribe? }');
|
|
console.log(' POST /livekit/relay/start { roomName, streamName? }');
|
|
console.log(' POST /livekit/relay/stop { roomName }');
|
|
console.log(' GET /livekit/relay/status');
|
|
console.log(' GET /webrtc-relay/status (legacy WebSocket relay)');
|
|
console.log(' ── Facebook ──────────────────────────────────────────');
|
|
console.log(' GET /fb/config');
|
|
console.log(' PUT /fb/config { app_id, app_secret }');
|
|
console.log(' GET /fb/accounts');
|
|
console.log(' GET /fb/accounts/:id/token');
|
|
console.log(' DELETE /fb/accounts/:id');
|
|
console.log(' PUT /fb/accounts/:id/context { restreamer_channel_id, restreamer_publication_id }');
|
|
console.log(' POST /fb/exchange { code, redirect_uri, restreamer_channel_id, restreamer_publication_id } ← Auth Code→Long-lived');
|
|
console.log(' POST /fb/upgrade { access_token, restreamer_channel_id, restreamer_publication_id } ← Short-lived→Long-lived');
|
|
console.log(' POST /fb/refresh/:id ← Renew long-lived token');
|
|
console.log(' ── YouTube ───────────────────────────────────────────');
|
|
console.log(' GET /yt/config');
|
|
console.log(' PUT /yt/config { client_id, client_secret }');
|
|
console.log(' GET /yt/config/full');
|
|
console.log(' GET /yt/accounts');
|
|
console.log(' GET /yt/accounts/:key/token');
|
|
console.log(' DELETE /yt/accounts/:key');
|
|
console.log(' PUT /yt/accounts/:key/context { restreamer_channel_id, restreamer_publication_id }');
|
|
console.log(' POST /yt/accounts { account_key, access_token, restreamer_channel_id, restreamer_publication_id, ... }');
|
|
console.log(' POST /yt/exchange { code, redirect_uri, restreamer_channel_id, restreamer_publication_id } ← Auth Code flow');
|
|
console.log(' POST /yt/accounts/:key/refresh ← Refresh access_token\n');
|
|
});
|
|
|
|
process.on('SIGINT', () => process.exit(0));
|
|
process.on('SIGTERM', () => process.exit(0));
|
|
|