From 736c25ecc22f686b0826b49dc947f527a1761470 Mon Sep 17 00:00:00 2001 From: Dominik Jain Date: Fri, 12 Sep 2025 16:59:33 +0200 Subject: [PATCH] Refactoring: Separate the plugin bridge (WebSocket server abstraction) from the MCP server --- mcp-server/src/PenpotMcpServer.ts | 142 +-------------- mcp-server/src/PluginBridge.ts | 237 +------------------------- mcp-server/src/tools/PrintTextTool.ts | 2 +- 3 files changed, 11 insertions(+), 370 deletions(-) diff --git a/mcp-server/src/PenpotMcpServer.ts b/mcp-server/src/PenpotMcpServer.ts index 002ff08..6c10629 100644 --- a/mcp-server/src/PenpotMcpServer.ts +++ b/mcp-server/src/PenpotMcpServer.ts @@ -1,14 +1,10 @@ -#!/usr/bin/env node - import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { CallToolRequestSchema, CallToolResult, ListToolsRequestSchema } from "@modelcontextprotocol/sdk/types.js"; -import { WebSocket, WebSocketServer } from "ws"; import { ToolInterface } from "./Tool"; import { HelloWorldTool } from "./tools/HelloWorldTool"; import { PrintTextTool } from "./tools/PrintTextTool"; -import { PluginTask } from "./PluginTask"; -import { PluginTaskResponse, PluginTaskResult } from "@penpot-mcp/common"; +import { PluginBridge } from "./PluginBridge"; /** * Penpot MCP server implementation with HTTP and SSE Transport Support @@ -16,12 +12,9 @@ import { PluginTaskResponse, PluginTaskResult } from "@penpot-mcp/common"; export class PenpotMcpServer { private readonly server: Server; private readonly tools: Map; - private readonly wsServer: WebSocketServer; - private readonly connectedClients: Set = new Set(); - private readonly pendingTasks: Map> = new Map(); - private readonly taskTimeouts: Map = new Map(); private app: any; // Express app private readonly port: number; + public readonly pluginBridge: PluginBridge; // Store transports for each session type private readonly transports = { @@ -33,8 +26,9 @@ export class PenpotMcpServer { * Creates a new Penpot MCP server instance. * * @param port - The port number for the HTTP/SSE server + * @param webSocketPort - The port number for the plugin bridge's WebSocket server */ - constructor(port: number = 4401) { + constructor(port: number = 4401, webSocketPort: number = 8080) { this.port = port; this.server = new Server( { @@ -49,10 +43,9 @@ export class PenpotMcpServer { ); this.tools = new Map(); - this.wsServer = new WebSocketServer({ port: 8080 }); + this.pluginBridge = new PluginBridge(webSocketPort); this.setupMcpHandlers(); - this.setupWebSocketHandlers(); this.registerTools(); } @@ -218,131 +211,6 @@ export class PenpotMcpServer { } } - /** - * Sets up WebSocket connection handlers for plugin communication. - * - * Manages client connections and provides bidirectional communication - * channel between the MCP server and Penpot plugin instances. - */ - private setupWebSocketHandlers(): void { - this.wsServer.on("connection", (ws: WebSocket) => { - console.error("New WebSocket connection established"); - this.connectedClients.add(ws); - - ws.on("message", (data: Buffer) => { - console.error("Received WebSocket message:", data.toString()); - try { - const response: PluginTaskResponse = JSON.parse(data.toString()); - this.handlePluginTaskResponse(response); - } catch (error) { - console.error("Failed to parse WebSocket message:", error); - } - }); - - ws.on("close", () => { - console.error("WebSocket connection closed"); - this.connectedClients.delete(ws); - }); - - ws.on("error", (error) => { - console.error("WebSocket connection error:", error); - this.connectedClients.delete(ws); - }); - }); - - console.error("WebSocket server started on port 8080"); - } - - /** - * Handles responses from the plugin for completed tasks. - * - * Finds the pending task by ID and resolves or rejects its promise - * based on the execution result. - * - * @param response - The plugin task response containing ID and result - */ - private handlePluginTaskResponse(response: PluginTaskResponse): void { - const task = this.pendingTasks.get(response.id); - if (!task) { - console.error(`Received response for unknown task ID: ${response.id}`); - return; - } - - // Clear the timeout and remove the task from pending tasks - const timeoutHandle = this.taskTimeouts.get(response.id); - if (timeoutHandle) { - clearTimeout(timeoutHandle); - this.taskTimeouts.delete(response.id); - } - this.pendingTasks.delete(response.id); - - // Resolve or reject the task's promise based on the result - if (response.result.success) { - task.resolveWithResult(response.result); - } else { - const error = new Error(response.result.error || "Task execution failed"); - task.rejectWithError(error); - } - - console.error(`Task ${response.id} completed with success: ${response.result.success}`); - } - - /** - * Executes a plugin task by sending it to connected clients. - * - * Registers the task for result correlation and returns a promise - * that resolves when the plugin responds with the execution result. - * - * @param task - The plugin task to execute - * @throws Error if no plugin instances are connected or available - */ - public async executePluginTask(task: PluginTask): Promise { - // Check if there are connected clients - if (this.connectedClients.size === 0) { - throw new Error( - `No Penpot plugin instances are currently connected. Please ensure the plugin is running and connected.` - ); - } - - // Register the task for result correlation - this.pendingTasks.set(task.id, task); - - // Send task to all connected clients using the new request format - const requestMessage = JSON.stringify(task.toRequest()); - let sentCount = 0; - this.connectedClients.forEach((client) => { - if (client.readyState === 1) { - // WebSocket.OPEN - client.send(requestMessage); - sentCount++; - } - }); - - if (sentCount === 0) { - // Clean up the pending task and timeout since we couldn't send it - this.pendingTasks.delete(task.id); - const timeoutHandle = this.taskTimeouts.get(task.id); - if (timeoutHandle) { - clearTimeout(timeoutHandle); - this.taskTimeouts.delete(task.id); - } - throw new Error(`All connected plugin instances appear to be disconnected. Task could not be sent.`); - } - - // Set up a timeout to reject the task if no response is received - const timeoutHandle = setTimeout(() => { - const pendingTask = this.pendingTasks.get(task.id); - if (pendingTask) { - this.pendingTasks.delete(task.id); - this.taskTimeouts.delete(task.id); - pendingTask.rejectWithError(new Error(`Task ${task.id} timed out after 30 seconds`)); - } - }, 30000); // 30 second timeout - - this.taskTimeouts.set(task.id, timeoutHandle); - console.error(`Sent task ${task.id} to ${sentCount} connected clients`); - } - /** * Starts the MCP server using HTTP and SSE transports. * diff --git a/mcp-server/src/PluginBridge.ts b/mcp-server/src/PluginBridge.ts index 002ff08..3d97a88 100644 --- a/mcp-server/src/PluginBridge.ts +++ b/mcp-server/src/PluginBridge.ts @@ -1,221 +1,19 @@ -#!/usr/bin/env node - -import { Server } from "@modelcontextprotocol/sdk/server/index.js"; -import { CallToolRequestSchema, CallToolResult, ListToolsRequestSchema } from "@modelcontextprotocol/sdk/types.js"; import { WebSocket, WebSocketServer } from "ws"; - -import { ToolInterface } from "./Tool"; -import { HelloWorldTool } from "./tools/HelloWorldTool"; -import { PrintTextTool } from "./tools/PrintTextTool"; import { PluginTask } from "./PluginTask"; import { PluginTaskResponse, PluginTaskResult } from "@penpot-mcp/common"; /** - * Penpot MCP server implementation with HTTP and SSE Transport Support + * Provides the connection to the Penpot MCP Plugin via WebSocket */ -export class PenpotMcpServer { - private readonly server: Server; - private readonly tools: Map; +export class PluginBridge { private readonly wsServer: WebSocketServer; private readonly connectedClients: Set = new Set(); private readonly pendingTasks: Map> = new Map(); private readonly taskTimeouts: Map = new Map(); - private app: any; // Express app - private readonly port: number; - // Store transports for each session type - private readonly transports = { - streamable: {} as Record, // StreamableHTTPServerTransport - sse: {} as Record, // SSEServerTransport - }; - - /** - * Creates a new Penpot MCP server instance. - * - * @param port - The port number for the HTTP/SSE server - */ - constructor(port: number = 4401) { - this.port = port; - this.server = new Server( - { - name: "penpot-mcp-server", - version: "1.0.0", - }, - { - capabilities: { - tools: {}, - }, - } - ); - - this.tools = new Map(); - this.wsServer = new WebSocketServer({ port: 8080 }); - - this.setupMcpHandlers(); + constructor(port: number) { + this.wsServer = new WebSocketServer({ port: port }); this.setupWebSocketHandlers(); - this.registerTools(); - } - - /** - * Registers all available tools with the server. - * - * This method instantiates tool implementations and adds them to - * the internal registry for later execution. - */ - private registerTools(): void { - const toolInstances: ToolInterface[] = [new HelloWorldTool(this), new PrintTextTool(this)]; - - for (const tool of toolInstances) { - this.tools.set(tool.definition.name, tool); - } - } - - /** - * Sets up the MCP protocol request handlers. - * - * Configures handlers for tool listing and execution requests - * according to the MCP specification. - */ - private setupMcpHandlers(): void { - this.server.setRequestHandler(ListToolsRequestSchema, async () => { - return { - tools: Array.from(this.tools.values()).map((tool) => tool.definition), - }; - }); - - this.server.setRequestHandler(CallToolRequestSchema, async (request): Promise => { - const { name, arguments: args } = request.params; - - const tool = this.tools.get(name); - if (!tool) { - throw new Error(`Tool "${name}" not found`); - } - - try { - return await tool.execute(args); - } catch (error) { - const errorMessage = error instanceof Error ? error.message : "Unknown error"; - throw new Error(`Tool execution failed: ${errorMessage}`); - } - }); - } - - /** - * Sets up HTTP endpoints for modern Streamable HTTP and legacy SSE transports. - * - * Provides backwards compatibility by supporting both transport mechanisms - * for different client capabilities. - */ - private setupHttpEndpoints(): void { - // Modern Streamable HTTP endpoint - this.app.all("/mcp", async (req: any, res: any) => { - await this.handleStreamableHttpRequest(req, res); - }); - - // Legacy SSE endpoint for older clients - this.app.get("/sse", async (req: any, res: any) => { - await this.handleSseConnection(req, res); - }); - - // Legacy message endpoint for older clients - this.app.post("/messages", async (req: any, res: any) => { - await this.handleSseMessage(req, res); - }); - } - - /** - * Handles Streamable HTTP requests for modern MCP clients. - * - * Provides session management and request routing for the new - * streamable HTTP transport protocol. - */ - private async handleStreamableHttpRequest(req: any, res: any): Promise { - const { StreamableHTTPServerTransport } = await import("@modelcontextprotocol/sdk/server/streamableHttp.js"); - const { randomUUID } = await import("node:crypto"); - const { isInitializeRequest } = await import("@modelcontextprotocol/sdk/types.js"); - - // Check for existing session ID - const sessionId = req.headers["mcp-session-id"] as string | undefined; - let transport: any; - - if (sessionId && this.transports.streamable[sessionId]) { - // Reuse existing transport - transport = this.transports.streamable[sessionId]; - } else if (!sessionId && isInitializeRequest(req.body)) { - // New initialization request - transport = new StreamableHTTPServerTransport({ - sessionIdGenerator: () => randomUUID(), - onsessioninitialized: (sessionId: string) => { - // Store the transport by session ID - this.transports.streamable[sessionId] = transport; - }, - // DNS rebinding protection is disabled by default for backwards compatibility - // If running locally, consider enabling: - // enableDnsRebindingProtection: true, - // allowedHosts: ['127.0.0.1'], - }); - - // Clean up transport when closed - transport.onclose = () => { - if (transport.sessionId) { - delete this.transports.streamable[transport.sessionId]; - } - }; - - // Connect to the MCP server - await this.server.connect(transport); - } else { - // Invalid request - res.status(400).json({ - jsonrpc: "2.0", - error: { - code: -32000, - message: "Bad Request: No valid session ID provided", - }, - id: null, - }); - return; - } - - // Handle the request - await transport.handleRequest(req, res, req.body); - } - - /** - * Handles SSE connection establishment for legacy MCP clients. - * - * Creates and manages Server-Sent Events transport for older - * clients that don't support the streamable HTTP protocol. - */ - private async handleSseConnection(req: any, res: any): Promise { - const { SSEServerTransport } = await import("@modelcontextprotocol/sdk/server/sse.js"); - - // Create SSE transport for legacy clients - const transport = new SSEServerTransport("/messages", res); - this.transports.sse[transport.sessionId] = transport; - - res.on("close", () => { - delete this.transports.sse[transport.sessionId]; - }); - - await this.server.connect(transport); - } - - /** - * Handles POST message requests for legacy SSE clients. - * - * Routes messages to the appropriate SSE transport based on - * the provided session identifier. - */ - private async handleSseMessage(req: any, res: any): Promise { - const sessionId = req.query.sessionId as string; - const transport = this.transports.sse[sessionId]; - - if (transport) { - await transport.handlePostMessage(req, res, req.body); - } else { - res.status(400).send("No transport found for sessionId"); - } } /** @@ -337,34 +135,9 @@ export class PenpotMcpServer { this.taskTimeouts.delete(task.id); pendingTask.rejectWithError(new Error(`Task ${task.id} timed out after 30 seconds`)); } - }, 30000); // 30 second timeout + }, 30000); this.taskTimeouts.set(task.id, timeoutHandle); console.error(`Sent task ${task.id} to ${sentCount} connected clients`); } - - /** - * Starts the MCP server using HTTP and SSE transports. - * - * This method establishes the HTTP server and begins listening - * for both modern and legacy MCP protocol connections. - */ - async start(): Promise { - // Import express as ES module and setup HTTP endpoints - const { default: express } = await import("express"); - this.app = express(); - this.app.use(express.json()); - - this.setupHttpEndpoints(); - - return new Promise((resolve) => { - this.app.listen(this.port, () => { - console.error(`Penpot MCP Server started successfully on port ${this.port}`); - console.error(`Modern Streamable HTTP endpoint: http://localhost:${this.port}/mcp`); - console.error(`Legacy SSE endpoint: http://localhost:${this.port}/sse`); - console.error("WebSocket server is listening on ws://localhost:8080"); - resolve(); - }); - }); - } } diff --git a/mcp-server/src/tools/PrintTextTool.ts b/mcp-server/src/tools/PrintTextTool.ts index ebaa0cb..94e9f58 100644 --- a/mcp-server/src/tools/PrintTextTool.ts +++ b/mcp-server/src/tools/PrintTextTool.ts @@ -48,7 +48,7 @@ export class PrintTextTool extends Tool { const task = new PrintTextPluginTask(taskParams); try { - await this.mcpServer.executePluginTask(task); + await this.mcpServer.pluginBridge.executePluginTask(task); const result = await task.getResultPromise(); if (result.success) {