mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-26 11:48:10 +00:00
* feat(gateway): implement LangGraph Platform API in Gateway, replace langgraph-cli
Implement all core LangGraph Platform API endpoints in the Gateway,
allowing it to fully replace the langgraph-cli dev server for local
development. This eliminates a heavyweight dependency and simplifies
the development stack.
Changes:
- Add runs lifecycle endpoints (create, stream, wait, cancel, join)
- Add threads CRUD and search endpoints
- Add assistants compatibility endpoints (search, get, graph, schemas)
- Add StreamBridge (in-memory pub/sub for SSE) and async provider
- Add RunManager with atomic create_or_reject (eliminates TOCTOU race)
- Add worker with interrupt/rollback cancel actions and runtime context injection
- Route /api/langgraph/* to Gateway in nginx config
- Skip langgraph-cli startup by default (SKIP_LANGGRAPH_SERVER=0 to restore)
- Add unit tests for RunManager, SSE format, and StreamBridge
* fix: drain bridge queue on client disconnect to prevent backpressure
When on_disconnect=continue, keep consuming events from the bridge
without yielding, so the worker is not blocked by a full queue.
Only on_disconnect=cancel breaks out immediately.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix: remove pytest import
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix: Fix default stream_mode to ["values", "messages-tuple"]
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix: Remove unused if_exists field from ThreadCreateRequest
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix: address review comments on gateway LangGraph API
- Mount runs.py router in app.py (missing include_router)
- Normalize interrupt_before/after "*" to node list before run_agent()
- Use entry.id for SSE event ID instead of counter
- Drain bridge queue on disconnect when on_disconnect=continue
- Reuse serialization helper in wait_run() for consistent wire format
- Reject unsupported multitask_strategy with 400
- Remove SKIP_LANGGRAPH_SERVER fallback, always use Gateway
* feat: extract app.state access into deps.py
Encapsulate read/write operations for singleton objects (RunManager,
StreamBridge, checkpointer) held in app.state into a shared utility,
reducing repeated access patterns across router modules.
* feat: extract deerflow.runtime.serialization module with tests
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* refactor: replace duplicated serialization with deerflow.runtime.serialization
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* feat: extract app/gateway/services.py with run lifecycle logic
Create a service layer that centralizes SSE formatting, input/config
normalization, and run lifecycle management. Router modules will delegate
to these functions instead of using private cross-imported helpers.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* refactor: wire routers to use services layer, remove cross-module private imports
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* style: apply ruff formatting to refactored files
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* feat(runtime): support LangGraph dev server and add compat route
- Enable official LangGraph dev server for local development workflow
- Decouple runtime components from agents package for better separation
- Provide gateway-backed fallback route when dev server is skipped
- Simplify lifecycle management using context manager in gateway
* feat(runtime): add Store providers with auto-backend selection
- Add async_provider.py and provider.py under deerflow/runtime/store/
- Support memory, sqlite, postgres backends matching checkpointer config
- Integrate into FastAPI lifespan via AsyncExitStack in deps.py
- Replace hardcoded InMemoryStore with config-driven factory
* refactor(gateway): migrate thread management from checkpointer to Store and resolve multiple endpoint failures
- Add Store-backed CRUD helpers (_store_get, _store_put, _store_upsert)
- Replace checkpoint-scanning search with two-phase strategy:
phase 1 reads Store (O(threads)), phase 2 backfills from checkpointer
for legacy/LangGraph Server threads with lazy migration
- Extend Store record schema with values field for title persistence
- Sync thread title from checkpoint to Store after run completion
- Fix /threads/{id}/runs/{run_id}/stream 405 by accepting both
GET and POST methods; POST handles interrupt/rollback actions
- Fix /threads/{id}/state 500 by separating read_config and
write_config, adding checkpoint_ns to configurable, and
shallow-copying checkpoint/metadata before mutation
- Sync title to Store on state update for immediate search reflection
- Move _upsert_thread_in_store into services.py, remove duplicate logic
- Add _sync_thread_title_after_run: await run task, read final
checkpoint title, write back to Store record
- Spawn title sync as background task from start_run when Store exists
* refactor(runtime): deduplicate store and checkpointer provider logic
Extract _ensure_sqlite_parent_dir() helper into checkpointer/provider.py
and use it in all three places that previously inlined the same mkdir logic.
Consolidate duplicate error constants in store/async_provider.py by importing
from store/provider.py instead of redefining them.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* refactor(runtime): move SQLite helpers to runtime/store, checkpointer imports from store
_resolve_sqlite_conn_str and _ensure_sqlite_parent_dir now live in
runtime/store/provider.py. agents/checkpointer/provider and
agents/checkpointer/async_provider import from there, reversing the
previous dependency direction (store → checkpointer becomes
checkpointer → store).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* refactor(runtime): extract SQLite helpers into runtime/store/_sqlite_utils.py
Move resolve_sqlite_conn_str and ensure_sqlite_parent_dir out of
checkpointer/provider.py into a dedicated _sqlite_utils module.
Functions are now public (no underscore prefix), making cross-module
imports semantically correct. All four provider files import from
the single shared location.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* fix(gateway): use adelete_thread to fully remove thread checkpoints on delete
AsyncSqliteSaver has no adelete method — the previous hasattr check
always evaluated to False, silently leaving all checkpoint rows in the
database. Switch to adelete_thread(thread_id) which deletes every
checkpoint and pending-write row for the thread across all namespaces
(including sub-graph checkpoints).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* fix(gateway): remove dead bridge_cm/ckpt_cm code and fix StrEnum lint
app.py had unreachable code after the async-with lifespan refactor:
bridge_cm and ckpt_cm were referenced but never defined (F821), and
the channel service startup/shutdown was outside the langgraph_runtime
block so it never ran. Move channel service lifecycle inside the
async-with block where it belongs.
Replace str+Enum inheritance in RunStatus and DisconnectMode with
StrEnum as suggested by UP042.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* style: format with ruff
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: JeffJiang <for-eleven@hotmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
297 lines
10 KiB
Python
297 lines
10 KiB
Python
"""Run lifecycle service layer.
|
|
|
|
Centralizes the business logic for creating runs, formatting SSE
|
|
frames, and consuming stream bridge events. Router modules
|
|
(``thread_runs``, ``runs``) are thin HTTP handlers that delegate here.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
from typing import Any
|
|
|
|
from fastapi import HTTPException, Request
|
|
from langchain_core.messages import HumanMessage
|
|
|
|
from app.gateway.deps import get_checkpointer, get_run_manager, get_store, get_stream_bridge
|
|
from deerflow.runtime import (
|
|
END_SENTINEL,
|
|
HEARTBEAT_SENTINEL,
|
|
ConflictError,
|
|
DisconnectMode,
|
|
RunManager,
|
|
RunRecord,
|
|
RunStatus,
|
|
StreamBridge,
|
|
UnsupportedStrategyError,
|
|
run_agent,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SSE formatting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def format_sse(event: str, data: Any, *, event_id: str | None = None) -> str:
|
|
"""Format a single SSE frame.
|
|
|
|
Field order: ``event:`` -> ``data:`` -> ``id:`` (optional) -> blank line.
|
|
This matches the LangGraph Platform wire format consumed by the
|
|
``useStream`` React hook and the Python ``langgraph-sdk`` SSE decoder.
|
|
"""
|
|
payload = json.dumps(data, default=str, ensure_ascii=False)
|
|
parts = [f"event: {event}", f"data: {payload}"]
|
|
if event_id:
|
|
parts.append(f"id: {event_id}")
|
|
parts.append("")
|
|
parts.append("")
|
|
return "\n".join(parts)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Input / config helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def normalize_stream_modes(raw: list[str] | str | None) -> list[str]:
|
|
"""Normalize the stream_mode parameter to a list.
|
|
|
|
Default matches what ``useStream`` expects: values + messages-tuple.
|
|
"""
|
|
if raw is None:
|
|
return ["values"]
|
|
if isinstance(raw, str):
|
|
return [raw]
|
|
return raw if raw else ["values"]
|
|
|
|
|
|
def normalize_input(raw_input: dict[str, Any] | None) -> dict[str, Any]:
|
|
"""Convert LangGraph Platform input format to LangChain state dict."""
|
|
if raw_input is None:
|
|
return {}
|
|
messages = raw_input.get("messages")
|
|
if messages and isinstance(messages, list):
|
|
converted = []
|
|
for msg in messages:
|
|
if isinstance(msg, dict):
|
|
role = msg.get("role", msg.get("type", "user"))
|
|
content = msg.get("content", "")
|
|
if role in ("user", "human"):
|
|
converted.append(HumanMessage(content=content))
|
|
else:
|
|
# TODO: handle other message types (system, ai, tool)
|
|
converted.append(HumanMessage(content=content))
|
|
else:
|
|
converted.append(msg)
|
|
return {**raw_input, "messages": converted}
|
|
return raw_input
|
|
|
|
|
|
def resolve_agent_factory(assistant_id: str | None):
|
|
"""Resolve the agent factory callable from config."""
|
|
from deerflow.agents.lead_agent.agent import make_lead_agent
|
|
|
|
if assistant_id and assistant_id != "lead_agent":
|
|
logger.info("assistant_id=%s requested; falling back to lead_agent", assistant_id)
|
|
return make_lead_agent
|
|
|
|
|
|
def build_run_config(thread_id: str, request_config: dict[str, Any] | None, metadata: dict[str, Any] | None) -> dict[str, Any]:
|
|
"""Build a RunnableConfig dict for the agent."""
|
|
configurable = {"thread_id": thread_id}
|
|
if request_config:
|
|
configurable.update(request_config.get("configurable", {}))
|
|
config: dict[str, Any] = {"configurable": configurable, "recursion_limit": 100}
|
|
if request_config:
|
|
for k, v in request_config.items():
|
|
if k != "configurable":
|
|
config[k] = v
|
|
if metadata:
|
|
config.setdefault("metadata", {}).update(metadata)
|
|
return config
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Run lifecycle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def _upsert_thread_in_store(store, thread_id: str, metadata: dict | None) -> None:
|
|
"""Create or refresh the thread record in the Store.
|
|
|
|
Called from :func:`start_run` so that threads created via the stateless
|
|
``/runs/stream`` endpoint (which never calls ``POST /threads``) still
|
|
appear in ``/threads/search`` results.
|
|
"""
|
|
# Deferred import to avoid circular import with the threads router module.
|
|
from app.gateway.routers.threads import _store_upsert
|
|
|
|
try:
|
|
await _store_upsert(store, thread_id, metadata=metadata)
|
|
except Exception:
|
|
logger.warning("Failed to upsert thread %s in store (non-fatal)", thread_id)
|
|
|
|
|
|
async def _sync_thread_title_after_run(
|
|
run_task: asyncio.Task,
|
|
thread_id: str,
|
|
checkpointer: Any,
|
|
store: Any,
|
|
) -> None:
|
|
"""Wait for *run_task* to finish, then persist the generated title to the Store.
|
|
|
|
TitleMiddleware writes the generated title to the LangGraph agent state
|
|
(checkpointer) but the Gateway's Store record is not updated automatically.
|
|
This coroutine closes that gap by reading the final checkpoint after the
|
|
run completes and syncing ``values.title`` into the Store record so that
|
|
subsequent ``/threads/search`` responses include the correct title.
|
|
|
|
Runs as a fire-and-forget :func:`asyncio.create_task`; failures are
|
|
logged at DEBUG level and never propagate.
|
|
"""
|
|
# Wait for the background run task to complete (any outcome).
|
|
# asyncio.wait does not propagate task exceptions — it just returns
|
|
# when the task is done, cancelled, or failed.
|
|
await asyncio.wait({run_task})
|
|
|
|
# Deferred import to avoid circular import with the threads router module.
|
|
from app.gateway.routers.threads import _store_get, _store_put
|
|
|
|
try:
|
|
ckpt_config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}}
|
|
ckpt_tuple = await checkpointer.aget_tuple(ckpt_config)
|
|
if ckpt_tuple is None:
|
|
return
|
|
|
|
channel_values = ckpt_tuple.checkpoint.get("channel_values", {})
|
|
title = channel_values.get("title")
|
|
if not title:
|
|
return
|
|
|
|
existing = await _store_get(store, thread_id)
|
|
if existing is None:
|
|
return
|
|
|
|
updated = dict(existing)
|
|
updated.setdefault("values", {})["title"] = title
|
|
updated["updated_at"] = time.time()
|
|
await _store_put(store, updated)
|
|
logger.debug("Synced title %r for thread %s", title, thread_id)
|
|
except Exception:
|
|
logger.debug("Failed to sync title for thread %s (non-fatal)", thread_id, exc_info=True)
|
|
|
|
|
|
async def start_run(
|
|
body: Any,
|
|
thread_id: str,
|
|
request: Request,
|
|
) -> RunRecord:
|
|
"""Create a RunRecord and launch the background agent task.
|
|
|
|
Parameters
|
|
----------
|
|
body : RunCreateRequest
|
|
The validated request body (typed as Any to avoid circular import
|
|
with the router module that defines the Pydantic model).
|
|
thread_id : str
|
|
Target thread.
|
|
request : Request
|
|
FastAPI request — used to retrieve singletons from ``app.state``.
|
|
"""
|
|
bridge = get_stream_bridge(request)
|
|
run_mgr = get_run_manager(request)
|
|
checkpointer = get_checkpointer(request)
|
|
store = get_store(request)
|
|
|
|
disconnect = DisconnectMode.cancel if body.on_disconnect == "cancel" else DisconnectMode.continue_
|
|
|
|
try:
|
|
record = await run_mgr.create_or_reject(
|
|
thread_id,
|
|
body.assistant_id,
|
|
on_disconnect=disconnect,
|
|
metadata=body.metadata or {},
|
|
kwargs={"input": body.input, "config": body.config},
|
|
multitask_strategy=body.multitask_strategy,
|
|
)
|
|
except ConflictError as exc:
|
|
raise HTTPException(status_code=409, detail=str(exc)) from exc
|
|
except UnsupportedStrategyError as exc:
|
|
raise HTTPException(status_code=501, detail=str(exc)) from exc
|
|
|
|
# Ensure the thread is visible in /threads/search, even for threads that
|
|
# were never explicitly created via POST /threads (e.g. stateless runs).
|
|
store = get_store(request)
|
|
if store is not None:
|
|
await _upsert_thread_in_store(store, thread_id, body.metadata)
|
|
|
|
agent_factory = resolve_agent_factory(body.assistant_id)
|
|
graph_input = normalize_input(body.input)
|
|
config = build_run_config(thread_id, body.config, body.metadata)
|
|
stream_modes = normalize_stream_modes(body.stream_mode)
|
|
|
|
task = asyncio.create_task(
|
|
run_agent(
|
|
bridge,
|
|
run_mgr,
|
|
record,
|
|
checkpointer=checkpointer,
|
|
store=store,
|
|
agent_factory=agent_factory,
|
|
graph_input=graph_input,
|
|
config=config,
|
|
stream_modes=stream_modes,
|
|
stream_subgraphs=body.stream_subgraphs,
|
|
interrupt_before=body.interrupt_before,
|
|
interrupt_after=body.interrupt_after,
|
|
)
|
|
)
|
|
record.task = task
|
|
|
|
# After the run completes, sync the title generated by TitleMiddleware from
|
|
# the checkpointer into the Store record so that /threads/search returns the
|
|
# correct title instead of an empty values dict.
|
|
if store is not None:
|
|
asyncio.create_task(_sync_thread_title_after_run(task, thread_id, checkpointer, store))
|
|
|
|
return record
|
|
|
|
|
|
async def sse_consumer(
|
|
bridge: StreamBridge,
|
|
record: RunRecord,
|
|
request: Request,
|
|
run_mgr: RunManager,
|
|
):
|
|
"""Async generator that yields SSE frames from the bridge.
|
|
|
|
The ``finally`` block implements ``on_disconnect`` semantics:
|
|
- ``cancel``: abort the background task on client disconnect.
|
|
- ``continue``: let the task run; events are discarded.
|
|
"""
|
|
try:
|
|
async for entry in bridge.subscribe(record.run_id):
|
|
if await request.is_disconnected():
|
|
break
|
|
|
|
if entry is HEARTBEAT_SENTINEL:
|
|
yield ": heartbeat\n\n"
|
|
continue
|
|
|
|
if entry is END_SENTINEL:
|
|
yield format_sse("end", None, event_id=entry.id or None)
|
|
return
|
|
|
|
yield format_sse(entry.event, entry.data, event_id=entry.id or None)
|
|
|
|
finally:
|
|
if record.status in (RunStatus.pending, RunStatus.running):
|
|
if record.on_disconnect == DisconnectMode.cancel:
|
|
await run_mgr.cancel(record.run_id)
|