mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-01 13:20:25 +00:00
* feat(gateway): implement LangGraph Platform API in Gateway, replace langgraph-cli
Implement all core LangGraph Platform API endpoints in the Gateway,
allowing it to fully replace the langgraph-cli dev server for local
development. This eliminates a heavyweight dependency and simplifies
the development stack.
Changes:
- Add runs lifecycle endpoints (create, stream, wait, cancel, join)
- Add threads CRUD and search endpoints
- Add assistants compatibility endpoints (search, get, graph, schemas)
- Add StreamBridge (in-memory pub/sub for SSE) and async provider
- Add RunManager with atomic create_or_reject (eliminates TOCTOU race)
- Add worker with interrupt/rollback cancel actions and runtime context injection
- Route /api/langgraph/* to Gateway in nginx config
- Skip langgraph-cli startup by default (SKIP_LANGGRAPH_SERVER=0 to restore)
- Add unit tests for RunManager, SSE format, and StreamBridge
* fix: drain bridge queue on client disconnect to prevent backpressure
When on_disconnect=continue, keep consuming events from the bridge
without yielding, so the worker is not blocked by a full queue.
Only on_disconnect=cancel breaks out immediately.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix: remove pytest import
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix: Fix default stream_mode to ["values", "messages-tuple"]
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix: Remove unused if_exists field from ThreadCreateRequest
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix: address review comments on gateway LangGraph API
- Mount runs.py router in app.py (missing include_router)
- Normalize interrupt_before/after "*" to node list before run_agent()
- Use entry.id for SSE event ID instead of counter
- Drain bridge queue on disconnect when on_disconnect=continue
- Reuse serialization helper in wait_run() for consistent wire format
- Reject unsupported multitask_strategy with 400
- Remove SKIP_LANGGRAPH_SERVER fallback, always use Gateway
* feat: extract app.state access into deps.py
Encapsulate read/write operations for singleton objects (RunManager,
StreamBridge, checkpointer) held in app.state into a shared utility,
reducing repeated access patterns across router modules.
* feat: extract deerflow.runtime.serialization module with tests
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* refactor: replace duplicated serialization with deerflow.runtime.serialization
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* feat: extract app/gateway/services.py with run lifecycle logic
Create a service layer that centralizes SSE formatting, input/config
normalization, and run lifecycle management. Router modules will delegate
to these functions instead of using private cross-imported helpers.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* refactor: wire routers to use services layer, remove cross-module private imports
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* style: apply ruff formatting to refactored files
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* feat(runtime): support LangGraph dev server and add compat route
- Enable official LangGraph dev server for local development workflow
- Decouple runtime components from agents package for better separation
- Provide gateway-backed fallback route when dev server is skipped
- Simplify lifecycle management using context manager in gateway
* feat(runtime): add Store providers with auto-backend selection
- Add async_provider.py and provider.py under deerflow/runtime/store/
- Support memory, sqlite, postgres backends matching checkpointer config
- Integrate into FastAPI lifespan via AsyncExitStack in deps.py
- Replace hardcoded InMemoryStore with config-driven factory
* refactor(gateway): migrate thread management from checkpointer to Store and resolve multiple endpoint failures
- Add Store-backed CRUD helpers (_store_get, _store_put, _store_upsert)
- Replace checkpoint-scanning search with two-phase strategy:
phase 1 reads Store (O(threads)), phase 2 backfills from checkpointer
for legacy/LangGraph Server threads with lazy migration
- Extend Store record schema with values field for title persistence
- Sync thread title from checkpoint to Store after run completion
- Fix /threads/{id}/runs/{run_id}/stream 405 by accepting both
GET and POST methods; POST handles interrupt/rollback actions
- Fix /threads/{id}/state 500 by separating read_config and
write_config, adding checkpoint_ns to configurable, and
shallow-copying checkpoint/metadata before mutation
- Sync title to Store on state update for immediate search reflection
- Move _upsert_thread_in_store into services.py, remove duplicate logic
- Add _sync_thread_title_after_run: await run task, read final
checkpoint title, write back to Store record
- Spawn title sync as background task from start_run when Store exists
* refactor(runtime): deduplicate store and checkpointer provider logic
Extract _ensure_sqlite_parent_dir() helper into checkpointer/provider.py
and use it in all three places that previously inlined the same mkdir logic.
Consolidate duplicate error constants in store/async_provider.py by importing
from store/provider.py instead of redefining them.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* refactor(runtime): move SQLite helpers to runtime/store, checkpointer imports from store
_resolve_sqlite_conn_str and _ensure_sqlite_parent_dir now live in
runtime/store/provider.py. agents/checkpointer/provider and
agents/checkpointer/async_provider import from there, reversing the
previous dependency direction (store → checkpointer becomes
checkpointer → store).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* refactor(runtime): extract SQLite helpers into runtime/store/_sqlite_utils.py
Move resolve_sqlite_conn_str and ensure_sqlite_parent_dir out of
checkpointer/provider.py into a dedicated _sqlite_utils module.
Functions are now public (no underscore prefix), making cross-module
imports semantically correct. All four provider files import from
the single shared location.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* fix(gateway): use adelete_thread to fully remove thread checkpoints on delete
AsyncSqliteSaver has no adelete method — the previous hasattr check
always evaluated to False, silently leaving all checkpoint rows in the
database. Switch to adelete_thread(thread_id) which deletes every
checkpoint and pending-write row for the thread across all namespaces
(including sub-graph checkpoints).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* fix(gateway): remove dead bridge_cm/ckpt_cm code and fix StrEnum lint
app.py had unreachable code after the async-with lifespan refactor:
bridge_cm and ckpt_cm were referenced but never defined (F821), and
the channel service startup/shutdown was outside the langgraph_runtime
block so it never ran. Move channel service lifecycle inside the
async-with block where it belongs.
Replace str+Enum inheritance in RunStatus and DisconnectMode with
StrEnum as suggested by UP042.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* style: format with ruff
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: JeffJiang <for-eleven@hotmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
340 lines
13 KiB
Python
340 lines
13 KiB
Python
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any, Self
|
|
|
|
import yaml
|
|
from dotenv import load_dotenv
|
|
from pydantic import BaseModel, ConfigDict, Field
|
|
|
|
from deerflow.config.acp_config import load_acp_config_from_dict
|
|
from deerflow.config.checkpointer_config import CheckpointerConfig, load_checkpointer_config_from_dict
|
|
from deerflow.config.extensions_config import ExtensionsConfig
|
|
from deerflow.config.guardrails_config import load_guardrails_config_from_dict
|
|
from deerflow.config.memory_config import load_memory_config_from_dict
|
|
from deerflow.config.model_config import ModelConfig
|
|
from deerflow.config.sandbox_config import SandboxConfig
|
|
from deerflow.config.skills_config import SkillsConfig
|
|
from deerflow.config.stream_bridge_config import StreamBridgeConfig, load_stream_bridge_config_from_dict
|
|
from deerflow.config.subagents_config import load_subagents_config_from_dict
|
|
from deerflow.config.summarization_config import load_summarization_config_from_dict
|
|
from deerflow.config.title_config import load_title_config_from_dict
|
|
from deerflow.config.token_usage_config import TokenUsageConfig
|
|
from deerflow.config.tool_config import ToolConfig, ToolGroupConfig
|
|
from deerflow.config.tool_search_config import ToolSearchConfig, load_tool_search_config_from_dict
|
|
|
|
load_dotenv()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AppConfig(BaseModel):
|
|
"""Config for the DeerFlow application"""
|
|
|
|
log_level: str = Field(default="info", description="Logging level for deerflow modules (debug/info/warning/error)")
|
|
token_usage: TokenUsageConfig = Field(default_factory=TokenUsageConfig, description="Token usage tracking configuration")
|
|
models: list[ModelConfig] = Field(default_factory=list, description="Available models")
|
|
sandbox: SandboxConfig = Field(description="Sandbox configuration")
|
|
tools: list[ToolConfig] = Field(default_factory=list, description="Available tools")
|
|
tool_groups: list[ToolGroupConfig] = Field(default_factory=list, description="Available tool groups")
|
|
skills: SkillsConfig = Field(default_factory=SkillsConfig, description="Skills configuration")
|
|
extensions: ExtensionsConfig = Field(default_factory=ExtensionsConfig, description="Extensions configuration (MCP servers and skills state)")
|
|
tool_search: ToolSearchConfig = Field(default_factory=ToolSearchConfig, description="Tool search / deferred loading configuration")
|
|
model_config = ConfigDict(extra="allow", frozen=False)
|
|
checkpointer: CheckpointerConfig | None = Field(default=None, description="Checkpointer configuration")
|
|
stream_bridge: StreamBridgeConfig | None = Field(default=None, description="Stream bridge configuration")
|
|
|
|
@classmethod
|
|
def resolve_config_path(cls, config_path: str | None = None) -> Path:
|
|
"""Resolve the config file path.
|
|
|
|
Priority:
|
|
1. If provided `config_path` argument, use it.
|
|
2. If provided `DEER_FLOW_CONFIG_PATH` environment variable, use it.
|
|
3. Otherwise, first check the `config.yaml` in the current directory, then fallback to `config.yaml` in the parent directory.
|
|
"""
|
|
if config_path:
|
|
path = Path(config_path)
|
|
if not Path.exists(path):
|
|
raise FileNotFoundError(f"Config file specified by param `config_path` not found at {path}")
|
|
return path
|
|
elif os.getenv("DEER_FLOW_CONFIG_PATH"):
|
|
path = Path(os.getenv("DEER_FLOW_CONFIG_PATH"))
|
|
if not Path.exists(path):
|
|
raise FileNotFoundError(f"Config file specified by environment variable `DEER_FLOW_CONFIG_PATH` not found at {path}")
|
|
return path
|
|
else:
|
|
# Check if the config.yaml is in the current directory
|
|
path = Path(os.getcwd()) / "config.yaml"
|
|
if not path.exists():
|
|
# Check if the config.yaml is in the parent directory of CWD
|
|
path = Path(os.getcwd()).parent / "config.yaml"
|
|
if not path.exists():
|
|
raise FileNotFoundError("`config.yaml` file not found at the current directory nor its parent directory")
|
|
return path
|
|
|
|
@classmethod
|
|
def from_file(cls, config_path: str | None = None) -> Self:
|
|
"""Load config from YAML file.
|
|
|
|
See `resolve_config_path` for more details.
|
|
|
|
Args:
|
|
config_path: Path to the config file.
|
|
|
|
Returns:
|
|
AppConfig: The loaded config.
|
|
"""
|
|
resolved_path = cls.resolve_config_path(config_path)
|
|
with open(resolved_path, encoding="utf-8") as f:
|
|
config_data = yaml.safe_load(f) or {}
|
|
|
|
# Check config version before processing
|
|
cls._check_config_version(config_data, resolved_path)
|
|
|
|
config_data = cls.resolve_env_variables(config_data)
|
|
|
|
# Load title config if present
|
|
if "title" in config_data:
|
|
load_title_config_from_dict(config_data["title"])
|
|
|
|
# Load summarization config if present
|
|
if "summarization" in config_data:
|
|
load_summarization_config_from_dict(config_data["summarization"])
|
|
|
|
# Load memory config if present
|
|
if "memory" in config_data:
|
|
load_memory_config_from_dict(config_data["memory"])
|
|
|
|
# Load subagents config if present
|
|
if "subagents" in config_data:
|
|
load_subagents_config_from_dict(config_data["subagents"])
|
|
|
|
# Load tool_search config if present
|
|
if "tool_search" in config_data:
|
|
load_tool_search_config_from_dict(config_data["tool_search"])
|
|
|
|
# Load guardrails config if present
|
|
if "guardrails" in config_data:
|
|
load_guardrails_config_from_dict(config_data["guardrails"])
|
|
|
|
# Load checkpointer config if present
|
|
if "checkpointer" in config_data:
|
|
load_checkpointer_config_from_dict(config_data["checkpointer"])
|
|
|
|
# Load stream bridge config if present
|
|
if "stream_bridge" in config_data:
|
|
load_stream_bridge_config_from_dict(config_data["stream_bridge"])
|
|
|
|
# Always refresh ACP agent config so removed entries do not linger across reloads.
|
|
load_acp_config_from_dict(config_data.get("acp_agents", {}))
|
|
|
|
# Load extensions config separately (it's in a different file)
|
|
extensions_config = ExtensionsConfig.from_file()
|
|
config_data["extensions"] = extensions_config.model_dump()
|
|
|
|
result = cls.model_validate(config_data)
|
|
return result
|
|
|
|
@classmethod
|
|
def _check_config_version(cls, config_data: dict, config_path: Path) -> None:
|
|
"""Check if the user's config.yaml is outdated compared to config.example.yaml.
|
|
|
|
Emits a warning if the user's config_version is lower than the example's.
|
|
Missing config_version is treated as version 0 (pre-versioning).
|
|
"""
|
|
try:
|
|
user_version = int(config_data.get("config_version", 0))
|
|
except (TypeError, ValueError):
|
|
user_version = 0
|
|
|
|
# Find config.example.yaml by searching config.yaml's directory and its parents
|
|
example_path = None
|
|
search_dir = config_path.parent
|
|
for _ in range(5): # search up to 5 levels
|
|
candidate = search_dir / "config.example.yaml"
|
|
if candidate.exists():
|
|
example_path = candidate
|
|
break
|
|
parent = search_dir.parent
|
|
if parent == search_dir:
|
|
break
|
|
search_dir = parent
|
|
if example_path is None:
|
|
return
|
|
|
|
try:
|
|
with open(example_path, encoding="utf-8") as f:
|
|
example_data = yaml.safe_load(f)
|
|
raw = example_data.get("config_version", 0) if example_data else 0
|
|
try:
|
|
example_version = int(raw)
|
|
except (TypeError, ValueError):
|
|
example_version = 0
|
|
except Exception:
|
|
return
|
|
|
|
if user_version < example_version:
|
|
logger.warning(
|
|
"Your config.yaml (version %d) is outdated — the latest version is %d. Run `make config-upgrade` to merge new fields into your config.",
|
|
user_version,
|
|
example_version,
|
|
)
|
|
|
|
@classmethod
|
|
def resolve_env_variables(cls, config: Any) -> Any:
|
|
"""Recursively resolve environment variables in the config.
|
|
|
|
Environment variables are resolved using the `os.getenv` function. Example: $OPENAI_API_KEY
|
|
|
|
Args:
|
|
config: The config to resolve environment variables in.
|
|
|
|
Returns:
|
|
The config with environment variables resolved.
|
|
"""
|
|
if isinstance(config, str):
|
|
if config.startswith("$"):
|
|
env_value = os.getenv(config[1:])
|
|
if env_value is None:
|
|
raise ValueError(f"Environment variable {config[1:]} not found for config value {config}")
|
|
return env_value
|
|
return config
|
|
elif isinstance(config, dict):
|
|
return {k: cls.resolve_env_variables(v) for k, v in config.items()}
|
|
elif isinstance(config, list):
|
|
return [cls.resolve_env_variables(item) for item in config]
|
|
return config
|
|
|
|
def get_model_config(self, name: str) -> ModelConfig | None:
|
|
"""Get the model config by name.
|
|
|
|
Args:
|
|
name: The name of the model to get the config for.
|
|
|
|
Returns:
|
|
The model config if found, otherwise None.
|
|
"""
|
|
return next((model for model in self.models if model.name == name), None)
|
|
|
|
def get_tool_config(self, name: str) -> ToolConfig | None:
|
|
"""Get the tool config by name.
|
|
|
|
Args:
|
|
name: The name of the tool to get the config for.
|
|
|
|
Returns:
|
|
The tool config if found, otherwise None.
|
|
"""
|
|
return next((tool for tool in self.tools if tool.name == name), None)
|
|
|
|
def get_tool_group_config(self, name: str) -> ToolGroupConfig | None:
|
|
"""Get the tool group config by name.
|
|
|
|
Args:
|
|
name: The name of the tool group to get the config for.
|
|
|
|
Returns:
|
|
The tool group config if found, otherwise None.
|
|
"""
|
|
return next((group for group in self.tool_groups if group.name == name), None)
|
|
|
|
|
|
_app_config: AppConfig | None = None
|
|
_app_config_path: Path | None = None
|
|
_app_config_mtime: float | None = None
|
|
_app_config_is_custom = False
|
|
|
|
|
|
def _get_config_mtime(config_path: Path) -> float | None:
|
|
"""Get the modification time of a config file if it exists."""
|
|
try:
|
|
return config_path.stat().st_mtime
|
|
except OSError:
|
|
return None
|
|
|
|
|
|
def _load_and_cache_app_config(config_path: str | None = None) -> AppConfig:
|
|
"""Load config from disk and refresh cache metadata."""
|
|
global _app_config, _app_config_path, _app_config_mtime, _app_config_is_custom
|
|
|
|
resolved_path = AppConfig.resolve_config_path(config_path)
|
|
_app_config = AppConfig.from_file(str(resolved_path))
|
|
_app_config_path = resolved_path
|
|
_app_config_mtime = _get_config_mtime(resolved_path)
|
|
_app_config_is_custom = False
|
|
return _app_config
|
|
|
|
|
|
def get_app_config() -> AppConfig:
|
|
"""Get the DeerFlow config instance.
|
|
|
|
Returns a cached singleton instance and automatically reloads it when the
|
|
underlying config file path or modification time changes. Use
|
|
`reload_app_config()` to force a reload, or `reset_app_config()` to clear
|
|
the cache.
|
|
"""
|
|
global _app_config, _app_config_path, _app_config_mtime
|
|
|
|
if _app_config is not None and _app_config_is_custom:
|
|
return _app_config
|
|
|
|
resolved_path = AppConfig.resolve_config_path()
|
|
current_mtime = _get_config_mtime(resolved_path)
|
|
|
|
should_reload = _app_config is None or _app_config_path != resolved_path or _app_config_mtime != current_mtime
|
|
if should_reload:
|
|
if _app_config_path == resolved_path and _app_config_mtime is not None and current_mtime is not None and _app_config_mtime != current_mtime:
|
|
logger.info(
|
|
"Config file has been modified (mtime: %s -> %s), reloading AppConfig",
|
|
_app_config_mtime,
|
|
current_mtime,
|
|
)
|
|
_load_and_cache_app_config(str(resolved_path))
|
|
return _app_config
|
|
|
|
|
|
def reload_app_config(config_path: str | None = None) -> AppConfig:
|
|
"""Reload the config from file and update the cached instance.
|
|
|
|
This is useful when the config file has been modified and you want
|
|
to pick up the changes without restarting the application.
|
|
|
|
Args:
|
|
config_path: Optional path to config file. If not provided,
|
|
uses the default resolution strategy.
|
|
|
|
Returns:
|
|
The newly loaded AppConfig instance.
|
|
"""
|
|
return _load_and_cache_app_config(config_path)
|
|
|
|
|
|
def reset_app_config() -> None:
|
|
"""Reset the cached config instance.
|
|
|
|
This clears the singleton cache, causing the next call to
|
|
`get_app_config()` to reload from file. Useful for testing
|
|
or when switching between different configurations.
|
|
"""
|
|
global _app_config, _app_config_path, _app_config_mtime, _app_config_is_custom
|
|
_app_config = None
|
|
_app_config_path = None
|
|
_app_config_mtime = None
|
|
_app_config_is_custom = False
|
|
|
|
|
|
def set_app_config(config: AppConfig) -> None:
|
|
"""Set a custom config instance.
|
|
|
|
This allows injecting a custom or mock config for testing purposes.
|
|
|
|
Args:
|
|
config: The AppConfig instance to use.
|
|
"""
|
|
global _app_config, _app_config_path, _app_config_mtime, _app_config_is_custom
|
|
_app_config = config
|
|
_app_config_path = None
|
|
_app_config_mtime = None
|
|
_app_config_is_custom = True
|