From cbbe39d28cd8bb3fe1980ab00734e214bf214022 Mon Sep 17 00:00:00 2001 From: rayhpeng Date: Sun, 12 Apr 2026 16:09:34 +0800 Subject: [PATCH] feat(api): add GET /api/runs/{run_id}/messages with cursor pagination New endpoint resolves thread_id from the run record and delegates to RunEventStore.list_messages_by_run for cursor-based pagination. Ownership is enforced implicitly via RunStore.get() user filtering. Co-Authored-By: Claude Sonnet 4.6 --- backend/app/gateway/routers/runs.py | 50 ++++++- backend/tests/test_runs_api_endpoints.py | 173 +++++++++++++++++++++++ 2 files changed, 221 insertions(+), 2 deletions(-) create mode 100644 backend/tests/test_runs_api_endpoints.py diff --git a/backend/app/gateway/routers/runs.py b/backend/app/gateway/routers/runs.py index 7d17488fc..66dac1627 100644 --- a/backend/app/gateway/routers/runs.py +++ b/backend/app/gateway/routers/runs.py @@ -11,10 +11,11 @@ import asyncio import logging import uuid -from fastapi import APIRouter, Request +from fastapi import APIRouter, HTTPException, Query, Request from fastapi.responses import StreamingResponse -from app.gateway.deps import get_checkpointer, get_run_manager, get_stream_bridge +from app.gateway.authz import require_permission +from app.gateway.deps import get_checkpointer, get_feedback_repo, get_run_event_store, get_run_manager, get_run_store, get_stream_bridge from app.gateway.routers.thread_runs import RunCreateRequest from app.gateway.services import sse_consumer, start_run from deerflow.runtime import serialize_channel_values @@ -85,3 +86,48 @@ async def stateless_wait(body: RunCreateRequest, request: Request) -> dict: logger.exception("Failed to fetch final state for run %s", record.run_id) return {"status": record.status.value, "error": record.error} + + +# --------------------------------------------------------------------------- +# Run-scoped read endpoints +# --------------------------------------------------------------------------- + + +async def _resolve_run(run_id: str, request: Request) -> dict: + """Fetch run by run_id with user ownership check. Raises 404 if not found.""" + run_store = get_run_store(request) + record = await run_store.get(run_id) # user_id=AUTO filters by contextvar + if record is None: + raise HTTPException(status_code=404, detail=f"Run {run_id} not found") + return record + + +@router.get("/{run_id}/messages") +@require_permission("runs", "read") +async def run_messages( + run_id: str, + request: Request, + limit: int = Query(default=50, le=200, ge=1), + before_seq: int | None = Query(default=None), + after_seq: int | None = Query(default=None), +) -> dict: + """Return paginated messages for a run (cursor-based). + + Pagination: + - after_seq: messages with seq > after_seq (forward) + - before_seq: messages with seq < before_seq (backward) + - neither: latest messages + + Response: { data: [...], has_more: bool } + """ + run = await _resolve_run(run_id, request) + event_store = get_run_event_store(request) + rows = await event_store.list_messages_by_run( + run["thread_id"], run_id, + limit=limit + 1, + before_seq=before_seq, + after_seq=after_seq, + ) + has_more = len(rows) > limit + data = rows[:limit] if has_more else rows + return {"data": data, "has_more": has_more} diff --git a/backend/tests/test_runs_api_endpoints.py b/backend/tests/test_runs_api_endpoints.py new file mode 100644 index 000000000..dd695b1db --- /dev/null +++ b/backend/tests/test_runs_api_endpoints.py @@ -0,0 +1,173 @@ +"""Tests for GET /api/runs/{run_id}/messages endpoint.""" +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from _router_auth_helpers import make_authed_test_app +from fastapi.testclient import TestClient + +from app.gateway.routers import runs + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_app(run_store=None, event_store=None): + """Build a test FastAPI app with stub auth and mocked state.""" + app = make_authed_test_app() + app.include_router(runs.router) + + if run_store is not None: + app.state.run_store = run_store + if event_store is not None: + app.state.run_event_store = event_store + + return app + + +def _make_run_store(run_record: dict | None): + """Return an AsyncMock run store whose get() returns run_record.""" + store = MagicMock() + store.get = AsyncMock(return_value=run_record) + return store + + +def _make_event_store(rows: list[dict]): + """Return an AsyncMock event store whose list_messages_by_run() returns rows.""" + store = MagicMock() + store.list_messages_by_run = AsyncMock(return_value=rows) + return store + + +def _make_message(seq: int) -> dict: + return {"seq": seq, "event_type": "on_chat_model_stream", "category": "message", "content": f"msg-{seq}"} + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_run_messages_returns_envelope(): + """GET /api/runs/{run_id}/messages returns {data: [...], has_more: bool}.""" + rows = [_make_message(i) for i in range(1, 4)] + run_record = {"run_id": "run-1", "thread_id": "thread-1"} + app = _make_app( + run_store=_make_run_store(run_record), + event_store=_make_event_store(rows), + ) + with TestClient(app) as client: + response = client.get("/api/runs/run-1/messages") + assert response.status_code == 200 + body = response.json() + assert "data" in body + assert "has_more" in body + assert body["has_more"] is False + assert len(body["data"]) == 3 + + +def test_run_messages_404_when_run_not_found(): + """Returns 404 when the run store returns None.""" + app = _make_app( + run_store=_make_run_store(None), + event_store=_make_event_store([]), + ) + with TestClient(app) as client: + response = client.get("/api/runs/missing-run/messages") + assert response.status_code == 404 + assert "missing-run" in response.json()["detail"] + + +def test_run_messages_has_more_true_when_extra_row_returned(): + """has_more=True when event store returns limit+1 rows.""" + # Default limit is 50; provide 51 rows + rows = [_make_message(i) for i in range(1, 52)] # 51 rows + run_record = {"run_id": "run-2", "thread_id": "thread-2"} + app = _make_app( + run_store=_make_run_store(run_record), + event_store=_make_event_store(rows), + ) + with TestClient(app) as client: + response = client.get("/api/runs/run-2/messages") + assert response.status_code == 200 + body = response.json() + assert body["has_more"] is True + assert len(body["data"]) == 50 # trimmed to limit + + +def test_run_messages_passes_after_seq_to_event_store(): + """after_seq query param is forwarded to event_store.list_messages_by_run.""" + rows = [_make_message(10)] + run_record = {"run_id": "run-3", "thread_id": "thread-3"} + event_store = _make_event_store(rows) + app = _make_app( + run_store=_make_run_store(run_record), + event_store=event_store, + ) + with TestClient(app) as client: + response = client.get("/api/runs/run-3/messages?after_seq=5") + assert response.status_code == 200 + event_store.list_messages_by_run.assert_awaited_once_with( + "thread-3", "run-3", + limit=51, # default limit(50) + 1 + before_seq=None, + after_seq=5, + ) + + +def test_run_messages_respects_custom_limit(): + """Custom limit is respected and capped at 200.""" + rows = [_make_message(i) for i in range(1, 6)] + run_record = {"run_id": "run-4", "thread_id": "thread-4"} + event_store = _make_event_store(rows) + app = _make_app( + run_store=_make_run_store(run_record), + event_store=event_store, + ) + with TestClient(app) as client: + response = client.get("/api/runs/run-4/messages?limit=10") + assert response.status_code == 200 + event_store.list_messages_by_run.assert_awaited_once_with( + "thread-4", "run-4", + limit=11, # 10 + 1 + before_seq=None, + after_seq=None, + ) + + +def test_run_messages_passes_before_seq_to_event_store(): + """before_seq query param is forwarded to event_store.list_messages_by_run.""" + rows = [_make_message(3)] + run_record = {"run_id": "run-5", "thread_id": "thread-5"} + event_store = _make_event_store(rows) + app = _make_app( + run_store=_make_run_store(run_record), + event_store=event_store, + ) + with TestClient(app) as client: + response = client.get("/api/runs/run-5/messages?before_seq=10") + assert response.status_code == 200 + event_store.list_messages_by_run.assert_awaited_once_with( + "thread-5", "run-5", + limit=51, + before_seq=10, + after_seq=None, + ) + + +def test_run_messages_empty_data(): + """Returns empty data list when no messages exist.""" + run_record = {"run_id": "run-6", "thread_id": "thread-6"} + app = _make_app( + run_store=_make_run_store(run_record), + event_store=_make_event_store([]), + ) + with TestClient(app) as client: + response = client.get("/api/runs/run-6/messages") + assert response.status_code == 200 + body = response.json() + assert body["data"] == [] + assert body["has_more"] is False