diff --git a/.gitignore b/.gitignore index f800ad7f53..5bc58cd0b2 100644 --- a/.gitignore +++ b/.gitignore @@ -93,6 +93,7 @@ /.pnpm-store /.vscode /.idea +*.iml /.claude /.playwright-mcp /.devenv/mcp/ diff --git a/mcp/README.md b/mcp/README.md index c1141653f6..3efc6255e7 100644 --- a/mcp/README.md +++ b/mcp/README.md @@ -273,6 +273,7 @@ The Penpot MCP server can be configured using environment variables. | `PENPOT_MCP_REMOTE_MODE` | Enable remote mode (disables file system access). Set to `true` to enable. | `false` | | `PENPOT_MCP_DEVENV` | Enable Penpot development environment tools. Set to `true` to enable. | `false` | | `PENPOT_MCP_EXPORT_SHAPE_MAX_PARALLEL_REQUESTS` | Maximum number of parallel export shape requests (multi-user mode only). | `0` (no limit) | +| `PENPOT_MCP_REDIS_URI` | Redis connection URI (e.g. `redis://host:6379`) enabling multi-instance horizontal scaling via Redis pub/sub task routing (multi-user mode only). When unset, the server runs in single-instance mode, requiring the plugin and MCP client to connect to the same instance. | (unset) | ### Logging Configuration diff --git a/mcp/packages/server/package.json b/mcp/packages/server/package.json index 58f8641887..c3a3a04e47 100644 --- a/mcp/packages/server/package.json +++ b/mcp/packages/server/package.json @@ -5,7 +5,7 @@ "type": "module", "main": "dist/index.js", "scripts": { - "build:server": "esbuild src/index.ts --bundle --platform=node --target=node18 --format=esm --outfile=dist/index.js --external:@modelcontextprotocol/* --external:ws --external:express --external:class-transformer --external:class-validator --external:reflect-metadata --external:pino --external:pino-pretty --external:pino-loki --external:js-yaml --external:sharp --external:nrepl-client", + "build:server": "esbuild src/index.ts --bundle --platform=node --target=node18 --format=esm --outfile=dist/index.js --external:@modelcontextprotocol/* --external:ws --external:express --external:class-transformer --external:class-validator --external:reflect-metadata --external:pino --external:pino-pretty --external:pino-loki --external:js-yaml --external:sharp --external:nrepl-client --external:ioredis", "build": "pnpm run build:server && node scripts/copy-resources.js", "build:types": "tsc --emitDeclarationOnly --outDir dist", "start": "node dist/index.js", @@ -29,6 +29,7 @@ "class-transformer": "^0.5.1", "class-validator": "^0.14.3", "express": "^5.1.0", + "ioredis": "^5.6.0", "js-yaml": "^4.1.1", "nrepl-client": "^0.3.0", "penpot-mcp": "file:..", diff --git a/mcp/packages/server/src/PenpotMcpServer.ts b/mcp/packages/server/src/PenpotMcpServer.ts index 44b058b358..2695acdd65 100644 --- a/mcp/packages/server/src/PenpotMcpServer.ts +++ b/mcp/packages/server/src/PenpotMcpServer.ts @@ -4,6 +4,7 @@ import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { ExecuteCodeTool } from "./tools/ExecuteCodeTool"; import { PluginBridge } from "./PluginBridge"; +import { RedisBridge } from "./RedisBridge"; import { ConfigurationLoader } from "./ConfigurationLoader"; import { createLogger } from "./logger"; import { Tool } from "./Tool"; @@ -104,6 +105,12 @@ export class PenpotMcpServer { public readonly replPort: number; private sessionTimeoutInterval: ReturnType | undefined; + /** + * Optional Redis bridge for multi-instance task routing; present only when running + * in multi-user mode with a configured Redis URI. + */ + private readonly redisBridge?: RedisBridge; + constructor(private isMultiUser: boolean = false) { // read port configuration from environment variables this.host = process.env.PENPOT_MCP_SERVER_HOST ?? "localhost"; @@ -122,7 +129,15 @@ export class PenpotMcpServer { this.tools = this.initTools(); - this.pluginBridge = new PluginBridge(this, this.webSocketPort); + // Enable multi-instance task routing when running in multi-user mode with a + // configured Redis URI. Without it, the server operates in single-instance mode, + // requiring the plugin and the MCP client to connect to the same instance. + const redisUri = process.env.PENPOT_MCP_REDIS_URI; + if (this.isMultiUser && redisUri) { + this.redisBridge = new RedisBridge(redisUri); + } + + this.pluginBridge = new PluginBridge(this, this.webSocketPort, this.redisBridge); this.replServer = new ReplServer(this.pluginBridge, this.replPort, this.host); } @@ -284,15 +299,21 @@ export class PenpotMcpServer { `Received request for existing session with id=${sessionId}; userTokenFp=${PenpotMcpServer.tokenFingerprint(session.userToken)}` ); } else { - // new session: create a fresh McpServer and transport + // No locally-known session for this request. Either a brand-new session + // (no session ID) or a session that was initialized on another instance + // and routed here by the load balancer (session ID present but unknown + // locally), which we adopt rather than reject. + const isAdoptedSession = sessionId !== undefined; userToken = req.query.userToken as string | undefined; this.logger.info( - `Received new session request; userTokenFp=${PenpotMcpServer.tokenFingerprint(userToken)}` + `${isAdoptedSession ? `Adopting session initialized on another instance with id=${sessionId}` : "Received new session request"}; userTokenFp=${PenpotMcpServer.tokenFingerprint(userToken)}` ); + const { randomUUID } = await import("node:crypto"); const server = this.createMcpServer(); transport = new StreamableHTTPServerTransport({ - sessionIdGenerator: () => randomUUID(), + // For an adopted session, reuse the existing ID; otherwise generate a new one. + sessionIdGenerator: () => (isAdoptedSession ? sessionId! : randomUUID()), onsessioninitialized: (id) => { this.streamableTransports[id] = new StreamableSession(transport, userToken, Date.now()); this.logger.info( @@ -300,6 +321,22 @@ export class PenpotMcpServer { ); }, }); + + if (isAdoptedSession) { + // Pre-initialize the transport so that the SDK's validateSession() accepts + // subsequent (non-initialize) requests for this session ID. The SDK stores + // these on the inner WebStandardStreamableHTTPServerTransport as plain + // (non-#private) properties; validateSession() checks exactly _initialized + // and sessionId. Verified against @modelcontextprotocol/sdk 1.25.3. + // + // Since no initialize request will arrive for an adopted session, the + // onsessioninitialized callback will not fire; register the session here. + const inner = (transport as any)._webStandardTransport; + inner._initialized = true; + inner.sessionId = sessionId; + this.streamableTransports[sessionId!] = new StreamableSession(transport, userToken, Date.now()); + } + transport.onclose = () => { if (transport.sessionId) { this.logger.info( @@ -364,6 +401,9 @@ export class PenpotMcpServer { return new Promise((resolve) => { this.app.listen(this.port, this.host, async () => { this.logger.info(`Multi-user mode: ${this.isMultiUserMode()}`); + this.logger.info( + `Multi-instance mode with Redis-backed transport: ${this.redisBridge ? "true" : "false"}` + ); this.logger.info(`Remote mode: ${this.isRemoteMode()}`); this.logger.info(`DevEnv mode: ${this.isDevEnv()}`); this.logger.info(`Modern Streamable HTTP endpoint: http://${this.host}:${this.port}/mcp`); @@ -387,6 +427,7 @@ export class PenpotMcpServer { public async stop(): Promise { this.logger.info("Stopping Penpot MCP Server..."); clearInterval(this.sessionTimeoutInterval); + await this.redisBridge?.close(); await this.replServer.stop(); this.logger.info("Penpot MCP Server stopped"); } diff --git a/mcp/packages/server/src/PluginBridge.ts b/mcp/packages/server/src/PluginBridge.ts index 35d39aa728..1c24547b8f 100644 --- a/mcp/packages/server/src/PluginBridge.ts +++ b/mcp/packages/server/src/PluginBridge.ts @@ -1,9 +1,11 @@ import { WebSocket, WebSocketServer } from "ws"; import * as http from "http"; -import { PluginTask } from "./PluginTask"; -import { PluginTaskResponse, PluginTaskResult } from "@penpot/mcp-common"; +import { AbstractPluginTask, PluginTask } from "./PluginTask"; +import { RemotePluginTask } from "./RemotePluginTask"; +import { PluginTaskRequest, PluginTaskResponse, PluginTaskResult } from "@penpot/mcp-common"; import { createLogger } from "./logger"; import type { PenpotMcpServer } from "./PenpotMcpServer"; +import type { RedisBridge } from "./RedisBridge"; const KEEP_ALIVE_TIME = 30000; // 30 seconds @@ -22,12 +24,24 @@ export class PluginBridge { private readonly wsServer: WebSocketServer; private readonly connectedClients: Map = new Map(); private readonly clientsByToken: Map = new Map(); - private readonly pendingTasks: Map> = new Map(); + private readonly pendingTasks: Map> = new Map(); private readonly taskTimeouts: Map = new Map(); + /** + * Creates the plugin bridge and starts its WebSocket server. + * + * @param mcpServer - The owning MCP server + * @param port - The port on which to listen for plugin WebSocket connections + * @param redisBridge - Optional Redis bridge enabling multi-instance task routing. + * When provided, tasks handled by this instance are routed to the instance + * holding the relevant plugin's WebSocket connection (which may be this same + * instance) via Redis, rather than dispatched directly over a local socket. + * @param taskTimeoutSecs - Timeout, in seconds, for plugin task execution + */ constructor( public readonly mcpServer: PenpotMcpServer, private port: number, + private readonly redisBridge?: RedisBridge, private taskTimeoutSecs: number = 30 ) { this.wsServer = new WebSocketServer({ port: port }); @@ -77,6 +91,17 @@ export class PluginBridge { } this.clientsByToken.set(userToken, connection); + + // In multi-instance mode, subscribe to this token's Redis request channel so + // that task requests issued by other instances are dispatched to this plugin. + if (this.redisBridge) { + const tokenForSubscription = userToken; + this.redisBridge + .subscribeToTasks(userToken, (request) => + this.dispatchForwardedTask(tokenForSubscription, request) + ) + .catch((error) => this.logger.error(error, "Failed to subscribe to Redis task channel")); + } } ws.on("message", (data: Buffer) => { @@ -121,6 +146,12 @@ export class PluginBridge { this.connectedClients.delete(ws); if (connection.userToken) { this.clientsByToken.delete(connection.userToken); + + if (this.redisBridge) { + this.redisBridge + .unsubscribeFromTasks(connection.userToken) + .catch((error) => this.logger.error(error, "Failed to unsubscribe from Redis task channel")); + } } } @@ -203,10 +234,9 @@ export class PluginBridge { } /** - * Executes a plugin task by sending it to connected clients. - * - * Registers the task for result correlation and returns a promise - * that resolves when the plugin responds with the execution result. + * Executes a plugin task by sending it to the connected Penpot plugin instance, + * either directly via WebSocket or indirectly via Redis (depending on the configuration), + * and awaiting the result. * * @param task - The plugin task to execute * @throws Error if no plugin instances are connected or available @@ -214,28 +244,67 @@ export class PluginBridge { public async executePluginTask>( task: PluginTask ): Promise { - // get the appropriate client connection based on mode - const connection = this.getClientConnection(); + this.sendPluginTask(task, this.redisBridge !== undefined); + return await task.getResultPromise(); + } - // register the task for result correlation - this.pendingTasks.set(task.id, task); + /** + * Registers a task for response correlation, sends its request over the appropriate + * transport, and arms a timeout that rejects the task if no response is received. + * + * The response (whether arriving over the local WebSocket or over Redis) is later + * matched by ID in {@link handlePluginTaskResponse}, which settles the task via its + * `resolveWithResult`/`rejectWithError` methods. The same correlation and timeout + * handling therefore applies regardless of the transport. + * + * @param task - The task to dispatch + * @param useRedis - Whether to route the request via Redis (multi-instance) rather + * than directly over the local WebSocket connection + * @param connection - The connection to use for a local (non-remote) dispatch; when + * omitted, the session's connection is resolved via {@link getClientConnection}. + * Ignored when `useRedis` is true. + * @throws Error if a local dispatch is required but no suitable connection is available + */ + private sendPluginTask(task: AbstractPluginTask, useRedis: boolean, connection?: ClientConnection): void { + let onTimeout: (() => void) | undefined; - // send task to the selected client - const requestMessage = JSON.stringify(task.toRequest()); - if (connection.socket.readyState !== 1) { - // WebSocket is not open - this.pendingTasks.delete(task.id); - throw new Error(`Plugin instance is disconnected. Task could not be sent.`); + if (useRedis) { + const sessionContext = this.mcpServer.getSessionContext(); + if (!sessionContext?.userToken) { + throw new Error("No userToken found in session context. Multi-user mode requires authentication."); + } + const userToken = sessionContext.userToken; + const redisBridge = this.redisBridge!; + this.logger.debug("Dispatching task %s via Redis", task.id); + + // register the task for result correlation, then publish the request via Redis + this.pendingTasks.set(task.id, task); + void redisBridge.sendTaskRequest(userToken, task.toRequest(), (response) => + this.handlePluginTaskResponse(response) + ); + + // on timeout, release the response-channel subscription, since no response + // will arrive to trigger its self-unsubscribe. + onTimeout = () => void redisBridge.unsubscribeFromResponse(task.id); + } else { + const target = connection ?? this.getClientConnection(); + if (target.socket.readyState !== 1) { + // WebSocket is not open + throw new Error(`Plugin instance is disconnected. Task could not be sent.`); + } + + // register the task for result correlation, then send over the socket + this.pendingTasks.set(task.id, task); + target.socket.send(JSON.stringify(task.toRequest())); } - connection.socket.send(requestMessage); - // Set up a timeout to reject the task if no response is received const timeoutHandle = setTimeout(() => { const pendingTask = this.pendingTasks.get(task.id); if (pendingTask) { this.pendingTasks.delete(task.id); this.taskTimeouts.delete(task.id); + onTimeout?.(); pendingTask.rejectWithError( new Error(`Task ${task.id} timed out after ${this.taskTimeoutSecs} seconds`) ); @@ -243,8 +312,43 @@ export class PluginBridge { }, this.taskTimeoutSecs * 1000); this.taskTimeouts.set(task.id, timeoutHandle); - this.logger.info(`Sent task ${task.id} to connected client`); + this.logger.info(`Sent task ${task.id}`); + } - return await task.getResultPromise(); + /** + * Dispatches a task request received over Redis to the locally-connected plugin. + * + * Invoked on the instance subscribed to a user token's request channel when another + * instance (or this one) issues a task request. A {@link RemotePluginTask} is created + * so that, once the plugin responds, the outcome is published back to the issuing + * instance's Redis response channel via the standard response-handling path. + * + * On failure to dispatch (e.g. the plugin is not connected here), an error response + * is published immediately so the requester need not wait for its timeout. + * + * @param userToken - The user token on whose request channel the request arrived; + * identifies the locally-connected plugin to dispatch to + * @param request - The serialized task request, passed through from Redis + */ + private dispatchForwardedTask(userToken: string, request: PluginTaskRequest): void { + if (!this.redisBridge) { + return; + } + + // The response is published on the channel keyed by the original request ID. + const task = new RemotePluginTask(request.task, request.params, this.redisBridge, request.id); + this.logger.debug("Dispatching remote task %s as %s to Penpot via WebSocket", request.id, task.id); + + const connection = this.clientsByToken.get(userToken); + if (!connection) { + task.rejectWithError(new Error("Plugin not connected on the receiving instance")); + return; + } + + try { + this.sendPluginTask(task, false, connection); + } catch (error) { + task.rejectWithError(error instanceof Error ? error : new Error(String(error))); + } } } diff --git a/mcp/packages/server/src/PluginTask.ts b/mcp/packages/server/src/PluginTask.ts index 8600cac22b..0e0d327564 100644 --- a/mcp/packages/server/src/PluginTask.ts +++ b/mcp/packages/server/src/PluginTask.ts @@ -1,24 +1,28 @@ /** - * Base class for plugin tasks that are sent over WebSocket. + * Base classes for plugin tasks that are dispatched to a Penpot plugin instance + * over a WebSocket connection. * - * Each task defines a specific operation for the plugin to execute - * along with strongly-typed parameters. - * - * @template TParams - The strongly-typed parameters for this task + * A task defines a specific operation for the plugin to execute along with + * strongly-typed parameters and provides request/response correlation. */ import { PluginTaskRequest, PluginTaskResult } from "@penpot/mcp-common"; import { randomUUID } from "crypto"; /** - * Base class for plugin tasks that are sent over WebSocket. + * Abstract base for plugin tasks, defining the parts that the plugin dispatch and + * response-correlation machinery (`PluginBridge.sendPluginTask` / + * `PluginBridge.handlePluginTaskResponse`) depend upon. * - * Each task defines a specific operation for the plugin to execute - * along with strongly-typed parameters and request/response correlation. + * The dispatch path only needs to serialize a task to a request and, upon receiving + * the plugin's response, settle the task via `resolveWithResult`/`rejectWithError`. + * What "settling" actually means is left to subclasses: a local task resolves an + * in-process promise (see {@link PluginTask}), whereas a remote task forwards the + * outcome elsewhere (see {@link RemotePluginTask}). * * @template TParams - The strongly-typed parameters for this task * @template TResult - The expected result type from task execution */ -export abstract class PluginTask = PluginTaskResult> { +export abstract class AbstractPluginTask = PluginTaskResult> { /** * Unique identifier for request/response correlation. */ @@ -34,6 +38,66 @@ export abstract class PluginTask = PluginTaskResult, +> extends AbstractPluginTask { /** * Promise that resolves when the task execution completes. */ @@ -50,15 +114,13 @@ export abstract class PluginTask void; /** - * Creates a new plugin task instance. + * Creates a new locally-awaited plugin task. * * @param task - The name of the task to execute * @param params - The parameters for task execution */ constructor(task: string, params: TParams) { - this.id = randomUUID(); - this.task = task; - this.params = params; + super(task, params); this.result = new Promise((resolve, reject) => { this.resolveResult = resolve; this.rejectResult = reject; @@ -77,14 +139,6 @@ export abstract class PluginTask void; + +/** + * Handler invoked for a task response arriving on a subscribed response channel. + */ +export type TaskResponseHandler = (response: PluginTaskResponse) => void; + +/** + * Provides a Redis-backed transport for routing plugin task requests and responses + * between MCP server instances. + * + * The bridge is a pure, stateless transport: it moves already-serialized + * `PluginTaskRequest` and `PluginTaskResponse` objects between instances and does not + * interpret their contents, correlate requests with responses, or impose timeouts. + * Correlation and timeout handling remain the responsibility of the caller (see + * `PluginBridge`, which routes Redis-delivered responses through the same + * pending-task machinery used for direct WebSocket dispatch). + * + * It enables a tool call handled on one instance to be executed against a plugin + * whose WebSocket connection lives on another instance: the request is published on a + * channel keyed by user token (to which the instance holding the plugin connection is + * subscribed), and the response is published on a channel keyed by task ID (to which + * the issuing instance subscribes). + * + * Two Redis connections are used, as ioredis requires a dedicated connection while + * subscribed: one for commands and publishing, and one for subscriptions. + */ +export class RedisBridge { + private readonly logger = createLogger("RedisBridge"); + private readonly publisher: Redis; + private readonly subscriber: Redis; + + /** + * Message handlers keyed by channel name. + * + * ioredis exposes a single, global message event for all subscribed channels, so + * incoming messages are dispatched to the correct handler by channel name. Both + * request-channel and response-channel handlers are stored here. + */ + private readonly handlers = new Map void>(); + + /** + * Creates a Redis bridge connected to the given Redis instance. + * + * @param redisUri - The Redis connection URI (e.g. `redis://host:6379`) + */ + constructor(redisUri: string) { + this.publisher = new Redis(redisUri); + this.subscriber = new Redis(redisUri); + + this.subscriber.on("message", (channel: string, rawMessage: string) => { + const handler = this.handlers.get(channel); + if (handler) { + handler(rawMessage); + } else { + this.logger.warn(`Received message on channel with no registered handler: ${channel}`); + } + }); + } + + /** + * Subscribes to the response channel for the given task ID and publishes the task + * request to the given user token's request channel. + * + * The response subscription is established *before* the request is published, to + * avoid a race in which the response would be published before the subscription is + * in place. The response handler is invoked at most once and the subscription is + * removed automatically upon delivery (response channels are single-use). + * + * @param userToken - The user token identifying the target plugin's request channel + * @param request - The serialized plugin task request, passed through verbatim + * @param onResponse - Handler invoked with the response when it arrives + */ + async sendTaskRequest( + userToken: string, + request: PluginTaskRequest, + onResponse: TaskResponseHandler + ): Promise { + const responseChannel = `${TASK_RESPONSE_CHANNEL_PREFIX}${request.id}`; + const requestChannel = `${TASK_REQUEST_CHANNEL_PREFIX}${userToken}`; + + this.handlers.set(responseChannel, (rawMessage) => { + // a response channel is single-use: remove the handler and unsubscribe on delivery + this.handlers.delete(responseChannel); + void this.subscriber.unsubscribe(responseChannel); + try { + onResponse(JSON.parse(rawMessage) as PluginTaskResponse); + } catch (error) { + this.logger.error(error, "Failed to parse task response message"); + } + }); + + await this.subscriber.subscribe(responseChannel); + // publish only once the response subscription is confirmed + await this.publisher.publish(requestChannel, JSON.stringify(request)); + } + + /** + * Unsubscribes from the response channel for the given task ID. + * + * Used to release a response subscription when no response will be processed (e.g. + * the awaiting task has timed out), since in that case the self-unsubscribe on + * delivery never occurs. + * + * @param taskId - The task ID whose response channel to unsubscribe from + */ + async unsubscribeFromResponse(taskId: string): Promise { + const responseChannel = `${TASK_RESPONSE_CHANNEL_PREFIX}${taskId}`; + this.handlers.delete(responseChannel); + await this.subscriber.unsubscribe(responseChannel); + } + + /** + * Publishes a task response on the response channel for the given task ID. + * + * Used by the instance executing a forwarded task to return its outcome to the + * issuing instance. + * + * @param taskId - The ID of the originally requested task + * @param response - The serialized plugin task response, passed through verbatim + */ + publishTaskResponse(taskId: string, response: PluginTaskResponse): void { + const responseChannel = `${TASK_RESPONSE_CHANNEL_PREFIX}${taskId}`; + void this.publisher.publish(responseChannel, JSON.stringify(response)); + } + + /** + * Subscribes to task requests for the given user token. + * + * The handler is invoked for each request arriving on the token's request channel. + * + * @param userToken - The user token whose request channel to subscribe to + * @param handler - The handler to invoke for incoming requests + */ + async subscribeToTasks(userToken: string, handler: TaskRequestHandler): Promise { + const requestChannel = `${TASK_REQUEST_CHANNEL_PREFIX}${userToken}`; + this.handlers.set(requestChannel, (rawMessage) => { + try { + handler(JSON.parse(rawMessage) as PluginTaskRequest); + } catch (error) { + this.logger.error(error, "Failed to parse task request message"); + } + }); + await this.subscriber.subscribe(requestChannel); + } + + /** + * Unsubscribes from task requests for the given user token. + * + * @param userToken - The user token whose request channel to unsubscribe from + */ + async unsubscribeFromTasks(userToken: string): Promise { + const requestChannel = `${TASK_REQUEST_CHANNEL_PREFIX}${userToken}`; + this.handlers.delete(requestChannel); + await this.subscriber.unsubscribe(requestChannel); + } + + /** + * Closes both Redis connections. Call on server shutdown. + */ + async close(): Promise { + await this.subscriber.quit(); + await this.publisher.quit(); + } +} diff --git a/mcp/packages/server/src/RemotePluginTask.ts b/mcp/packages/server/src/RemotePluginTask.ts new file mode 100644 index 0000000000..368623a5a0 --- /dev/null +++ b/mcp/packages/server/src/RemotePluginTask.ts @@ -0,0 +1,56 @@ +import { AbstractPluginTask } from "./PluginTask"; +import { PluginTaskResult } from "@penpot/mcp-common"; +import type { RedisBridge } from "./RedisBridge"; + +/** + * A plugin task whose outcome is forwarded back to a remote requester via Redis, + * rather than awaited in-process. + * + * This task type is used on the server instance that holds the plugin's WebSocket + * connection when a task request arrives over Redis (published by another instance + * that received the corresponding tool call). It is dispatched to the plugin through + * the ordinary local dispatch path; when the plugin responds, the response-correlation + * machinery settles this task, and the overridden `resolveWithResult`/`rejectWithError` + * publish the outcome back onto the requester's Redis response channel. + * + * Note that this task has its own ID (used to correlate the local WebSocket dispatch), + * distinct from the original requester's task ID, which keys the Redis response channel. + * + * It deliberately carries no result promise: settling the task *is* the side effect + * of publishing to Redis, and nothing awaits it locally. + */ +export class RemotePluginTask extends AbstractPluginTask> { + /** + * Creates a task that forwards its outcome to a Redis response channel. + * + * @param task - The name of the task to execute (from the incoming request) + * @param params - The parameters for task execution (from the incoming request) + * @param redisBridge - The Redis bridge used to publish the outcome + * @param originalTaskId - The ID of the original request, which keys the response + * channel the requesting instance is awaiting + */ + constructor( + task: string, + params: any, + private readonly redisBridge: RedisBridge, + private readonly originalTaskId: string + ) { + super(task, params); + } + + resolveWithResult(result: PluginTaskResult): void { + this.redisBridge.publishTaskResponse(this.originalTaskId, { + id: this.originalTaskId, + success: true, + data: result.data, + }); + } + + rejectWithError(error: Error): void { + this.redisBridge.publishTaskResponse(this.originalTaskId, { + id: this.originalTaskId, + success: false, + error: error.message, + }); + } +} diff --git a/mcp/pnpm-lock.yaml b/mcp/pnpm-lock.yaml index 99fa1143d6..16a7487e30 100644 --- a/mcp/pnpm-lock.yaml +++ b/mcp/pnpm-lock.yaml @@ -57,6 +57,9 @@ importers: express: specifier: ^5.1.0 version: 5.2.1 + ioredis: + specifier: ^5.6.0 + version: 5.11.0 js-yaml: specifier: ^4.1.1 version: 4.1.1 @@ -760,6 +763,9 @@ packages: cpu: [x64] os: [win32] + '@ioredis/commands@1.10.0': + resolution: {integrity: sha512-UmeW7z4LfctwoQ5wkhVzgq8tXkreED2xZGpX+Bg+zA+WJFZCT6c062AfCK/Dfk81xZnnwdhJCUMkitihRaoC2Q==} + '@jridgewell/resolve-uri@3.1.2': resolution: {integrity: sha512-bRISgCIjP20/tbWSPWMEi54QVPRZExkuD9lJL+UIxUKtwVJA8wW1Trb1jMs1RFXo1CBTNZ/5hpC9QvmKWdopKw==} engines: {node: '>=6.0.0'} @@ -1080,6 +1086,10 @@ packages: resolution: {integrity: sha512-BSeNnyus75C4//NQ9gQt1/csTXyo/8Sb+afLAkzAptFuMsod9HFokGNudZpi/oQV73hnVK+sR+5PVRMd+Dr7YQ==} engines: {node: '>=12'} + cluster-key-slot@1.1.1: + resolution: {integrity: sha512-rwHwUfXL40Chm1r08yrhU3qpUvdVlgkKNeyeGPOxnW8/SyVDvgRaed/Uz54AqWNaTCAThlj6QAs3TZcKI0xDEw==} + engines: {node: '>=0.10.0'} + color-convert@2.0.1: resolution: {integrity: sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==} engines: {node: '>=7.0.0'} @@ -1143,6 +1153,10 @@ packages: supports-color: optional: true + denque@2.1.0: + resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==} + engines: {node: '>=0.10'} + depd@2.0.0: resolution: {integrity: sha512-g7nH6P6dyDioJogAAGprGpCtVImJhpPk/roCzdb3fIh61/s/nPsfR6onyMwkCAR/OlC3yBC0lESvUoQEAssIrw==} engines: {node: '>= 0.8'} @@ -1319,6 +1333,10 @@ packages: inherits@2.0.4: resolution: {integrity: sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==} + ioredis@5.11.0: + resolution: {integrity: sha512-EZBErytyVovD8f6pDfG3Kb37N6Y3lmDA9NNj+4+IP13CzzHGeX+OyeRM2Um13khRzoBSzzL+5lVnCX8V2RLeMg==} + engines: {node: '>=12.22.0'} + ipaddr.js@1.9.1: resolution: {integrity: sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==} engines: {node: '>= 0.10'} @@ -1501,6 +1519,14 @@ packages: resolution: {integrity: sha512-57frrGM/OCTLqLOAh0mhVA9VBMHd+9U7Zb2THMGdBUoZVOtGbJzjxsYGDJ3A9AYYCP4hn6y1TVbaOfzWtm5GFg==} engines: {node: '>= 12.13.0'} + redis-errors@1.2.0: + resolution: {integrity: sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==} + engines: {node: '>=4'} + + redis-parser@3.0.0: + resolution: {integrity: sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==} + engines: {node: '>=4'} + reflect-metadata@0.1.14: resolution: {integrity: sha512-ZhYeb6nRaXCfhnndflDK8qI6ZQ/YcWZCISRAWICW9XYqMUwjZM9Z0DveWX/ABN01oxSHwVxKQmxeYZSsm0jh5A==} @@ -1593,6 +1619,9 @@ packages: resolution: {integrity: sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==} engines: {node: '>= 10.x'} + standard-as-callback@2.1.0: + resolution: {integrity: sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==} + statuses@2.0.2: resolution: {integrity: sha512-DvEy55V3DB7uknRo+4iOGT5fP1slR8wQohVdknigZPMpMstaKJQWhwiYBACJE3Ul2pTnATihhBYnRhZQHGBiRw==} engines: {node: '>= 0.8'} @@ -2124,6 +2153,8 @@ snapshots: '@img/sharp-win32-x64@0.34.5': optional: true + '@ioredis/commands@1.10.0': {} + '@jridgewell/resolve-uri@3.1.2': {} '@jridgewell/sourcemap-codec@1.5.5': {} @@ -2397,6 +2428,8 @@ snapshots: strip-ansi: 6.0.1 wrap-ansi: 7.0.0 + cluster-key-slot@1.1.1: {} + color-convert@2.0.1: dependencies: color-name: 1.1.4 @@ -2447,6 +2480,8 @@ snapshots: dependencies: ms: 2.1.3 + denque@2.1.0: {} + depd@2.0.0: {} detect-libc@2.1.2: {} @@ -2695,6 +2730,18 @@ snapshots: inherits@2.0.4: {} + ioredis@5.11.0: + dependencies: + '@ioredis/commands': 1.10.0 + cluster-key-slot: 1.1.1 + debug: 4.4.3 + denque: 2.1.0 + redis-errors: 1.2.0 + redis-parser: 3.0.0 + standard-as-callback: 2.1.0 + transitivePeerDependencies: + - supports-color + ipaddr.js@1.9.1: {} is-fullwidth-code-point@3.0.0: {} @@ -2856,6 +2903,12 @@ snapshots: real-require@0.2.0: {} + redis-errors@1.2.0: {} + + redis-parser@3.0.0: + dependencies: + redis-errors: 1.2.0 + reflect-metadata@0.1.14: {} require-directory@2.1.1: {} @@ -3017,6 +3070,8 @@ snapshots: split2@4.2.0: {} + standard-as-callback@2.1.0: {} + statuses@2.0.2: {} string-width@4.2.3: