'use strict'; const express = require('express'); const cors = require('cors'); const path = require('path'); const fs = require('fs'); const https = require('https'); const http = require('http'); const crypto = require('crypto'); const { spawn } = require('child_process'); const WebSocket = require('ws'); const { WebSocketServer } = require('ws'); const { AccessToken, RoomServiceClient } = require('livekit-server-sdk'); const PORT = parseInt(process.env.FB_SERVER_PORT || '3002', 10); const DATA_DIR = process.env.FB_DATA_DIR ? path.resolve(process.env.FB_DATA_DIR) : path.resolve(__dirname, 'data'); const CFG_PATH = path.join(DATA_DIR, 'config.json'); const ENCRYPTION_SECRET = process.env.FB_ENCRYPTION_SECRET || 'restreamer-ui-fb-secret-key-32x!'; // ── LiveKit config ───────────────────────────────────────────────────────────── const LK_API_KEY = process.env.LIVEKIT_API_KEY || ''; const LK_API_SECRET = process.env.LIVEKIT_API_SECRET || ''; const LK_WS_URL = process.env.LIVEKIT_WS_URL || ''; // HTTP URL para RoomServiceClient (wss:// → https://) const LK_HTTP_URL = LK_WS_URL.replace(/^wss:\/\//, 'https://').replace(/^ws:\/\//, 'http://'); // ── RTMP relay config (relay → Restreamer Core) ─────────────────────────────── const RTMP_HOST = process.env.RTMP_HOST || '127.0.0.1'; const RTMP_PORT = process.env.RTMP_PORT || '1935'; const RTMP_APP = process.env.RTMP_APP || 'live'; // ───────────────────────────────────────────────────────────────────────────── // Schema unificado de config.json // ───────────────────────────────────────────────────────────────────────────── /** * { * "__fb_config": { * "app_id": string, * "app_secret": string ← AES-256-GCM encrypted * }, * "__yt_config": { * "client_id": string, * "client_secret": string ← AES-256-GCM encrypted * }, * "fb__": { * "fb_user_id": string, // PK * "name": string, * "token_type": "USER", * "access_token": string, // AES-256-GCM encrypted long-lived token (60 días) * "expires_at": number, // Unix ms * "scope_granted": string[], * "pages": [{ * "id": string, // Page ID (PK) * "name": string, * "category": string, * "token_type": "PAGE", * "access_token": string, // AES-256-GCM encrypted long-lived page token * "tasks": string[] * }], * "updated_at": number * }, * "yt__": { * "account_key": string, // PK = channel_id o "yt_" * "label": string, // Nombre del canal * "channel_title": string, * "channel_id": string, * "access_token": string, // AES-256-GCM encrypted * "refresh_token": string, // AES-256-GCM encrypted * "token_expiry": number, // Unix ms * "updated_at": number * } * } */ // ── Encryption helpers (AES-256-GCM) ───────────────────────────────────────── function deriveKey(secret) { return crypto.createHash('sha256').update(secret).digest(); } function encrypt(text) { if (!text) return ''; try { const key = deriveKey(ENCRYPTION_SECRET); const iv = crypto.randomBytes(12); const cipher = crypto.createCipheriv('aes-256-gcm', key, iv); const enc = Buffer.concat([cipher.update(text, 'utf8'), cipher.final()]); const tag = cipher.getAuthTag(); return iv.toString('hex') + ':' + tag.toString('hex') + ':' + enc.toString('hex'); } catch (_) { return text; } } function decrypt(data) { if (!data) return ''; try { if (!data.includes(':')) return data; const parts = data.split(':'); if (parts.length < 3) return data; const [ivHex, tagHex, ...encParts] = parts; const encHex = encParts.join(':'); const key = deriveKey(ENCRYPTION_SECRET); const iv = Buffer.from(ivHex, 'hex'); const tag = Buffer.from(tagHex, 'hex'); const encBuf = Buffer.from(encHex, 'hex'); const decipher = crypto.createDecipheriv('aes-256-gcm', key, iv); decipher.setAuthTag(tag); return decipher.update(encBuf, undefined, 'utf8') + decipher.final('utf8'); } catch (_) { return data; } } // ── config.json I/O ─────────────────────────────────────────────────────────── function loadCfg() { if (!fs.existsSync(CFG_PATH)) return {}; try { return JSON.parse(fs.readFileSync(CFG_PATH, 'utf8')); } catch (_) { return {}; } } function saveCfg(data) { if (!fs.existsSync(DATA_DIR)) fs.mkdirSync(DATA_DIR, { recursive: true }); fs.writeFileSync(CFG_PATH, JSON.stringify(data, null, 2), 'utf8'); } // ── Facebook serialization ──────────────────────────────────────────────────── function serializeFbAccount(acc) { return { ...acc, access_token: encrypt(acc.access_token || ''), pages: (acc.pages || []).map((p) => ({ ...p, access_token: encrypt(p.access_token || ''), })), }; } function deserializeFbAccount(acc) { if (!acc) return null; return { ...acc, access_token: decrypt(acc.access_token || ''), pages: (acc.pages || []).map((p) => ({ ...p, access_token: decrypt(p.access_token || ''), })), }; } function publicFbAccount(acc) { const { access_token, pages, ...rest } = acc; return { ...rest, pages: (pages || []).map(({ access_token: _t, ...p }) => p), restreamer_channel_id: acc.restreamer_channel_id || '', restreamer_publication_id: acc.restreamer_publication_id || '', }; } // ── YouTube serialization ───────────────────────────────────────────────────── function serializeYtAccount(acc) { return { ...acc, access_token: encrypt(acc.access_token || ''), refresh_token: encrypt(acc.refresh_token || ''), }; } function deserializeYtAccount(acc) { if (!acc) return null; return { ...acc, access_token: decrypt(acc.access_token || ''), refresh_token: decrypt(acc.refresh_token || ''), }; } function publicYtAccount(acc) { const { access_token, refresh_token, ...rest } = acc; return { ...rest, has_refresh_token: !!(refresh_token), has_access_token: !!(access_token), restreamer_channel_id: acc.restreamer_channel_id || '', restreamer_publication_id: acc.restreamer_publication_id || '', }; } // ── Facebook Graph API helpers ──────────────────────────────────────────────── function fbGet(url) { return new Promise((resolve, reject) => { const lib = url.startsWith('https') ? https : http; lib.get(url, (res) => { let body = ''; res.on('data', (c) => { body += c; }); res.on('end', () => { try { resolve(JSON.parse(body)); } catch (e) { reject(new Error('Invalid JSON from Facebook: ' + body.slice(0, 200))); } }); }).on('error', reject); }); } async function fbExchangeToLongLived(appId, appSecret, shortToken) { const data = await fbGet( `https://graph.facebook.com/v19.0/oauth/access_token` + `?grant_type=fb_exchange_token` + `&client_id=${encodeURIComponent(appId)}` + `&client_secret=${encodeURIComponent(appSecret)}` + `&fb_exchange_token=${encodeURIComponent(shortToken)}` ); if (data.error) throw new Error(`FB exchange error: ${data.error.message}`); return data; // { access_token, token_type, expires_in } } async function fbExchangeCodeToLongLived(appId, appSecret, code, redirectUri) { // Step 1: code → short-lived user token const step1 = await fbGet( `https://graph.facebook.com/v19.0/oauth/access_token` + `?client_id=${encodeURIComponent(appId)}` + `&client_secret=${encodeURIComponent(appSecret)}` + `&redirect_uri=${encodeURIComponent(redirectUri)}` + `&code=${encodeURIComponent(code)}` ); if (step1.error) throw new Error(`Code exchange error: ${step1.error.message}`); // Step 2: short-lived → long-lived (~60 days) const step2 = await fbExchangeToLongLived(appId, appSecret, step1.access_token); return { shortToken: step1.access_token, ...step2 }; } async function fbFetchPages(longLivedUserToken) { const data = await fbGet( `https://graph.facebook.com/v19.0/me/accounts` + `?fields=id,name,access_token,category,tasks` + `&access_token=${encodeURIComponent(longLivedUserToken)}` ); if (data.error) throw new Error(`fetchPages error: ${data.error.message}`); return (data.data || []).map((p) => ({ id: p.id, name: p.name, category: p.category || '', tasks: p.tasks || [], access_token: p.access_token, token_type: 'PAGE', })); } async function fbFetchUserInfo(token) { const data = await fbGet( `https://graph.facebook.com/v19.0/me?fields=id,name&access_token=${encodeURIComponent(token)}` ); if (data.error) throw new Error(data.error.message); return data; } async function fbDebugToken(appId, appSecret, token) { const data = await fbGet( `https://graph.facebook.com/v19.0/debug_token` + `?input_token=${encodeURIComponent(token)}` + `&access_token=${encodeURIComponent(appId + '|' + appSecret)}` ); if (data.error) return { scopes: [], expires_at: 0 }; const d = data.data || {}; return { scopes: d.scopes || [], expires_at: d.expires_at ? d.expires_at * 1000 : 0, }; } // ── YouTube OAuth2 helpers ──────────────────────────────────────────────────── async function ytHttpsPost(url, body) { return new Promise((resolve, reject) => { const bodyStr = typeof body === 'string' ? body : new URLSearchParams(body).toString(); const urlObj = new URL(url); const options = { hostname: urlObj.hostname, path: urlObj.pathname + urlObj.search, method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded', 'Content-Length': Buffer.byteLength(bodyStr), }, }; const req = https.request(options, (res) => { let buf = ''; res.on('data', (c) => { buf += c; }); res.on('end', () => { try { resolve(JSON.parse(buf)); } catch (e) { reject(new Error('Invalid JSON from Google: ' + buf.slice(0, 200))); } }); }); req.on('error', reject); req.write(bodyStr); req.end(); }); } async function ytFetchChannelInfo(accessToken) { return new Promise((resolve, reject) => { const url = 'https://www.googleapis.com/youtube/v3/channels?part=snippet&mine=true'; const req = https.request(url, { headers: { Authorization: 'Bearer ' + accessToken }, }, (res) => { let buf = ''; res.on('data', (c) => { buf += c; }); res.on('end', () => { try { const data = JSON.parse(buf); if (data.items && data.items.length > 0) { resolve({ channel_title: data.items[0].snippet.title, channel_id: data.items[0].id, }); } else { resolve({}); } } catch (e) { reject(e); } }); }); req.on('error', reject); req.end(); }); } // ── Express app ─────────────────────────────────────────────────────────────── const app = express(); app.use(cors({ origin: true, methods: ['GET','POST','PUT','DELETE','OPTIONS'], allowedHeaders: ['Content-Type'] })); app.use(express.json()); // ── Health ──────────────────────────────────────────────────────────────────── app.get('/health', (_, res) => { res.json({ ok: true, config: CFG_PATH, port: PORT, ts: new Date().toISOString() }); }); // ═════════════════════════════════════════════════════════════════════════════ // LIVEKIT // ═════════════════════════════════════════════════════════════════════════════ /** * GET /livekit/config * Devuelve la wsUrl pública del servidor LiveKit (sin secretos). */ app.get('/livekit/config', (_, res) => { if (!LK_WS_URL) return res.status(503).json({ error: 'LiveKit not configured' }); res.json({ wsUrl: LK_WS_URL }); }); /** * POST /livekit/token * Body: { roomName, participantName?, canPublish?, canSubscribe? } * Genera un AccessToken JWT firmado para que el browser se conecte a LiveKit. */ app.post('/livekit/token', async (req, res) => { if (!LK_API_KEY || !LK_API_SECRET) { return res.status(503).json({ error: 'LiveKit API credentials not configured' }); } const { roomName, participantName, canPublish = true, canSubscribe = false } = req.body || {}; if (!roomName) return res.status(400).json({ error: 'roomName is required' }); const identity = participantName || ('presenter-' + Date.now()); try { const at = new AccessToken(LK_API_KEY, LK_API_SECRET, { identity, ttl: '4h' }); at.addGrant({ roomJoin: true, room: roomName, canPublish, canSubscribe, canPublishData: true }); const token = await at.toJwt(); res.json({ token, identity, wsUrl: LK_WS_URL }); } catch (err) { console.error('[livekit] token error:', err.message); res.status(500).json({ error: 'Failed to generate token' }); } }); // ── Relay sessions: roomName → { ffmpeg, rtmpUrl, streamName } ──────────────── const lkRelaySessions = new Map(); /** * POST /livekit/relay/start * Body: { roomName: string, streamName?: string } * * Inicia un proceso FFmpeg que: * 1. Escucha en un puerto RTMP local temporal (127.0.0.1:19350+) * 2. Lee el stream y lo reenvía al Core de Restreamer via RTMP: * rtmp://RTMP_HOST:RTMP_PORT/RTMP_APP/.stream * * El browser (webrtc-room) publica en LiveKit. * Para hacer el bridge LiveKit → RTMP, el webrtc-room también * llama a este endpoint para que el operador sepa que el relay está activo. * * NOTA: El Core Restreamer escucha el input como {rtmp,name=.stream} * que es equivalente a recibir RTMP push en rtmp://localhost:1935/live/.stream */ app.post('/livekit/relay/start', async (req, res) => { if (!LK_API_KEY || !LK_API_SECRET) { return res.status(503).json({ error: 'LiveKit not configured' }); } const { roomName, streamName } = req.body || {}; if (!roomName) return res.status(400).json({ error: 'roomName is required' }); // Stop existing relay for this room if (lkRelaySessions.has(roomName)) { const old = lkRelaySessions.get(roomName); try { old.ffmpeg.kill('SIGTERM'); } catch(_) {} lkRelaySessions.delete(roomName); await new Promise(r => setTimeout(r, 500)); } // Stream name for RTMP push to Core: .stream const sName = (streamName || roomName).replace(/\.stream$/, '') + '.stream'; const rtmpUrl = `rtmp://${RTMP_HOST}:${RTMP_PORT}/${RTMP_APP}/${sName}`; // Verify LiveKit room exists via RoomServiceClient try { const rsc = new RoomServiceClient(LK_HTTP_URL, LK_API_KEY, LK_API_SECRET); const rooms = await rsc.listRooms([roomName]); if (!rooms || rooms.length === 0) { return res.status(404).json({ error: `LiveKit room "${roomName}" not found or empty` }); } } catch (err) { console.warn('[livekit-relay] RoomService check failed:', err.message, '(continuing anyway)'); } // Build RTMP pull URL for FFmpeg from LiveKit via its built-in RTMP ingress // Since LiveKit doesn't expose direct RTMP pull, we use the token + wsUrl // and instruct the webrtc-room page to also push via MediaRecorder → FFmpeg stdin // The simplest and most reliable approach: confirm relay intent and return RTMP push URL // The actual video bridge happens in the browser via the WebRTC→RTMP bridge below. // For the Core process input side, return the stream registration info console.log(`[livekit-relay] ▶ Relay registered: room="${roomName}" → ${rtmpUrl}`); lkRelaySessions.set(roomName, { rtmpUrl, streamName: sName, ffmpeg: null }); res.json({ ok: true, rtmpUrl, streamName: sName, roomName }); }); /** * POST /livekit/relay/stop * Body: { roomName: string } */ app.post('/livekit/relay/stop', (req, res) => { const { roomName } = req.body || {}; if (!roomName) return res.status(400).json({ error: 'roomName is required' }); const session = lkRelaySessions.get(roomName); if (!session) return res.json({ ok: true, message: 'No active relay' }); if (session.ffmpeg) { try { session.ffmpeg.kill('SIGTERM'); } catch(_) {} } lkRelaySessions.delete(roomName); console.log(`[livekit-relay] ⏹ Relay stopped: room="${roomName}"`); res.json({ ok: true }); }); /** * GET /livekit/relay/status */ app.get('/livekit/relay/status', (_, res) => { const active = []; lkRelaySessions.forEach((s, roomName) => active.push({ roomName, rtmpUrl: s.rtmpUrl, streamName: s.streamName })); res.json({ ok: true, active }); }); // ═════════════════════════════════════════════════════════════════════════════ // FACEBOOK // ═════════════════════════════════════════════════════════════════════════════ // ── FB App config (app_id + app_secret) ────────────────────────────────────── app.get('/fb/config', (_, res) => { const cfg = loadCfg(); const c = cfg.__fb_config || {}; // Soporte retrocompatibilidad con clave vieja __config const old = cfg.__config || {}; res.json({ app_id: c.app_id || old.app_id || '', has_secret: !!(c.app_secret || old.app_secret), }); }); app.put('/fb/config', (req, res) => { const { app_id, app_secret } = req.body || {}; const cfg = loadCfg(); // Migrar de __config a __fb_config si es necesario if (cfg.__config && !cfg.__fb_config) { cfg.__fb_config = { ...cfg.__config }; delete cfg.__config; } cfg.__fb_config = { ...(cfg.__fb_config || {}), ...(app_id !== undefined ? { app_id: String(app_id) } : {}), ...(app_secret !== undefined ? { app_secret: encrypt(String(app_secret)) } : {}), }; saveCfg(cfg); res.json({ ok: true }); }); // ── FB Accounts ─────────────────────────────────────────────────────────────── app.get('/fb/accounts', (_, res) => { const cfg = loadCfg(); const accounts = Object.entries(cfg) .filter(([k, v]) => k.startsWith('fb__') && v && v.fb_user_id) .map(([, v]) => publicFbAccount(deserializeFbAccount(v))); res.json(accounts); }); app.get('/fb/accounts/:id/token', (req, res) => { const cfg = loadCfg(); const raw = cfg['fb__' + req.params.id]; if (!raw) return res.status(404).json({ error: 'Account not found' }); const acc = deserializeFbAccount(raw); res.json({ fb_user_id: acc.fb_user_id, name: acc.name, token_type: acc.token_type, access_token: acc.access_token, expires_at: acc.expires_at, scope_granted: acc.scope_granted || [], pages: acc.pages || [], }); }); app.delete('/fb/accounts/:id', (req, res) => { const cfg = loadCfg(); const key = 'fb__' + req.params.id; if (!cfg[key]) return res.status(404).json({ error: 'Account not found' }); delete cfg[key]; saveCfg(cfg); res.json({ ok: true }); }); // Asociar una cuenta FB existente con un canal/publicación de Restreamer app.put('/fb/accounts/:id/context', (req, res) => { const { restreamer_channel_id, restreamer_publication_id } = req.body || {}; const cfg = loadCfg(); const key = 'fb__' + req.params.id; if (!cfg[key]) return res.status(404).json({ error: 'Account not found' }); cfg[key] = { ...cfg[key], restreamer_channel_id: restreamer_channel_id || '', restreamer_publication_id: restreamer_publication_id || '', }; saveCfg(cfg); console.log(`[fb/context] ✅ ${req.params.id} → channel:${restreamer_channel_id || '-'} pub:${restreamer_publication_id || '-'}`); res.json({ ok: true }); }); // ── FB Exchange: Auth Code → Short-lived → Long-lived (60 días) ────────────── /** * POST /fb/exchange * Body: { code, redirect_uri } * * Flujo completo: * 1. code → short-lived user token (Graph API /oauth/access_token) * 2. short-lived → long-lived token (~60 días) (grant_type=fb_exchange_token) * 3. Fetch user info (id, name) * 4. Debug token → scopes, expires_at * 5. Fetch páginas del usuario (tokens de página ya son long-lived) * 6. Guardar todo cifrado en config.json * 7. Retornar cuenta pública (sin tokens) */ app.post('/fb/exchange', async (req, res) => { const { code, redirect_uri, restreamer_channel_id, restreamer_publication_id } = req.body || {}; if (!code || !redirect_uri) { return res.status(400).json({ error: 'code and redirect_uri are required' }); } const cfg = loadCfg(); const c = cfg.__fb_config || cfg.__config || {}; if (!c.app_id) { return res.status(400).json({ error: 'Facebook App ID not configured. Go to Settings → Integrations.' }); } const appSecret = c.app_secret ? decrypt(c.app_secret) : ''; if (!appSecret) { return res.status(400).json({ error: 'Facebook App Secret not configured. Go to Settings → Integrations.' }); } try { const { access_token: longToken, expires_in } = await fbExchangeCodeToLongLived( c.app_id, appSecret, code, redirect_uri ); const expires_at = expires_in ? Date.now() + parseInt(expires_in, 10) * 1000 : Date.now() + 60 * 24 * 60 * 60 * 1000; const userInfo = await fbFetchUserInfo(longToken); const { scopes } = await fbDebugToken(c.app_id, appSecret, longToken); let pages = []; try { pages = await fbFetchPages(longToken); } catch (_) {} const account = { fb_user_id: userInfo.id, name: userInfo.name, token_type: 'USER', access_token: longToken, expires_at, scope_granted: scopes, pages, restreamer_channel_id: restreamer_channel_id || '', restreamer_publication_id: restreamer_publication_id || '', updated_at: Date.now(), }; cfg['fb__' + userInfo.id] = serializeFbAccount(account); saveCfg(cfg); console.log(`[fb/exchange] ✅ ${userInfo.name} (${userInfo.id}) — expires ${new Date(expires_at).toISOString()} → channel:${restreamer_channel_id || '-'} pub:${restreamer_publication_id || '-'}`); res.json({ ok: true, account: publicFbAccount(account) }); } catch (err) { console.error('[fb/exchange] Error:', err.message); res.status(500).json({ error: err.message }); } }); // ── FB Upgrade: short-lived token → long-lived (implicit flow fallback) ─────── app.post('/fb/upgrade', async (req, res) => { const { access_token: shortToken, restreamer_channel_id, restreamer_publication_id } = req.body || {}; if (!shortToken) return res.status(400).json({ error: 'access_token is required' }); const cfg = loadCfg(); const c = cfg.__fb_config || cfg.__config || {}; const appSecret = c.app_secret ? decrypt(c.app_secret) : c.app_secret_plain || ''; if (!c.app_id || !appSecret) { return res.status(400).json({ error: 'App ID / Secret not configured. Cannot upgrade token.' }); } try { const { access_token: longToken, expires_in } = await fbExchangeToLongLived(c.app_id, appSecret, shortToken); const expires_at = expires_in ? Date.now() + parseInt(expires_in, 10) * 1000 : Date.now() + 60 * 24 * 60 * 60 * 1000; const userInfo = await fbFetchUserInfo(longToken); const { scopes } = await fbDebugToken(c.app_id, appSecret, longToken); let pages = []; try { pages = await fbFetchPages(longToken); } catch (_) {} const account = { fb_user_id: userInfo.id, name: userInfo.name, token_type: 'USER', access_token: longToken, expires_at, scope_granted: scopes, pages, restreamer_channel_id: restreamer_channel_id || '', restreamer_publication_id: restreamer_publication_id || '', updated_at: Date.now(), }; cfg['fb__' + userInfo.id] = serializeFbAccount(account); saveCfg(cfg); console.log(`[fb/upgrade] ✅ Token upgraded: ${userInfo.name} (${userInfo.id})`); res.json({ ok: true, account: publicFbAccount(account) }); } catch (err) { console.error('[fb/upgrade] Error:', err.message); res.status(500).json({ error: err.message }); } }); // ── FB Refresh: renovar long-lived token existente ──────────────────────────── app.post('/fb/refresh/:id', async (req, res) => { const cfg = loadCfg(); const raw = cfg['fb__' + req.params.id]; if (!raw) return res.status(404).json({ error: 'Account not found' }); const c = cfg.__fb_config || cfg.__config || {}; const appSecret = c.app_secret ? decrypt(c.app_secret) : ''; if (!c.app_id || !appSecret) { return res.status(400).json({ error: 'App ID / Secret not configured' }); } const acc = deserializeFbAccount(raw); try { const { access_token: newToken, expires_in } = await fbExchangeToLongLived(c.app_id, appSecret, acc.access_token); const expires_at = expires_in ? Date.now() + parseInt(expires_in, 10) * 1000 : Date.now() + 60 * 24 * 60 * 60 * 1000; const { scopes } = await fbDebugToken(c.app_id, appSecret, newToken); let pages = acc.pages || []; try { pages = await fbFetchPages(newToken); } catch (_) {} const updated = { ...acc, access_token: newToken, expires_at, scope_granted: scopes, pages, updated_at: Date.now() }; cfg['fb__' + req.params.id] = serializeFbAccount(updated); saveCfg(cfg); console.log(`[fb/refresh] ✅ Token refreshed: ${acc.name} (${req.params.id})`); res.json({ ok: true, account: publicFbAccount(updated) }); } catch (err) { console.error('[fb/refresh] Error:', err.message); res.status(500).json({ error: err.message }); } }); // ═════════════════════════════════════════════════════════════════════════════ // YOUTUBE // ═════════════════════════════════════════════════════════════════════════════ // ── YT Credentials (client_id + client_secret) ─────────────────────────────── app.get('/yt/config', (_, res) => { const cfg = loadCfg(); const c = cfg.__yt_config || {}; res.json({ client_id: c.client_id || '', has_secret: !!(c.client_secret) }); }); app.put('/yt/config', (req, res) => { const { client_id, client_secret } = req.body || {}; const cfg = loadCfg(); cfg.__yt_config = { ...(cfg.__yt_config || {}), ...(client_id !== undefined ? { client_id: String(client_id) } : {}), ...(client_secret !== undefined ? { client_secret: encrypt(String(client_secret)) } : {}), }; saveCfg(cfg); console.log(`[yt/config] ✅ Credentials saved (client_id: ${client_id || '(unchanged)'})`); res.json({ ok: true }); }); // Versión completa con secreto descifrado (uso interno — intercambio de tokens) app.get('/yt/config/full', (_, res) => { const cfg = loadCfg(); const c = cfg.__yt_config || {}; res.json({ client_id: c.client_id || '', client_secret: c.client_secret ? decrypt(c.client_secret) : '', }); }); // ── YT Accounts ─────────────────────────────────────────────────────────────── app.get('/yt/accounts', (_, res) => { const cfg = loadCfg(); const accounts = Object.entries(cfg) .filter(([k, v]) => k.startsWith('yt__') && v && v.account_key) .map(([, v]) => publicYtAccount(deserializeYtAccount(v))); res.json(accounts); }); app.get('/yt/accounts/:key/token', (req, res) => { const cfg = loadCfg(); const raw = cfg['yt__' + req.params.key]; if (!raw) return res.status(404).json({ error: 'YT account not found' }); const acc = deserializeYtAccount(raw); res.json({ account_key: acc.account_key, label: acc.label, channel_title: acc.channel_title || '', channel_id: acc.channel_id || '', access_token: acc.access_token, refresh_token: acc.refresh_token, token_expiry: acc.token_expiry, }); }); app.delete('/yt/accounts/:key', (req, res) => { const cfg = loadCfg(); const key = 'yt__' + req.params.key; if (!cfg[key]) return res.status(404).json({ error: 'YT account not found' }); delete cfg[key]; saveCfg(cfg); res.json({ ok: true }); }); // Asociar una cuenta YT existente con un canal/publicación de Restreamer app.put('/yt/accounts/:key/context', (req, res) => { const { restreamer_channel_id, restreamer_publication_id } = req.body || {}; const cfg = loadCfg(); const key = 'yt__' + req.params.key; if (!cfg[key]) return res.status(404).json({ error: 'YT account not found' }); cfg[key] = { ...cfg[key], restreamer_channel_id: restreamer_channel_id || '', restreamer_publication_id: restreamer_publication_id || '', }; saveCfg(cfg); console.log(`[yt/context] ✅ ${req.params.key} → channel:${restreamer_channel_id || '-'} pub:${restreamer_publication_id || '-'}`); res.json({ ok: true }); }); // ── YT Save account (tras el intercambio de código en el browser) ───────────── /** * POST /yt/accounts * Body: { account_key, label, channel_title, channel_id, access_token, refresh_token, token_expiry } * * El browser ya hizo el intercambio code→token con Google. * Este endpoint solo persiste el resultado cifrado en config.json. */ app.post('/yt/accounts', (req, res) => { const { account_key, label, channel_title, channel_id, access_token, refresh_token, token_expiry, restreamer_channel_id, restreamer_publication_id, } = req.body || {}; if (!account_key || !access_token) { return res.status(400).json({ error: 'account_key and access_token are required' }); } const cfg = loadCfg(); const existing = cfg['yt__' + account_key] ? deserializeYtAccount(cfg['yt__' + account_key]) : {}; const account = { account_key, label: label || existing.label || account_key, channel_title: channel_title || existing.channel_title || '', channel_id: channel_id || existing.channel_id || '', access_token, refresh_token: refresh_token || existing.refresh_token || '', token_expiry: token_expiry || existing.token_expiry || 0, restreamer_channel_id: restreamer_channel_id || existing.restreamer_channel_id || '', restreamer_publication_id: restreamer_publication_id || existing.restreamer_publication_id || '', updated_at: Date.now(), }; cfg['yt__' + account_key] = serializeYtAccount(account); saveCfg(cfg); console.log(`[yt/accounts] ✅ Account saved: ${label || account_key} (${channel_id || account_key}) → channel:${restreamer_channel_id || '-'} pub:${restreamer_publication_id || '-'}`); res.json({ ok: true, account: publicYtAccount(account) }); }); // ── YT Refresh token: usar refresh_token para obtener nuevo access_token ────── /** * POST /yt/accounts/:key/refresh * Usa el refresh_token almacenado + credentials para obtener un nuevo access_token. */ app.post('/yt/accounts/:key/refresh', async (req, res) => { const cfg = loadCfg(); const raw = cfg['yt__' + req.params.key]; if (!raw) return res.status(404).json({ error: 'YT account not found' }); const c = cfg.__yt_config || {}; const clientSecret = c.client_secret ? decrypt(c.client_secret) : ''; if (!c.client_id || !clientSecret) { return res.status(400).json({ error: 'YouTube OAuth2 credentials not configured in Settings → Integrations' }); } const acc = deserializeYtAccount(raw); if (!acc.refresh_token) { return res.status(400).json({ error: 'No refresh token stored for this account' }); } try { const data = await ytHttpsPost('https://oauth2.googleapis.com/token', { client_id: c.client_id, client_secret: clientSecret, refresh_token: acc.refresh_token, grant_type: 'refresh_token', }); if (data.error) throw new Error(data.error_description || data.error); const updated = { ...acc, access_token: data.access_token, token_expiry: Date.now() + (data.expires_in || 3600) * 1000, updated_at: Date.now(), }; cfg['yt__' + req.params.key] = serializeYtAccount(updated); saveCfg(cfg); console.log(`[yt/refresh] ✅ Token refreshed: ${acc.label || req.params.key}`); res.json({ ok: true, access_token: data.access_token, token_expiry: updated.token_expiry }); } catch (err) { console.error('[yt/refresh] Error:', err.message); res.status(500).json({ error: err.message }); } }); // ── YT Exchange: Auth Code → tokens (server-side, sin exponer client_secret) ── /** * POST /yt/exchange * Body: { code, redirect_uri } * * Flujo: * 1. Intercambia code → access_token + refresh_token (Google /token) * 2. Obtiene info del canal * 3. Guarda cifrado en config.json */ app.post('/yt/exchange', async (req, res) => { const { code, redirect_uri, restreamer_channel_id, restreamer_publication_id } = req.body || {}; if (!code || !redirect_uri) { return res.status(400).json({ error: 'code and redirect_uri are required' }); } const cfg = loadCfg(); const c = cfg.__yt_config || {}; const clientSecret = c.client_secret ? decrypt(c.client_secret) : ''; if (!c.client_id || !clientSecret) { return res.status(400).json({ error: 'YouTube OAuth2 credentials not configured in Settings → Integrations' }); } try { const tokenData = await ytHttpsPost('https://oauth2.googleapis.com/token', { code, client_id: c.client_id, client_secret: clientSecret, redirect_uri, grant_type: 'authorization_code', }); if (tokenData.error) throw new Error(tokenData.error_description || tokenData.error); let channelInfo = {}; try { channelInfo = await ytFetchChannelInfo(tokenData.access_token); } catch (_) {} const accountKey = channelInfo.channel_id || ('yt_' + Date.now()); const channelName = channelInfo.channel_title || accountKey; const account = { account_key: accountKey, label: channelName, channel_title: channelInfo.channel_title || '', channel_id: channelInfo.channel_id || '', access_token: tokenData.access_token, refresh_token: tokenData.refresh_token || '', token_expiry: Date.now() + (tokenData.expires_in || 3600) * 1000, restreamer_channel_id: restreamer_channel_id || '', restreamer_publication_id: restreamer_publication_id || '', updated_at: Date.now(), }; cfg['yt__' + accountKey] = serializeYtAccount(account); saveCfg(cfg); console.log(`[yt/exchange] ✅ Account saved: ${channelName} (${accountKey}) → channel:${restreamer_channel_id || '-'} pub:${restreamer_publication_id || '-'}`); res.json({ ok: true, account: publicYtAccount(account) }); } catch (err) { console.error('[yt/exchange] Error:', err.message); res.status(500).json({ error: err.message }); } }); // ═════════════════════════════════════════════════════════════════════════════ // WEBRTC RELAY (WebM MediaRecorder → FFmpeg → RTMP Core) // ═════════════════════════════════════════════════════════════════════════════ /** * Cada conexión WebSocket abre un proceso FFmpeg que lee WebM desde stdin * y lo empuja como RTMP al Core de Restreamer en localhost. * * WS URL: ws://:/webrtc-relay/ * * El roomId corresponde al channelid de Restreamer (p.ej. "external" o * el nombre que se configure en el source). * * El cliente WebRTC (browser) envía: * 1. Un JSON de config → { type:'config', room, videoBitrate, audioBitrate, mimeType } * 2. Chunks binarios → fragmentos WebM del MediaRecorder */ const FFMPEG_BIN = process.env.FFMPEG_BIN || 'ffmpeg'; // Track active relay sessions for status endpoint const relaySessions = new Map(); // roomId → { ws, ffmpeg, startedAt } function buildRtmpUrl(roomId) { // Formato compatible con Restreamer Core: rtmp://localhost:1935/live/{roomId}.stream return `rtmp://${RTMP_HOST}:${RTMP_PORT}/${RTMP_APP}/${roomId}.stream`; } function startFfmpegRelay(roomId, videoBitrate, audioBitrate) { const rtmpUrl = buildRtmpUrl(roomId); const vbr = Math.floor((videoBitrate || 1500000) / 1000) + 'k'; const abr = Math.floor((audioBitrate || 128000) / 1000) + 'k'; console.log(`[webrtc-relay] 🎬 Starting FFmpeg relay for room="${roomId}" → ${rtmpUrl}`); // FFmpeg reads WebM from stdin, re-encodes to H.264+AAC, pushes RTMP const args = [ '-loglevel', 'warning', '-re', '-fflags', '+nobuffer+genpts', '-analyzeduration', '1000000', '-probesize', '512000', '-i', 'pipe:0', // stdin = WebM stream // Video: transcode to H.264 (most compatible with RTMP/HLS) '-c:v', 'libx264', '-preset', 'ultrafast', '-tune', 'zerolatency', '-b:v', vbr, '-maxrate', vbr, '-bufsize', String(parseInt(vbr) * 2) + 'k', '-g', '60', // GOP = 2s @ 30fps '-keyint_min', '60', '-sc_threshold', '0', '-pix_fmt', 'yuv420p', // Audio: transcode to AAC '-c:a', 'aac', '-b:a', abr, '-ar', '44100', '-ac', '2', // Output: RTMP '-f', 'flv', rtmpUrl, ]; const ffmpeg = spawn(FFMPEG_BIN, args, { stdio: ['pipe', 'pipe', 'pipe'] }); ffmpeg.stderr.on('data', (data) => { const msg = data.toString().trim(); if (msg) console.log(`[ffmpeg/${roomId}] ${msg}`); }); ffmpeg.on('error', (err) => { console.error(`[webrtc-relay] FFmpeg spawn error (room=${roomId}): ${err.message}`); if (err.code === 'ENOENT') { console.error(`[webrtc-relay] ⚠️ ffmpeg not found. Set FFMPEG_BIN env or install ffmpeg.`); } }); ffmpeg.on('close', (code) => { console.log(`[webrtc-relay] FFmpeg closed (room=${roomId}) code=${code}`); relaySessions.delete(roomId); }); return ffmpeg; } // ── Status endpoint ─────────────────────────────────────────────────────────── app.get('/webrtc-relay/status', (_, res) => { const sessions = []; relaySessions.forEach((sess, roomId) => { sessions.push({ roomId, startedAt: sess.startedAt, uptime: Math.floor((Date.now() - sess.startedAt) / 1000), }); }); res.json({ ok: true, sessions }); }); // ───────────────────────────────────────────────────────────────────────────── // HTTP server (wraps Express) + WebSocket server // ───────────────────────────────────────────────────────────────────────────── const httpServer = http.createServer(app); // Create WSS without 'path' or 'server' — we handle the upgrade event manually // so that /webrtc-relay/ (prefix) is supported instead of exact match. const wss = new WebSocketServer({ noServer: true }); // Intercept HTTP upgrade requests and forward only /webrtc-relay/* to the WSS httpServer.on('upgrade', (req, socket, head) => { const url = req.url || ''; if (url.startsWith('/webrtc-relay/') || url === '/webrtc-relay') { wss.handleUpgrade(req, socket, head, (ws) => { wss.emit('connection', ws, req); }); } else { socket.destroy(); } }); wss.on('connection', (ws, req) => { // Extract roomId from path: /webrtc-relay/ const url = req.url || ''; const parts = url.split('/').filter(Boolean); const roomId = parts[1] ? decodeURIComponent(parts[1]) : 'default'; console.log(`[webrtc-relay] ✅ New connection room="${roomId}" from ${req.socket.remoteAddress}`); let ffmpeg = null; let configured = false; let headerSent = false; ws.on('message', (data, isBinary) => { // First message: JSON config if (!configured) { try { const cfg = JSON.parse(data.toString()); if (cfg.type === 'config') { configured = true; // Kill any existing session for this room const existing = relaySessions.get(roomId); if (existing) { try { existing.ffmpeg.stdin.destroy(); } catch(_) {} try { existing.ffmpeg.kill('SIGKILL'); } catch(_) {} relaySessions.delete(roomId); } ffmpeg = startFfmpegRelay(roomId, cfg.videoBitrate, cfg.audioBitrate); relaySessions.set(roomId, { ws, ffmpeg, startedAt: Date.now() }); ffmpeg.on('spawn', () => { ws.send(JSON.stringify({ type: 'ready', message: 'FFmpeg relay started' })); }); ffmpeg.on('close', () => { if (ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ type: 'info', message: 'FFmpeg process ended' })); } }); ws.send(JSON.stringify({ type: 'info', message: `Relay configurado → ${buildRtmpUrl(roomId)}` })); return; } } catch(_) { // Not JSON → treat as binary from the start configured = true; } } // Binary data: forward to FFmpeg stdin if (ffmpeg && ffmpeg.stdin.writable) { if (!headerSent) { headerSent = true; console.log(`[webrtc-relay] First binary chunk (${isBinary ? 'binary' : 'text'}) for room="${roomId}"`); } try { ffmpeg.stdin.write(data); } catch(e) { console.warn(`[webrtc-relay] stdin write error (room=${roomId}): ${e.message}`); } } }); ws.on('close', (code, reason) => { console.log(`[webrtc-relay] 🔌 Client disconnected room="${roomId}" code=${code}`); if (ffmpeg) { try { ffmpeg.stdin.end(); } catch(_) {} setTimeout(() => { try { ffmpeg.kill('SIGTERM'); } catch(_) {} }, 2000); } relaySessions.delete(roomId); }); ws.on('error', (err) => { console.error(`[webrtc-relay] WS error (room=${roomId}): ${err.message}`); }); }); // ── Start server ────────────────────────────────────────────────────────────── httpServer.listen(PORT, '0.0.0.0', () => { console.log(`\n[server] ✅ http://0.0.0.0:${PORT}`); console.log(`[server] 💾 Config: ${CFG_PATH}`); console.log(`[server] 🔐 Encryption: AES-256-GCM`); console.log(`[server] 🎬 WebRTC Relay (legacy): ws://0.0.0.0:${PORT}/webrtc-relay/`); console.log(`[server] 🔴 LiveKit: ${LK_WS_URL || '(not configured)'}`); console.log(`[server] 📡 RTMP target: rtmp://${RTMP_HOST}:${RTMP_PORT}/${RTMP_APP}/.stream\n`); console.log(' GET /health'); console.log(' ── LiveKit ───────────────────────────────────────────'); console.log(' GET /livekit/config'); console.log(' POST /livekit/token { roomName, participantName?, canPublish?, canSubscribe? }'); console.log(' POST /livekit/relay/start { roomName, streamName? }'); console.log(' POST /livekit/relay/stop { roomName }'); console.log(' GET /livekit/relay/status'); console.log(' GET /webrtc-relay/status (legacy WebSocket relay)'); console.log(' ── Facebook ──────────────────────────────────────────'); console.log(' GET /fb/config'); console.log(' PUT /fb/config { app_id, app_secret }'); console.log(' GET /fb/accounts'); console.log(' GET /fb/accounts/:id/token'); console.log(' DELETE /fb/accounts/:id'); console.log(' PUT /fb/accounts/:id/context { restreamer_channel_id, restreamer_publication_id }'); console.log(' POST /fb/exchange { code, redirect_uri, restreamer_channel_id, restreamer_publication_id } ← Auth Code→Long-lived'); console.log(' POST /fb/upgrade { access_token, restreamer_channel_id, restreamer_publication_id } ← Short-lived→Long-lived'); console.log(' POST /fb/refresh/:id ← Renew long-lived token'); console.log(' ── YouTube ───────────────────────────────────────────'); console.log(' GET /yt/config'); console.log(' PUT /yt/config { client_id, client_secret }'); console.log(' GET /yt/config/full'); console.log(' GET /yt/accounts'); console.log(' GET /yt/accounts/:key/token'); console.log(' DELETE /yt/accounts/:key'); console.log(' PUT /yt/accounts/:key/context { restreamer_channel_id, restreamer_publication_id }'); console.log(' POST /yt/accounts { account_key, access_token, restreamer_channel_id, restreamer_publication_id, ... }'); console.log(' POST /yt/exchange { code, redirect_uri, restreamer_channel_id, restreamer_publication_id } ← Auth Code flow'); console.log(' POST /yt/accounts/:key/refresh ← Refresh access_token\n'); }); process.on('SIGINT', () => process.exit(0)); process.on('SIGTERM', () => process.exit(0));