import { WebSocket, WebSocketServer } from "ws"; import * as http from "http"; import { PluginTask } from "./PluginTask"; import { PluginTaskResponse, PluginTaskResult } from "@penpot/mcp-common"; import { createLogger } from "./logger"; import type { PenpotMcpServer } from "./PenpotMcpServer"; interface ClientConnection { socket: WebSocket; userToken: string | null; } /** * Manages WebSocket connections to Penpot plugin instances and handles plugin tasks * over these connections. */ export class PluginBridge { private readonly logger = createLogger("PluginBridge"); private readonly wsServer: WebSocketServer; private readonly connectedClients: Map = new Map(); private readonly clientsByToken: Map = new Map(); private readonly pendingTasks: Map> = new Map(); private readonly taskTimeouts: Map = new Map(); constructor( public readonly mcpServer: PenpotMcpServer, private port: number, private taskTimeoutSecs: number = 30 ) { this.wsServer = new WebSocketServer({ port: port }); this.setupWebSocketHandlers(); } /** * Sets up WebSocket connection handlers for plugin communication. * * Manages client connections and provides bidirectional communication * channel between the MCP mcpServer and Penpot plugin instances. */ private setupWebSocketHandlers(): void { this.wsServer.on("connection", (ws: WebSocket, request: http.IncomingMessage) => { // extract userToken from query parameters const url = new URL(request.url!, `ws://${request.headers.host}`); const userToken = url.searchParams.get("userToken"); // require userToken if running in multi-user mode if (this.mcpServer.isMultiUserMode() && !userToken) { this.logger.warn("Connection attempt without userToken in multi-user mode - rejecting"); ws.close(1008, "Missing userToken parameter"); return; } if (userToken) { this.logger.info("New WebSocket connection established (token provided)"); } else { this.logger.info("New WebSocket connection established"); } // register the client connection with both indexes const connection: ClientConnection = { socket: ws, userToken }; this.connectedClients.set(ws, connection); if (userToken) { // ensure only one connection per userToken if (this.clientsByToken.has(userToken)) { this.logger.warn("Duplicate connection for given user token; rejecting new connection"); ws.close(1008, "Duplicate connection for given user token; close previous connection first."); } this.clientsByToken.set(userToken, connection); } ws.on("message", (data: Buffer) => { this.logger.debug("Received WebSocket message: %s", data.toString()); try { const response: PluginTaskResponse = JSON.parse(data.toString()); this.handlePluginTaskResponse(response); } catch (error) { this.logger.error(error, "Failure while processing WebSocket message"); } }); ws.on("close", () => { this.logger.info("WebSocket connection closed"); const connection = this.connectedClients.get(ws); this.connectedClients.delete(ws); if (connection?.userToken) { this.clientsByToken.delete(connection.userToken); } }); ws.on("error", (error) => { this.logger.error(error, "WebSocket connection error"); const connection = this.connectedClients.get(ws); this.connectedClients.delete(ws); if (connection?.userToken) { this.clientsByToken.delete(connection.userToken); } }); }); this.logger.info("WebSocket mcpServer started on port %d", this.port); } /** * Handles responses from the plugin for completed tasks. * * Finds the pending task by ID and resolves or rejects its promise * based on the execution result. * * @param response - The plugin task response containing ID and result */ private handlePluginTaskResponse(response: PluginTaskResponse): void { const task = this.pendingTasks.get(response.id); if (!task) { this.logger.info(`Received response for unknown task ID: ${response.id}`); return; } // Clear the timeout and remove the task from pending tasks const timeoutHandle = this.taskTimeouts.get(response.id); if (timeoutHandle) { clearTimeout(timeoutHandle); this.taskTimeouts.delete(response.id); } this.pendingTasks.delete(response.id); // Resolve or reject the task's promise based on the result if (response.success) { task.resolveWithResult({ data: response.data }); } else { const error = new Error(response.error || "Task execution failed (details not provided)"); task.rejectWithError(error); } this.logger.info(`Task ${response.id} completed: success=${response.success}`); } /** * Determines the client connection to use for executing a task. * * In single-user mode, returns the single connected client. * In multi-user mode, returns the client matching the session's userToken. * * @returns The client connection to use * @throws Error if no suitable connection is found or if configuration is invalid */ private getClientConnection(): ClientConnection { if (this.mcpServer.isMultiUserMode()) { const sessionContext = this.mcpServer.getSessionContext(); if (!sessionContext?.userToken) { throw new Error("No userToken found in session context. Multi-user mode requires authentication."); } const connection = this.clientsByToken.get(sessionContext.userToken); if (!connection) { throw new Error( `No plugin instance connected for user token. Please ensure the plugin is running and connected with the correct token.` ); } return connection; } else { // single-user mode: return the single connected client if (this.connectedClients.size === 0) { throw new Error( `No Penpot plugin instances are currently connected. Please ensure the plugin is running and connected.` ); } if (this.connectedClients.size > 1) { throw new Error( `Multiple (${this.connectedClients.size}) Penpot MCP Plugin instances are connected. ` + `Ask the user to ensure that only one instance is connected at a time.` ); } // return the first (and only) connection const connection = this.connectedClients.values().next().value; return connection; } } /** * Executes a plugin task by sending it to connected clients. * * Registers the task for result correlation and returns a promise * that resolves when the plugin responds with the execution result. * * @param task - The plugin task to execute * @throws Error if no plugin instances are connected or available */ public async executePluginTask>( task: PluginTask ): Promise { // get the appropriate client connection based on mode const connection = this.getClientConnection(); // register the task for result correlation this.pendingTasks.set(task.id, task); // send task to the selected client const requestMessage = JSON.stringify(task.toRequest()); if (connection.socket.readyState !== 1) { // WebSocket is not open this.pendingTasks.delete(task.id); throw new Error(`Plugin instance is disconnected. Task could not be sent.`); } connection.socket.send(requestMessage); // Set up a timeout to reject the task if no response is received const timeoutHandle = setTimeout(() => { const pendingTask = this.pendingTasks.get(task.id); if (pendingTask) { this.pendingTasks.delete(task.id); this.taskTimeouts.delete(task.id); pendingTask.rejectWithError( new Error(`Task ${task.id} timed out after ${this.taskTimeoutSecs} seconds`) ); } }, this.taskTimeoutSecs * 1000); this.taskTimeouts.set(task.id, timeoutHandle); this.logger.info(`Sent task ${task.id} to connected client`); return await task.getResultPromise(); } }