From 4d3d722eccad4947da303eb49d96a05a3c561ab8 Mon Sep 17 00:00:00 2001 From: Dominik Jain Date: Mon, 2 Mar 2026 11:27:13 +0100 Subject: [PATCH] :sparkles: MCP: Improve Streamable HTTP session handling & logging (#8493) * :sparkles: Reintroduce proper session management for /mcp endpoint Reuse transport and server instance based on session ID in header * :sparkles: Periodically clean up stale streamable HTTP sessions Add class StreamableSession to improve type clarity * :sparkles: Avoid recreation of objects when instantiating McpServer instances Precompute the initial instructions and all tool-related data * :sparkles: Improve logging of tool executions --- mcp/packages/server/src/PenpotMcpServer.ts | 154 ++++++++++++++++----- mcp/packages/server/src/Tool.ts | 13 +- 2 files changed, 129 insertions(+), 38 deletions(-) diff --git a/mcp/packages/server/src/PenpotMcpServer.ts b/mcp/packages/server/src/PenpotMcpServer.ts index 6dab371ff4..2c4a2cc792 100644 --- a/mcp/packages/server/src/PenpotMcpServer.ts +++ b/mcp/packages/server/src/PenpotMcpServer.ts @@ -21,26 +21,56 @@ export interface SessionContext { userToken?: string; } +/** + * Represents an active Streamable HTTP session, grouping the transport, MCP server, and session metadata. + */ +class StreamableSession { + constructor( + public readonly transport: StreamableHTTPServerTransport, + public readonly userToken: string | undefined, + public lastActiveTime: number + ) {} +} + +/** + * Holds information about a registered tool, including its instance, name, and configuration. + */ +class ToolInfo { + constructor( + public readonly instance: Tool, + public readonly name: string, + public readonly config: { description: string; inputSchema: any } + ) {} +} + export class PenpotMcpServer { + /** + * Timeout, in minutes, for idle Streamable HTTP sessions before they are automatically closed and removed. + */ + private static readonly SESSION_TIMEOUT_MINUTES = 60; + private readonly logger = createLogger("PenpotMcpServer"); - private readonly tools: Map>; + private readonly tools: ToolInfo[]; public readonly configLoader: ConfigurationLoader; private app: any; public readonly pluginBridge: PluginBridge; private readonly replServer: ReplServer; private apiDocs: ApiDocs; + private initialInstructions: string; /** * Manages session-specific context, particularly user tokens for each request. */ private readonly sessionContext = new AsyncLocalStorage(); + private readonly streamableTransports: Record = {}; private readonly sseTransports: Record = {}; public readonly host: string; public readonly port: number; public readonly webSocketPort: number; public readonly replPort: number; + private sessionTimeoutInterval: ReturnType | undefined; constructor(private isMultiUser: boolean = false) { // read port configuration from environment variables @@ -52,11 +82,15 @@ export class PenpotMcpServer { this.configLoader = new ConfigurationLoader(process.cwd()); this.apiDocs = new ApiDocs(); - this.tools = new Map>(); + // prepare initial instructions + let instructions = this.configLoader.getInitialInstructions(); + instructions = instructions.replace("$api_types", this.apiDocs.getTypeNames().join(", ")); + this.initialInstructions = instructions; + + this.tools = this.initTools(); + this.pluginBridge = new PluginBridge(this, this.webSocketPort); this.replServer = new ReplServer(this.pluginBridge, this.replPort); - - this.initTools(); } /** @@ -91,9 +125,7 @@ export class PenpotMcpServer { } public getInitialInstructions(): string { - let instructions = this.configLoader.getInitialInstructions(); - instructions = instructions.replace("$api_types", this.apiDocs.getTypeNames().join(", ")); - return instructions; + return this.initialInstructions; } /** @@ -105,7 +137,7 @@ export class PenpotMcpServer { return this.sessionContext.getStore(); } - private initTools(): void { + private initTools(): ToolInfo[] { const toolInstances: Tool[] = [ new ExecuteCodeTool(this), new HighLevelOverviewTool(this), @@ -116,10 +148,13 @@ export class PenpotMcpServer { toolInstances.push(new ImportImageTool(this)); } - for (const tool of toolInstances) { - this.logger.info(`Registering tool: ${tool.getToolName()}`); - this.tools.set(tool.getToolName(), tool); - } + return toolInstances.map((instance) => { + this.logger.info(`Registering tool: ${instance.getToolName()}`); + return new ToolInfo(instance, instance.getToolName(), { + description: instance.getToolDescription(), + inputSchema: instance.getInputSchema(), + }); + }); } /** @@ -127,43 +162,90 @@ export class PenpotMcpServer { */ private createMcpServer(): McpServer { const server = new McpServer( - { name: "penpot-mcp-server", version: "1.0.0" }, + { name: "penpot", version: "1.0.0" }, { instructions: this.getInitialInstructions() } ); - for (const tool of this.tools.values()) { - server.registerTool( - tool.getToolName(), - { - description: tool.getToolDescription(), - inputSchema: tool.getInputSchema(), - }, - async (args) => tool.execute(args) - ); + for (const tool of this.tools) { + server.registerTool(tool.name, tool.config, async (args: any) => tool.instance.execute(args)); } return server; } + /** + * Starts a periodic timer that closes and removes Streamable HTTP sessions that have been + * idle for longer than {@link SESSION_TIMEOUT_MINUTES}. + */ + private startSessionTimeoutChecker(): void { + const timeoutMs = PenpotMcpServer.SESSION_TIMEOUT_MINUTES * 60 * 1000; + const checkIntervalMs = timeoutMs / 2; + this.sessionTimeoutInterval = setInterval(() => { + this.logger.info("Checking for stale sessions..."); + const now = Date.now(); + let removed = 0; + for (const session of Object.values(this.streamableTransports)) { + if (now - session.lastActiveTime > timeoutMs) { + session.transport.close(); + removed++; + } + } + this.logger.info( + `Removed ${removed} stale session(s); total sessions remaining: ${Object.keys(this.streamableTransports).length}` + ); + }, checkIntervalMs); + } + private setupHttpEndpoints(): void { /** - * Modern Streamable HTTP connection endpoint + * Modern Streamable HTTP connection endpoint. + * + * New sessions are created on initialize requests (no mcp-session-id header). + * Subsequent requests for an existing session are routed to the stored transport, + * with the session context populated from the stored userToken. */ this.app.all("/mcp", async (req: any, res: any) => { - const userToken = req.query.userToken as string | undefined; - this.logger.info(`Received /mcp request with userToken: ${userToken}`); + const sessionId = req.headers["mcp-session-id"] as string | undefined; + let userToken: string | undefined = undefined; + let transport: StreamableHTTPServerTransport; - await this.sessionContext.run({ userToken }, async () => { - const transport = new StreamableHTTPServerTransport({ - sessionIdGenerator: undefined, - }); + // obtain transport and user token for the session, either from an existing session or by creating a new one + if (sessionId && this.streamableTransports[sessionId]) { + // existing session: reuse stored transport and token + const session = this.streamableTransports[sessionId]; + transport = session.transport; + userToken = session.userToken; + session.lastActiveTime = Date.now(); + this.logger.info( + `Received request for existing session with id=${sessionId}; userToken=${session.userToken}` + ); + } else { + // new session: create a fresh McpServer and transport + userToken = req.query.userToken as string | undefined; + this.logger.info(`Received new session request; userToken=${userToken}`); + const { randomUUID } = await import("node:crypto"); const server = this.createMcpServer(); - await server.connect(transport); - await transport.handleRequest(req, res, req.body); - res.on("close", () => { - transport.close(); - server.close(); + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: (id) => { + this.streamableTransports[id] = new StreamableSession(transport, userToken, Date.now()); + this.logger.info( + `Session initialized with id=${id} for userToken=${userToken}; total sessions: ${Object.keys(this.streamableTransports).length}` + ); + }, }); + transport.onclose = () => { + if (transport.sessionId) { + this.logger.info(`Closing session with id=${transport.sessionId} for userToken=${userToken}`); + delete this.streamableTransports[transport.sessionId]; + } + }; + await server.connect(transport); + } + + // handle the request + await this.sessionContext.run({ userToken }, async () => { + await transport.handleRequest(req, res, req.body); }); }); @@ -218,8 +300,9 @@ export class PenpotMcpServer { this.logger.info(`Legacy SSE endpoint: http://${this.host}:${this.port}/sse`); this.logger.info(`WebSocket server URL: ws://${this.host}:${this.webSocketPort}`); - // start the REPL server + // start the REPL server and session timeout checker await this.replServer.start(); + this.startSessionTimeoutChecker(); resolve(); }); @@ -233,6 +316,7 @@ export class PenpotMcpServer { */ public async stop(): Promise { this.logger.info("Stopping Penpot MCP Server..."); + clearInterval(this.sessionTimeoutInterval); await this.replServer.stop(); this.logger.info("Penpot MCP Server stopped"); } diff --git a/mcp/packages/server/src/Tool.ts b/mcp/packages/server/src/Tool.ts index 65cfe539bd..df4e1f2266 100644 --- a/mcp/packages/server/src/Tool.ts +++ b/mcp/packages/server/src/Tool.ts @@ -22,6 +22,9 @@ export class EmptyToolArgs { export abstract class Tool { private readonly logger = createLogger("Tool"); + /** monotonically increasing counter for unique tool execution IDs */ + private static executionCounter = 0; + protected constructor( protected mcpServer: PenpotMcpServer, private inputSchema: z.ZodRawShape @@ -34,17 +37,21 @@ export abstract class Tool { * delegating to the type-safe implementation. */ async execute(args: unknown): Promise { + const executionId = ++Tool.executionCounter; try { let argsInstance: TArgs = args as TArgs; - this.logger.info("Executing tool: %s; arguments: %s", this.getToolName(), this.formatArgs(argsInstance)); + this.logger.info("Tool execution #%d starting: %s", executionId, this.getToolName()); + if (this.logger.isLevelEnabled("debug")) { + this.logger.debug("Tool execution #%d arguments: %s", executionId, this.formatArgs(argsInstance)); + } // execute the actual tool logic let result = await this.executeCore(argsInstance); - this.logger.info("Tool execution completed: %s", this.getToolName()); + this.logger.info("Tool execution #%d complete: %s", executionId, this.getToolName()); return result; } catch (error) { - this.logger.error(error); + this.logger.error("Tool execution #%d failed: %s; error: %s", executionId, this.getToolName(), error); return new TextResponse(`Tool execution failed: ${String(error)}`); } }