From e3e00af51dc60660bc714e05c8de2853a092ee95 Mon Sep 17 00:00:00 2001 From: rayhpeng Date: Wed, 22 Apr 2026 11:29:12 +0800 Subject: [PATCH] refactor(harness): update modules to use new runtime imports Update import paths across harness modules: - agents/lead_agent/prompt.py - agents/middlewares/ (memory, thread_data, uploads) - client.py - enhanced with new capabilities - community/aio_sandbox/ - config/app_config.py - remove deprecated configs - sandbox/tools.py - tools/builtins/ (invoke_acp_agent, present_file) - uploads/manager.py Co-Authored-By: Claude Opus 4.5 --- .../deerflow/agents/lead_agent/prompt.py | 2 +- .../agents/middlewares/memory_middleware.py | 2 +- .../middlewares/thread_data_middleware.py | 2 +- .../agents/middlewares/uploads_middleware.py | 2 +- backend/packages/harness/deerflow/client.py | 126 ++++++++++++++++-- .../aio_sandbox/aio_sandbox_provider.py | 2 +- .../harness/deerflow/config/app_config.py | 8 -- .../harness/deerflow/sandbox/tools.py | 2 +- .../tools/builtins/invoke_acp_agent_tool.py | 2 +- .../tools/builtins/present_file_tool.py | 2 +- .../harness/deerflow/uploads/manager.py | 2 +- 11 files changed, 121 insertions(+), 31 deletions(-) diff --git a/backend/packages/harness/deerflow/agents/lead_agent/prompt.py b/backend/packages/harness/deerflow/agents/lead_agent/prompt.py index 8e00e1ea4..480cb393e 100644 --- a/backend/packages/harness/deerflow/agents/lead_agent/prompt.py +++ b/backend/packages/harness/deerflow/agents/lead_agent/prompt.py @@ -519,7 +519,7 @@ def _get_memory_context(agent_name: str | None = None) -> str: try: from deerflow.agents.memory import format_memory_for_injection, get_memory_data from deerflow.config.memory_config import get_memory_config - from deerflow.runtime.user_context import get_effective_user_id + from deerflow.runtime.actor_context import get_effective_user_id config = get_memory_config() if not config.enabled or not config.injection_enabled: diff --git a/backend/packages/harness/deerflow/agents/middlewares/memory_middleware.py b/backend/packages/harness/deerflow/agents/middlewares/memory_middleware.py index 7f239a89e..83163d5d3 100644 --- a/backend/packages/harness/deerflow/agents/middlewares/memory_middleware.py +++ b/backend/packages/harness/deerflow/agents/middlewares/memory_middleware.py @@ -11,7 +11,7 @@ from langgraph.runtime import Runtime from deerflow.agents.memory.queue import get_memory_queue from deerflow.config.memory_config import get_memory_config -from deerflow.runtime.user_context import get_effective_user_id +from deerflow.runtime.actor_context import get_effective_user_id logger = logging.getLogger(__name__) diff --git a/backend/packages/harness/deerflow/agents/middlewares/thread_data_middleware.py b/backend/packages/harness/deerflow/agents/middlewares/thread_data_middleware.py index 8d93de4ff..143a96daf 100644 --- a/backend/packages/harness/deerflow/agents/middlewares/thread_data_middleware.py +++ b/backend/packages/harness/deerflow/agents/middlewares/thread_data_middleware.py @@ -10,7 +10,7 @@ from langgraph.runtime import Runtime from deerflow.agents.thread_state import ThreadDataState from deerflow.config.paths import Paths, get_paths -from deerflow.runtime.user_context import get_effective_user_id +from deerflow.runtime.actor_context import get_effective_user_id logger = logging.getLogger(__name__) diff --git a/backend/packages/harness/deerflow/agents/middlewares/uploads_middleware.py b/backend/packages/harness/deerflow/agents/middlewares/uploads_middleware.py index 4f584c5c3..c1ee8fae2 100644 --- a/backend/packages/harness/deerflow/agents/middlewares/uploads_middleware.py +++ b/backend/packages/harness/deerflow/agents/middlewares/uploads_middleware.py @@ -10,7 +10,7 @@ from langchain_core.messages import HumanMessage from langgraph.runtime import Runtime from deerflow.config.paths import Paths, get_paths -from deerflow.runtime.user_context import get_effective_user_id +from deerflow.runtime.actor_context import get_effective_user_id from deerflow.utils.file_conversion import extract_outline logger = logging.getLogger(__name__) diff --git a/backend/packages/harness/deerflow/client.py b/backend/packages/harness/deerflow/client.py index 7623c8f3e..1347246a1 100644 --- a/backend/packages/harness/deerflow/client.py +++ b/backend/packages/harness/deerflow/client.py @@ -26,6 +26,7 @@ from collections.abc import Generator, Sequence from dataclasses import dataclass, field from pathlib import Path from typing import Any, Literal +from typing_extensions import TypedDict from langchain.agents import create_agent from langchain.agents.middleware import AgentMiddleware @@ -40,7 +41,7 @@ from deerflow.config.app_config import get_app_config, reload_app_config from deerflow.config.extensions_config import ExtensionsConfig, SkillStateConfig, get_extensions_config, reload_extensions_config from deerflow.config.paths import get_paths from deerflow.models import create_chat_model -from deerflow.runtime.user_context import get_effective_user_id +from deerflow.runtime.actor_context import get_effective_user_id from deerflow.skills.installer import install_skill_from_archive from deerflow.uploads.manager import ( claim_unique_filename, @@ -59,6 +60,14 @@ logger = logging.getLogger(__name__) StreamEventType = Literal["values", "messages-tuple", "custom", "end"] +class AgentContext(TypedDict, total=False): + """Typed runtime context passed into LangGraph agent execution.""" + + thread_id: str + agent_name: str + sandbox_id: str + + @dataclass class StreamEvent: """A single event from the streaming agent response. @@ -143,12 +152,17 @@ class DeerFlowClient: """ if config_path is not None: reload_app_config(config_path) + from store.config.app_config import reload_app_config as reload_storage_app_config + + reload_storage_app_config(config_path) self._app_config = get_app_config() if agent_name is not None and not AGENT_NAME_PATTERN.match(agent_name): raise ValueError(f"Invalid agent name '{agent_name}'. Must match pattern: {AGENT_NAME_PATTERN.pattern}") self._checkpointer = checkpointer + self._default_checkpointer = None + self._default_checkpointer_resource = None self._model_name = model_name self._thinking_enabled = thinking_enabled self._subagent_enabled = subagent_enabled @@ -207,6 +221,97 @@ class DeerFlowClient: recursion_limit=overrides.get("recursion_limit", 100), ) + @staticmethod + def _resolve_sqlite_conn_str(connection_string: str) -> str: + """Normalize sqlite connection strings to a filesystem path string.""" + if connection_string == ":memory:": + return connection_string + return str(Path(connection_string).expanduser()) + + @staticmethod + def _ensure_sqlite_parent_dir(connection_string: str) -> None: + """Create parent directory for a sqlite file if needed.""" + if connection_string == ":memory:": + return + Path(connection_string).expanduser().parent.mkdir(parents=True, exist_ok=True) + + def _get_configured_checkpointer(self): + """Build or reuse the client fallback checkpointer from storage config.""" + if self._default_checkpointer is not None: + return self._default_checkpointer + + config = self._resolve_checkpointer_config() + self._default_checkpointer = self._build_configured_checkpointer(config) + return self._default_checkpointer + + @staticmethod + def _resolve_checkpointer_config() -> dict[str, str]: + """Resolve checkpointer backend from the unified storage config entry.""" + from store.config.app_config import get_app_config as get_storage_app_config + + storage = get_storage_app_config().storage + driver = storage.driver + + if driver == "sqlite": + return {"backend": "sqlite", "connection_string": storage.sqlite_storage_path} + if driver == "postgres": + return { + "backend": "postgres", + "connection_string": ( + f"postgresql://{storage.username}:{storage.password}@{storage.host}:{storage.port}/{storage.db_name}" + ), + } + if driver == "mysql": + raise ValueError("DeerFlowClient does not support a MySQL checkpointer") + + raise ValueError(f"Unsupported storage driver for checkpointer: {driver}") + + def _build_configured_checkpointer(self, config: dict[str, str]): + """Build a sync checkpointer using the embedded client's config.""" + backend = config["backend"] + connection_string = config.get("connection_string", "") + + if backend == "memory": + from langgraph.checkpoint.memory import InMemorySaver + + return InMemorySaver() + + if backend == "sqlite": + try: + from langgraph.checkpoint.sqlite import SqliteSaver + except ImportError as exc: + raise ImportError("SQLite checkpointer requires langgraph-checkpoint-sqlite") from exc + if not connection_string: + raise ValueError("connection_string is required for sqlite checkpointer") + conn_str = self._resolve_sqlite_conn_str(connection_string) + self._ensure_sqlite_parent_dir(conn_str) + resource = SqliteSaver.from_conn_string(conn_str) + saver = resource.__enter__() + saver.setup() + self._default_checkpointer_resource = resource + return saver + + if backend == "postgres": + try: + from langgraph.checkpoint.postgres import PostgresSaver + except ImportError as exc: + raise ImportError("Postgres checkpointer requires langgraph-checkpoint-postgres") from exc + if not connection_string: + raise ValueError("connection_string is required for postgres checkpointer") + resource = PostgresSaver.from_conn_string(connection_string) + saver = resource.__enter__() + saver.setup() + self._default_checkpointer_resource = resource + return saver + + raise ValueError(f"Unsupported checkpointer type: {backend}") + + def _get_active_checkpointer(self): + """Return the explicitly injected or config-derived checkpointer.""" + if self._checkpointer is not None: + return self._checkpointer + return self._get_configured_checkpointer() + def _ensure_agent(self, config: RunnableConfig): """Create (or recreate) the agent when config-dependent params change.""" cfg = config.get("configurable", {}) @@ -238,12 +343,9 @@ class DeerFlowClient: available_skills=self._available_skills, ), "state_schema": ThreadState, + "context_schema": AgentContext, } - checkpointer = self._checkpointer - if checkpointer is None: - from deerflow.runtime.checkpointer import get_checkpointer - - checkpointer = get_checkpointer() + checkpointer = self._get_active_checkpointer() if checkpointer is not None: kwargs["checkpointer"] = checkpointer @@ -373,11 +475,9 @@ class DeerFlowClient: Dict with "thread_list" key containing list of thread info dicts, sorted by thread creation time descending. """ - checkpointer = self._checkpointer + checkpointer = self._get_active_checkpointer() if checkpointer is None: - from deerflow.runtime.checkpointer.provider import get_checkpointer - - checkpointer = get_checkpointer() + return {"thread_list": []} thread_info_map = {} @@ -428,11 +528,9 @@ class DeerFlowClient: Returns: Dict containing the thread's full checkpoint history. """ - checkpointer = self._checkpointer + checkpointer = self._get_active_checkpointer() if checkpointer is None: - from deerflow.runtime.checkpointer.provider import get_checkpointer - - checkpointer = get_checkpointer() + return {"thread_id": thread_id, "checkpoints": []} config = {"configurable": {"thread_id": thread_id}} checkpoints = [] diff --git a/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py b/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py index 27a20c701..166d32b70 100644 --- a/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py +++ b/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py @@ -27,7 +27,7 @@ except ImportError: # pragma: no cover - Windows fallback from deerflow.config import get_app_config from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths -from deerflow.runtime.user_context import get_effective_user_id +from deerflow.runtime.actor_context import get_effective_user_id from deerflow.sandbox.sandbox import Sandbox from deerflow.sandbox.sandbox_provider import SandboxProvider diff --git a/backend/packages/harness/deerflow/config/app_config.py b/backend/packages/harness/deerflow/config/app_config.py index bec3d0b8d..18d5b9474 100644 --- a/backend/packages/harness/deerflow/config/app_config.py +++ b/backend/packages/harness/deerflow/config/app_config.py @@ -9,8 +9,6 @@ from dotenv import load_dotenv from pydantic import BaseModel, ConfigDict, Field from deerflow.config.acp_config import load_acp_config_from_dict -from deerflow.config.checkpointer_config import CheckpointerConfig, load_checkpointer_config_from_dict -from deerflow.config.database_config import DatabaseConfig from deerflow.config.extensions_config import ExtensionsConfig from deerflow.config.guardrails_config import GuardrailsConfig, load_guardrails_config_from_dict from deerflow.config.memory_config import MemoryConfig, load_memory_config_from_dict @@ -58,9 +56,7 @@ class AppConfig(BaseModel): subagents: SubagentsAppConfig = Field(default_factory=SubagentsAppConfig, description="Subagent runtime configuration") guardrails: GuardrailsConfig = Field(default_factory=GuardrailsConfig, description="Guardrail middleware configuration") model_config = ConfigDict(extra="allow", frozen=False) - database: DatabaseConfig = Field(default_factory=DatabaseConfig, description="Unified database backend configuration") run_events: RunEventsConfig = Field(default_factory=RunEventsConfig, description="Run event storage configuration") - checkpointer: CheckpointerConfig | None = Field(default=None, description="Checkpointer configuration") stream_bridge: StreamBridgeConfig | None = Field(default=None, description="Stream bridge configuration") @classmethod @@ -133,10 +129,6 @@ class AppConfig(BaseModel): if "guardrails" in config_data: load_guardrails_config_from_dict(config_data["guardrails"]) - # Load checkpointer config if present - if "checkpointer" in config_data: - load_checkpointer_config_from_dict(config_data["checkpointer"]) - # Load stream bridge config if present if "stream_bridge" in config_data: load_stream_bridge_config_from_dict(config_data["stream_bridge"]) diff --git a/backend/packages/harness/deerflow/sandbox/tools.py b/backend/packages/harness/deerflow/sandbox/tools.py index 601a7efb8..9f3c1e2d3 100644 --- a/backend/packages/harness/deerflow/sandbox/tools.py +++ b/backend/packages/harness/deerflow/sandbox/tools.py @@ -200,7 +200,7 @@ def _get_acp_workspace_host_path(thread_id: str | None = None) -> str | None: if thread_id is not None: try: from deerflow.config.paths import get_paths - from deerflow.runtime.user_context import get_effective_user_id + from deerflow.runtime.actor_context import get_effective_user_id host_path = get_paths().acp_workspace_dir(thread_id, user_id=get_effective_user_id()) if host_path.exists(): diff --git a/backend/packages/harness/deerflow/tools/builtins/invoke_acp_agent_tool.py b/backend/packages/harness/deerflow/tools/builtins/invoke_acp_agent_tool.py index 618649020..fc772b109 100644 --- a/backend/packages/harness/deerflow/tools/builtins/invoke_acp_agent_tool.py +++ b/backend/packages/harness/deerflow/tools/builtins/invoke_acp_agent_tool.py @@ -33,7 +33,7 @@ def _get_work_dir(thread_id: str | None) -> str: An absolute physical filesystem path to use as the working directory. """ from deerflow.config.paths import get_paths - from deerflow.runtime.user_context import get_effective_user_id + from deerflow.runtime.actor_context import get_effective_user_id paths = get_paths() if thread_id: diff --git a/backend/packages/harness/deerflow/tools/builtins/present_file_tool.py b/backend/packages/harness/deerflow/tools/builtins/present_file_tool.py index 39cc61c4f..9950b65bf 100644 --- a/backend/packages/harness/deerflow/tools/builtins/present_file_tool.py +++ b/backend/packages/harness/deerflow/tools/builtins/present_file_tool.py @@ -8,7 +8,7 @@ from langgraph.typing import ContextT from deerflow.agents.thread_state import ThreadState from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths -from deerflow.runtime.user_context import get_effective_user_id +from deerflow.runtime.actor_context import get_effective_user_id OUTPUTS_VIRTUAL_PREFIX = f"{VIRTUAL_PATH_PREFIX}/outputs" diff --git a/backend/packages/harness/deerflow/uploads/manager.py b/backend/packages/harness/deerflow/uploads/manager.py index c36151b38..617bf64e2 100644 --- a/backend/packages/harness/deerflow/uploads/manager.py +++ b/backend/packages/harness/deerflow/uploads/manager.py @@ -10,7 +10,7 @@ from pathlib import Path from urllib.parse import quote from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths -from deerflow.runtime.user_context import get_effective_user_id +from deerflow.runtime.actor_context import get_effective_user_id class PathTraversalError(ValueError):