MCP: Improve Streamable HTTP session handling & logging (#8493)

*  Reintroduce proper session management for /mcp endpoint

Reuse transport and server instance based on session ID in header

*  Periodically clean up stale streamable HTTP sessions

Add class StreamableSession to improve type clarity

*  Avoid recreation of objects when instantiating McpServer instances

Precompute the initial instructions and all tool-related data

*  Improve logging of tool executions
This commit is contained in:
Dominik Jain 2026-03-02 11:27:13 +01:00
parent bd8283a435
commit 4d3d722ecc
2 changed files with 129 additions and 38 deletions

View File

@ -21,26 +21,56 @@ export interface SessionContext {
userToken?: string;
}
/**
* Represents an active Streamable HTTP session, grouping the transport, MCP server, and session metadata.
*/
class StreamableSession {
constructor(
public readonly transport: StreamableHTTPServerTransport,
public readonly userToken: string | undefined,
public lastActiveTime: number
) {}
}
/**
* Holds information about a registered tool, including its instance, name, and configuration.
*/
class ToolInfo {
constructor(
public readonly instance: Tool<any>,
public readonly name: string,
public readonly config: { description: string; inputSchema: any }
) {}
}
export class PenpotMcpServer {
/**
* Timeout, in minutes, for idle Streamable HTTP sessions before they are automatically closed and removed.
*/
private static readonly SESSION_TIMEOUT_MINUTES = 60;
private readonly logger = createLogger("PenpotMcpServer");
private readonly tools: Map<string, Tool<any>>;
private readonly tools: ToolInfo[];
public readonly configLoader: ConfigurationLoader;
private app: any;
public readonly pluginBridge: PluginBridge;
private readonly replServer: ReplServer;
private apiDocs: ApiDocs;
private initialInstructions: string;
/**
* Manages session-specific context, particularly user tokens for each request.
*/
private readonly sessionContext = new AsyncLocalStorage<SessionContext>();
private readonly streamableTransports: Record<string, StreamableSession> = {};
private readonly sseTransports: Record<string, { transport: SSEServerTransport; userToken?: string }> = {};
public readonly host: string;
public readonly port: number;
public readonly webSocketPort: number;
public readonly replPort: number;
private sessionTimeoutInterval: ReturnType<typeof setInterval> | undefined;
constructor(private isMultiUser: boolean = false) {
// read port configuration from environment variables
@ -52,11 +82,15 @@ export class PenpotMcpServer {
this.configLoader = new ConfigurationLoader(process.cwd());
this.apiDocs = new ApiDocs();
this.tools = new Map<string, Tool<any>>();
// prepare initial instructions
let instructions = this.configLoader.getInitialInstructions();
instructions = instructions.replace("$api_types", this.apiDocs.getTypeNames().join(", "));
this.initialInstructions = instructions;
this.tools = this.initTools();
this.pluginBridge = new PluginBridge(this, this.webSocketPort);
this.replServer = new ReplServer(this.pluginBridge, this.replPort);
this.initTools();
}
/**
@ -91,9 +125,7 @@ export class PenpotMcpServer {
}
public getInitialInstructions(): string {
let instructions = this.configLoader.getInitialInstructions();
instructions = instructions.replace("$api_types", this.apiDocs.getTypeNames().join(", "));
return instructions;
return this.initialInstructions;
}
/**
@ -105,7 +137,7 @@ export class PenpotMcpServer {
return this.sessionContext.getStore();
}
private initTools(): void {
private initTools(): ToolInfo[] {
const toolInstances: Tool<any>[] = [
new ExecuteCodeTool(this),
new HighLevelOverviewTool(this),
@ -116,10 +148,13 @@ export class PenpotMcpServer {
toolInstances.push(new ImportImageTool(this));
}
for (const tool of toolInstances) {
this.logger.info(`Registering tool: ${tool.getToolName()}`);
this.tools.set(tool.getToolName(), tool);
}
return toolInstances.map((instance) => {
this.logger.info(`Registering tool: ${instance.getToolName()}`);
return new ToolInfo(instance, instance.getToolName(), {
description: instance.getToolDescription(),
inputSchema: instance.getInputSchema(),
});
});
}
/**
@ -127,43 +162,90 @@ export class PenpotMcpServer {
*/
private createMcpServer(): McpServer {
const server = new McpServer(
{ name: "penpot-mcp-server", version: "1.0.0" },
{ name: "penpot", version: "1.0.0" },
{ instructions: this.getInitialInstructions() }
);
for (const tool of this.tools.values()) {
server.registerTool(
tool.getToolName(),
{
description: tool.getToolDescription(),
inputSchema: tool.getInputSchema(),
},
async (args) => tool.execute(args)
);
for (const tool of this.tools) {
server.registerTool(tool.name, tool.config, async (args: any) => tool.instance.execute(args));
}
return server;
}
/**
* Starts a periodic timer that closes and removes Streamable HTTP sessions that have been
* idle for longer than {@link SESSION_TIMEOUT_MINUTES}.
*/
private startSessionTimeoutChecker(): void {
const timeoutMs = PenpotMcpServer.SESSION_TIMEOUT_MINUTES * 60 * 1000;
const checkIntervalMs = timeoutMs / 2;
this.sessionTimeoutInterval = setInterval(() => {
this.logger.info("Checking for stale sessions...");
const now = Date.now();
let removed = 0;
for (const session of Object.values(this.streamableTransports)) {
if (now - session.lastActiveTime > timeoutMs) {
session.transport.close();
removed++;
}
}
this.logger.info(
`Removed ${removed} stale session(s); total sessions remaining: ${Object.keys(this.streamableTransports).length}`
);
}, checkIntervalMs);
}
private setupHttpEndpoints(): void {
/**
* Modern Streamable HTTP connection endpoint
* Modern Streamable HTTP connection endpoint.
*
* New sessions are created on initialize requests (no mcp-session-id header).
* Subsequent requests for an existing session are routed to the stored transport,
* with the session context populated from the stored userToken.
*/
this.app.all("/mcp", async (req: any, res: any) => {
const userToken = req.query.userToken as string | undefined;
this.logger.info(`Received /mcp request with userToken: ${userToken}`);
const sessionId = req.headers["mcp-session-id"] as string | undefined;
let userToken: string | undefined = undefined;
let transport: StreamableHTTPServerTransport;
await this.sessionContext.run({ userToken }, async () => {
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
});
// obtain transport and user token for the session, either from an existing session or by creating a new one
if (sessionId && this.streamableTransports[sessionId]) {
// existing session: reuse stored transport and token
const session = this.streamableTransports[sessionId];
transport = session.transport;
userToken = session.userToken;
session.lastActiveTime = Date.now();
this.logger.info(
`Received request for existing session with id=${sessionId}; userToken=${session.userToken}`
);
} else {
// new session: create a fresh McpServer and transport
userToken = req.query.userToken as string | undefined;
this.logger.info(`Received new session request; userToken=${userToken}`);
const { randomUUID } = await import("node:crypto");
const server = this.createMcpServer();
await server.connect(transport);
await transport.handleRequest(req, res, req.body);
res.on("close", () => {
transport.close();
server.close();
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (id) => {
this.streamableTransports[id] = new StreamableSession(transport, userToken, Date.now());
this.logger.info(
`Session initialized with id=${id} for userToken=${userToken}; total sessions: ${Object.keys(this.streamableTransports).length}`
);
},
});
transport.onclose = () => {
if (transport.sessionId) {
this.logger.info(`Closing session with id=${transport.sessionId} for userToken=${userToken}`);
delete this.streamableTransports[transport.sessionId];
}
};
await server.connect(transport);
}
// handle the request
await this.sessionContext.run({ userToken }, async () => {
await transport.handleRequest(req, res, req.body);
});
});
@ -218,8 +300,9 @@ export class PenpotMcpServer {
this.logger.info(`Legacy SSE endpoint: http://${this.host}:${this.port}/sse`);
this.logger.info(`WebSocket server URL: ws://${this.host}:${this.webSocketPort}`);
// start the REPL server
// start the REPL server and session timeout checker
await this.replServer.start();
this.startSessionTimeoutChecker();
resolve();
});
@ -233,6 +316,7 @@ export class PenpotMcpServer {
*/
public async stop(): Promise<void> {
this.logger.info("Stopping Penpot MCP Server...");
clearInterval(this.sessionTimeoutInterval);
await this.replServer.stop();
this.logger.info("Penpot MCP Server stopped");
}

View File

@ -22,6 +22,9 @@ export class EmptyToolArgs {
export abstract class Tool<TArgs extends object> {
private readonly logger = createLogger("Tool");
/** monotonically increasing counter for unique tool execution IDs */
private static executionCounter = 0;
protected constructor(
protected mcpServer: PenpotMcpServer,
private inputSchema: z.ZodRawShape
@ -34,17 +37,21 @@ export abstract class Tool<TArgs extends object> {
* delegating to the type-safe implementation.
*/
async execute(args: unknown): Promise<ToolResponse> {
const executionId = ++Tool.executionCounter;
try {
let argsInstance: TArgs = args as TArgs;
this.logger.info("Executing tool: %s; arguments: %s", this.getToolName(), this.formatArgs(argsInstance));
this.logger.info("Tool execution #%d starting: %s", executionId, this.getToolName());
if (this.logger.isLevelEnabled("debug")) {
this.logger.debug("Tool execution #%d arguments: %s", executionId, this.formatArgs(argsInstance));
}
// execute the actual tool logic
let result = await this.executeCore(argsInstance);
this.logger.info("Tool execution completed: %s", this.getToolName());
this.logger.info("Tool execution #%d complete: %s", executionId, this.getToolName());
return result;
} catch (error) {
this.logger.error(error);
this.logger.error("Tool execution #%d failed: %s; error: %s", executionId, this.getToolName(), error);
return new TextResponse(`Tool execution failed: ${String(error)}`);
}
}