mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-30 13:58:22 +00:00
Squashes 25 PR commits onto current main. AppConfig becomes a pure value object with no ambient lookup. Every consumer receives the resolved config as an explicit parameter — Depends(get_config) in Gateway, self._app_config in DeerFlowClient, runtime.context.app_config in agent runs, AppConfig.from_file() at the LangGraph Server registration boundary. Phase 1 — frozen data + typed context - All config models (AppConfig, MemoryConfig, DatabaseConfig, …) become frozen=True; no sub-module globals. - AppConfig.from_file() is pure (no side-effect singleton loaders). - Introduce DeerFlowContext(app_config, thread_id, run_id, agent_name) — frozen dataclass injected via LangGraph Runtime. - Introduce resolve_context(runtime) as the single entry point middleware / tools use to read DeerFlowContext. Phase 2 — pure explicit parameter passing - Gateway: app.state.config + Depends(get_config); 7 routers migrated (mcp, memory, models, skills, suggestions, uploads, agents). - DeerFlowClient: __init__(config=...) captures config locally. - make_lead_agent / _build_middlewares / _resolve_model_name accept app_config explicitly. - RunContext.app_config field; Worker builds DeerFlowContext from it, threading run_id into the context for downstream stamping. - Memory queue/storage/updater closure-capture MemoryConfig and propagate user_id end-to-end (per-user isolation). - Sandbox/skills/community/factories/tools thread app_config. - resolve_context() rejects non-typed runtime.context. - Test suite migrated off AppConfig.current() monkey-patches. - AppConfig.current() classmethod deleted. Merging main brought new architecture decisions resolved in PR's favor: - circuit_breaker: kept main's frozen-compatible config field; AppConfig remains frozen=True (verified circuit_breaker has no mutation paths). - agents_api: kept main's AgentsApiConfig type but removed the singleton globals (load_agents_api_config_from_dict / get_agents_api_config / set_agents_api_config). 8 routes in agents.py now read via Depends(get_config). - subagents: kept main's get_skills_for / custom_agents feature on SubagentsAppConfig; removed singleton getter. registry.py now reads app_config.subagents directly. - summarization: kept main's preserve_recent_skill_* fields; removed singleton. - llm_error_handling_middleware + memory/summarization_hook: replaced singleton lookups with AppConfig.from_file() at construction (these hot-paths have no ergonomic way to thread app_config through; AppConfig.from_file is a pure load). - worker.py + thread_data_middleware.py: DeerFlowContext.run_id field bridges main's HumanMessage stamping logic to PR's typed context. Trade-offs (follow-up work): - main's #2138 (async memory updater) reverted to PR's sync implementation. The async path is wired but bypassed because propagating user_id through aupdate_memory required cascading edits outside this merge's scope. - tests/test_subagent_skills_config.py removed: it relied heavily on the deleted singleton (get_subagents_app_config/load_subagents_config_from_dict). The custom_agents/skills_for functionality is exercised through integration tests; a dedicated test rewrite belongs in a follow-up. Verification: backend test suite — 2560 passed, 4 skipped, 84 failures. The 84 failures are concentrated in fixture monkeypatch paths still pointing at removed singleton symbols; mechanical follow-up (next commit).
346 lines
12 KiB
Python
346 lines
12 KiB
Python
"""Run lifecycle service layer.
|
|
|
|
Centralizes the business logic for creating runs, formatting SSE
|
|
frames, and consuming stream bridge events. Router modules
|
|
(``thread_runs``, ``runs``) are thin HTTP handlers that delegate here.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import dataclasses
|
|
import json
|
|
import logging
|
|
import re
|
|
import time
|
|
from collections.abc import Mapping
|
|
from typing import Any
|
|
|
|
from fastapi import HTTPException, Request
|
|
from langchain_core.messages import HumanMessage
|
|
|
|
from app.gateway.deps import get_run_context, get_run_manager, get_run_store, get_stream_bridge
|
|
from app.gateway.utils import sanitize_log_param
|
|
from deerflow.runtime import (
|
|
END_SENTINEL,
|
|
HEARTBEAT_SENTINEL,
|
|
ConflictError,
|
|
DisconnectMode,
|
|
RunManager,
|
|
RunRecord,
|
|
RunStatus,
|
|
StreamBridge,
|
|
UnsupportedStrategyError,
|
|
run_agent,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SSE formatting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def format_sse(event: str, data: Any, *, event_id: str | None = None) -> str:
|
|
"""Format a single SSE frame.
|
|
|
|
Field order: ``event:`` -> ``data:`` -> ``id:`` (optional) -> blank line.
|
|
This matches the LangGraph Platform wire format consumed by the
|
|
``useStream`` React hook and the Python ``langgraph-sdk`` SSE decoder.
|
|
"""
|
|
payload = json.dumps(data, default=str, ensure_ascii=False)
|
|
parts = [f"event: {event}", f"data: {payload}"]
|
|
if event_id:
|
|
parts.append(f"id: {event_id}")
|
|
parts.append("")
|
|
parts.append("")
|
|
return "\n".join(parts)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Input / config helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def normalize_stream_modes(raw: list[str] | str | None) -> list[str]:
|
|
"""Normalize the stream_mode parameter to a list.
|
|
|
|
Default matches what ``useStream`` expects: values + messages-tuple.
|
|
"""
|
|
if raw is None:
|
|
return ["values"]
|
|
if isinstance(raw, str):
|
|
return [raw]
|
|
return raw if raw else ["values"]
|
|
|
|
|
|
def normalize_input(raw_input: dict[str, Any] | None) -> dict[str, Any]:
|
|
"""Convert LangGraph Platform input format to LangChain state dict."""
|
|
if raw_input is None:
|
|
return {}
|
|
messages = raw_input.get("messages")
|
|
if messages and isinstance(messages, list):
|
|
converted = []
|
|
for msg in messages:
|
|
if isinstance(msg, dict):
|
|
role = msg.get("role", msg.get("type", "user"))
|
|
content = msg.get("content", "")
|
|
if role in ("user", "human"):
|
|
converted.append(HumanMessage(content=content))
|
|
else:
|
|
# TODO: handle other message types (system, ai, tool)
|
|
converted.append(HumanMessage(content=content))
|
|
else:
|
|
converted.append(msg)
|
|
return {**raw_input, "messages": converted}
|
|
return raw_input
|
|
|
|
|
|
_DEFAULT_ASSISTANT_ID = "lead_agent"
|
|
|
|
|
|
def resolve_agent_factory(assistant_id: str | None):
|
|
"""Resolve the agent factory callable from config.
|
|
|
|
Custom agents are implemented as ``lead_agent`` + an ``agent_name``
|
|
injected into ``configurable`` or ``context`` — see
|
|
:func:`build_run_config`. All ``assistant_id`` values therefore map to the
|
|
same factory; the routing happens inside ``make_lead_agent`` when it reads
|
|
``cfg["agent_name"]``.
|
|
"""
|
|
from deerflow.agents.lead_agent.agent import make_lead_agent
|
|
|
|
return make_lead_agent
|
|
|
|
|
|
def build_run_config(
|
|
thread_id: str,
|
|
request_config: dict[str, Any] | None,
|
|
metadata: dict[str, Any] | None,
|
|
*,
|
|
assistant_id: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Build a RunnableConfig dict for the agent.
|
|
|
|
When *assistant_id* refers to a custom agent (anything other than
|
|
``"lead_agent"`` / ``None``), the name is forwarded as ``agent_name`` in
|
|
whichever runtime options container is active: ``context`` for
|
|
LangGraph >= 0.6.0 requests, otherwise ``configurable``.
|
|
``make_lead_agent`` reads this key to load the matching
|
|
``agents/<name>/SOUL.md`` and per-agent config — without it the agent
|
|
silently runs as the default lead agent.
|
|
|
|
This mirrors the channel manager's ``_resolve_run_params`` logic so that
|
|
the LangGraph Platform-compatible HTTP API and the IM channel path behave
|
|
identically.
|
|
"""
|
|
config: dict[str, Any] = {"recursion_limit": 100}
|
|
if request_config:
|
|
# LangGraph >= 0.6.0 introduced ``context`` as the preferred way to
|
|
# pass thread-level data and rejects requests that include both
|
|
# ``configurable`` and ``context``. If the caller already sends
|
|
# ``context``, honour it and skip our own ``configurable`` dict.
|
|
if "context" in request_config:
|
|
if "configurable" in request_config:
|
|
logger.warning(
|
|
"build_run_config: client sent both 'context' and 'configurable'; preferring 'context' (LangGraph >= 0.6.0). thread_id=%s, caller_configurable keys=%s",
|
|
thread_id,
|
|
list(request_config.get("configurable", {}).keys()),
|
|
)
|
|
context_value = request_config["context"]
|
|
if context_value is None:
|
|
context = {}
|
|
elif isinstance(context_value, Mapping):
|
|
context = dict(context_value)
|
|
else:
|
|
raise ValueError("request config 'context' must be a mapping or null.")
|
|
config["context"] = context
|
|
else:
|
|
configurable = {"thread_id": thread_id}
|
|
configurable.update(request_config.get("configurable", {}))
|
|
config["configurable"] = configurable
|
|
for k, v in request_config.items():
|
|
if k not in ("configurable", "context"):
|
|
config[k] = v
|
|
else:
|
|
config["configurable"] = {"thread_id": thread_id}
|
|
|
|
# Inject custom agent name when the caller specified a non-default assistant.
|
|
# Honour an explicit agent_name in the active runtime options container.
|
|
if assistant_id and assistant_id != _DEFAULT_ASSISTANT_ID:
|
|
normalized = assistant_id.strip().lower().replace("_", "-")
|
|
if not normalized or not re.fullmatch(r"[a-z0-9-]+", normalized):
|
|
raise ValueError(f"Invalid assistant_id {assistant_id!r}: must contain only letters, digits, and hyphens after normalization.")
|
|
if "configurable" in config:
|
|
target = config["configurable"]
|
|
elif "context" in config:
|
|
target = config["context"]
|
|
else:
|
|
target = config.setdefault("configurable", {})
|
|
if target is not None and "agent_name" not in target:
|
|
target["agent_name"] = normalized
|
|
if metadata:
|
|
config.setdefault("metadata", {}).update(metadata)
|
|
return config
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Run lifecycle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def start_run(
|
|
body: Any,
|
|
thread_id: str,
|
|
request: Request,
|
|
) -> RunRecord:
|
|
"""Create a RunRecord and launch the background agent task.
|
|
|
|
Parameters
|
|
----------
|
|
body : RunCreateRequest
|
|
The validated request body (typed as Any to avoid circular import
|
|
with the router module that defines the Pydantic model).
|
|
thread_id : str
|
|
Target thread.
|
|
request : Request
|
|
FastAPI request — used to retrieve singletons from ``app.state``.
|
|
"""
|
|
bridge = get_stream_bridge(request)
|
|
run_mgr = get_run_manager(request)
|
|
run_ctx = get_run_context(request)
|
|
|
|
disconnect = DisconnectMode.cancel if body.on_disconnect == "cancel" else DisconnectMode.continue_
|
|
|
|
# Resolve follow_up_to_run_id: explicit from request, or auto-detect from latest successful run
|
|
follow_up_to_run_id = getattr(body, "follow_up_to_run_id", None)
|
|
if follow_up_to_run_id is None:
|
|
run_store = get_run_store(request)
|
|
try:
|
|
recent_runs = await run_store.list_by_thread(thread_id, limit=1)
|
|
if recent_runs and recent_runs[0].get("status") == "success":
|
|
follow_up_to_run_id = recent_runs[0]["run_id"]
|
|
except Exception:
|
|
pass # Don't block run creation
|
|
|
|
# Enrich base context with per-run field
|
|
if follow_up_to_run_id:
|
|
run_ctx = dataclasses.replace(run_ctx, follow_up_to_run_id=follow_up_to_run_id)
|
|
|
|
try:
|
|
record = await run_mgr.create_or_reject(
|
|
thread_id,
|
|
body.assistant_id,
|
|
on_disconnect=disconnect,
|
|
metadata=body.metadata or {},
|
|
kwargs={"input": body.input, "config": body.config},
|
|
multitask_strategy=body.multitask_strategy,
|
|
follow_up_to_run_id=follow_up_to_run_id,
|
|
)
|
|
except ConflictError as exc:
|
|
raise HTTPException(status_code=409, detail=str(exc)) from exc
|
|
except UnsupportedStrategyError as exc:
|
|
raise HTTPException(status_code=501, detail=str(exc)) from exc
|
|
|
|
# Upsert thread metadata so the thread appears in /threads/search,
|
|
# even for threads that were never explicitly created via POST /threads
|
|
# (e.g. stateless runs).
|
|
try:
|
|
existing = await run_ctx.thread_store.get(thread_id)
|
|
if existing is None:
|
|
await run_ctx.thread_store.create(
|
|
thread_id,
|
|
assistant_id=body.assistant_id,
|
|
metadata=body.metadata,
|
|
)
|
|
else:
|
|
await run_ctx.thread_store.update_status(thread_id, "running")
|
|
except Exception:
|
|
logger.warning("Failed to upsert thread_meta for %s (non-fatal)", sanitize_log_param(thread_id))
|
|
|
|
agent_factory = resolve_agent_factory(body.assistant_id)
|
|
graph_input = normalize_input(body.input)
|
|
config = build_run_config(thread_id, body.config, body.metadata, assistant_id=body.assistant_id)
|
|
|
|
# Merge DeerFlow-specific context overrides into configurable.
|
|
# The ``context`` field is a custom extension for the langgraph-compat layer
|
|
# that carries agent configuration (model_name, thinking_enabled, etc.).
|
|
# Only agent-relevant keys are forwarded; unknown keys (e.g. thread_id) are ignored.
|
|
context = getattr(body, "context", None)
|
|
if context:
|
|
_CONTEXT_CONFIGURABLE_KEYS = {
|
|
"model_name",
|
|
"mode",
|
|
"thinking_enabled",
|
|
"reasoning_effort",
|
|
"is_plan_mode",
|
|
"subagent_enabled",
|
|
"max_concurrent_subagents",
|
|
"agent_name",
|
|
"is_bootstrap",
|
|
}
|
|
configurable = config.setdefault("configurable", {})
|
|
for key in _CONTEXT_CONFIGURABLE_KEYS:
|
|
if key in context:
|
|
configurable.setdefault(key, context[key])
|
|
|
|
stream_modes = normalize_stream_modes(body.stream_mode)
|
|
|
|
task = asyncio.create_task(
|
|
run_agent(
|
|
bridge,
|
|
run_mgr,
|
|
record,
|
|
ctx=run_ctx,
|
|
agent_factory=agent_factory,
|
|
graph_input=graph_input,
|
|
config=config,
|
|
stream_modes=stream_modes,
|
|
stream_subgraphs=body.stream_subgraphs,
|
|
interrupt_before=body.interrupt_before,
|
|
interrupt_after=body.interrupt_after,
|
|
)
|
|
)
|
|
record.task = task
|
|
|
|
# Title sync is handled by worker.py's finally block which reads the
|
|
# title from the checkpoint and calls thread_store.update_display_name
|
|
# after the run completes.
|
|
|
|
return record
|
|
|
|
|
|
async def sse_consumer(
|
|
bridge: StreamBridge,
|
|
record: RunRecord,
|
|
request: Request,
|
|
run_mgr: RunManager,
|
|
):
|
|
"""Async generator that yields SSE frames from the bridge.
|
|
|
|
The ``finally`` block implements ``on_disconnect`` semantics:
|
|
- ``cancel``: abort the background task on client disconnect.
|
|
- ``continue``: let the task run; events are discarded.
|
|
"""
|
|
last_event_id = request.headers.get("Last-Event-ID")
|
|
try:
|
|
async for entry in bridge.subscribe(record.run_id, last_event_id=last_event_id):
|
|
if await request.is_disconnected():
|
|
break
|
|
|
|
if entry is HEARTBEAT_SENTINEL:
|
|
yield ": heartbeat\n\n"
|
|
continue
|
|
|
|
if entry is END_SENTINEL:
|
|
yield format_sse("end", None, event_id=entry.id or None)
|
|
return
|
|
|
|
yield format_sse(entry.event, entry.data, event_id=entry.id or None)
|
|
|
|
finally:
|
|
if record.status in (RunStatus.pending, RunStatus.running):
|
|
if record.on_disconnect == DisconnectMode.cancel:
|
|
await run_mgr.cancel(record.run_id)
|