305 lines
8.6 KiB
TypeScript
305 lines
8.6 KiB
TypeScript
/**
|
|
* Web server setup for HTTP-based MCP communication using Hono
|
|
*/
|
|
import { Hono } from 'hono';
|
|
import { cors } from 'hono/cors';
|
|
import { serve } from '@hono/node-server';
|
|
import { streamSSE } from 'hono/streaming';
|
|
import { v4 as uuid } from 'uuid';
|
|
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
|
|
import { JSONRPCMessage, JSONRPCMessageSchema } from '@modelcontextprotocol/sdk/types.js';
|
|
import type { Context } from 'hono';
|
|
import type { SSEStreamingApi } from 'hono/streaming';
|
|
import type { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
|
|
|
|
// Import server configuration constants
|
|
import { SERVER_NAME, SERVER_VERSION } from './index.js';
|
|
|
|
/**
|
|
* Custom SSE Transport implementation using Hono's streaming API
|
|
*/
|
|
class SSETransport implements Transport {
|
|
private _sessionId: string;
|
|
private stream: SSEStreamingApi;
|
|
private messageUrl: string;
|
|
|
|
onclose?: () => void;
|
|
onerror?: (error: Error) => void;
|
|
onmessage?: (message: JSONRPCMessage) => void;
|
|
|
|
constructor(messageUrl: string, stream: SSEStreamingApi) {
|
|
this._sessionId = uuid();
|
|
this.stream = stream;
|
|
this.messageUrl = messageUrl;
|
|
|
|
// Set up stream abort handler
|
|
this.stream.onAbort(() => {
|
|
console.error(`SSE connection aborted for session ${this._sessionId}`);
|
|
this.close();
|
|
});
|
|
}
|
|
|
|
get sessionId(): string {
|
|
return this._sessionId;
|
|
}
|
|
|
|
async start(): Promise<void> {
|
|
if (this.stream.closed) {
|
|
throw new Error('SSE transport already closed!');
|
|
}
|
|
|
|
// Send the endpoint information
|
|
await this.stream.writeSSE({
|
|
event: 'endpoint',
|
|
data: `${this.messageUrl}?sessionId=${this._sessionId}`,
|
|
});
|
|
|
|
// Send session ID and connection info in a format the client can understand
|
|
await this.stream.writeSSE({
|
|
event: 'session',
|
|
data: JSON.stringify({
|
|
type: 'session_id',
|
|
session_id: this._sessionId,
|
|
}),
|
|
});
|
|
|
|
// Send a welcome notification
|
|
await this.send({
|
|
jsonrpc: '2.0',
|
|
method: 'notification',
|
|
params: {
|
|
type: 'welcome',
|
|
clientInfo: {
|
|
sessionId: this._sessionId,
|
|
serverName: SERVER_NAME,
|
|
serverVersion: SERVER_VERSION,
|
|
},
|
|
},
|
|
});
|
|
}
|
|
|
|
async handlePostMessage(c: Context): Promise<Response> {
|
|
if (this.stream?.closed) {
|
|
return c.text('SSE connection closed', 400);
|
|
}
|
|
|
|
try {
|
|
// Parse and validate the message
|
|
const body = await c.req.json();
|
|
|
|
try {
|
|
// Parse and validate the message
|
|
const parsedMessage = JSONRPCMessageSchema.parse(body);
|
|
|
|
// Forward to the message handler
|
|
if (this.onmessage) {
|
|
this.onmessage(parsedMessage);
|
|
return c.text('Accepted', 202);
|
|
} else {
|
|
return c.text('No message handler defined', 500);
|
|
}
|
|
} catch (error) {
|
|
if (this.onerror) {
|
|
this.onerror(error instanceof Error ? error : new Error(String(error)));
|
|
}
|
|
console.error('Error parsing message:', error);
|
|
return c.text('Invalid message format', 400);
|
|
}
|
|
} catch (error) {
|
|
if (this.onerror) {
|
|
this.onerror(error instanceof Error ? error : new Error(String(error)));
|
|
}
|
|
console.error('Error processing request:', error);
|
|
return c.text('Error processing message', 400);
|
|
}
|
|
}
|
|
|
|
async close(): Promise<void> {
|
|
if (this.stream && !this.stream.closed) {
|
|
this.stream.abort();
|
|
}
|
|
|
|
if (this.onclose) {
|
|
this.onclose();
|
|
}
|
|
}
|
|
|
|
async send(message: JSONRPCMessage): Promise<void> {
|
|
if (this.stream.closed) {
|
|
throw new Error('Not connected');
|
|
}
|
|
|
|
await this.stream.writeSSE({
|
|
event: 'message',
|
|
data: JSON.stringify(message),
|
|
});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Sets up a web server for the MCP server using Server-Sent Events (SSE)
|
|
*
|
|
* @param server The MCP Server instance
|
|
* @param port The port to listen on (default: 3000)
|
|
* @returns The Hono app instance
|
|
*/
|
|
export async function setupWebServer(server: Server, port = 3000) {
|
|
// Create Hono app
|
|
const app = new Hono();
|
|
|
|
// Enable CORS
|
|
app.use('*', cors());
|
|
|
|
// Store active SSE transports by session ID
|
|
const transports: { [sessionId: string]: SSETransport } = {};
|
|
|
|
// Add a simple health check endpoint
|
|
app.get('/health', (c) => {
|
|
return c.json({ status: 'OK', server: SERVER_NAME, version: SERVER_VERSION });
|
|
});
|
|
|
|
// SSE endpoint for clients to connect to
|
|
app.get('/sse', (c) => {
|
|
return streamSSE(c, async (stream) => {
|
|
// Create SSE transport
|
|
const transport = new SSETransport('/api/messages', stream);
|
|
const sessionId = transport.sessionId;
|
|
|
|
console.error(`New SSE connection established: ${sessionId}`);
|
|
|
|
// Store the transport
|
|
transports[sessionId] = transport;
|
|
|
|
// Set up cleanup on transport close
|
|
transport.onclose = () => {
|
|
console.error(`SSE connection closed for session ${sessionId}`);
|
|
delete transports[sessionId];
|
|
};
|
|
|
|
// Make the transport available to the MCP server
|
|
try {
|
|
transport.onmessage = async (message: JSONRPCMessage) => {
|
|
try {
|
|
// The server will automatically send a response via the transport
|
|
// if the message has an ID (i.e., it's a request, not a notification)
|
|
} catch (error) {
|
|
console.error('Error handling MCP message:', error);
|
|
}
|
|
};
|
|
|
|
// Connect to the MCP server
|
|
await server.connect(transport);
|
|
} catch (error) {
|
|
console.error(`Error connecting transport for session ${sessionId}:`, error);
|
|
}
|
|
|
|
// Keep the stream open until aborted
|
|
while (!stream.closed) {
|
|
await stream.sleep(1000);
|
|
}
|
|
});
|
|
});
|
|
|
|
// API endpoint for clients to send messages
|
|
app.post('/api/messages', async (c) => {
|
|
const sessionId = c.req.query('sessionId');
|
|
|
|
if (!sessionId) {
|
|
return c.json({ error: 'Missing sessionId query parameter' }, 400);
|
|
}
|
|
|
|
const transport = transports[sessionId];
|
|
|
|
if (!transport) {
|
|
return c.json({ error: 'No active session found with the provided sessionId' }, 404);
|
|
}
|
|
|
|
return transport.handlePostMessage(c);
|
|
});
|
|
|
|
// Static files for the web client (if any)
|
|
app.get('/*', async (c) => {
|
|
const filePath = c.req.path === '/' ? '/index.html' : c.req.path;
|
|
try {
|
|
// Use Node.js fs to serve static files
|
|
const fs = await import('fs');
|
|
const path = await import('path');
|
|
const { fileURLToPath } = await import('url');
|
|
|
|
const __dirname = path.dirname(fileURLToPath(import.meta.url));
|
|
const publicPath = path.join(__dirname, '..', '..', 'public');
|
|
const fullPath = path.join(publicPath, filePath);
|
|
|
|
// Simple security check to prevent directory traversal
|
|
if (!fullPath.startsWith(publicPath)) {
|
|
return c.text('Forbidden', 403);
|
|
}
|
|
|
|
try {
|
|
const stat = fs.statSync(fullPath);
|
|
if (stat.isFile()) {
|
|
const content = fs.readFileSync(fullPath);
|
|
|
|
// Set content type based on file extension
|
|
const ext = path.extname(fullPath).toLowerCase();
|
|
let contentType = 'text/plain';
|
|
|
|
switch (ext) {
|
|
case '.html':
|
|
contentType = 'text/html';
|
|
break;
|
|
case '.css':
|
|
contentType = 'text/css';
|
|
break;
|
|
case '.js':
|
|
contentType = 'text/javascript';
|
|
break;
|
|
case '.json':
|
|
contentType = 'application/json';
|
|
break;
|
|
case '.png':
|
|
contentType = 'image/png';
|
|
break;
|
|
case '.jpg':
|
|
contentType = 'image/jpeg';
|
|
break;
|
|
case '.svg':
|
|
contentType = 'image/svg+xml';
|
|
break;
|
|
}
|
|
|
|
return new Response(content, {
|
|
headers: { 'Content-Type': contentType },
|
|
});
|
|
}
|
|
} catch (err) {
|
|
// File not found or other error
|
|
return c.text('Not Found', 404);
|
|
}
|
|
} catch (err) {
|
|
console.error('Error serving static file:', err);
|
|
return c.text('Internal Server Error', 500);
|
|
}
|
|
|
|
return c.text('Not Found', 404);
|
|
});
|
|
|
|
// Start the server
|
|
serve(
|
|
{
|
|
fetch: app.fetch,
|
|
port,
|
|
},
|
|
(info) => {
|
|
console.error(`MCP Web Server running at http://localhost:${info.port}`);
|
|
console.error(`- SSE Endpoint: http://localhost:${info.port}/sse`);
|
|
console.error(
|
|
`- Messages Endpoint: http://localhost:${info.port}/api/messages?sessionId=YOUR_SESSION_ID`
|
|
);
|
|
console.error(`- Health Check: http://localhost:${info.port}/health`);
|
|
}
|
|
);
|
|
|
|
return app;
|
|
}
|