PluginBridge: Support multi-user mode via user tokens #20

This commit is contained in:
Dominik Jain 2025-12-15 13:18:22 +01:00
parent df7245cb9d
commit 66af0d6b72
2 changed files with 109 additions and 41 deletions

View File

@ -60,7 +60,7 @@ export class PenpotMcpServer {
); );
this.tools = new Map<string, Tool<any>>(); this.tools = new Map<string, Tool<any>>();
this.pluginBridge = new PluginBridge(webSocketPort); this.pluginBridge = new PluginBridge(this, webSocketPort);
this.replServer = new ReplServer(this.pluginBridge, replPort); this.replServer = new ReplServer(this.pluginBridge, replPort);
this.registerTools(); this.registerTools();

View File

@ -1,21 +1,32 @@
import { WebSocket, WebSocketServer } from "ws"; import { WebSocket, WebSocketServer } from "ws";
import * as http from "http";
import { PluginTask } from "./PluginTask"; import { PluginTask } from "./PluginTask";
import { PluginTaskResponse, PluginTaskResult } from "@penpot-mcp/common"; import { PluginTaskResponse, PluginTaskResult } from "@penpot-mcp/common";
import { createLogger } from "./logger"; import { createLogger } from "./logger";
import type { PenpotMcpServer } from "./PenpotMcpServer";
interface ClientConnection {
socket: WebSocket;
userToken: string | null;
}
/** /**
* Provides the connection to the Penpot MCP Plugin via WebSocket * Manages WebSocket connections to Penpot plugin instances and handles plugin tasks
* over these connections.
*/ */
export class PluginBridge { export class PluginBridge {
private readonly logger = createLogger("PluginBridge"); private readonly logger = createLogger("PluginBridge");
private readonly wsServer: WebSocketServer; private readonly wsServer: WebSocketServer;
private readonly connectedClients: Set<WebSocket> = new Set(); private readonly connectedClients: Map<WebSocket, ClientConnection> = new Map();
private readonly clientsByToken: Map<string, ClientConnection> = new Map();
private readonly pendingTasks: Map<string, PluginTask<any, any>> = new Map(); private readonly pendingTasks: Map<string, PluginTask<any, any>> = new Map();
private readonly taskTimeouts: Map<string, NodeJS.Timeout> = new Map(); private readonly taskTimeouts: Map<string, NodeJS.Timeout> = new Map();
constructor( constructor(
private mcpServer: PenpotMcpServer,
private port: number, private port: number,
private taskTimeoutSecs: number = 30 private taskTimeoutSecs: number = 30,
private isMultiUser: boolean = false
) { ) {
this.wsServer = new WebSocketServer({ port: port }); this.wsServer = new WebSocketServer({ port: port });
this.setupWebSocketHandlers(); this.setupWebSocketHandlers();
@ -25,12 +36,39 @@ export class PluginBridge {
* Sets up WebSocket connection handlers for plugin communication. * Sets up WebSocket connection handlers for plugin communication.
* *
* Manages client connections and provides bidirectional communication * Manages client connections and provides bidirectional communication
* channel between the MCP server and Penpot plugin instances. * channel between the MCP mcpServer and Penpot plugin instances.
*/ */
private setupWebSocketHandlers(): void { private setupWebSocketHandlers(): void {
this.wsServer.on("connection", (ws: WebSocket) => { this.wsServer.on("connection", (ws: WebSocket, request: http.IncomingMessage) => {
this.logger.info("New WebSocket connection established"); // extract userToken from query parameters
this.connectedClients.add(ws); const url = new URL(request.url!, `ws://${request.headers.host}`);
const userToken = url.searchParams.get("userToken");
// require userToken if running in multi-user mode
if (this.isMultiUser && !userToken) {
this.logger.warn("Connection attempt without userToken in multi-user mode - rejecting");
ws.close(1008, "Missing userToken parameter");
return;
}
if (userToken) {
this.logger.info("New WebSocket connection established (authenticated)");
} else {
this.logger.info("New WebSocket connection established (unauthenticated)");
}
// register the client connection with both indexes
const connection: ClientConnection = { socket: ws, userToken };
this.connectedClients.set(ws, connection);
if (userToken) {
// ensure only one connection per userToken
if (this.clientsByToken.has(userToken)) {
this.logger.warn("Duplicate connection for given user token; rejecting new connection");
ws.close(1008, "Duplicate connection for given user token; close previous connection first.");
}
this.clientsByToken.set(userToken, connection);
}
ws.on("message", (data: Buffer) => { ws.on("message", (data: Buffer) => {
this.logger.info("Received WebSocket message: %s", data.toString()); this.logger.info("Received WebSocket message: %s", data.toString());
@ -44,16 +82,24 @@ export class PluginBridge {
ws.on("close", () => { ws.on("close", () => {
this.logger.info("WebSocket connection closed"); this.logger.info("WebSocket connection closed");
const connection = this.connectedClients.get(ws);
this.connectedClients.delete(ws); this.connectedClients.delete(ws);
if (connection?.userToken) {
this.clientsByToken.delete(connection.userToken);
}
}); });
ws.on("error", (error) => { ws.on("error", (error) => {
this.logger.error(error, "WebSocket connection error"); this.logger.error(error, "WebSocket connection error");
const connection = this.connectedClients.get(ws);
this.connectedClients.delete(ws); this.connectedClients.delete(ws);
if (connection?.userToken) {
this.clientsByToken.delete(connection.userToken);
}
}); });
}); });
this.logger.info("WebSocket server started on port %d", this.port); this.logger.info("WebSocket mcpServer started on port %d", this.port);
} }
/** /**
@ -90,6 +136,50 @@ export class PluginBridge {
this.logger.info(`Task ${response.id} completed: success=${response.success}`); this.logger.info(`Task ${response.id} completed: success=${response.success}`);
} }
/**
* Determines the client connection to use for executing a task.
*
* In single-user mode, returns the single connected client.
* In multi-user mode, returns the client matching the session's userToken.
*
* @returns The client connection to use
* @throws Error if no suitable connection is found or if configuration is invalid
*/
private getClientConnection(): ClientConnection {
if (this.isMultiUser) {
const sessionContext = this.mcpServer.getSessionContext();
if (!sessionContext?.userToken) {
throw new Error("No userToken found in session context. Multi-user mode requires authentication.");
}
const connection = this.clientsByToken.get(sessionContext.userToken);
if (!connection) {
throw new Error(
`No plugin instance connected for user token. Please ensure the plugin is running and connected with the correct token.`
);
}
return connection;
} else {
// single-user mode: return the single connected client
if (this.connectedClients.size === 0) {
throw new Error(
`No Penpot plugin instances are currently connected. Please ensure the plugin is running and connected.`
);
}
if (this.connectedClients.size > 1) {
throw new Error(
`Multiple (${this.connectedClients.size}) Penpot MCP Plugin instances are connected. ` +
`Ask the user to ensure that only one instance is connected at a time.`
);
}
// return the first (and only) connection
const connection = this.connectedClients.values().next().value;
return <ClientConnection>connection;
}
}
/** /**
* Executes a plugin task by sending it to connected clients. * Executes a plugin task by sending it to connected clients.
* *
@ -102,44 +192,22 @@ export class PluginBridge {
public async executePluginTask<TResult extends PluginTaskResult<any>>( public async executePluginTask<TResult extends PluginTaskResult<any>>(
task: PluginTask<any, TResult> task: PluginTask<any, TResult>
): Promise<TResult> { ): Promise<TResult> {
// Check for a single connected client // get the appropriate client connection based on mode
if (this.connectedClients.size === 0) { const connection = this.getClientConnection();
throw new Error(
`No Penpot plugin instances are currently connected. Please ensure the plugin is running and connected.`
);
}
if (this.connectedClients.size > 1) {
throw new Error(
`Multiple (${this.connectedClients.size}) Penpot MCP Plugin instances are connected. ` +
`Ask the user to ensure that only one instance is connected at a time.`
);
}
// Register the task for result correlation // register the task for result correlation
this.pendingTasks.set(task.id, task); this.pendingTasks.set(task.id, task);
// Send task to all connected clients // send task to the selected client
const requestMessage = JSON.stringify(task.toRequest()); const requestMessage = JSON.stringify(task.toRequest());
let sentCount = 0; if (connection.socket.readyState !== 1) {
this.connectedClients.forEach((client) => { // WebSocket is not open
if (client.readyState === 1) {
// WebSocket.OPEN
client.send(requestMessage);
sentCount++;
}
});
if (sentCount === 0) {
// Clean up the pending task and timeout since we couldn't send it
this.pendingTasks.delete(task.id); this.pendingTasks.delete(task.id);
const timeoutHandle = this.taskTimeouts.get(task.id); throw new Error(`Plugin instance is disconnected. Task could not be sent.`);
if (timeoutHandle) {
clearTimeout(timeoutHandle);
this.taskTimeouts.delete(task.id);
}
throw new Error(`All connected plugin instances appear to be disconnected. Task could not be sent.`);
} }
connection.socket.send(requestMessage);
// Set up a timeout to reject the task if no response is received // Set up a timeout to reject the task if no response is received
const timeoutHandle = setTimeout(() => { const timeoutHandle = setTimeout(() => {
const pendingTask = this.pendingTasks.get(task.id); const pendingTask = this.pendingTasks.get(task.id);
@ -153,7 +221,7 @@ export class PluginBridge {
}, this.taskTimeoutSecs * 1000); }, this.taskTimeoutSecs * 1000);
this.taskTimeouts.set(task.id, timeoutHandle); this.taskTimeouts.set(task.id, timeoutHandle);
this.logger.info(`Sent task ${task.id} to ${sentCount} connected client`); this.logger.info(`Sent task ${task.id} to connected client`);
return await task.getResultPromise(); return await task.getResultPromise();
} }