rayhpeng 34e835bc33
feat(gateway): implement LangGraph Platform API in Gateway, replace langgraph-cli (#1403)
* feat(gateway): implement LangGraph Platform API in Gateway, replace langgraph-cli

Implement all core LangGraph Platform API endpoints in the Gateway,
allowing it to fully replace the langgraph-cli dev server for local
development. This eliminates a heavyweight dependency and simplifies
the development stack.

Changes:
- Add runs lifecycle endpoints (create, stream, wait, cancel, join)
- Add threads CRUD and search endpoints
- Add assistants compatibility endpoints (search, get, graph, schemas)
- Add StreamBridge (in-memory pub/sub for SSE) and async provider
- Add RunManager with atomic create_or_reject (eliminates TOCTOU race)
- Add worker with interrupt/rollback cancel actions and runtime context injection
- Route /api/langgraph/* to Gateway in nginx config
- Skip langgraph-cli startup by default (SKIP_LANGGRAPH_SERVER=0 to restore)
- Add unit tests for RunManager, SSE format, and StreamBridge

* fix: drain bridge queue on client disconnect to prevent backpressure

When on_disconnect=continue, keep consuming events from the bridge
without yielding, so the worker is not blocked by a full queue.
Only on_disconnect=cancel breaks out immediately.

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: remove pytest import

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: Fix default stream_mode to ["values", "messages-tuple"]

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: Remove unused if_exists field from ThreadCreateRequest

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: address review comments on gateway LangGraph API

- Mount runs.py router in app.py (missing include_router)
- Normalize interrupt_before/after "*" to node list before run_agent()
- Use entry.id for SSE event ID instead of counter
- Drain bridge queue on disconnect when on_disconnect=continue
- Reuse serialization helper in wait_run() for consistent wire format
- Reject unsupported multitask_strategy with 400
- Remove SKIP_LANGGRAPH_SERVER fallback, always use Gateway

* feat: extract app.state access into deps.py

Encapsulate read/write operations for singleton objects (RunManager,
StreamBridge, checkpointer) held in app.state into a shared utility,
reducing repeated access patterns across router modules.

* feat: extract deerflow.runtime.serialization module with tests

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor: replace duplicated serialization with deerflow.runtime.serialization

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: extract app/gateway/services.py with run lifecycle logic

Create a service layer that centralizes SSE formatting, input/config
normalization, and run lifecycle management. Router modules will delegate
to these functions instead of using private cross-imported helpers.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor: wire routers to use services layer, remove cross-module private imports

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* style: apply ruff formatting to refactored files

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(runtime): support LangGraph dev server and add compat route

- Enable official LangGraph dev server for local development workflow
- Decouple runtime components from agents package for better separation
- Provide gateway-backed fallback route when dev server is skipped
- Simplify lifecycle management using context manager in gateway

* feat(runtime): add Store providers with auto-backend selection

- Add async_provider.py and provider.py under deerflow/runtime/store/
- Support memory, sqlite, postgres backends matching checkpointer config
- Integrate into FastAPI lifespan via AsyncExitStack in deps.py
- Replace hardcoded InMemoryStore with config-driven factory

* refactor(gateway): migrate thread management from checkpointer to Store and resolve multiple endpoint failures

- Add Store-backed CRUD helpers (_store_get, _store_put, _store_upsert)
- Replace checkpoint-scanning search with two-phase strategy:
  phase 1 reads Store (O(threads)), phase 2 backfills from checkpointer
  for legacy/LangGraph Server threads with lazy migration
- Extend Store record schema with values field for title persistence
- Sync thread title from checkpoint to Store after run completion
- Fix /threads/{id}/runs/{run_id}/stream 405 by accepting both
  GET and POST methods; POST handles interrupt/rollback actions
- Fix /threads/{id}/state 500 by separating read_config and
  write_config, adding checkpoint_ns to configurable, and
  shallow-copying checkpoint/metadata before mutation
- Sync title to Store on state update for immediate search reflection
- Move _upsert_thread_in_store into services.py, remove duplicate logic
- Add _sync_thread_title_after_run: await run task, read final
  checkpoint title, write back to Store record
- Spawn title sync as background task from start_run when Store exists

* refactor(runtime): deduplicate store and checkpointer provider logic

Extract _ensure_sqlite_parent_dir() helper into checkpointer/provider.py
and use it in all three places that previously inlined the same mkdir logic.
Consolidate duplicate error constants in store/async_provider.py by importing
from store/provider.py instead of redefining them.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(runtime): move SQLite helpers to runtime/store, checkpointer imports from store

_resolve_sqlite_conn_str and _ensure_sqlite_parent_dir now live in
runtime/store/provider.py. agents/checkpointer/provider and
agents/checkpointer/async_provider import from there, reversing the
previous dependency direction (store → checkpointer becomes
checkpointer → store).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(runtime): extract SQLite helpers into runtime/store/_sqlite_utils.py

Move resolve_sqlite_conn_str and ensure_sqlite_parent_dir out of
checkpointer/provider.py into a dedicated _sqlite_utils module.
Functions are now public (no underscore prefix), making cross-module
imports semantically correct. All four provider files import from
the single shared location.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(gateway): use adelete_thread to fully remove thread checkpoints on delete

AsyncSqliteSaver has no adelete method — the previous hasattr check
always evaluated to False, silently leaving all checkpoint rows in the
database. Switch to adelete_thread(thread_id) which deletes every
checkpoint and pending-write row for the thread across all namespaces
(including sub-graph checkpoints).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(gateway): remove dead bridge_cm/ckpt_cm code and fix StrEnum lint

app.py had unreachable code after the async-with lifespan refactor:
bridge_cm and ckpt_cm were referenced but never defined (F821), and
the channel service startup/shutdown was outside the langgraph_runtime
block so it never ran. Move channel service lifecycle inside the
async-with block where it belongs.

Replace str+Enum inheritance in RunStatus and DisconnectMode with
StrEnum as suggested by UP042.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* style: format with ruff

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: JeffJiang <for-eleven@hotmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
2026-03-30 16:02:23 +08:00

266 lines
11 KiB
Python

"""Runs endpoints — create, stream, wait, cancel.
Implements the LangGraph Platform runs API on top of
:class:`deerflow.agents.runs.RunManager` and
:class:`deerflow.agents.stream_bridge.StreamBridge`.
SSE format is aligned with the LangGraph Platform protocol so that
the ``useStream`` React hook from ``@langchain/langgraph-sdk/react``
works without modification.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Literal
from fastapi import APIRouter, HTTPException, Query, Request
from fastapi.responses import Response, StreamingResponse
from pydantic import BaseModel, Field
from app.gateway.deps import get_checkpointer, get_run_manager, get_stream_bridge
from app.gateway.services import sse_consumer, start_run
from deerflow.runtime import RunRecord, serialize_channel_values
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/threads", tags=["runs"])
# ---------------------------------------------------------------------------
# Request / response models
# ---------------------------------------------------------------------------
class RunCreateRequest(BaseModel):
assistant_id: str | None = Field(default=None, description="Agent / assistant to use")
input: dict[str, Any] | None = Field(default=None, description="Graph input (e.g. {messages: [...]})")
command: dict[str, Any] | None = Field(default=None, description="LangGraph Command")
metadata: dict[str, Any] | None = Field(default=None, description="Run metadata")
config: dict[str, Any] | None = Field(default=None, description="RunnableConfig overrides")
webhook: str | None = Field(default=None, description="Completion callback URL")
checkpoint_id: str | None = Field(default=None, description="Resume from checkpoint")
checkpoint: dict[str, Any] | None = Field(default=None, description="Full checkpoint object")
interrupt_before: list[str] | Literal["*"] | None = Field(default=None, description="Nodes to interrupt before")
interrupt_after: list[str] | Literal["*"] | None = Field(default=None, description="Nodes to interrupt after")
stream_mode: list[str] | str | None = Field(default=None, description="Stream mode(s)")
stream_subgraphs: bool = Field(default=False, description="Include subgraph events")
stream_resumable: bool | None = Field(default=None, description="SSE resumable mode")
on_disconnect: Literal["cancel", "continue"] = Field(default="cancel", description="Behaviour on SSE disconnect")
on_completion: Literal["delete", "keep"] = Field(default="keep", description="Delete temp thread on completion")
multitask_strategy: Literal["reject", "rollback", "interrupt", "enqueue"] = Field(default="reject", description="Concurrency strategy")
after_seconds: float | None = Field(default=None, description="Delayed execution")
if_not_exists: Literal["reject", "create"] = Field(default="create", description="Thread creation policy")
feedback_keys: list[str] | None = Field(default=None, description="LangSmith feedback keys")
class RunResponse(BaseModel):
run_id: str
thread_id: str
assistant_id: str | None = None
status: str
metadata: dict[str, Any] = Field(default_factory=dict)
kwargs: dict[str, Any] = Field(default_factory=dict)
multitask_strategy: str = "reject"
created_at: str = ""
updated_at: str = ""
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _record_to_response(record: RunRecord) -> RunResponse:
return RunResponse(
run_id=record.run_id,
thread_id=record.thread_id,
assistant_id=record.assistant_id,
status=record.status.value,
metadata=record.metadata,
kwargs=record.kwargs,
multitask_strategy=record.multitask_strategy,
created_at=record.created_at,
updated_at=record.updated_at,
)
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.post("/{thread_id}/runs", response_model=RunResponse)
async def create_run(thread_id: str, body: RunCreateRequest, request: Request) -> RunResponse:
"""Create a background run (returns immediately)."""
record = await start_run(body, thread_id, request)
return _record_to_response(record)
@router.post("/{thread_id}/runs/stream")
async def stream_run(thread_id: str, body: RunCreateRequest, request: Request) -> StreamingResponse:
"""Create a run and stream events via SSE.
The response includes a ``Content-Location`` header with the run's
resource URL, matching the LangGraph Platform protocol. The
``useStream`` React hook uses this to extract run metadata.
"""
bridge = get_stream_bridge(request)
run_mgr = get_run_manager(request)
record = await start_run(body, thread_id, request)
return StreamingResponse(
sse_consumer(bridge, record, request, run_mgr),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
# LangGraph Platform includes run metadata in this header.
# The SDK's _get_run_metadata_from_response() parses it.
"Content-Location": (f"/api/threads/{thread_id}/runs/{record.run_id}/stream?thread_id={thread_id}&run_id={record.run_id}"),
},
)
@router.post("/{thread_id}/runs/wait", response_model=dict)
async def wait_run(thread_id: str, body: RunCreateRequest, request: Request) -> dict:
"""Create a run and block until it completes, returning the final state."""
record = await start_run(body, thread_id, request)
if record.task is not None:
try:
await record.task
except asyncio.CancelledError:
pass
checkpointer = get_checkpointer(request)
config = {"configurable": {"thread_id": thread_id}}
try:
checkpoint_tuple = await checkpointer.aget_tuple(config)
if checkpoint_tuple is not None:
checkpoint = getattr(checkpoint_tuple, "checkpoint", {}) or {}
channel_values = checkpoint.get("channel_values", {})
return serialize_channel_values(channel_values)
except Exception:
logger.exception("Failed to fetch final state for run %s", record.run_id)
return {"status": record.status.value, "error": record.error}
@router.get("/{thread_id}/runs", response_model=list[RunResponse])
async def list_runs(thread_id: str, request: Request) -> list[RunResponse]:
"""List all runs for a thread."""
run_mgr = get_run_manager(request)
records = await run_mgr.list_by_thread(thread_id)
return [_record_to_response(r) for r in records]
@router.get("/{thread_id}/runs/{run_id}", response_model=RunResponse)
async def get_run(thread_id: str, run_id: str, request: Request) -> RunResponse:
"""Get details of a specific run."""
run_mgr = get_run_manager(request)
record = run_mgr.get(run_id)
if record is None or record.thread_id != thread_id:
raise HTTPException(status_code=404, detail=f"Run {run_id} not found")
return _record_to_response(record)
@router.post("/{thread_id}/runs/{run_id}/cancel")
async def cancel_run(
thread_id: str,
run_id: str,
request: Request,
wait: bool = Query(default=False, description="Block until run completes after cancel"),
action: Literal["interrupt", "rollback"] = Query(default="interrupt", description="Cancel action"),
) -> Response:
"""Cancel a running or pending run.
- action=interrupt: Stop execution, keep current checkpoint (can be resumed)
- action=rollback: Stop execution, revert to pre-run checkpoint state
- wait=true: Block until the run fully stops, return 204
- wait=false: Return immediately with 202
"""
run_mgr = get_run_manager(request)
record = run_mgr.get(run_id)
if record is None or record.thread_id != thread_id:
raise HTTPException(status_code=404, detail=f"Run {run_id} not found")
cancelled = await run_mgr.cancel(run_id, action=action)
if not cancelled:
raise HTTPException(
status_code=409,
detail=f"Run {run_id} is not cancellable (status: {record.status.value})",
)
if wait and record.task is not None:
try:
await record.task
except asyncio.CancelledError:
pass
return Response(status_code=204)
return Response(status_code=202)
@router.get("/{thread_id}/runs/{run_id}/join")
async def join_run(thread_id: str, run_id: str, request: Request) -> StreamingResponse:
"""Join an existing run's SSE stream."""
bridge = get_stream_bridge(request)
run_mgr = get_run_manager(request)
record = run_mgr.get(run_id)
if record is None or record.thread_id != thread_id:
raise HTTPException(status_code=404, detail=f"Run {run_id} not found")
return StreamingResponse(
sse_consumer(bridge, record, request, run_mgr),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@router.api_route("/{thread_id}/runs/{run_id}/stream", methods=["GET", "POST"], response_model=None)
async def stream_existing_run(
thread_id: str,
run_id: str,
request: Request,
action: Literal["interrupt", "rollback"] | None = Query(default=None, description="Cancel action"),
wait: int = Query(default=0, description="Block until cancelled (1) or return immediately (0)"),
):
"""Join an existing run's SSE stream (GET), or cancel-then-stream (POST).
The LangGraph SDK's ``joinStream`` and ``useStream`` stop button both use
``POST`` to this endpoint. When ``action=interrupt`` or ``action=rollback``
is present the run is cancelled first; the response then streams any
remaining buffered events so the client observes a clean shutdown.
"""
run_mgr = get_run_manager(request)
record = run_mgr.get(run_id)
if record is None or record.thread_id != thread_id:
raise HTTPException(status_code=404, detail=f"Run {run_id} not found")
# Cancel if an action was requested (stop-button / interrupt flow)
if action is not None:
cancelled = await run_mgr.cancel(run_id, action=action)
if cancelled and wait and record.task is not None:
try:
await record.task
except (asyncio.CancelledError, Exception):
pass
return Response(status_code=204)
bridge = get_stream_bridge(request)
return StreamingResponse(
sse_consumer(bridge, record, request, run_mgr),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)