From 37451500eb4381dd38ee8a721e10f8160d63de15 Mon Sep 17 00:00:00 2001 From: Lucy Shen <49802413+player0718@users.noreply.github.com> Date: Thu, 28 May 2026 08:20:52 +0800 Subject: [PATCH] fix(gateway): split stream_existing_run into per-method routes for unique OpenAPI operationIds (#3228) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(gateway): split stream_existing_run into per-method routes for unique OpenAPI operationIds `@router.api_route("/.../stream", methods=["GET", "POST"])` registers a single FastAPI route that holds both methods. FastAPI's auto-generated `operationId` is computed once per route from a single method picked out of `route.methods`, so when OpenAPI generation iterates over every method on that route both end up sharing the same `operationId`. That triggers `UserWarning: Duplicate Operation ID stream_existing_run_..._stream_(get|post) for function stream_existing_run` during `app.openapi()` and produces an invalid OpenAPI spec for SDK / codegen consumers. Register GET and POST as two separate routes on the same handler so each method gets a distinct auto-generated `operationId` ("..._stream_get" and "..._stream_post"). Behavior is otherwise unchanged: same handler, same `require_permission` decoration, same response. Add `tests/test_openapi_operation_ids.py` to lock in the invariant: no duplicate-operationId warnings during spec generation, globally unique operationIds across the spec, and distinct GET / POST operationIds on the stream endpoint specifically. Reverted the source change locally and confirmed all three tests fail before the fix. * test(runtime): widen CancelledError catch in _ScriptedAgent to fix cancel-race flake `_ScriptedAgent.astream()` previously only caught `asyncio.CancelledError` inside the inner `if self.block_after_first_chunk:` while-loop. Cancellation arriving during any earlier `await` in the same body (`self.model.ainvoke`, `_write_checkpoint`, the `yield`) would propagate without setting `controller.cancelled`, so callers waiting on `controller.cancelled.wait(5)` after `POST /cancel` returned 204 could race and time out. `test_cancel_interrupt_stops_running_background_run` waits only for the `started` event (set on the first line of `astream`) before issuing cancel, so its race window spans all three pre-loop `await`s. On a clean `main` checkout, stress-running the test 20× reproduces the failure 6/20 (~30%). `test_cancel_rollback_restores_pre_run_checkpoint`, which waits for the later `checkpoint_written` event, passes 20/20 — confirming the race lives entirely in the gap between `started.set()` and the cancellation-aware block. Widen the try/except to cover the entire `astream` body so any `CancelledError` sets the controller event; the non-cancel path is unchanged (no exception means no event set). After this change the previously flaky test passes 50/50, the rollback test still passes 30/30, and the full backend suite remains at 3649 passed / 19 skipped. Test-only change — `backend/tests/test_runtime_lifecycle_e2e.py` is the only file touched; the production cancel pipeline is unaffected. --- backend/app/gateway/routers/thread_runs.py | 7 +- backend/tests/test_openapi_operation_ids.py | 79 +++++++++++++++++++++ backend/tests/test_runtime_lifecycle_e2e.py | 33 +++++---- 3 files changed, 104 insertions(+), 15 deletions(-) create mode 100644 backend/tests/test_openapi_operation_ids.py diff --git a/backend/app/gateway/routers/thread_runs.py b/backend/app/gateway/routers/thread_runs.py index 8a7cade4d..9fc4dfa68 100644 --- a/backend/app/gateway/routers/thread_runs.py +++ b/backend/app/gateway/routers/thread_runs.py @@ -278,7 +278,12 @@ async def join_run(thread_id: str, run_id: str, request: Request) -> StreamingRe ) -@router.api_route("/{thread_id}/runs/{run_id}/stream", methods=["GET", "POST"], response_model=None) +# Register GET and POST as separate routes so each method gets a unique OpenAPI +# operationId. ``api_route(methods=["GET", "POST"])`` shares one route registration +# across both methods, which makes FastAPI emit the same ``operationId`` twice and +# warn about a duplicate operation id during OpenAPI generation. +@router.get("/{thread_id}/runs/{run_id}/stream", response_model=None) +@router.post("/{thread_id}/runs/{run_id}/stream", response_model=None) @require_permission("runs", "read", owner_check=True) async def stream_existing_run( thread_id: str, diff --git a/backend/tests/test_openapi_operation_ids.py b/backend/tests/test_openapi_operation_ids.py new file mode 100644 index 000000000..d6bb46162 --- /dev/null +++ b/backend/tests/test_openapi_operation_ids.py @@ -0,0 +1,79 @@ +"""Regression tests for the generated OpenAPI spec. + +The Gateway exposes its FastAPI ``app.openapi()`` schema at ``/openapi.json`` +and downstream tooling (SDK codegen, schema validators, client generators) +relies on ``operationId`` values being globally unique. FastAPI emits a +``UserWarning`` during spec generation when two routes share the same +``operationId`` — concretely this happens when ``@router.api_route`` registers +one route for multiple HTTP methods, because the auto-generated unique id is +computed from a single method picked out of ``route.methods`` while OpenAPI +generation iterates over every method on that route. + +These tests pin that invariant so the warning cannot silently come back. +""" + +from __future__ import annotations + +import warnings + +import pytest + + +@pytest.fixture(scope="module") +def openapi_spec() -> dict: + """Build the OpenAPI spec for the Gateway app once per module.""" + from app.gateway.app import app + + # ``app.openapi()`` caches the result on the FastAPI instance, so reset to + # force a fresh generation pass that triggers any duplicate-id warnings. + app.openapi_schema = None + return app.openapi() + + +def test_openapi_spec_has_no_duplicate_operation_warnings() -> None: + """Generating the OpenAPI schema must not emit any ``Duplicate Operation ID`` UserWarning.""" + from app.gateway.app import app + + app.openapi_schema = None + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + app.openapi() + + dup_messages = [str(item.message) for item in caught if "Duplicate Operation ID" in str(item.message)] + assert dup_messages == [], f"OpenAPI generation emitted duplicate operation id warnings: {dup_messages}" + + +def test_openapi_operation_ids_are_unique(openapi_spec: dict) -> None: + """Every (path, method) operation in the spec must carry a unique ``operationId``.""" + op_id_to_locations: dict[str, list[tuple[str, str]]] = {} + + for path, path_item in openapi_spec.get("paths", {}).items(): + for method, operation in path_item.items(): + if not isinstance(operation, dict): + continue + op_id = operation.get("operationId") + if op_id is None: + continue + op_id_to_locations.setdefault(op_id, []).append((path, method)) + + duplicates = {op_id: locations for op_id, locations in op_id_to_locations.items() if len(locations) > 1} + assert not duplicates, f"Duplicate operationIds in OpenAPI spec: {duplicates}" + + +def test_stream_existing_run_exposes_distinct_get_and_post(openapi_spec: dict) -> None: + """The ``/runs/{run_id}/stream`` endpoint must expose GET and POST as distinct operations. + + LangGraph SDK ``joinStream`` uses GET while ``useStream``'s stop button uses POST, so + both methods must remain registered with their own ``operationId``. + """ + path = "/api/threads/{thread_id}/runs/{run_id}/stream" + path_item = openapi_spec["paths"].get(path) + assert path_item is not None, f"Expected {path} to be present in the OpenAPI spec" + + assert "get" in path_item, f"Expected GET handler on {path}" + assert "post" in path_item, f"Expected POST handler on {path}" + + get_op_id = path_item["get"].get("operationId") + post_op_id = path_item["post"].get("operationId") + assert get_op_id and post_op_id, "Both GET and POST must have operationIds" + assert get_op_id != post_op_id, f"GET and POST share operationId {get_op_id!r}, which breaks OpenAPI codegen" diff --git a/backend/tests/test_runtime_lifecycle_e2e.py b/backend/tests/test_runtime_lifecycle_e2e.py index 1eda351ec..be3fb506e 100644 --- a/backend/tests/test_runtime_lifecycle_e2e.py +++ b/backend/tests/test_runtime_lifecycle_e2e.py @@ -96,25 +96,30 @@ class _ScriptedAgent: del subgraphs self.controller.started.set() - thread_id = _thread_id_from_config(config) - human_text = _last_human_text(graph_input) - human = HumanMessage(content=human_text) - ai = await self.model.ainvoke([human], config=config) - state = {"messages": [human.model_dump(), ai.model_dump()], "title": self.title} + try: + thread_id = _thread_id_from_config(config) + human_text = _last_human_text(graph_input) + human = HumanMessage(content=human_text) + ai = await self.model.ainvoke([human], config=config) + state = {"messages": [human.model_dump(), ai.model_dump()], "title": self.title} - if self.checkpointer is not None: - await _write_checkpoint(self.checkpointer, thread_id=thread_id, state=state) - self.controller.checkpoint_written.set() + if self.checkpointer is not None: + await _write_checkpoint(self.checkpointer, thread_id=thread_id, state=state) + self.controller.checkpoint_written.set() - yield _stream_item_for_mode(stream_mode, state) + yield _stream_item_for_mode(stream_mode, state) - if self.block_after_first_chunk: - try: + if self.block_after_first_chunk: while not self.controller.release.is_set(): await asyncio.sleep(0.05) - except asyncio.CancelledError: - self.controller.cancelled.set() - raise + except asyncio.CancelledError: + # Catch cancellation arriving anywhere in the body — including the + # `await ainvoke()` / `_write_checkpoint()` / `yield` points between + # ``started.set()`` and the original inner ``try`` — so tests that + # wait for ``cancelled`` after issuing ``POST /cancel`` no longer + # race with cancellation arriving early. + self.controller.cancelled.set() + raise def _make_agent_factory(controller: _RunController, **agent_kwargs):