mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-30 20:38:09 +00:00
fix(gateway): split stream_existing_run into per-method routes for unique OpenAPI operationIds (#3228)
* fix(gateway): split stream_existing_run into per-method routes for unique OpenAPI operationIds
`@router.api_route("/.../stream", methods=["GET", "POST"])` registers a
single FastAPI route that holds both methods. FastAPI's auto-generated
`operationId` is computed once per route from a single method picked out
of `route.methods`, so when OpenAPI generation iterates over every method
on that route both end up sharing the same `operationId`. That triggers
`UserWarning: Duplicate Operation ID stream_existing_run_..._stream_(get|post) for function stream_existing_run`
during `app.openapi()` and produces an invalid OpenAPI spec for SDK /
codegen consumers.
Register GET and POST as two separate routes on the same handler so each
method gets a distinct auto-generated `operationId` ("..._stream_get" and
"..._stream_post"). Behavior is otherwise unchanged: same handler, same
`require_permission` decoration, same response.
Add `tests/test_openapi_operation_ids.py` to lock in the invariant:
no duplicate-operationId warnings during spec generation, globally unique
operationIds across the spec, and distinct GET / POST operationIds on the
stream endpoint specifically. Reverted the source change locally and
confirmed all three tests fail before the fix.
* test(runtime): widen CancelledError catch in _ScriptedAgent to fix cancel-race flake
`_ScriptedAgent.astream()` previously only caught `asyncio.CancelledError`
inside the inner `if self.block_after_first_chunk:` while-loop. Cancellation
arriving during any earlier `await` in the same body
(`self.model.ainvoke`, `_write_checkpoint`, the `yield`) would propagate
without setting `controller.cancelled`, so callers waiting on
`controller.cancelled.wait(5)` after `POST /cancel` returned 204 could race
and time out.
`test_cancel_interrupt_stops_running_background_run` waits only for the
`started` event (set on the first line of `astream`) before issuing cancel,
so its race window spans all three pre-loop `await`s. On a clean `main`
checkout, stress-running the test 20× reproduces the failure 6/20
(~30%). `test_cancel_rollback_restores_pre_run_checkpoint`, which waits
for the later `checkpoint_written` event, passes 20/20 — confirming the
race lives entirely in the gap between `started.set()` and the
cancellation-aware block.
Widen the try/except to cover the entire `astream` body so any
`CancelledError` sets the controller event; the non-cancel path is
unchanged (no exception means no event set). After this change the
previously flaky test passes 50/50, the rollback test still passes 30/30,
and the full backend suite remains at 3649 passed / 19 skipped.
Test-only change — `backend/tests/test_runtime_lifecycle_e2e.py` is the
only file touched; the production cancel pipeline is unaffected.
This commit is contained in:
parent
3cb75887c1
commit
37451500eb
@ -278,7 +278,12 @@ async def join_run(thread_id: str, run_id: str, request: Request) -> StreamingRe
|
||||
)
|
||||
|
||||
|
||||
@router.api_route("/{thread_id}/runs/{run_id}/stream", methods=["GET", "POST"], response_model=None)
|
||||
# Register GET and POST as separate routes so each method gets a unique OpenAPI
|
||||
# operationId. ``api_route(methods=["GET", "POST"])`` shares one route registration
|
||||
# across both methods, which makes FastAPI emit the same ``operationId`` twice and
|
||||
# warn about a duplicate operation id during OpenAPI generation.
|
||||
@router.get("/{thread_id}/runs/{run_id}/stream", response_model=None)
|
||||
@router.post("/{thread_id}/runs/{run_id}/stream", response_model=None)
|
||||
@require_permission("runs", "read", owner_check=True)
|
||||
async def stream_existing_run(
|
||||
thread_id: str,
|
||||
|
||||
79
backend/tests/test_openapi_operation_ids.py
Normal file
79
backend/tests/test_openapi_operation_ids.py
Normal file
@ -0,0 +1,79 @@
|
||||
"""Regression tests for the generated OpenAPI spec.
|
||||
|
||||
The Gateway exposes its FastAPI ``app.openapi()`` schema at ``/openapi.json``
|
||||
and downstream tooling (SDK codegen, schema validators, client generators)
|
||||
relies on ``operationId`` values being globally unique. FastAPI emits a
|
||||
``UserWarning`` during spec generation when two routes share the same
|
||||
``operationId`` — concretely this happens when ``@router.api_route`` registers
|
||||
one route for multiple HTTP methods, because the auto-generated unique id is
|
||||
computed from a single method picked out of ``route.methods`` while OpenAPI
|
||||
generation iterates over every method on that route.
|
||||
|
||||
These tests pin that invariant so the warning cannot silently come back.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import warnings
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def openapi_spec() -> dict:
|
||||
"""Build the OpenAPI spec for the Gateway app once per module."""
|
||||
from app.gateway.app import app
|
||||
|
||||
# ``app.openapi()`` caches the result on the FastAPI instance, so reset to
|
||||
# force a fresh generation pass that triggers any duplicate-id warnings.
|
||||
app.openapi_schema = None
|
||||
return app.openapi()
|
||||
|
||||
|
||||
def test_openapi_spec_has_no_duplicate_operation_warnings() -> None:
|
||||
"""Generating the OpenAPI schema must not emit any ``Duplicate Operation ID`` UserWarning."""
|
||||
from app.gateway.app import app
|
||||
|
||||
app.openapi_schema = None
|
||||
with warnings.catch_warnings(record=True) as caught:
|
||||
warnings.simplefilter("always")
|
||||
app.openapi()
|
||||
|
||||
dup_messages = [str(item.message) for item in caught if "Duplicate Operation ID" in str(item.message)]
|
||||
assert dup_messages == [], f"OpenAPI generation emitted duplicate operation id warnings: {dup_messages}"
|
||||
|
||||
|
||||
def test_openapi_operation_ids_are_unique(openapi_spec: dict) -> None:
|
||||
"""Every (path, method) operation in the spec must carry a unique ``operationId``."""
|
||||
op_id_to_locations: dict[str, list[tuple[str, str]]] = {}
|
||||
|
||||
for path, path_item in openapi_spec.get("paths", {}).items():
|
||||
for method, operation in path_item.items():
|
||||
if not isinstance(operation, dict):
|
||||
continue
|
||||
op_id = operation.get("operationId")
|
||||
if op_id is None:
|
||||
continue
|
||||
op_id_to_locations.setdefault(op_id, []).append((path, method))
|
||||
|
||||
duplicates = {op_id: locations for op_id, locations in op_id_to_locations.items() if len(locations) > 1}
|
||||
assert not duplicates, f"Duplicate operationIds in OpenAPI spec: {duplicates}"
|
||||
|
||||
|
||||
def test_stream_existing_run_exposes_distinct_get_and_post(openapi_spec: dict) -> None:
|
||||
"""The ``/runs/{run_id}/stream`` endpoint must expose GET and POST as distinct operations.
|
||||
|
||||
LangGraph SDK ``joinStream`` uses GET while ``useStream``'s stop button uses POST, so
|
||||
both methods must remain registered with their own ``operationId``.
|
||||
"""
|
||||
path = "/api/threads/{thread_id}/runs/{run_id}/stream"
|
||||
path_item = openapi_spec["paths"].get(path)
|
||||
assert path_item is not None, f"Expected {path} to be present in the OpenAPI spec"
|
||||
|
||||
assert "get" in path_item, f"Expected GET handler on {path}"
|
||||
assert "post" in path_item, f"Expected POST handler on {path}"
|
||||
|
||||
get_op_id = path_item["get"].get("operationId")
|
||||
post_op_id = path_item["post"].get("operationId")
|
||||
assert get_op_id and post_op_id, "Both GET and POST must have operationIds"
|
||||
assert get_op_id != post_op_id, f"GET and POST share operationId {get_op_id!r}, which breaks OpenAPI codegen"
|
||||
@ -96,25 +96,30 @@ class _ScriptedAgent:
|
||||
del subgraphs
|
||||
self.controller.started.set()
|
||||
|
||||
thread_id = _thread_id_from_config(config)
|
||||
human_text = _last_human_text(graph_input)
|
||||
human = HumanMessage(content=human_text)
|
||||
ai = await self.model.ainvoke([human], config=config)
|
||||
state = {"messages": [human.model_dump(), ai.model_dump()], "title": self.title}
|
||||
try:
|
||||
thread_id = _thread_id_from_config(config)
|
||||
human_text = _last_human_text(graph_input)
|
||||
human = HumanMessage(content=human_text)
|
||||
ai = await self.model.ainvoke([human], config=config)
|
||||
state = {"messages": [human.model_dump(), ai.model_dump()], "title": self.title}
|
||||
|
||||
if self.checkpointer is not None:
|
||||
await _write_checkpoint(self.checkpointer, thread_id=thread_id, state=state)
|
||||
self.controller.checkpoint_written.set()
|
||||
if self.checkpointer is not None:
|
||||
await _write_checkpoint(self.checkpointer, thread_id=thread_id, state=state)
|
||||
self.controller.checkpoint_written.set()
|
||||
|
||||
yield _stream_item_for_mode(stream_mode, state)
|
||||
yield _stream_item_for_mode(stream_mode, state)
|
||||
|
||||
if self.block_after_first_chunk:
|
||||
try:
|
||||
if self.block_after_first_chunk:
|
||||
while not self.controller.release.is_set():
|
||||
await asyncio.sleep(0.05)
|
||||
except asyncio.CancelledError:
|
||||
self.controller.cancelled.set()
|
||||
raise
|
||||
except asyncio.CancelledError:
|
||||
# Catch cancellation arriving anywhere in the body — including the
|
||||
# `await ainvoke()` / `_write_checkpoint()` / `yield` points between
|
||||
# ``started.set()`` and the original inner ``try`` — so tests that
|
||||
# wait for ``cancelled`` after issuing ``POST /cancel`` no longer
|
||||
# race with cancellation arriving early.
|
||||
self.controller.cancelled.set()
|
||||
raise
|
||||
|
||||
|
||||
def _make_agent_factory(controller: _RunController, **agent_kwargs):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user