diff --git a/mcp-server/src/PluginBridge.ts b/mcp-server/src/PluginBridge.ts new file mode 100644 index 0000000..0582e9b --- /dev/null +++ b/mcp-server/src/PluginBridge.ts @@ -0,0 +1,372 @@ +#!/usr/bin/env node + +import { Server } from "@modelcontextprotocol/sdk/server/index.js"; +import { CallToolRequestSchema, CallToolResult, ListToolsRequestSchema } from "@modelcontextprotocol/sdk/types.js"; +import { WebSocket, WebSocketServer } from "ws"; + +import { ToolInterface } from "./Tool"; +import { HelloWorldTool } from "./tools/HelloWorldTool"; +import { PrintTextTool } from "./tools/PrintTextTool"; +import { PluginTask } from "./PluginTask"; +import { PluginTaskResponse, PluginTaskResult } from '@penpot-mcp/common'; + +/** + * Penpot MCP server implementation with HTTP and SSE Transport Support + */ +export class PenpotMcpServer { + private readonly server: Server; + private readonly tools: Map; + private readonly wsServer: WebSocketServer; + private readonly connectedClients: Set = new Set(); + private readonly pendingTasks: Map> = new Map(); + private readonly taskTimeouts: Map = new Map(); + private app: any; // Express app + private readonly port: number; + + // Store transports for each session type + private readonly transports = { + streamable: {} as Record, // StreamableHTTPServerTransport + sse: {} as Record, // SSEServerTransport + }; + + /** + * Creates a new Penpot MCP server instance. + * + * @param port - The port number for the HTTP/SSE server + */ + constructor(port: number = 4401) { + this.port = port; + this.server = new Server( + { + name: "penpot-mcp-server", + version: "1.0.0", + }, + { + capabilities: { + tools: {}, + }, + } + ); + + this.tools = new Map(); + this.wsServer = new WebSocketServer({ port: 8080 }); + + this.setupMcpHandlers(); + this.setupWebSocketHandlers(); + this.registerTools(); + } + + /** + * Registers all available tools with the server. + * + * This method instantiates tool implementations and adds them to + * the internal registry for later execution. + */ + private registerTools(): void { + const toolInstances: ToolInterface[] = [new HelloWorldTool(this), new PrintTextTool(this)]; + + for (const tool of toolInstances) { + this.tools.set(tool.definition.name, tool); + } + } + + /** + * Sets up the MCP protocol request handlers. + * + * Configures handlers for tool listing and execution requests + * according to the MCP specification. + */ + private setupMcpHandlers(): void { + this.server.setRequestHandler(ListToolsRequestSchema, async () => { + return { + tools: Array.from(this.tools.values()).map((tool) => tool.definition), + }; + }); + + this.server.setRequestHandler(CallToolRequestSchema, async (request): Promise => { + const { name, arguments: args } = request.params; + + const tool = this.tools.get(name); + if (!tool) { + throw new Error(`Tool "${name}" not found`); + } + + try { + return await tool.execute(args); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : "Unknown error"; + throw new Error(`Tool execution failed: ${errorMessage}`); + } + }); + } + + /** + * Sets up HTTP endpoints for modern Streamable HTTP and legacy SSE transports. + * + * Provides backwards compatibility by supporting both transport mechanisms + * for different client capabilities. + */ + private setupHttpEndpoints(): void { + // Modern Streamable HTTP endpoint + this.app.all("/mcp", async (req: any, res: any) => { + await this.handleStreamableHttpRequest(req, res); + }); + + // Legacy SSE endpoint for older clients + this.app.get("/sse", async (req: any, res: any) => { + await this.handleSseConnection(req, res); + }); + + // Legacy message endpoint for older clients + this.app.post("/messages", async (req: any, res: any) => { + await this.handleSseMessage(req, res); + }); + } + + /** + * Handles Streamable HTTP requests for modern MCP clients. + * + * Provides session management and request routing for the new + * streamable HTTP transport protocol. + */ + private async handleStreamableHttpRequest(req: any, res: any): Promise { + const { StreamableHTTPServerTransport } = await import("@modelcontextprotocol/sdk/server/streamableHttp.js"); + const { randomUUID } = await import("node:crypto"); + const { isInitializeRequest } = await import("@modelcontextprotocol/sdk/types.js"); + + // Check for existing session ID + const sessionId = req.headers["mcp-session-id"] as string | undefined; + let transport: any; + + if (sessionId && this.transports.streamable[sessionId]) { + // Reuse existing transport + transport = this.transports.streamable[sessionId]; + } else if (!sessionId && isInitializeRequest(req.body)) { + // New initialization request + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: (sessionId: string) => { + // Store the transport by session ID + this.transports.streamable[sessionId] = transport; + }, + // DNS rebinding protection is disabled by default for backwards compatibility + // If running locally, consider enabling: + // enableDnsRebindingProtection: true, + // allowedHosts: ['127.0.0.1'], + }); + + // Clean up transport when closed + transport.onclose = () => { + if (transport.sessionId) { + delete this.transports.streamable[transport.sessionId]; + } + }; + + // Connect to the MCP server + await this.server.connect(transport); + } else { + // Invalid request + res.status(400).json({ + jsonrpc: "2.0", + error: { + code: -32000, + message: "Bad Request: No valid session ID provided", + }, + id: null, + }); + return; + } + + // Handle the request + await transport.handleRequest(req, res, req.body); + } + + /** + * Handles SSE connection establishment for legacy MCP clients. + * + * Creates and manages Server-Sent Events transport for older + * clients that don't support the streamable HTTP protocol. + */ + private async handleSseConnection(req: any, res: any): Promise { + const { SSEServerTransport } = await import("@modelcontextprotocol/sdk/server/sse.js"); + + // Create SSE transport for legacy clients + const transport = new SSEServerTransport("/messages", res); + this.transports.sse[transport.sessionId] = transport; + + res.on("close", () => { + delete this.transports.sse[transport.sessionId]; + }); + + await this.server.connect(transport); + } + + /** + * Handles POST message requests for legacy SSE clients. + * + * Routes messages to the appropriate SSE transport based on + * the provided session identifier. + */ + private async handleSseMessage(req: any, res: any): Promise { + const sessionId = req.query.sessionId as string; + const transport = this.transports.sse[sessionId]; + + if (transport) { + await transport.handlePostMessage(req, res, req.body); + } else { + res.status(400).send("No transport found for sessionId"); + } + } + + /** + * Sets up WebSocket connection handlers for plugin communication. + * + * Manages client connections and provides bidirectional communication + * channel between the MCP server and Penpot plugin instances. + */ + private setupWebSocketHandlers(): void { + this.wsServer.on("connection", (ws: WebSocket) => { + console.error("New WebSocket connection established"); + this.connectedClients.add(ws); + + ws.on("message", (data: Buffer) => { + console.error("Received WebSocket message:", data.toString()); + try { + const response: PluginTaskResponse = JSON.parse(data.toString()); + this.handlePluginTaskResponse(response); + } catch (error) { + console.error("Failed to parse WebSocket message:", error); + } + }); + + ws.on("close", () => { + console.error("WebSocket connection closed"); + this.connectedClients.delete(ws); + }); + + ws.on("error", (error) => { + console.error("WebSocket connection error:", error); + this.connectedClients.delete(ws); + }); + }); + + console.error("WebSocket server started on port 8080"); + } + + /** + * Handles responses from the plugin for completed tasks. + * + * Finds the pending task by ID and resolves or rejects its promise + * based on the execution result. + * + * @param response - The plugin task response containing ID and result + */ + private handlePluginTaskResponse(response: PluginTaskResponse): void { + const task = this.pendingTasks.get(response.id); + if (!task) { + console.error(`Received response for unknown task ID: ${response.id}`); + return; + } + + // Clear the timeout and remove the task from pending tasks + const timeoutHandle = this.taskTimeouts.get(response.id); + if (timeoutHandle) { + clearTimeout(timeoutHandle); + this.taskTimeouts.delete(response.id); + } + this.pendingTasks.delete(response.id); + + // Resolve or reject the task's promise based on the result + if (response.result.success) { + task.resolveWithResult(response.result); + } else { + const error = new Error(response.result.error || 'Task execution failed'); + task.rejectWithError(error); + } + + console.error(`Task ${response.id} completed with success: ${response.result.success}`); + } + + /** + * Executes a plugin task by sending it to connected clients. + * + * Registers the task for result correlation and returns a promise + * that resolves when the plugin responds with the execution result. + * + * @param task - The plugin task to execute + * @throws Error if no plugin instances are connected or available + */ + public async executePluginTask( + task: PluginTask + ): Promise { + // Check if there are connected clients + if (this.connectedClients.size === 0) { + throw new Error( + `No Penpot plugin instances are currently connected. Please ensure the plugin is running and connected.` + ); + } + + // Register the task for result correlation + this.pendingTasks.set(task.id, task); + + // Send task to all connected clients using the new request format + const requestMessage = JSON.stringify(task.toRequest()); + let sentCount = 0; + this.connectedClients.forEach((client) => { + if (client.readyState === 1) { + // WebSocket.OPEN + client.send(requestMessage); + sentCount++; + } + }); + + if (sentCount === 0) { + // Clean up the pending task and timeout since we couldn't send it + this.pendingTasks.delete(task.id); + const timeoutHandle = this.taskTimeouts.get(task.id); + if (timeoutHandle) { + clearTimeout(timeoutHandle); + this.taskTimeouts.delete(task.id); + } + throw new Error(`All connected plugin instances appear to be disconnected. Task could not be sent.`); + } + + // Set up a timeout to reject the task if no response is received + const timeoutHandle = setTimeout(() => { + const pendingTask = this.pendingTasks.get(task.id); + if (pendingTask) { + this.pendingTasks.delete(task.id); + this.taskTimeouts.delete(task.id); + pendingTask.rejectWithError(new Error(`Task ${task.id} timed out after 30 seconds`)); + } + }, 30000); // 30 second timeout + + this.taskTimeouts.set(task.id, timeoutHandle); + console.error(`Sent task ${task.id} to ${sentCount} connected clients`); + } + + /** + * Starts the MCP server using HTTP and SSE transports. + * + * This method establishes the HTTP server and begins listening + * for both modern and legacy MCP protocol connections. + */ + async start(): Promise { + // Import express as ES module and setup HTTP endpoints + const { default: express } = await import("express"); + this.app = express(); + this.app.use(express.json()); + + this.setupHttpEndpoints(); + + return new Promise((resolve) => { + this.app.listen(this.port, () => { + console.error(`Penpot MCP Server started successfully on port ${this.port}`); + console.error(`Modern Streamable HTTP endpoint: http://localhost:${this.port}/mcp`); + console.error(`Legacy SSE endpoint: http://localhost:${this.port}/sse`); + console.error("WebSocket server is listening on ws://localhost:8080"); + resolve(); + }); + }); + } +}