fix: ignore stale run reconnect conflicts (#3284)

* fix: ignore stale run reconnect conflicts

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: ignore stale run reconnect conflicts

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
zgenu 2026-05-28 17:29:30 +08:00 committed by GitHub
parent 8decfd327e
commit 737abc0e45
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 179 additions and 6 deletions

View File

@ -38,6 +38,47 @@ function injectCsrfHeader(_url: URL, init: RequestInit): RequestInit {
return { ...init, headers };
}
export function isInactiveRunStreamError(error: unknown): boolean {
const status =
typeof error === "object" && error !== null
? Reflect.get(error, "status")
: undefined;
const message =
typeof error === "string"
? error
: error instanceof Error
? error.message
: typeof error === "object" && error !== null
? String(Reflect.get(error, "message") ?? "")
: "";
// Match the gateway's store-only run response in
// backend/app/gateway/routers/thread_runs.py until the API exposes a
// structured error code for inactive run streams.
return (
(status === 409 || message.includes("HTTP 409")) &&
message.includes("not active on this worker") &&
message.includes("cannot be streamed")
);
}
export function clearReconnectRun(
threadId: string | null | undefined,
runId: string,
): void {
if (typeof window === "undefined" || !threadId) return;
const key = `lg:stream:${threadId}`;
try {
const storage = window.sessionStorage;
if (storage.getItem(key) === runId) {
storage.removeItem(key);
}
} catch {
// Ignore storage access failures so reconnect cleanup never throws.
}
}
function createCompatibleClient(isMock?: boolean): LangGraphClient {
if (isStaticWebsiteOnly() && !isMock) {
return createStaticClient();
@ -59,12 +100,21 @@ function createCompatibleClient(isMock?: boolean): LangGraphClient {
)) as typeof client.runs.stream;
const originalJoinStream = client.runs.joinStream.bind(client.runs);
client.runs.joinStream = ((threadId, runId, options) =>
originalJoinStream(
threadId,
runId,
sanitizeRunStreamOptions(options),
)) as typeof client.runs.joinStream;
client.runs.joinStream = async function* (threadId, runId, options) {
try {
yield* originalJoinStream(
threadId,
runId,
sanitizeRunStreamOptions(options),
);
} catch (error) {
if (isInactiveRunStreamError(error)) {
clearReconnectRun(threadId, runId);
return;
}
throw error;
}
} as typeof client.runs.joinStream;
return client;
}

View File

@ -0,0 +1,123 @@
import { afterEach, expect, test, vi } from "vitest";
import {
clearReconnectRun,
getAPIClient,
isInactiveRunStreamError,
} from "@/core/api/api-client";
function makeSessionStorage() {
const values = new Map<string, string>();
return {
getItem: vi.fn((key: string) => values.get(key) ?? null),
removeItem: vi.fn((key: string) => {
values.delete(key);
}),
setItem: vi.fn((key: string, value: string) => {
values.set(key, value);
}),
};
}
afterEach(() => {
vi.unstubAllGlobals();
});
test("identifies inactive run stream errors", () => {
const error = Object.assign(
new Error(
'HTTP 409: {"detail":"Run run-1 is not active on this worker and cannot be streamed"}',
),
{ status: 409 },
);
expect(isInactiveRunStreamError(error)).toBe(true);
});
test("does not classify unrelated conflict errors as inactive streams", () => {
const error = Object.assign(new Error("HTTP 409: run is still active"), {
status: 409,
});
expect(isInactiveRunStreamError(error)).toBe(false);
});
test("clears matching reconnect metadata", () => {
const sessionStorage = makeSessionStorage();
sessionStorage.setItem("lg:stream:thread-1", "run-1");
vi.stubGlobal("window", { sessionStorage });
clearReconnectRun("thread-1", "run-1");
expect(sessionStorage.removeItem).toHaveBeenCalledWith("lg:stream:thread-1");
});
test("keeps newer reconnect metadata", () => {
const sessionStorage = makeSessionStorage();
sessionStorage.setItem("lg:stream:thread-1", "newer-run");
vi.stubGlobal("window", { sessionStorage });
clearReconnectRun("thread-1", "stale-run");
expect(sessionStorage.removeItem).not.toHaveBeenCalled();
});
test("ignores reconnect metadata storage access failures", () => {
vi.stubGlobal("window", {
get sessionStorage() {
throw new DOMException("Blocked", "SecurityError");
},
});
expect(() => clearReconnectRun("thread-1", "run-1")).not.toThrow();
});
test("clears stale reconnect metadata when join stream cannot be resumed", async () => {
const sessionStorage = makeSessionStorage();
sessionStorage.setItem("lg:stream:thread-1", "run-1");
vi.stubGlobal("window", {
location: { origin: "http://localhost:2026" },
sessionStorage,
});
vi.stubGlobal(
"fetch",
vi.fn(async () => {
return new Response(
JSON.stringify({
detail:
"Run run-1 is not active on this worker and cannot be streamed",
}),
{ status: 409 },
);
}),
);
await expect(
getAPIClient(true).runs.joinStream("thread-1", "run-1").next(),
).resolves.toMatchObject({ done: true });
expect(sessionStorage.removeItem).toHaveBeenCalledWith("lg:stream:thread-1");
});
test("rethrows unrelated streaming errors", async () => {
const sessionStorage = makeSessionStorage();
sessionStorage.setItem("lg:stream:thread-1", "run-1");
vi.stubGlobal("window", {
location: { origin: "http://localhost:2026" },
sessionStorage,
});
vi.stubGlobal(
"fetch",
vi.fn(async () => {
return new Response(JSON.stringify({ detail: "run is still active" }), {
status: 409,
});
}),
);
await expect(
getAPIClient(true).runs.joinStream("thread-1", "run-1").next(),
).rejects.toThrow("HTTP 409");
expect(sessionStorage.removeItem).not.toHaveBeenCalled();
});