From df7245cb9dd4bdab231fe6495f1424115b10ef3b Mon Sep 17 00:00:00 2001 From: Dominik Jain Date: Thu, 11 Dec 2025 12:26:42 +0100 Subject: [PATCH] Support passing a user-specific token to MCP server requests which can subsequently be accessed by downstream calls #20 --- mcp-server/src/PenpotMcpServer.ts | 109 ++++++++++++++++++++---------- mcp-server/src/Tool.ts | 15 +++- 2 files changed, 89 insertions(+), 35 deletions(-) diff --git a/mcp-server/src/PenpotMcpServer.ts b/mcp-server/src/PenpotMcpServer.ts index b726960..b45251b 100644 --- a/mcp-server/src/PenpotMcpServer.ts +++ b/mcp-server/src/PenpotMcpServer.ts @@ -1,4 +1,5 @@ import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { AsyncLocalStorage } from "async_hooks"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { ExecuteCodeTool } from "./tools/ExecuteCodeTool"; @@ -13,6 +14,13 @@ import { ImportImageTool } from "./tools/ImportImageTool"; import { ReplServer } from "./ReplServer"; import { ApiDocs } from "./ApiDocs"; +/** + * Session context for request-scoped data. + */ +export interface SessionContext { + userToken?: string; +} + export class PenpotMcpServer { private readonly logger = createLogger("PenpotMcpServer"); private readonly server: McpServer; @@ -23,9 +31,14 @@ export class PenpotMcpServer { private readonly replServer: ReplServer; private apiDocs: ApiDocs; + /** + * Manages session-specific context, particularly user tokens for each request. + */ + private readonly sessionContext = new AsyncLocalStorage(); + private readonly transports = { streamable: {} as Record, - sse: {} as Record, + sse: {} as Record, }; constructor( @@ -59,6 +72,15 @@ export class PenpotMcpServer { return instructions; } + /** + * Retrieves the current session context. + * + * @returns The session context for the current request, or undefined if not in a request context + */ + public getSessionContext(): SessionContext | undefined { + return this.sessionContext.getStore(); + } + private registerTools(): void { const toolInstances: Tool[] = [ new ExecuteCodeTool(this), @@ -88,51 +110,70 @@ export class PenpotMcpServer { } private setupHttpEndpoints(): void { + /** + * Modern Streamable HTTP connection endpoint + */ this.app.all("/mcp", async (req: any, res: any) => { - const { randomUUID } = await import("node:crypto"); + const userToken = req.query.userToken as string | undefined; - const sessionId = req.headers["mcp-session-id"] as string | undefined; - let transport: StreamableHTTPServerTransport; + await this.sessionContext.run({ userToken }, async () => { + const { randomUUID } = await import("node:crypto"); - if (sessionId && this.transports.streamable[sessionId]) { - transport = this.transports.streamable[sessionId]; - } else { - transport = new StreamableHTTPServerTransport({ - sessionIdGenerator: () => randomUUID(), - onsessioninitialized: (id: string) => { - this.transports.streamable[id] = transport; - }, + const sessionId = req.headers["mcp-session-id"] as string | undefined; + let transport: StreamableHTTPServerTransport; + + if (sessionId && this.transports.streamable[sessionId]) { + transport = this.transports.streamable[sessionId]; + } else { + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: (id: string) => { + this.transports.streamable[id] = transport; + }, + }); + + transport.onclose = () => { + if (transport.sessionId) { + delete this.transports.streamable[transport.sessionId]; + } + }; + + await this.server.connect(transport); + } + + await transport.handleRequest(req, res, req.body); + }); + }); + + /** + * Legacy SSE connection endpoint + */ + this.app.get("/sse", async (req: any, res: any) => { + const userToken = req.query.userToken as string | undefined; + + await this.sessionContext.run({ userToken }, async () => { + const transport = new SSEServerTransport("/messages", res); + this.transports.sse[transport.sessionId] = { transport, userToken }; + + res.on("close", () => { + delete this.transports.sse[transport.sessionId]; }); - transport.onclose = () => { - if (transport.sessionId) { - delete this.transports.streamable[transport.sessionId]; - } - }; - await this.server.connect(transport); - } - - await transport.handleRequest(req, res, req.body); - }); - - this.app.get("/sse", async (_req: any, res: any) => { - const transport = new SSEServerTransport("/messages", res); - this.transports.sse[transport.sessionId] = transport; - - res.on("close", () => { - delete this.transports.sse[transport.sessionId]; }); - - await this.server.connect(transport); }); + /** + * SSE message POST endpoint (using previously established session) + */ this.app.post("/messages", async (req: any, res: any) => { const sessionId = req.query.sessionId as string; - const transport = this.transports.sse[sessionId]; + const session = this.transports.sse[sessionId]; - if (transport) { - await transport.handlePostMessage(req, res, req.body); + if (session) { + await this.sessionContext.run({ userToken: session.userToken }, async () => { + await session.transport.handlePostMessage(req, res, req.body); + }); } else { res.status(400).send("No transport found for sessionId"); } diff --git a/mcp-server/src/Tool.ts b/mcp-server/src/Tool.ts index 4a8fc2e..90c5a41 100644 --- a/mcp-server/src/Tool.ts +++ b/mcp-server/src/Tool.ts @@ -1,7 +1,7 @@ import { z } from "zod"; import "reflect-metadata"; import { TextResponse, ToolResponse } from "./ToolResponse"; -import type { PenpotMcpServer } from "./PenpotMcpServer"; +import type { PenpotMcpServer, SessionContext } from "./PenpotMcpServer"; import { createLogger } from "./logger"; /** @@ -38,6 +38,10 @@ export abstract class Tool { let argsInstance: TArgs = args as TArgs; this.logger.info("Executing tool: %s; arguments: %s", this.getToolName(), this.formatArgs(argsInstance)); + // TODO: Remove; testing only + const sessionContext = this.mcpServer.getSessionContext(); + this.logger.info("Session context: %s", sessionContext ? JSON.stringify(sessionContext) : "none"); + // execute the actual tool logic let result = await this.executeCore(argsInstance); @@ -89,6 +93,15 @@ export abstract class Tool { return formatted.length > 0 ? "\n" + formatted.join("\n") : "{}"; } + /** + * Retrieves the current session context. + * + * @returns The session context for the current request, or undefined if not in a request context + */ + protected getSessionContext(): SessionContext | undefined { + return this.mcpServer.getSessionContext(); + } + public getInputSchema() { return this.inputSchema; }