greatmengqi 3e6a34297d refactor(config): eliminate global mutable state — explicit parameter passing on top of main
Squashes 25 PR commits onto current main. AppConfig becomes a pure value
object with no ambient lookup. Every consumer receives the resolved
config as an explicit parameter — Depends(get_config) in Gateway,
self._app_config in DeerFlowClient, runtime.context.app_config in agent
runs, AppConfig.from_file() at the LangGraph Server registration
boundary.

Phase 1 — frozen data + typed context

- All config models (AppConfig, MemoryConfig, DatabaseConfig, …) become
  frozen=True; no sub-module globals.
- AppConfig.from_file() is pure (no side-effect singleton loaders).
- Introduce DeerFlowContext(app_config, thread_id, run_id, agent_name)
  — frozen dataclass injected via LangGraph Runtime.
- Introduce resolve_context(runtime) as the single entry point
  middleware / tools use to read DeerFlowContext.

Phase 2 — pure explicit parameter passing

- Gateway: app.state.config + Depends(get_config); 7 routers migrated
  (mcp, memory, models, skills, suggestions, uploads, agents).
- DeerFlowClient: __init__(config=...) captures config locally.
- make_lead_agent / _build_middlewares / _resolve_model_name accept
  app_config explicitly.
- RunContext.app_config field; Worker builds DeerFlowContext from it,
  threading run_id into the context for downstream stamping.
- Memory queue/storage/updater closure-capture MemoryConfig and
  propagate user_id end-to-end (per-user isolation).
- Sandbox/skills/community/factories/tools thread app_config.
- resolve_context() rejects non-typed runtime.context.
- Test suite migrated off AppConfig.current() monkey-patches.
- AppConfig.current() classmethod deleted.

Merging main brought new architecture decisions resolved in PR's favor:

- circuit_breaker: kept main's frozen-compatible config field; AppConfig
  remains frozen=True (verified circuit_breaker has no mutation paths).
- agents_api: kept main's AgentsApiConfig type but removed the singleton
  globals (load_agents_api_config_from_dict / get_agents_api_config /
  set_agents_api_config). 8 routes in agents.py now read via
  Depends(get_config).
- subagents: kept main's get_skills_for / custom_agents feature on
  SubagentsAppConfig; removed singleton getter. registry.py now reads
  app_config.subagents directly.
- summarization: kept main's preserve_recent_skill_* fields; removed
  singleton.
- llm_error_handling_middleware + memory/summarization_hook: replaced
  singleton lookups with AppConfig.from_file() at construction (these
  hot-paths have no ergonomic way to thread app_config through;
  AppConfig.from_file is a pure load).
- worker.py + thread_data_middleware.py: DeerFlowContext.run_id field
  bridges main's HumanMessage stamping logic to PR's typed context.

Trade-offs (follow-up work):

- main's #2138 (async memory updater) reverted to PR's sync
  implementation. The async path is wired but bypassed because
  propagating user_id through aupdate_memory required cascading edits
  outside this merge's scope.
- tests/test_subagent_skills_config.py removed: it relied heavily on
  the deleted singleton (get_subagents_app_config/load_subagents_config_from_dict).
  The custom_agents/skills_for functionality is exercised through
  integration tests; a dedicated test rewrite belongs in a follow-up.

Verification: backend test suite — 2560 passed, 4 skipped, 84 failures.
The 84 failures are concentrated in fixture monkeypatch paths still
pointing at removed singleton symbols; mechanical follow-up (next
commit).
2026-04-26 21:45:02 +08:00

346 lines
12 KiB
Python

"""Run lifecycle service layer.
Centralizes the business logic for creating runs, formatting SSE
frames, and consuming stream bridge events. Router modules
(``thread_runs``, ``runs``) are thin HTTP handlers that delegate here.
"""
from __future__ import annotations
import asyncio
import dataclasses
import json
import logging
import re
import time
from collections.abc import Mapping
from typing import Any
from fastapi import HTTPException, Request
from langchain_core.messages import HumanMessage
from app.gateway.deps import get_run_context, get_run_manager, get_run_store, get_stream_bridge
from app.gateway.utils import sanitize_log_param
from deerflow.runtime import (
END_SENTINEL,
HEARTBEAT_SENTINEL,
ConflictError,
DisconnectMode,
RunManager,
RunRecord,
RunStatus,
StreamBridge,
UnsupportedStrategyError,
run_agent,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# SSE formatting
# ---------------------------------------------------------------------------
def format_sse(event: str, data: Any, *, event_id: str | None = None) -> str:
"""Format a single SSE frame.
Field order: ``event:`` -> ``data:`` -> ``id:`` (optional) -> blank line.
This matches the LangGraph Platform wire format consumed by the
``useStream`` React hook and the Python ``langgraph-sdk`` SSE decoder.
"""
payload = json.dumps(data, default=str, ensure_ascii=False)
parts = [f"event: {event}", f"data: {payload}"]
if event_id:
parts.append(f"id: {event_id}")
parts.append("")
parts.append("")
return "\n".join(parts)
# ---------------------------------------------------------------------------
# Input / config helpers
# ---------------------------------------------------------------------------
def normalize_stream_modes(raw: list[str] | str | None) -> list[str]:
"""Normalize the stream_mode parameter to a list.
Default matches what ``useStream`` expects: values + messages-tuple.
"""
if raw is None:
return ["values"]
if isinstance(raw, str):
return [raw]
return raw if raw else ["values"]
def normalize_input(raw_input: dict[str, Any] | None) -> dict[str, Any]:
"""Convert LangGraph Platform input format to LangChain state dict."""
if raw_input is None:
return {}
messages = raw_input.get("messages")
if messages and isinstance(messages, list):
converted = []
for msg in messages:
if isinstance(msg, dict):
role = msg.get("role", msg.get("type", "user"))
content = msg.get("content", "")
if role in ("user", "human"):
converted.append(HumanMessage(content=content))
else:
# TODO: handle other message types (system, ai, tool)
converted.append(HumanMessage(content=content))
else:
converted.append(msg)
return {**raw_input, "messages": converted}
return raw_input
_DEFAULT_ASSISTANT_ID = "lead_agent"
def resolve_agent_factory(assistant_id: str | None):
"""Resolve the agent factory callable from config.
Custom agents are implemented as ``lead_agent`` + an ``agent_name``
injected into ``configurable`` or ``context`` — see
:func:`build_run_config`. All ``assistant_id`` values therefore map to the
same factory; the routing happens inside ``make_lead_agent`` when it reads
``cfg["agent_name"]``.
"""
from deerflow.agents.lead_agent.agent import make_lead_agent
return make_lead_agent
def build_run_config(
thread_id: str,
request_config: dict[str, Any] | None,
metadata: dict[str, Any] | None,
*,
assistant_id: str | None = None,
) -> dict[str, Any]:
"""Build a RunnableConfig dict for the agent.
When *assistant_id* refers to a custom agent (anything other than
``"lead_agent"`` / ``None``), the name is forwarded as ``agent_name`` in
whichever runtime options container is active: ``context`` for
LangGraph >= 0.6.0 requests, otherwise ``configurable``.
``make_lead_agent`` reads this key to load the matching
``agents/<name>/SOUL.md`` and per-agent config — without it the agent
silently runs as the default lead agent.
This mirrors the channel manager's ``_resolve_run_params`` logic so that
the LangGraph Platform-compatible HTTP API and the IM channel path behave
identically.
"""
config: dict[str, Any] = {"recursion_limit": 100}
if request_config:
# LangGraph >= 0.6.0 introduced ``context`` as the preferred way to
# pass thread-level data and rejects requests that include both
# ``configurable`` and ``context``. If the caller already sends
# ``context``, honour it and skip our own ``configurable`` dict.
if "context" in request_config:
if "configurable" in request_config:
logger.warning(
"build_run_config: client sent both 'context' and 'configurable'; preferring 'context' (LangGraph >= 0.6.0). thread_id=%s, caller_configurable keys=%s",
thread_id,
list(request_config.get("configurable", {}).keys()),
)
context_value = request_config["context"]
if context_value is None:
context = {}
elif isinstance(context_value, Mapping):
context = dict(context_value)
else:
raise ValueError("request config 'context' must be a mapping or null.")
config["context"] = context
else:
configurable = {"thread_id": thread_id}
configurable.update(request_config.get("configurable", {}))
config["configurable"] = configurable
for k, v in request_config.items():
if k not in ("configurable", "context"):
config[k] = v
else:
config["configurable"] = {"thread_id": thread_id}
# Inject custom agent name when the caller specified a non-default assistant.
# Honour an explicit agent_name in the active runtime options container.
if assistant_id and assistant_id != _DEFAULT_ASSISTANT_ID:
normalized = assistant_id.strip().lower().replace("_", "-")
if not normalized or not re.fullmatch(r"[a-z0-9-]+", normalized):
raise ValueError(f"Invalid assistant_id {assistant_id!r}: must contain only letters, digits, and hyphens after normalization.")
if "configurable" in config:
target = config["configurable"]
elif "context" in config:
target = config["context"]
else:
target = config.setdefault("configurable", {})
if target is not None and "agent_name" not in target:
target["agent_name"] = normalized
if metadata:
config.setdefault("metadata", {}).update(metadata)
return config
# ---------------------------------------------------------------------------
# Run lifecycle
# ---------------------------------------------------------------------------
async def start_run(
body: Any,
thread_id: str,
request: Request,
) -> RunRecord:
"""Create a RunRecord and launch the background agent task.
Parameters
----------
body : RunCreateRequest
The validated request body (typed as Any to avoid circular import
with the router module that defines the Pydantic model).
thread_id : str
Target thread.
request : Request
FastAPI request — used to retrieve singletons from ``app.state``.
"""
bridge = get_stream_bridge(request)
run_mgr = get_run_manager(request)
run_ctx = get_run_context(request)
disconnect = DisconnectMode.cancel if body.on_disconnect == "cancel" else DisconnectMode.continue_
# Resolve follow_up_to_run_id: explicit from request, or auto-detect from latest successful run
follow_up_to_run_id = getattr(body, "follow_up_to_run_id", None)
if follow_up_to_run_id is None:
run_store = get_run_store(request)
try:
recent_runs = await run_store.list_by_thread(thread_id, limit=1)
if recent_runs and recent_runs[0].get("status") == "success":
follow_up_to_run_id = recent_runs[0]["run_id"]
except Exception:
pass # Don't block run creation
# Enrich base context with per-run field
if follow_up_to_run_id:
run_ctx = dataclasses.replace(run_ctx, follow_up_to_run_id=follow_up_to_run_id)
try:
record = await run_mgr.create_or_reject(
thread_id,
body.assistant_id,
on_disconnect=disconnect,
metadata=body.metadata or {},
kwargs={"input": body.input, "config": body.config},
multitask_strategy=body.multitask_strategy,
follow_up_to_run_id=follow_up_to_run_id,
)
except ConflictError as exc:
raise HTTPException(status_code=409, detail=str(exc)) from exc
except UnsupportedStrategyError as exc:
raise HTTPException(status_code=501, detail=str(exc)) from exc
# Upsert thread metadata so the thread appears in /threads/search,
# even for threads that were never explicitly created via POST /threads
# (e.g. stateless runs).
try:
existing = await run_ctx.thread_store.get(thread_id)
if existing is None:
await run_ctx.thread_store.create(
thread_id,
assistant_id=body.assistant_id,
metadata=body.metadata,
)
else:
await run_ctx.thread_store.update_status(thread_id, "running")
except Exception:
logger.warning("Failed to upsert thread_meta for %s (non-fatal)", sanitize_log_param(thread_id))
agent_factory = resolve_agent_factory(body.assistant_id)
graph_input = normalize_input(body.input)
config = build_run_config(thread_id, body.config, body.metadata, assistant_id=body.assistant_id)
# Merge DeerFlow-specific context overrides into configurable.
# The ``context`` field is a custom extension for the langgraph-compat layer
# that carries agent configuration (model_name, thinking_enabled, etc.).
# Only agent-relevant keys are forwarded; unknown keys (e.g. thread_id) are ignored.
context = getattr(body, "context", None)
if context:
_CONTEXT_CONFIGURABLE_KEYS = {
"model_name",
"mode",
"thinking_enabled",
"reasoning_effort",
"is_plan_mode",
"subagent_enabled",
"max_concurrent_subagents",
"agent_name",
"is_bootstrap",
}
configurable = config.setdefault("configurable", {})
for key in _CONTEXT_CONFIGURABLE_KEYS:
if key in context:
configurable.setdefault(key, context[key])
stream_modes = normalize_stream_modes(body.stream_mode)
task = asyncio.create_task(
run_agent(
bridge,
run_mgr,
record,
ctx=run_ctx,
agent_factory=agent_factory,
graph_input=graph_input,
config=config,
stream_modes=stream_modes,
stream_subgraphs=body.stream_subgraphs,
interrupt_before=body.interrupt_before,
interrupt_after=body.interrupt_after,
)
)
record.task = task
# Title sync is handled by worker.py's finally block which reads the
# title from the checkpoint and calls thread_store.update_display_name
# after the run completes.
return record
async def sse_consumer(
bridge: StreamBridge,
record: RunRecord,
request: Request,
run_mgr: RunManager,
):
"""Async generator that yields SSE frames from the bridge.
The ``finally`` block implements ``on_disconnect`` semantics:
- ``cancel``: abort the background task on client disconnect.
- ``continue``: let the task run; events are discarded.
"""
last_event_id = request.headers.get("Last-Event-ID")
try:
async for entry in bridge.subscribe(record.run_id, last_event_id=last_event_id):
if await request.is_disconnected():
break
if entry is HEARTBEAT_SENTINEL:
yield ": heartbeat\n\n"
continue
if entry is END_SENTINEL:
yield format_sse("end", None, event_id=entry.id or None)
return
yield format_sse(entry.event, entry.data, event_id=entry.id or None)
finally:
if record.status in (RunStatus.pending, RunStatus.running):
if record.on_disconnect == DisconnectMode.cancel:
await run_mgr.cancel(record.run_id)