From 737abc0e45b9baf9d3a31843b7360a4b219a09e5 Mon Sep 17 00:00:00 2001 From: zgenu <2023020285@bistu.edu.cn> Date: Thu, 28 May 2026 17:29:30 +0800 Subject: [PATCH] fix: ignore stale run reconnect conflicts (#3284) * fix: ignore stale run reconnect conflicts * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * fix: ignore stale run reconnect conflicts --------- Co-authored-by: Willem Jiang Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- frontend/src/core/api/api-client.ts | 62 ++++++++- .../tests/unit/core/api/api-client.test.ts | 123 ++++++++++++++++++ 2 files changed, 179 insertions(+), 6 deletions(-) create mode 100644 frontend/tests/unit/core/api/api-client.test.ts diff --git a/frontend/src/core/api/api-client.ts b/frontend/src/core/api/api-client.ts index 841c2cdfb..ff03fac6f 100644 --- a/frontend/src/core/api/api-client.ts +++ b/frontend/src/core/api/api-client.ts @@ -38,6 +38,47 @@ function injectCsrfHeader(_url: URL, init: RequestInit): RequestInit { return { ...init, headers }; } +export function isInactiveRunStreamError(error: unknown): boolean { + const status = + typeof error === "object" && error !== null + ? Reflect.get(error, "status") + : undefined; + const message = + typeof error === "string" + ? error + : error instanceof Error + ? error.message + : typeof error === "object" && error !== null + ? String(Reflect.get(error, "message") ?? "") + : ""; + + // Match the gateway's store-only run response in + // backend/app/gateway/routers/thread_runs.py until the API exposes a + // structured error code for inactive run streams. + return ( + (status === 409 || message.includes("HTTP 409")) && + message.includes("not active on this worker") && + message.includes("cannot be streamed") + ); +} + +export function clearReconnectRun( + threadId: string | null | undefined, + runId: string, +): void { + if (typeof window === "undefined" || !threadId) return; + + const key = `lg:stream:${threadId}`; + try { + const storage = window.sessionStorage; + if (storage.getItem(key) === runId) { + storage.removeItem(key); + } + } catch { + // Ignore storage access failures so reconnect cleanup never throws. + } +} + function createCompatibleClient(isMock?: boolean): LangGraphClient { if (isStaticWebsiteOnly() && !isMock) { return createStaticClient(); @@ -59,12 +100,21 @@ function createCompatibleClient(isMock?: boolean): LangGraphClient { )) as typeof client.runs.stream; const originalJoinStream = client.runs.joinStream.bind(client.runs); - client.runs.joinStream = ((threadId, runId, options) => - originalJoinStream( - threadId, - runId, - sanitizeRunStreamOptions(options), - )) as typeof client.runs.joinStream; + client.runs.joinStream = async function* (threadId, runId, options) { + try { + yield* originalJoinStream( + threadId, + runId, + sanitizeRunStreamOptions(options), + ); + } catch (error) { + if (isInactiveRunStreamError(error)) { + clearReconnectRun(threadId, runId); + return; + } + throw error; + } + } as typeof client.runs.joinStream; return client; } diff --git a/frontend/tests/unit/core/api/api-client.test.ts b/frontend/tests/unit/core/api/api-client.test.ts new file mode 100644 index 000000000..7e8e595f1 --- /dev/null +++ b/frontend/tests/unit/core/api/api-client.test.ts @@ -0,0 +1,123 @@ +import { afterEach, expect, test, vi } from "vitest"; + +import { + clearReconnectRun, + getAPIClient, + isInactiveRunStreamError, +} from "@/core/api/api-client"; + +function makeSessionStorage() { + const values = new Map(); + return { + getItem: vi.fn((key: string) => values.get(key) ?? null), + removeItem: vi.fn((key: string) => { + values.delete(key); + }), + setItem: vi.fn((key: string, value: string) => { + values.set(key, value); + }), + }; +} + +afterEach(() => { + vi.unstubAllGlobals(); +}); + +test("identifies inactive run stream errors", () => { + const error = Object.assign( + new Error( + 'HTTP 409: {"detail":"Run run-1 is not active on this worker and cannot be streamed"}', + ), + { status: 409 }, + ); + + expect(isInactiveRunStreamError(error)).toBe(true); +}); + +test("does not classify unrelated conflict errors as inactive streams", () => { + const error = Object.assign(new Error("HTTP 409: run is still active"), { + status: 409, + }); + + expect(isInactiveRunStreamError(error)).toBe(false); +}); + +test("clears matching reconnect metadata", () => { + const sessionStorage = makeSessionStorage(); + sessionStorage.setItem("lg:stream:thread-1", "run-1"); + vi.stubGlobal("window", { sessionStorage }); + + clearReconnectRun("thread-1", "run-1"); + + expect(sessionStorage.removeItem).toHaveBeenCalledWith("lg:stream:thread-1"); +}); + +test("keeps newer reconnect metadata", () => { + const sessionStorage = makeSessionStorage(); + sessionStorage.setItem("lg:stream:thread-1", "newer-run"); + vi.stubGlobal("window", { sessionStorage }); + + clearReconnectRun("thread-1", "stale-run"); + + expect(sessionStorage.removeItem).not.toHaveBeenCalled(); +}); + +test("ignores reconnect metadata storage access failures", () => { + vi.stubGlobal("window", { + get sessionStorage() { + throw new DOMException("Blocked", "SecurityError"); + }, + }); + + expect(() => clearReconnectRun("thread-1", "run-1")).not.toThrow(); +}); + +test("clears stale reconnect metadata when join stream cannot be resumed", async () => { + const sessionStorage = makeSessionStorage(); + sessionStorage.setItem("lg:stream:thread-1", "run-1"); + vi.stubGlobal("window", { + location: { origin: "http://localhost:2026" }, + sessionStorage, + }); + vi.stubGlobal( + "fetch", + vi.fn(async () => { + return new Response( + JSON.stringify({ + detail: + "Run run-1 is not active on this worker and cannot be streamed", + }), + { status: 409 }, + ); + }), + ); + + await expect( + getAPIClient(true).runs.joinStream("thread-1", "run-1").next(), + ).resolves.toMatchObject({ done: true }); + + expect(sessionStorage.removeItem).toHaveBeenCalledWith("lg:stream:thread-1"); +}); + +test("rethrows unrelated streaming errors", async () => { + const sessionStorage = makeSessionStorage(); + sessionStorage.setItem("lg:stream:thread-1", "run-1"); + vi.stubGlobal("window", { + location: { origin: "http://localhost:2026" }, + sessionStorage, + }); + vi.stubGlobal( + "fetch", + vi.fn(async () => { + return new Response(JSON.stringify({ detail: "run is still active" }), { + status: 409, + }); + }), + ); + + await expect( + getAPIClient(true).runs.joinStream("thread-1", "run-1").next(), + ).rejects.toThrow("HTTP 409"); + + expect(sessionStorage.removeItem).not.toHaveBeenCalled(); +});