mirror of
https://github.com/penpot/penpot.git
synced 2026-06-09 08:52:05 +00:00
🎉 Enable multi-instance horizontal scaling for MCP server (#10013)
* 📎 Ignore .iml files (IntelliJ module files) * 🎉 Enable multi-instance horizontal scaling for MCP server Allow the MCP server to run as multiple instances behind a plain round-robin load balancer, removing the previous requirement that a user's plugin WebSocket and MCP client connection terminate on the same instance. Behaviour is unchanged when run as a single instance or without Redis. Cross-instance MCP sessions: when a request arrives with an mcp-session-id that was initialised on another instance, the session is adopted locally instead of rejected. The user token is read from the query parameter (present on every request, as the configured endpoint URL is never rewritten), so no shared session store is needed; the transport is pre-initialised so the SDK's validateSession() accepts it. Cross-instance task routing: when a Redis URI is configured in multi-user mode, plugin task requests are routed via Redis pub/sub keyed by user token. The instance holding a plugin's WebSocket subscribes to that token's request channel; any instance handling a tool call publishes the request and awaits the response on a per-request channel. RedisBridge is a pure transport for the existing serialised PluginTaskRequest/Response objects. PluginTask is split into an abstract base plus a local (promise-backed) PluginTask and a RemotePluginTask whose resolve/reject publish the outcome back over Redis, so the existing local dispatch and response-correlation paths are reused unchanged on the executing instance. Refs #10000
This commit is contained in:
parent
c183380e0d
commit
03c02d5adf
1
.gitignore
vendored
1
.gitignore
vendored
@ -93,6 +93,7 @@
|
||||
/.pnpm-store
|
||||
/.vscode
|
||||
/.idea
|
||||
*.iml
|
||||
/.claude
|
||||
/.playwright-mcp
|
||||
/.devenv/mcp/
|
||||
|
||||
@ -273,6 +273,7 @@ The Penpot MCP server can be configured using environment variables.
|
||||
| `PENPOT_MCP_REMOTE_MODE` | Enable remote mode (disables file system access). Set to `true` to enable. | `false` |
|
||||
| `PENPOT_MCP_DEVENV` | Enable Penpot development environment tools. Set to `true` to enable. | `false` |
|
||||
| `PENPOT_MCP_EXPORT_SHAPE_MAX_PARALLEL_REQUESTS` | Maximum number of parallel export shape requests (multi-user mode only). | `0` (no limit) |
|
||||
| `PENPOT_MCP_REDIS_URI` | Redis connection URI (e.g. `redis://host:6379`) enabling multi-instance horizontal scaling via Redis pub/sub task routing (multi-user mode only). When unset, the server runs in single-instance mode, requiring the plugin and MCP client to connect to the same instance. | (unset) |
|
||||
|
||||
### Logging Configuration
|
||||
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
"type": "module",
|
||||
"main": "dist/index.js",
|
||||
"scripts": {
|
||||
"build:server": "esbuild src/index.ts --bundle --platform=node --target=node18 --format=esm --outfile=dist/index.js --external:@modelcontextprotocol/* --external:ws --external:express --external:class-transformer --external:class-validator --external:reflect-metadata --external:pino --external:pino-pretty --external:pino-loki --external:js-yaml --external:sharp --external:nrepl-client",
|
||||
"build:server": "esbuild src/index.ts --bundle --platform=node --target=node18 --format=esm --outfile=dist/index.js --external:@modelcontextprotocol/* --external:ws --external:express --external:class-transformer --external:class-validator --external:reflect-metadata --external:pino --external:pino-pretty --external:pino-loki --external:js-yaml --external:sharp --external:nrepl-client --external:ioredis",
|
||||
"build": "pnpm run build:server && node scripts/copy-resources.js",
|
||||
"build:types": "tsc --emitDeclarationOnly --outDir dist",
|
||||
"start": "node dist/index.js",
|
||||
@ -29,6 +29,7 @@
|
||||
"class-transformer": "^0.5.1",
|
||||
"class-validator": "^0.14.3",
|
||||
"express": "^5.1.0",
|
||||
"ioredis": "^5.6.0",
|
||||
"js-yaml": "^4.1.1",
|
||||
"nrepl-client": "^0.3.0",
|
||||
"penpot-mcp": "file:..",
|
||||
|
||||
@ -4,6 +4,7 @@ import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
|
||||
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
|
||||
import { ExecuteCodeTool } from "./tools/ExecuteCodeTool";
|
||||
import { PluginBridge } from "./PluginBridge";
|
||||
import { RedisBridge } from "./RedisBridge";
|
||||
import { ConfigurationLoader } from "./ConfigurationLoader";
|
||||
import { createLogger } from "./logger";
|
||||
import { Tool } from "./Tool";
|
||||
@ -104,6 +105,12 @@ export class PenpotMcpServer {
|
||||
public readonly replPort: number;
|
||||
private sessionTimeoutInterval: ReturnType<typeof setInterval> | undefined;
|
||||
|
||||
/**
|
||||
* Optional Redis bridge for multi-instance task routing; present only when running
|
||||
* in multi-user mode with a configured Redis URI.
|
||||
*/
|
||||
private readonly redisBridge?: RedisBridge;
|
||||
|
||||
constructor(private isMultiUser: boolean = false) {
|
||||
// read port configuration from environment variables
|
||||
this.host = process.env.PENPOT_MCP_SERVER_HOST ?? "localhost";
|
||||
@ -122,7 +129,15 @@ export class PenpotMcpServer {
|
||||
|
||||
this.tools = this.initTools();
|
||||
|
||||
this.pluginBridge = new PluginBridge(this, this.webSocketPort);
|
||||
// Enable multi-instance task routing when running in multi-user mode with a
|
||||
// configured Redis URI. Without it, the server operates in single-instance mode,
|
||||
// requiring the plugin and the MCP client to connect to the same instance.
|
||||
const redisUri = process.env.PENPOT_MCP_REDIS_URI;
|
||||
if (this.isMultiUser && redisUri) {
|
||||
this.redisBridge = new RedisBridge(redisUri);
|
||||
}
|
||||
|
||||
this.pluginBridge = new PluginBridge(this, this.webSocketPort, this.redisBridge);
|
||||
this.replServer = new ReplServer(this.pluginBridge, this.replPort, this.host);
|
||||
}
|
||||
|
||||
@ -284,15 +299,21 @@ export class PenpotMcpServer {
|
||||
`Received request for existing session with id=${sessionId}; userTokenFp=${PenpotMcpServer.tokenFingerprint(session.userToken)}`
|
||||
);
|
||||
} else {
|
||||
// new session: create a fresh McpServer and transport
|
||||
// No locally-known session for this request. Either a brand-new session
|
||||
// (no session ID) or a session that was initialized on another instance
|
||||
// and routed here by the load balancer (session ID present but unknown
|
||||
// locally), which we adopt rather than reject.
|
||||
const isAdoptedSession = sessionId !== undefined;
|
||||
userToken = req.query.userToken as string | undefined;
|
||||
this.logger.info(
|
||||
`Received new session request; userTokenFp=${PenpotMcpServer.tokenFingerprint(userToken)}`
|
||||
`${isAdoptedSession ? `Adopting session initialized on another instance with id=${sessionId}` : "Received new session request"}; userTokenFp=${PenpotMcpServer.tokenFingerprint(userToken)}`
|
||||
);
|
||||
|
||||
const { randomUUID } = await import("node:crypto");
|
||||
const server = this.createMcpServer();
|
||||
transport = new StreamableHTTPServerTransport({
|
||||
sessionIdGenerator: () => randomUUID(),
|
||||
// For an adopted session, reuse the existing ID; otherwise generate a new one.
|
||||
sessionIdGenerator: () => (isAdoptedSession ? sessionId! : randomUUID()),
|
||||
onsessioninitialized: (id) => {
|
||||
this.streamableTransports[id] = new StreamableSession(transport, userToken, Date.now());
|
||||
this.logger.info(
|
||||
@ -300,6 +321,22 @@ export class PenpotMcpServer {
|
||||
);
|
||||
},
|
||||
});
|
||||
|
||||
if (isAdoptedSession) {
|
||||
// Pre-initialize the transport so that the SDK's validateSession() accepts
|
||||
// subsequent (non-initialize) requests for this session ID. The SDK stores
|
||||
// these on the inner WebStandardStreamableHTTPServerTransport as plain
|
||||
// (non-#private) properties; validateSession() checks exactly _initialized
|
||||
// and sessionId. Verified against @modelcontextprotocol/sdk 1.25.3.
|
||||
//
|
||||
// Since no initialize request will arrive for an adopted session, the
|
||||
// onsessioninitialized callback will not fire; register the session here.
|
||||
const inner = (transport as any)._webStandardTransport;
|
||||
inner._initialized = true;
|
||||
inner.sessionId = sessionId;
|
||||
this.streamableTransports[sessionId!] = new StreamableSession(transport, userToken, Date.now());
|
||||
}
|
||||
|
||||
transport.onclose = () => {
|
||||
if (transport.sessionId) {
|
||||
this.logger.info(
|
||||
@ -364,6 +401,9 @@ export class PenpotMcpServer {
|
||||
return new Promise((resolve) => {
|
||||
this.app.listen(this.port, this.host, async () => {
|
||||
this.logger.info(`Multi-user mode: ${this.isMultiUserMode()}`);
|
||||
this.logger.info(
|
||||
`Multi-instance mode with Redis-backed transport: ${this.redisBridge ? "true" : "false"}`
|
||||
);
|
||||
this.logger.info(`Remote mode: ${this.isRemoteMode()}`);
|
||||
this.logger.info(`DevEnv mode: ${this.isDevEnv()}`);
|
||||
this.logger.info(`Modern Streamable HTTP endpoint: http://${this.host}:${this.port}/mcp`);
|
||||
@ -387,6 +427,7 @@ export class PenpotMcpServer {
|
||||
public async stop(): Promise<void> {
|
||||
this.logger.info("Stopping Penpot MCP Server...");
|
||||
clearInterval(this.sessionTimeoutInterval);
|
||||
await this.redisBridge?.close();
|
||||
await this.replServer.stop();
|
||||
this.logger.info("Penpot MCP Server stopped");
|
||||
}
|
||||
|
||||
@ -1,9 +1,11 @@
|
||||
import { WebSocket, WebSocketServer } from "ws";
|
||||
import * as http from "http";
|
||||
import { PluginTask } from "./PluginTask";
|
||||
import { PluginTaskResponse, PluginTaskResult } from "@penpot/mcp-common";
|
||||
import { AbstractPluginTask, PluginTask } from "./PluginTask";
|
||||
import { RemotePluginTask } from "./RemotePluginTask";
|
||||
import { PluginTaskRequest, PluginTaskResponse, PluginTaskResult } from "@penpot/mcp-common";
|
||||
import { createLogger } from "./logger";
|
||||
import type { PenpotMcpServer } from "./PenpotMcpServer";
|
||||
import type { RedisBridge } from "./RedisBridge";
|
||||
|
||||
const KEEP_ALIVE_TIME = 30000; // 30 seconds
|
||||
|
||||
@ -22,12 +24,24 @@ export class PluginBridge {
|
||||
private readonly wsServer: WebSocketServer;
|
||||
private readonly connectedClients: Map<WebSocket, ClientConnection> = new Map();
|
||||
private readonly clientsByToken: Map<string, ClientConnection> = new Map();
|
||||
private readonly pendingTasks: Map<string, PluginTask<any, any>> = new Map();
|
||||
private readonly pendingTasks: Map<string, AbstractPluginTask<any, any>> = new Map();
|
||||
private readonly taskTimeouts: Map<string, NodeJS.Timeout> = new Map();
|
||||
|
||||
/**
|
||||
* Creates the plugin bridge and starts its WebSocket server.
|
||||
*
|
||||
* @param mcpServer - The owning MCP server
|
||||
* @param port - The port on which to listen for plugin WebSocket connections
|
||||
* @param redisBridge - Optional Redis bridge enabling multi-instance task routing.
|
||||
* When provided, tasks handled by this instance are routed to the instance
|
||||
* holding the relevant plugin's WebSocket connection (which may be this same
|
||||
* instance) via Redis, rather than dispatched directly over a local socket.
|
||||
* @param taskTimeoutSecs - Timeout, in seconds, for plugin task execution
|
||||
*/
|
||||
constructor(
|
||||
public readonly mcpServer: PenpotMcpServer,
|
||||
private port: number,
|
||||
private readonly redisBridge?: RedisBridge,
|
||||
private taskTimeoutSecs: number = 30
|
||||
) {
|
||||
this.wsServer = new WebSocketServer({ port: port });
|
||||
@ -77,6 +91,17 @@ export class PluginBridge {
|
||||
}
|
||||
|
||||
this.clientsByToken.set(userToken, connection);
|
||||
|
||||
// In multi-instance mode, subscribe to this token's Redis request channel so
|
||||
// that task requests issued by other instances are dispatched to this plugin.
|
||||
if (this.redisBridge) {
|
||||
const tokenForSubscription = userToken;
|
||||
this.redisBridge
|
||||
.subscribeToTasks(userToken, (request) =>
|
||||
this.dispatchForwardedTask(tokenForSubscription, request)
|
||||
)
|
||||
.catch((error) => this.logger.error(error, "Failed to subscribe to Redis task channel"));
|
||||
}
|
||||
}
|
||||
|
||||
ws.on("message", (data: Buffer) => {
|
||||
@ -121,6 +146,12 @@ export class PluginBridge {
|
||||
this.connectedClients.delete(ws);
|
||||
if (connection.userToken) {
|
||||
this.clientsByToken.delete(connection.userToken);
|
||||
|
||||
if (this.redisBridge) {
|
||||
this.redisBridge
|
||||
.unsubscribeFromTasks(connection.userToken)
|
||||
.catch((error) => this.logger.error(error, "Failed to unsubscribe from Redis task channel"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -203,10 +234,9 @@ export class PluginBridge {
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a plugin task by sending it to connected clients.
|
||||
*
|
||||
* Registers the task for result correlation and returns a promise
|
||||
* that resolves when the plugin responds with the execution result.
|
||||
* Executes a plugin task by sending it to the connected Penpot plugin instance,
|
||||
* either directly via WebSocket or indirectly via Redis (depending on the configuration),
|
||||
* and awaiting the result.
|
||||
*
|
||||
* @param task - The plugin task to execute
|
||||
* @throws Error if no plugin instances are connected or available
|
||||
@ -214,28 +244,67 @@ export class PluginBridge {
|
||||
public async executePluginTask<TResult extends PluginTaskResult<any>>(
|
||||
task: PluginTask<any, TResult>
|
||||
): Promise<TResult> {
|
||||
// get the appropriate client connection based on mode
|
||||
const connection = this.getClientConnection();
|
||||
this.sendPluginTask(task, this.redisBridge !== undefined);
|
||||
return await task.getResultPromise();
|
||||
}
|
||||
|
||||
// register the task for result correlation
|
||||
this.pendingTasks.set(task.id, task);
|
||||
/**
|
||||
* Registers a task for response correlation, sends its request over the appropriate
|
||||
* transport, and arms a timeout that rejects the task if no response is received.
|
||||
*
|
||||
* The response (whether arriving over the local WebSocket or over Redis) is later
|
||||
* matched by ID in {@link handlePluginTaskResponse}, which settles the task via its
|
||||
* `resolveWithResult`/`rejectWithError` methods. The same correlation and timeout
|
||||
* handling therefore applies regardless of the transport.
|
||||
*
|
||||
* @param task - The task to dispatch
|
||||
* @param useRedis - Whether to route the request via Redis (multi-instance) rather
|
||||
* than directly over the local WebSocket connection
|
||||
* @param connection - The connection to use for a local (non-remote) dispatch; when
|
||||
* omitted, the session's connection is resolved via {@link getClientConnection}.
|
||||
* Ignored when `useRedis` is true.
|
||||
* @throws Error if a local dispatch is required but no suitable connection is available
|
||||
*/
|
||||
private sendPluginTask(task: AbstractPluginTask<any, any>, useRedis: boolean, connection?: ClientConnection): void {
|
||||
let onTimeout: (() => void) | undefined;
|
||||
|
||||
// send task to the selected client
|
||||
const requestMessage = JSON.stringify(task.toRequest());
|
||||
if (connection.socket.readyState !== 1) {
|
||||
// WebSocket is not open
|
||||
this.pendingTasks.delete(task.id);
|
||||
throw new Error(`Plugin instance is disconnected. Task could not be sent.`);
|
||||
if (useRedis) {
|
||||
const sessionContext = this.mcpServer.getSessionContext();
|
||||
if (!sessionContext?.userToken) {
|
||||
throw new Error("No userToken found in session context. Multi-user mode requires authentication.");
|
||||
}
|
||||
const userToken = sessionContext.userToken;
|
||||
const redisBridge = this.redisBridge!;
|
||||
this.logger.debug("Dispatching task %s via Redis", task.id);
|
||||
|
||||
// register the task for result correlation, then publish the request via Redis
|
||||
this.pendingTasks.set(task.id, task);
|
||||
void redisBridge.sendTaskRequest(userToken, task.toRequest(), (response) =>
|
||||
this.handlePluginTaskResponse(response)
|
||||
);
|
||||
|
||||
// on timeout, release the response-channel subscription, since no response
|
||||
// will arrive to trigger its self-unsubscribe.
|
||||
onTimeout = () => void redisBridge.unsubscribeFromResponse(task.id);
|
||||
} else {
|
||||
const target = connection ?? this.getClientConnection();
|
||||
if (target.socket.readyState !== 1) {
|
||||
// WebSocket is not open
|
||||
throw new Error(`Plugin instance is disconnected. Task could not be sent.`);
|
||||
}
|
||||
|
||||
// register the task for result correlation, then send over the socket
|
||||
this.pendingTasks.set(task.id, task);
|
||||
target.socket.send(JSON.stringify(task.toRequest()));
|
||||
}
|
||||
|
||||
connection.socket.send(requestMessage);
|
||||
|
||||
// Set up a timeout to reject the task if no response is received
|
||||
const timeoutHandle = setTimeout(() => {
|
||||
const pendingTask = this.pendingTasks.get(task.id);
|
||||
if (pendingTask) {
|
||||
this.pendingTasks.delete(task.id);
|
||||
this.taskTimeouts.delete(task.id);
|
||||
onTimeout?.();
|
||||
pendingTask.rejectWithError(
|
||||
new Error(`Task ${task.id} timed out after ${this.taskTimeoutSecs} seconds`)
|
||||
);
|
||||
@ -243,8 +312,43 @@ export class PluginBridge {
|
||||
}, this.taskTimeoutSecs * 1000);
|
||||
|
||||
this.taskTimeouts.set(task.id, timeoutHandle);
|
||||
this.logger.info(`Sent task ${task.id} to connected client`);
|
||||
this.logger.info(`Sent task ${task.id}`);
|
||||
}
|
||||
|
||||
return await task.getResultPromise();
|
||||
/**
|
||||
* Dispatches a task request received over Redis to the locally-connected plugin.
|
||||
*
|
||||
* Invoked on the instance subscribed to a user token's request channel when another
|
||||
* instance (or this one) issues a task request. A {@link RemotePluginTask} is created
|
||||
* so that, once the plugin responds, the outcome is published back to the issuing
|
||||
* instance's Redis response channel via the standard response-handling path.
|
||||
*
|
||||
* On failure to dispatch (e.g. the plugin is not connected here), an error response
|
||||
* is published immediately so the requester need not wait for its timeout.
|
||||
*
|
||||
* @param userToken - The user token on whose request channel the request arrived;
|
||||
* identifies the locally-connected plugin to dispatch to
|
||||
* @param request - The serialized task request, passed through from Redis
|
||||
*/
|
||||
private dispatchForwardedTask(userToken: string, request: PluginTaskRequest): void {
|
||||
if (!this.redisBridge) {
|
||||
return;
|
||||
}
|
||||
|
||||
// The response is published on the channel keyed by the original request ID.
|
||||
const task = new RemotePluginTask(request.task, request.params, this.redisBridge, request.id);
|
||||
this.logger.debug("Dispatching remote task %s as %s to Penpot via WebSocket", request.id, task.id);
|
||||
|
||||
const connection = this.clientsByToken.get(userToken);
|
||||
if (!connection) {
|
||||
task.rejectWithError(new Error("Plugin not connected on the receiving instance"));
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
this.sendPluginTask(task, false, connection);
|
||||
} catch (error) {
|
||||
task.rejectWithError(error instanceof Error ? error : new Error(String(error)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,24 +1,28 @@
|
||||
/**
|
||||
* Base class for plugin tasks that are sent over WebSocket.
|
||||
* Base classes for plugin tasks that are dispatched to a Penpot plugin instance
|
||||
* over a WebSocket connection.
|
||||
*
|
||||
* Each task defines a specific operation for the plugin to execute
|
||||
* along with strongly-typed parameters.
|
||||
*
|
||||
* @template TParams - The strongly-typed parameters for this task
|
||||
* A task defines a specific operation for the plugin to execute along with
|
||||
* strongly-typed parameters and provides request/response correlation.
|
||||
*/
|
||||
import { PluginTaskRequest, PluginTaskResult } from "@penpot/mcp-common";
|
||||
import { randomUUID } from "crypto";
|
||||
|
||||
/**
|
||||
* Base class for plugin tasks that are sent over WebSocket.
|
||||
* Abstract base for plugin tasks, defining the parts that the plugin dispatch and
|
||||
* response-correlation machinery (`PluginBridge.sendPluginTask` /
|
||||
* `PluginBridge.handlePluginTaskResponse`) depend upon.
|
||||
*
|
||||
* Each task defines a specific operation for the plugin to execute
|
||||
* along with strongly-typed parameters and request/response correlation.
|
||||
* The dispatch path only needs to serialize a task to a request and, upon receiving
|
||||
* the plugin's response, settle the task via `resolveWithResult`/`rejectWithError`.
|
||||
* What "settling" actually means is left to subclasses: a local task resolves an
|
||||
* in-process promise (see {@link PluginTask}), whereas a remote task forwards the
|
||||
* outcome elsewhere (see {@link RemotePluginTask}).
|
||||
*
|
||||
* @template TParams - The strongly-typed parameters for this task
|
||||
* @template TResult - The expected result type from task execution
|
||||
*/
|
||||
export abstract class PluginTask<TParams = any, TResult extends PluginTaskResult<any> = PluginTaskResult<any>> {
|
||||
export abstract class AbstractPluginTask<TParams = any, TResult extends PluginTaskResult<any> = PluginTaskResult<any>> {
|
||||
/**
|
||||
* Unique identifier for request/response correlation.
|
||||
*/
|
||||
@ -34,6 +38,66 @@ export abstract class PluginTask<TParams = any, TResult extends PluginTaskResult
|
||||
*/
|
||||
public readonly params: TParams;
|
||||
|
||||
/**
|
||||
* Creates a new plugin task instance.
|
||||
*
|
||||
* @param task - The name of the task to execute
|
||||
* @param params - The parameters for task execution
|
||||
*/
|
||||
protected constructor(task: string, params: TParams) {
|
||||
this.id = randomUUID();
|
||||
this.task = task;
|
||||
this.params = params;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes the task to a request message for transmission to the plugin.
|
||||
*
|
||||
* @returns The request message containing ID, task name, and parameters
|
||||
*/
|
||||
toRequest(): PluginTaskRequest {
|
||||
return {
|
||||
id: this.id,
|
||||
task: this.task,
|
||||
params: this.params,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Settles the task successfully with the given result.
|
||||
*
|
||||
* Called by the response-correlation machinery when the plugin reports success
|
||||
* for the task with the matching ID.
|
||||
*
|
||||
* @param result - The task execution result
|
||||
*/
|
||||
abstract resolveWithResult(result: TResult): void;
|
||||
|
||||
/**
|
||||
* Settles the task unsuccessfully with the given error.
|
||||
*
|
||||
* Called by the response-correlation machinery when task execution fails
|
||||
* or times out.
|
||||
*
|
||||
* @param error - The error that occurred during task execution
|
||||
*/
|
||||
abstract rejectWithError(error: Error): void;
|
||||
}
|
||||
|
||||
/**
|
||||
* A locally-awaited plugin task.
|
||||
*
|
||||
* The task's outcome is exposed as an in-process promise (see {@link getResultPromise}),
|
||||
* which the caller awaits to obtain the result. This is the task type used by tools that
|
||||
* execute operations on the plugin and consume the result directly.
|
||||
*
|
||||
* @template TParams - The strongly-typed parameters for this task
|
||||
* @template TResult - The expected result type from task execution
|
||||
*/
|
||||
export class PluginTask<
|
||||
TParams = any,
|
||||
TResult extends PluginTaskResult<any> = PluginTaskResult<any>,
|
||||
> extends AbstractPluginTask<TParams, TResult> {
|
||||
/**
|
||||
* Promise that resolves when the task execution completes.
|
||||
*/
|
||||
@ -50,15 +114,13 @@ export abstract class PluginTask<TParams = any, TResult extends PluginTaskResult
|
||||
private rejectResult?: (error: Error) => void;
|
||||
|
||||
/**
|
||||
* Creates a new plugin task instance.
|
||||
* Creates a new locally-awaited plugin task.
|
||||
*
|
||||
* @param task - The name of the task to execute
|
||||
* @param params - The parameters for task execution
|
||||
*/
|
||||
constructor(task: string, params: TParams) {
|
||||
this.id = randomUUID();
|
||||
this.task = task;
|
||||
this.params = params;
|
||||
super(task, params);
|
||||
this.result = new Promise<TResult>((resolve, reject) => {
|
||||
this.resolveResult = resolve;
|
||||
this.rejectResult = reject;
|
||||
@ -77,14 +139,6 @@ export abstract class PluginTask<TParams = any, TResult extends PluginTaskResult
|
||||
return this.result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves the task with the given result.
|
||||
*
|
||||
* This method should be called when a task response is received
|
||||
* from the plugin with matching ID.
|
||||
*
|
||||
* @param result - The task execution result
|
||||
*/
|
||||
resolveWithResult(result: TResult): void {
|
||||
if (!this.resolveResult) {
|
||||
throw new Error("Result promise not initialized");
|
||||
@ -92,31 +146,10 @@ export abstract class PluginTask<TParams = any, TResult extends PluginTaskResult
|
||||
this.resolveResult(result);
|
||||
}
|
||||
|
||||
/**
|
||||
* Rejects the task with the given error.
|
||||
*
|
||||
* This method should be called when task execution fails
|
||||
* or times out.
|
||||
*
|
||||
* @param error - The error that occurred during task execution
|
||||
*/
|
||||
rejectWithError(error: Error): void {
|
||||
if (!this.rejectResult) {
|
||||
throw new Error("Result promise not initialized");
|
||||
}
|
||||
this.rejectResult(error);
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes the task to a request message for WebSocket transmission.
|
||||
*
|
||||
* @returns The request message containing ID, task name, and parameters
|
||||
*/
|
||||
toRequest(): PluginTaskRequest {
|
||||
return {
|
||||
id: this.id,
|
||||
task: this.task,
|
||||
params: this.params,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
182
mcp/packages/server/src/RedisBridge.ts
Normal file
182
mcp/packages/server/src/RedisBridge.ts
Normal file
@ -0,0 +1,182 @@
|
||||
import Redis from "ioredis";
|
||||
import { PluginTaskRequest, PluginTaskResponse } from "@penpot/mcp-common";
|
||||
import { createLogger } from "./logger";
|
||||
|
||||
/**
|
||||
* Channel name prefixes for the task request/response pub/sub protocol.
|
||||
*
|
||||
* Request channels are keyed by user token (one per connected plugin); response
|
||||
* channels are keyed by the task ID, so that only the instance that issued a given
|
||||
* request receives its response.
|
||||
*/
|
||||
const TASK_REQUEST_CHANNEL_PREFIX = "penpot.mcp.task.req.";
|
||||
const TASK_RESPONSE_CHANNEL_PREFIX = "penpot.mcp.task.res.";
|
||||
|
||||
/**
|
||||
* Handler invoked for a task request arriving on a subscribed request channel.
|
||||
*/
|
||||
export type TaskRequestHandler = (request: PluginTaskRequest) => void;
|
||||
|
||||
/**
|
||||
* Handler invoked for a task response arriving on a subscribed response channel.
|
||||
*/
|
||||
export type TaskResponseHandler = (response: PluginTaskResponse<any>) => void;
|
||||
|
||||
/**
|
||||
* Provides a Redis-backed transport for routing plugin task requests and responses
|
||||
* between MCP server instances.
|
||||
*
|
||||
* The bridge is a pure, stateless transport: it moves already-serialized
|
||||
* `PluginTaskRequest` and `PluginTaskResponse` objects between instances and does not
|
||||
* interpret their contents, correlate requests with responses, or impose timeouts.
|
||||
* Correlation and timeout handling remain the responsibility of the caller (see
|
||||
* `PluginBridge`, which routes Redis-delivered responses through the same
|
||||
* pending-task machinery used for direct WebSocket dispatch).
|
||||
*
|
||||
* It enables a tool call handled on one instance to be executed against a plugin
|
||||
* whose WebSocket connection lives on another instance: the request is published on a
|
||||
* channel keyed by user token (to which the instance holding the plugin connection is
|
||||
* subscribed), and the response is published on a channel keyed by task ID (to which
|
||||
* the issuing instance subscribes).
|
||||
*
|
||||
* Two Redis connections are used, as ioredis requires a dedicated connection while
|
||||
* subscribed: one for commands and publishing, and one for subscriptions.
|
||||
*/
|
||||
export class RedisBridge {
|
||||
private readonly logger = createLogger("RedisBridge");
|
||||
private readonly publisher: Redis;
|
||||
private readonly subscriber: Redis;
|
||||
|
||||
/**
|
||||
* Message handlers keyed by channel name.
|
||||
*
|
||||
* ioredis exposes a single, global message event for all subscribed channels, so
|
||||
* incoming messages are dispatched to the correct handler by channel name. Both
|
||||
* request-channel and response-channel handlers are stored here.
|
||||
*/
|
||||
private readonly handlers = new Map<string, (rawMessage: string) => void>();
|
||||
|
||||
/**
|
||||
* Creates a Redis bridge connected to the given Redis instance.
|
||||
*
|
||||
* @param redisUri - The Redis connection URI (e.g. `redis://host:6379`)
|
||||
*/
|
||||
constructor(redisUri: string) {
|
||||
this.publisher = new Redis(redisUri);
|
||||
this.subscriber = new Redis(redisUri);
|
||||
|
||||
this.subscriber.on("message", (channel: string, rawMessage: string) => {
|
||||
const handler = this.handlers.get(channel);
|
||||
if (handler) {
|
||||
handler(rawMessage);
|
||||
} else {
|
||||
this.logger.warn(`Received message on channel with no registered handler: ${channel}`);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribes to the response channel for the given task ID and publishes the task
|
||||
* request to the given user token's request channel.
|
||||
*
|
||||
* The response subscription is established *before* the request is published, to
|
||||
* avoid a race in which the response would be published before the subscription is
|
||||
* in place. The response handler is invoked at most once and the subscription is
|
||||
* removed automatically upon delivery (response channels are single-use).
|
||||
*
|
||||
* @param userToken - The user token identifying the target plugin's request channel
|
||||
* @param request - The serialized plugin task request, passed through verbatim
|
||||
* @param onResponse - Handler invoked with the response when it arrives
|
||||
*/
|
||||
async sendTaskRequest(
|
||||
userToken: string,
|
||||
request: PluginTaskRequest,
|
||||
onResponse: TaskResponseHandler
|
||||
): Promise<void> {
|
||||
const responseChannel = `${TASK_RESPONSE_CHANNEL_PREFIX}${request.id}`;
|
||||
const requestChannel = `${TASK_REQUEST_CHANNEL_PREFIX}${userToken}`;
|
||||
|
||||
this.handlers.set(responseChannel, (rawMessage) => {
|
||||
// a response channel is single-use: remove the handler and unsubscribe on delivery
|
||||
this.handlers.delete(responseChannel);
|
||||
void this.subscriber.unsubscribe(responseChannel);
|
||||
try {
|
||||
onResponse(JSON.parse(rawMessage) as PluginTaskResponse<any>);
|
||||
} catch (error) {
|
||||
this.logger.error(error, "Failed to parse task response message");
|
||||
}
|
||||
});
|
||||
|
||||
await this.subscriber.subscribe(responseChannel);
|
||||
// publish only once the response subscription is confirmed
|
||||
await this.publisher.publish(requestChannel, JSON.stringify(request));
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribes from the response channel for the given task ID.
|
||||
*
|
||||
* Used to release a response subscription when no response will be processed (e.g.
|
||||
* the awaiting task has timed out), since in that case the self-unsubscribe on
|
||||
* delivery never occurs.
|
||||
*
|
||||
* @param taskId - The task ID whose response channel to unsubscribe from
|
||||
*/
|
||||
async unsubscribeFromResponse(taskId: string): Promise<void> {
|
||||
const responseChannel = `${TASK_RESPONSE_CHANNEL_PREFIX}${taskId}`;
|
||||
this.handlers.delete(responseChannel);
|
||||
await this.subscriber.unsubscribe(responseChannel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Publishes a task response on the response channel for the given task ID.
|
||||
*
|
||||
* Used by the instance executing a forwarded task to return its outcome to the
|
||||
* issuing instance.
|
||||
*
|
||||
* @param taskId - The ID of the originally requested task
|
||||
* @param response - The serialized plugin task response, passed through verbatim
|
||||
*/
|
||||
publishTaskResponse(taskId: string, response: PluginTaskResponse<any>): void {
|
||||
const responseChannel = `${TASK_RESPONSE_CHANNEL_PREFIX}${taskId}`;
|
||||
void this.publisher.publish(responseChannel, JSON.stringify(response));
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribes to task requests for the given user token.
|
||||
*
|
||||
* The handler is invoked for each request arriving on the token's request channel.
|
||||
*
|
||||
* @param userToken - The user token whose request channel to subscribe to
|
||||
* @param handler - The handler to invoke for incoming requests
|
||||
*/
|
||||
async subscribeToTasks(userToken: string, handler: TaskRequestHandler): Promise<void> {
|
||||
const requestChannel = `${TASK_REQUEST_CHANNEL_PREFIX}${userToken}`;
|
||||
this.handlers.set(requestChannel, (rawMessage) => {
|
||||
try {
|
||||
handler(JSON.parse(rawMessage) as PluginTaskRequest);
|
||||
} catch (error) {
|
||||
this.logger.error(error, "Failed to parse task request message");
|
||||
}
|
||||
});
|
||||
await this.subscriber.subscribe(requestChannel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribes from task requests for the given user token.
|
||||
*
|
||||
* @param userToken - The user token whose request channel to unsubscribe from
|
||||
*/
|
||||
async unsubscribeFromTasks(userToken: string): Promise<void> {
|
||||
const requestChannel = `${TASK_REQUEST_CHANNEL_PREFIX}${userToken}`;
|
||||
this.handlers.delete(requestChannel);
|
||||
await this.subscriber.unsubscribe(requestChannel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes both Redis connections. Call on server shutdown.
|
||||
*/
|
||||
async close(): Promise<void> {
|
||||
await this.subscriber.quit();
|
||||
await this.publisher.quit();
|
||||
}
|
||||
}
|
||||
56
mcp/packages/server/src/RemotePluginTask.ts
Normal file
56
mcp/packages/server/src/RemotePluginTask.ts
Normal file
@ -0,0 +1,56 @@
|
||||
import { AbstractPluginTask } from "./PluginTask";
|
||||
import { PluginTaskResult } from "@penpot/mcp-common";
|
||||
import type { RedisBridge } from "./RedisBridge";
|
||||
|
||||
/**
|
||||
* A plugin task whose outcome is forwarded back to a remote requester via Redis,
|
||||
* rather than awaited in-process.
|
||||
*
|
||||
* This task type is used on the server instance that holds the plugin's WebSocket
|
||||
* connection when a task request arrives over Redis (published by another instance
|
||||
* that received the corresponding tool call). It is dispatched to the plugin through
|
||||
* the ordinary local dispatch path; when the plugin responds, the response-correlation
|
||||
* machinery settles this task, and the overridden `resolveWithResult`/`rejectWithError`
|
||||
* publish the outcome back onto the requester's Redis response channel.
|
||||
*
|
||||
* Note that this task has its own ID (used to correlate the local WebSocket dispatch),
|
||||
* distinct from the original requester's task ID, which keys the Redis response channel.
|
||||
*
|
||||
* It deliberately carries no result promise: settling the task *is* the side effect
|
||||
* of publishing to Redis, and nothing awaits it locally.
|
||||
*/
|
||||
export class RemotePluginTask extends AbstractPluginTask<any, PluginTaskResult<any>> {
|
||||
/**
|
||||
* Creates a task that forwards its outcome to a Redis response channel.
|
||||
*
|
||||
* @param task - The name of the task to execute (from the incoming request)
|
||||
* @param params - The parameters for task execution (from the incoming request)
|
||||
* @param redisBridge - The Redis bridge used to publish the outcome
|
||||
* @param originalTaskId - The ID of the original request, which keys the response
|
||||
* channel the requesting instance is awaiting
|
||||
*/
|
||||
constructor(
|
||||
task: string,
|
||||
params: any,
|
||||
private readonly redisBridge: RedisBridge,
|
||||
private readonly originalTaskId: string
|
||||
) {
|
||||
super(task, params);
|
||||
}
|
||||
|
||||
resolveWithResult(result: PluginTaskResult<any>): void {
|
||||
this.redisBridge.publishTaskResponse(this.originalTaskId, {
|
||||
id: this.originalTaskId,
|
||||
success: true,
|
||||
data: result.data,
|
||||
});
|
||||
}
|
||||
|
||||
rejectWithError(error: Error): void {
|
||||
this.redisBridge.publishTaskResponse(this.originalTaskId, {
|
||||
id: this.originalTaskId,
|
||||
success: false,
|
||||
error: error.message,
|
||||
});
|
||||
}
|
||||
}
|
||||
55
mcp/pnpm-lock.yaml
generated
55
mcp/pnpm-lock.yaml
generated
@ -57,6 +57,9 @@ importers:
|
||||
express:
|
||||
specifier: ^5.1.0
|
||||
version: 5.2.1
|
||||
ioredis:
|
||||
specifier: ^5.6.0
|
||||
version: 5.11.0
|
||||
js-yaml:
|
||||
specifier: ^4.1.1
|
||||
version: 4.1.1
|
||||
@ -760,6 +763,9 @@ packages:
|
||||
cpu: [x64]
|
||||
os: [win32]
|
||||
|
||||
'@ioredis/commands@1.10.0':
|
||||
resolution: {integrity: sha512-UmeW7z4LfctwoQ5wkhVzgq8tXkreED2xZGpX+Bg+zA+WJFZCT6c062AfCK/Dfk81xZnnwdhJCUMkitihRaoC2Q==}
|
||||
|
||||
'@jridgewell/resolve-uri@3.1.2':
|
||||
resolution: {integrity: sha512-bRISgCIjP20/tbWSPWMEi54QVPRZExkuD9lJL+UIxUKtwVJA8wW1Trb1jMs1RFXo1CBTNZ/5hpC9QvmKWdopKw==}
|
||||
engines: {node: '>=6.0.0'}
|
||||
@ -1080,6 +1086,10 @@ packages:
|
||||
resolution: {integrity: sha512-BSeNnyus75C4//NQ9gQt1/csTXyo/8Sb+afLAkzAptFuMsod9HFokGNudZpi/oQV73hnVK+sR+5PVRMd+Dr7YQ==}
|
||||
engines: {node: '>=12'}
|
||||
|
||||
cluster-key-slot@1.1.1:
|
||||
resolution: {integrity: sha512-rwHwUfXL40Chm1r08yrhU3qpUvdVlgkKNeyeGPOxnW8/SyVDvgRaed/Uz54AqWNaTCAThlj6QAs3TZcKI0xDEw==}
|
||||
engines: {node: '>=0.10.0'}
|
||||
|
||||
color-convert@2.0.1:
|
||||
resolution: {integrity: sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==}
|
||||
engines: {node: '>=7.0.0'}
|
||||
@ -1143,6 +1153,10 @@ packages:
|
||||
supports-color:
|
||||
optional: true
|
||||
|
||||
denque@2.1.0:
|
||||
resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==}
|
||||
engines: {node: '>=0.10'}
|
||||
|
||||
depd@2.0.0:
|
||||
resolution: {integrity: sha512-g7nH6P6dyDioJogAAGprGpCtVImJhpPk/roCzdb3fIh61/s/nPsfR6onyMwkCAR/OlC3yBC0lESvUoQEAssIrw==}
|
||||
engines: {node: '>= 0.8'}
|
||||
@ -1319,6 +1333,10 @@ packages:
|
||||
inherits@2.0.4:
|
||||
resolution: {integrity: sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==}
|
||||
|
||||
ioredis@5.11.0:
|
||||
resolution: {integrity: sha512-EZBErytyVovD8f6pDfG3Kb37N6Y3lmDA9NNj+4+IP13CzzHGeX+OyeRM2Um13khRzoBSzzL+5lVnCX8V2RLeMg==}
|
||||
engines: {node: '>=12.22.0'}
|
||||
|
||||
ipaddr.js@1.9.1:
|
||||
resolution: {integrity: sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==}
|
||||
engines: {node: '>= 0.10'}
|
||||
@ -1501,6 +1519,14 @@ packages:
|
||||
resolution: {integrity: sha512-57frrGM/OCTLqLOAh0mhVA9VBMHd+9U7Zb2THMGdBUoZVOtGbJzjxsYGDJ3A9AYYCP4hn6y1TVbaOfzWtm5GFg==}
|
||||
engines: {node: '>= 12.13.0'}
|
||||
|
||||
redis-errors@1.2.0:
|
||||
resolution: {integrity: sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==}
|
||||
engines: {node: '>=4'}
|
||||
|
||||
redis-parser@3.0.0:
|
||||
resolution: {integrity: sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==}
|
||||
engines: {node: '>=4'}
|
||||
|
||||
reflect-metadata@0.1.14:
|
||||
resolution: {integrity: sha512-ZhYeb6nRaXCfhnndflDK8qI6ZQ/YcWZCISRAWICW9XYqMUwjZM9Z0DveWX/ABN01oxSHwVxKQmxeYZSsm0jh5A==}
|
||||
|
||||
@ -1593,6 +1619,9 @@ packages:
|
||||
resolution: {integrity: sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==}
|
||||
engines: {node: '>= 10.x'}
|
||||
|
||||
standard-as-callback@2.1.0:
|
||||
resolution: {integrity: sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==}
|
||||
|
||||
statuses@2.0.2:
|
||||
resolution: {integrity: sha512-DvEy55V3DB7uknRo+4iOGT5fP1slR8wQohVdknigZPMpMstaKJQWhwiYBACJE3Ul2pTnATihhBYnRhZQHGBiRw==}
|
||||
engines: {node: '>= 0.8'}
|
||||
@ -2124,6 +2153,8 @@ snapshots:
|
||||
'@img/sharp-win32-x64@0.34.5':
|
||||
optional: true
|
||||
|
||||
'@ioredis/commands@1.10.0': {}
|
||||
|
||||
'@jridgewell/resolve-uri@3.1.2': {}
|
||||
|
||||
'@jridgewell/sourcemap-codec@1.5.5': {}
|
||||
@ -2397,6 +2428,8 @@ snapshots:
|
||||
strip-ansi: 6.0.1
|
||||
wrap-ansi: 7.0.0
|
||||
|
||||
cluster-key-slot@1.1.1: {}
|
||||
|
||||
color-convert@2.0.1:
|
||||
dependencies:
|
||||
color-name: 1.1.4
|
||||
@ -2447,6 +2480,8 @@ snapshots:
|
||||
dependencies:
|
||||
ms: 2.1.3
|
||||
|
||||
denque@2.1.0: {}
|
||||
|
||||
depd@2.0.0: {}
|
||||
|
||||
detect-libc@2.1.2: {}
|
||||
@ -2695,6 +2730,18 @@ snapshots:
|
||||
|
||||
inherits@2.0.4: {}
|
||||
|
||||
ioredis@5.11.0:
|
||||
dependencies:
|
||||
'@ioredis/commands': 1.10.0
|
||||
cluster-key-slot: 1.1.1
|
||||
debug: 4.4.3
|
||||
denque: 2.1.0
|
||||
redis-errors: 1.2.0
|
||||
redis-parser: 3.0.0
|
||||
standard-as-callback: 2.1.0
|
||||
transitivePeerDependencies:
|
||||
- supports-color
|
||||
|
||||
ipaddr.js@1.9.1: {}
|
||||
|
||||
is-fullwidth-code-point@3.0.0: {}
|
||||
@ -2856,6 +2903,12 @@ snapshots:
|
||||
|
||||
real-require@0.2.0: {}
|
||||
|
||||
redis-errors@1.2.0: {}
|
||||
|
||||
redis-parser@3.0.0:
|
||||
dependencies:
|
||||
redis-errors: 1.2.0
|
||||
|
||||
reflect-metadata@0.1.14: {}
|
||||
|
||||
require-directory@2.1.1: {}
|
||||
@ -3017,6 +3070,8 @@ snapshots:
|
||||
|
||||
split2@4.2.0: {}
|
||||
|
||||
standard-as-callback@2.1.0: {}
|
||||
|
||||
statuses@2.0.2: {}
|
||||
|
||||
string-width@4.2.3:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user