From 3b6dd0a4e32ec5133382573514f9c728f4c99945 Mon Sep 17 00:00:00 2001 From: AochenShen99 Date: Mon, 8 Jun 2026 23:17:22 +0800 Subject: [PATCH] feat(subagents): extend deferred MCP tool loading to subagents (#3432) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(subagents): extend deferred MCP tool loading to subagents (#3341) Subagents now reuse the lead agent's deferred-tool path: when tool_search.enabled, MCP tool schemas are withheld from the model and surfaced by name in , fetched on demand via the generated tool_search helper. DeferredToolFilterMiddleware deterministically rewrites request.tools to hide the deferred schemas (the prompt section is discovery only, not enforcement). Consolidates the assembly into deerflow.tools.builtins.tool_search, now the single home for both assemble_deferred_tools (centralized fail-closed guard, replacing the lead-only private _assemble_deferred) and the relocated get_deferred_tools_prompt_section. Shared by every build path: lead agent, embedded client, and subagent executor. tool_search is appended after the subagent's name-level tool policy and is treated as infrastructure: its catalog is built from the already policy-filtered list, so it can never surface a tool the policy denied. Follow-up to #3370. Fixes #3341. * test(subagents): assert the real middleware builder emits a working deferred filter (#3341) The existing recipe test hand-constructs DeferredToolFilterMiddleware, so it cannot catch a regression in how build_subagent_runtime_middlewares (the call executor._create_agent actually makes) wires the deferred setup into the filter. Add a test that sources the filter from the real builder given a real setup and runs it through a graph: a wrong catalog hash would silently stop promotion, a dropped filter would stop hiding — both now caught. Running the full real middleware stack is intentionally avoided (the other runtime middlewares need sandbox/thread infra to execute, which would make the test flaky); their attachment + ordering before Safety stays locked in test_tool_error_handling_middleware.py. * test(subagents): keep executor tests config-free in CI * chore: trigger ci * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Willem Jiang Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- backend/CLAUDE.md | 1 + .../deerflow/agents/lead_agent/agent.py | 31 +-- .../deerflow/agents/lead_agent/prompt.py | 14 +- .../tool_error_handling_middleware.py | 16 +- backend/packages/harness/deerflow/client.py | 5 +- .../harness/deerflow/subagents/executor.py | 51 +++- .../deerflow/tools/builtins/tool_search.py | 40 +++ .../tests/test_deferred_tool_crosscontext.py | 20 +- ...subagent_deferred_promotion_integration.py | 174 +++++++++++++ backend/tests/test_subagent_executor.py | 238 ++++++++++++++++-- .../test_tool_error_handling_middleware.py | 42 ++++ backend/tests/test_tool_search.py | 2 +- 12 files changed, 551 insertions(+), 83 deletions(-) create mode 100644 backend/tests/test_subagent_deferred_promotion_integration.py diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 8490d8644..903d86e28 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -305,6 +305,7 @@ Proxied through nginx: `/api/langgraph/*` → Gateway LangGraph-compatible runti **Concurrency**: `MAX_CONCURRENT_SUBAGENTS = 3` enforced by `SubagentLimitMiddleware` (truncates excess tool calls in `after_model`), 15-minute timeout **Flow**: `task()` tool → `SubagentExecutor` → background thread → poll 5s → SSE events → result **Events**: `task_started`, `task_running`, `task_completed`/`task_failed`/`task_timed_out` +**Deferred MCP tools** (if `tool_search.enabled`): `SubagentExecutor._build_initial_state` assembles deferral after policy filtering via the shared `assemble_deferred_tools` (fail-closed), appends the `tool_search` tool, injects the `` section into the subagent's `SystemMessage`, and threads the setup to `_create_agent`, which attaches `DeferredToolFilterMiddleware` through `build_subagent_runtime_middlewares(deferred_setup=...)`. Subagents thus withhold full MCP schemas until promotion, same as the lead agent; each task run gets a fresh `ThreadState` so promotion is isolated per run ### Tool System (`packages/harness/deerflow/tools/`) diff --git a/backend/packages/harness/deerflow/agents/lead_agent/agent.py b/backend/packages/harness/deerflow/agents/lead_agent/agent.py index 7aa6c37fd..39110424c 100644 --- a/backend/packages/harness/deerflow/agents/lead_agent/agent.py +++ b/backend/packages/harness/deerflow/agents/lead_agent/agent.py @@ -21,7 +21,6 @@ middleware, and the async path inside ``TitleMiddleware``. Any new in-graph from __future__ import annotations import logging -from typing import TYPE_CHECKING from langchain.agents import create_agent from langchain.agents.middleware import AgentMiddleware @@ -48,11 +47,6 @@ from deerflow.skills.tool_policy import filter_tools_by_skill_allowed_tools from deerflow.skills.types import Skill from deerflow.tracing import build_tracing_callbacks -if TYPE_CHECKING: - from langchain.tools import BaseTool - - from deerflow.tools.builtins.tool_search import DeferredToolSetup - logger = logging.getLogger(__name__) @@ -364,26 +358,6 @@ def _build_middlewares( return middlewares -def _assemble_deferred(filtered_tools: list[BaseTool], *, enabled: bool) -> tuple[list[BaseTool], DeferredToolSetup]: - """Build the final tool list + deferred setup from a policy-filtered list. - - Call AFTER tool-policy filtering so the deferred catalog never exposes a - tool the agent is not allowed to use. Fail-closed: if tool_search is enabled - and MCP tools survived filtering but no deferred set was recovered, raise - rather than silently binding their full schemas to the model. - """ - from deerflow.tools.builtins.tool_search import build_deferred_tool_setup - from deerflow.tools.mcp_metadata import is_mcp_tool - - deferred_setup = build_deferred_tool_setup(filtered_tools, enabled=enabled) - if enabled and not deferred_setup.deferred_names and any(is_mcp_tool(t) for t in filtered_tools): - raise RuntimeError("tool_search enabled and MCP tools survived policy filtering, but no deferred set was recovered — refusing to bind MCP schemas (fail-closed).") - final_tools = list(filtered_tools) - if deferred_setup.tool_search_tool: - final_tools.append(deferred_setup.tool_search_tool) - return final_tools, deferred_setup - - def _available_skill_names(agent_config, is_bootstrap: bool) -> set[str] | None: if is_bootstrap: return {"bootstrap"} @@ -417,6 +391,7 @@ def _make_lead_agent(config: RunnableConfig, *, app_config: AppConfig): # Lazy import to avoid circular dependency from deerflow.tools import get_available_tools from deerflow.tools.builtins import setup_agent, update_agent + from deerflow.tools.builtins.tool_search import assemble_deferred_tools cfg = _get_runtime_config(config) resolved_app_config = app_config @@ -493,7 +468,7 @@ def _make_lead_agent(config: RunnableConfig, *, app_config: AppConfig): # Special bootstrap agent with minimal prompt for initial custom agent creation flow raw_tools = get_available_tools(model_name=model_name, subagent_enabled=subagent_enabled, app_config=resolved_app_config) + [setup_agent] filtered = filter_tools_by_skill_allowed_tools(raw_tools, skills_for_tool_policy) - final_tools, setup = _assemble_deferred(filtered, enabled=resolved_app_config.tool_search.enabled) + final_tools, setup = assemble_deferred_tools(filtered, enabled=resolved_app_config.tool_search.enabled) return create_agent( model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled, app_config=resolved_app_config, attach_tracing=False), tools=final_tools, @@ -514,7 +489,7 @@ def _make_lead_agent(config: RunnableConfig, *, app_config: AppConfig): # Default lead agent (unchanged behavior) raw_tools = get_available_tools(model_name=model_name, groups=agent_config.tool_groups if agent_config else None, subagent_enabled=subagent_enabled, app_config=resolved_app_config) filtered = filter_tools_by_skill_allowed_tools(raw_tools + extra_tools, skills_for_tool_policy) - final_tools, setup = _assemble_deferred(filtered, enabled=resolved_app_config.tool_search.enabled) + final_tools, setup = assemble_deferred_tools(filtered, enabled=resolved_app_config.tool_search.enabled) return create_agent( model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled, reasoning_effort=reasoning_effort, app_config=resolved_app_config, attach_tracing=False), tools=final_tools, diff --git a/backend/packages/harness/deerflow/agents/lead_agent/prompt.py b/backend/packages/harness/deerflow/agents/lead_agent/prompt.py index 2e4d32cec..f7d9fa8c6 100644 --- a/backend/packages/harness/deerflow/agents/lead_agent/prompt.py +++ b/backend/packages/harness/deerflow/agents/lead_agent/prompt.py @@ -10,6 +10,7 @@ from deerflow.config.agents_config import load_agent_soul from deerflow.skills.storage import get_or_new_skill_storage from deerflow.skills.types import Skill, SkillCategory from deerflow.subagents import get_available_subagent_names +from deerflow.tools.builtins.tool_search import get_deferred_tools_prompt_section if TYPE_CHECKING: from deerflow.config.app_config import AppConfig @@ -693,19 +694,6 @@ Rules: """ -def get_deferred_tools_prompt_section(*, deferred_names: frozenset[str] = frozenset()) -> str: - """Generate from an explicit deferred-name set. - - Lists only names so the agent knows what exists and can use tool_search to - load them. Returns empty string when there are no deferred tools. The set is - computed at agent build time (after tool-policy filtering) and passed in. - """ - if not deferred_names: - return "" - names = "\n".join(sorted(deferred_names)) - return f"\n{names}\n" - - def _build_acp_section(*, app_config: AppConfig | None = None) -> str: """Build the ACP agent prompt section, only if ACP agents are configured.""" if app_config is None: diff --git a/backend/packages/harness/deerflow/agents/middlewares/tool_error_handling_middleware.py b/backend/packages/harness/deerflow/agents/middlewares/tool_error_handling_middleware.py index 8012f04ef..1d19e94de 100644 --- a/backend/packages/harness/deerflow/agents/middlewares/tool_error_handling_middleware.py +++ b/backend/packages/harness/deerflow/agents/middlewares/tool_error_handling_middleware.py @@ -2,7 +2,7 @@ import logging from collections.abc import Awaitable, Callable -from typing import override +from typing import TYPE_CHECKING, override from langchain.agents import AgentState from langchain.agents.middleware import AgentMiddleware @@ -17,6 +17,9 @@ from deerflow.subagents.status_contract import ( make_subagent_additional_kwargs, ) +if TYPE_CHECKING: + from deerflow.tools.builtins.tool_search import DeferredToolSetup + logger = logging.getLogger(__name__) _MISSING_TOOL_CALL_ID = "missing_tool_call_id" @@ -199,6 +202,7 @@ def build_subagent_runtime_middlewares( app_config: AppConfig | None = None, model_name: str | None = None, lazy_init: bool = True, + deferred_setup: "DeferredToolSetup | None" = None, ) -> list[AgentMiddleware]: """Middlewares shared by subagent runtime before subagent-only middlewares.""" if app_config is None: @@ -222,6 +226,16 @@ def build_subagent_runtime_middlewares( middlewares.append(ViewImageMiddleware()) + # Hide deferred (MCP) tool schemas from the subagent's model binding until + # tool_search promotes them. This is the same wiring the lead agent gets. The deferred + # set + catalog hash come from the build-time setup (assembled after + # tool-policy filtering); promotion is read from graph state. Empty/None + # setup (deferral disabled or no MCP tool survived) is a pure no-op. + if deferred_setup is not None and deferred_setup.deferred_names: + from deerflow.agents.middlewares.deferred_tool_filter_middleware import DeferredToolFilterMiddleware + + middlewares.append(DeferredToolFilterMiddleware(deferred_setup.deferred_names, deferred_setup.catalog_hash)) + # Same provider safety-termination guard the lead agent uses — subagents # are equally exposed to truncated tool_calls returned with # finish_reason=content_filter (and friends), and the bad call would then diff --git a/backend/packages/harness/deerflow/client.py b/backend/packages/harness/deerflow/client.py index 32f85856d..b9c09a5c6 100644 --- a/backend/packages/harness/deerflow/client.py +++ b/backend/packages/harness/deerflow/client.py @@ -33,7 +33,7 @@ from langchain.agents.middleware import AgentMiddleware from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from langchain_core.runnables import RunnableConfig -from deerflow.agents.lead_agent.agent import _assemble_deferred, _build_middlewares +from deerflow.agents.lead_agent.agent import _build_middlewares from deerflow.agents.lead_agent.prompt import apply_prompt_template from deerflow.agents.thread_state import ThreadState from deerflow.config.agents_config import AGENT_NAME_PATTERN @@ -43,6 +43,7 @@ from deerflow.config.paths import get_paths from deerflow.models import create_chat_model from deerflow.runtime.user_context import get_effective_user_id from deerflow.skills.storage import get_or_new_skill_storage +from deerflow.tools.builtins.tool_search import assemble_deferred_tools from deerflow.tracing import build_tracing_callbacks, inject_langfuse_metadata from deerflow.uploads.manager import ( claim_unique_filename, @@ -238,7 +239,7 @@ class DeerFlowClient: max_concurrent_subagents = cfg.get("max_concurrent_subagents", 3) tools = self._get_tools(model_name=model_name, subagent_enabled=subagent_enabled) - final_tools, deferred_setup = _assemble_deferred(tools, enabled=self._app_config.tool_search.enabled) + final_tools, deferred_setup = assemble_deferred_tools(tools, enabled=self._app_config.tool_search.enabled) kwargs: dict[str, Any] = { # attach_tracing=False because ``stream()`` injects tracing # callbacks at the graph invocation root so a single embedded run diff --git a/backend/packages/harness/deerflow/subagents/executor.py b/backend/packages/harness/deerflow/subagents/executor.py index 8fcbd5e1d..ffc2ae6cb 100644 --- a/backend/packages/harness/deerflow/subagents/executor.py +++ b/backend/packages/harness/deerflow/subagents/executor.py @@ -12,7 +12,7 @@ from contextvars import Context, copy_context from dataclasses import dataclass, field from datetime import datetime from enum import Enum -from typing import Any +from typing import TYPE_CHECKING, Any from langchain.agents import create_agent from langchain.tools import BaseTool @@ -28,6 +28,13 @@ from deerflow.skills.types import Skill from deerflow.subagents.config import SubagentConfig, resolve_subagent_model_name from deerflow.subagents.token_collector import SubagentTokenCollector +if TYPE_CHECKING: + # Imported lazily at runtime inside _build_initial_state: importing + # tool_search eagerly would run tools/builtins/__init__ -> task_tool -> + # `from deerflow.subagents import SubagentExecutor`, which re-enters this + # still-initializing package. Type-only here keeps the annotation precise. + from deerflow.tools.builtins.tool_search import DeferredToolSetup + logger = logging.getLogger(__name__) @@ -319,8 +326,13 @@ class SubagentExecutor: logger.info(f"[trace={self.trace_id}] SubagentExecutor initialized: {config.name} with {len(self.tools)} tools") - def _create_agent(self, tools: list[BaseTool] | None = None): - """Create the agent instance.""" + def _create_agent(self, tools: list[BaseTool] | None = None, *, deferred_setup: "DeferredToolSetup | None" = None): + """Create the agent instance. + + ``deferred_setup`` (assembled in ``_build_initial_state``) carries the + deferred MCP tool names + catalog hash so the subagent gets the same + DeferredToolFilterMiddleware the lead agent has. ``None`` is a no-op. + """ app_config = self.app_config or get_app_config() if self.model_name is None: self.model_name = resolve_subagent_model_name(self.config, self.parent_model, app_config=app_config) @@ -329,7 +341,7 @@ class SubagentExecutor: from deerflow.agents.middlewares.tool_error_handling_middleware import build_subagent_runtime_middlewares # Reuse shared middleware composition with lead agent. - middlewares = build_subagent_runtime_middlewares(app_config=app_config, model_name=self.model_name, lazy_init=True) + middlewares = build_subagent_runtime_middlewares(app_config=app_config, model_name=self.model_name, lazy_init=True, deferred_setup=deferred_setup) # system_prompt is included in initial state messages (see _build_initial_state) # to avoid multiple SystemMessages which some LLM APIs don't support. @@ -403,19 +415,35 @@ class SubagentExecutor: return messages - async def _build_initial_state(self, task: str) -> tuple[dict[str, Any], list[BaseTool]]: + async def _build_initial_state(self, task: str) -> tuple[dict[str, Any], list[BaseTool], "DeferredToolSetup"]: """Build the initial state for agent execution. Args: task: The task description. Returns: - Initial state dictionary and tools filtered by loaded skill metadata. + ``(state, final_tools, deferred_setup)``. ``final_tools`` is the + policy-filtered tool list with the ``tool_search`` tool appended when + deferral applies; ``deferred_setup`` is consumed by ``_create_agent`` + so the agent build and the injected ```` + section share one catalog/hash. """ + # Lazy import: see the TYPE_CHECKING note at the top of this module - + # importing tool_search runs tools/builtins/__init__, which would + # re-enter this package during its own initialization. + from deerflow.tools.builtins.tool_search import assemble_deferred_tools, get_deferred_tools_prompt_section # Load skills as conversation items (Codex pattern) skills = await self._load_skills() filtered_tools = self._apply_skill_allowed_tools(skills) + # Assemble deferred tool_search AFTER policy filtering (fail-closed), + # mirroring the lead path so subagents stop binding full MCP schemas. + # The generated tool_search helper is intentionally not subject to the + # subagent's name-level allow/deny (config.tools / disallowed_tools): + # its catalog is built from the already-filtered list, so it can never + # surface a tool the policy denied. This matches the lead agent. + enabled = (self.app_config or get_app_config()).tool_search.enabled + final_tools, deferred_setup = assemble_deferred_tools(filtered_tools, enabled=enabled) skill_messages = await self._load_skill_messages(skills) # Combine system_prompt and skills into a single SystemMessage. @@ -426,6 +454,11 @@ class SubagentExecutor: system_parts.append(self.config.system_prompt) for skill_msg in skill_messages: system_parts.append(skill_msg.content) + # Name the deferred MCP tools in the prompt; their schemas stay withheld + # until tool_search promotes them. Empty set -> "" -> appends nothing. + deferred_section = get_deferred_tools_prompt_section(deferred_names=deferred_setup.deferred_names) + if deferred_section: + system_parts.append(deferred_section) messages: list[Any] = [] if system_parts: @@ -444,7 +477,7 @@ class SubagentExecutor: if self.thread_data is not None: state["thread_data"] = self.thread_data - return state, filtered_tools + return state, final_tools, deferred_setup async def _aexecute(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult: """Execute a task asynchronously. @@ -475,8 +508,8 @@ class SubagentExecutor: collector: SubagentTokenCollector | None = None try: - state, filtered_tools = await self._build_initial_state(task) - agent = self._create_agent(filtered_tools) + state, final_tools, deferred_setup = await self._build_initial_state(task) + agent = self._create_agent(final_tools, deferred_setup=deferred_setup) # Token collector for subagent LLM calls collector_caller = f"subagent:{self.config.name}" diff --git a/backend/packages/harness/deerflow/tools/builtins/tool_search.py b/backend/packages/harness/deerflow/tools/builtins/tool_search.py index f42a32852..c24311510 100644 --- a/backend/packages/harness/deerflow/tools/builtins/tool_search.py +++ b/backend/packages/harness/deerflow/tools/builtins/tool_search.py @@ -179,3 +179,43 @@ def build_deferred_tool_setup(filtered_tools: list[BaseTool], *, enabled: bool) return DeferredToolSetup(None, frozenset(), None) catalog = DeferredToolCatalog(tuple(deferred)) return DeferredToolSetup(build_tool_search_tool(catalog), catalog.names, catalog.hash) + + +def assemble_deferred_tools(filtered_tools: list[BaseTool], *, enabled: bool) -> tuple[list[BaseTool], DeferredToolSetup]: + """Build the final tool list + deferred setup from a POLICY-FILTERED list. + + Call AFTER tool-policy filtering so the deferred catalog never exposes a tool + the agent is not allowed to use. Fail-closed: if tool_search is enabled and + MCP tools survived filtering but no deferred set was recovered, raise rather + than silently binding their full schemas to the model. + + Shared by every agent-build path (lead, embedded client, subagent) so they + all get the same fail-closed guarantee from one place. + """ + deferred_setup = build_deferred_tool_setup(filtered_tools, enabled=enabled) + if enabled and not deferred_setup.deferred_names and any(is_mcp_tool(t) for t in filtered_tools): + raise RuntimeError("tool_search enabled and MCP tools survived policy filtering, but no deferred set was recovered - refusing to bind MCP schemas (fail-closed).") + final_tools = list(filtered_tools) + if deferred_setup.tool_search_tool: + final_tools.append(deferred_setup.tool_search_tool) + return final_tools, deferred_setup + + +# Prompt rendering + + +def get_deferred_tools_prompt_section(*, deferred_names: frozenset[str] = frozenset()) -> str: + """Generate from an explicit deferred-name set. + + Lists only names so the agent knows what exists and can use tool_search to + load them. Returns empty string when there are no deferred tools. The set is + computed at agent build time (after tool-policy filtering) and passed in. + + Lives here, next to the assembly that produces ``deferred_names``, so every + agent-build path (lead, embedded client, subagent) renders the section the + same way without coupling back to ``lead_agent.prompt``. + """ + if not deferred_names: + return "" + names = "\n".join(sorted(deferred_names)) + return f"\n{names}\n" diff --git a/backend/tests/test_deferred_tool_crosscontext.py b/backend/tests/test_deferred_tool_crosscontext.py index 6a0479bf2..0933f1365 100644 --- a/backend/tests/test_deferred_tool_crosscontext.py +++ b/backend/tests/test_deferred_tool_crosscontext.py @@ -22,7 +22,7 @@ from langchain_core.tools import tool as as_tool from deerflow.agents.middlewares.deferred_tool_filter_middleware import DeferredToolFilterMiddleware from deerflow.skills.tool_policy import filter_tools_by_skill_allowed_tools from deerflow.skills.types import Skill -from deerflow.tools.builtins.tool_search import DeferredToolSetup, build_deferred_tool_setup +from deerflow.tools.builtins.tool_search import DeferredToolSetup, assemble_deferred_tools, build_deferred_tool_setup from deerflow.tools.mcp_metadata import tag_mcp_tool @@ -93,17 +93,15 @@ def test_policy_excluded_mcp_tool_not_in_catalog(): def test_fail_closed_when_mcp_survives_without_setup(monkeypatch): """Finding 2: simulate a wiring regression and assert it fails loudly. - ``_assemble_deferred`` lazy-imports ``build_deferred_tool_setup`` from the - source module, so patch it there (not on the agent module). + ``assemble_deferred_tools`` references ``build_deferred_tool_setup`` as a + module global, so patch it in ``tool_search`` (its home module). """ - from deerflow.agents.lead_agent import agent as agentmod - monkeypatch.setattr( "deerflow.tools.builtins.tool_search.build_deferred_tool_setup", lambda tools, *, enabled: DeferredToolSetup(None, frozenset(), None), ) with pytest.raises(RuntimeError, match="fail-closed"): - agentmod._assemble_deferred([tag_mcp_tool(mcp_secret)], enabled=True) + assemble_deferred_tools([tag_mcp_tool(mcp_secret)], enabled=True) def test_subagent_reentry_does_not_touch_lead_state(): @@ -146,12 +144,10 @@ def _make_skill(allowed_tools): def test_policy_denied_mcp_yields_no_tool_search_end_to_end(): """An allowlist that denies the MCP tool gates it end-to-end: after the real - policy filter no MCP tool survives, so ``_assemble_deferred`` adds no + policy filter no MCP tool survives, so ``assemble_deferred_tools`` adds no tool_search (and does not fail-closed, because no MCP tool leaked through).""" - from deerflow.agents.lead_agent import agent as agentmod - filtered = filter_tools_by_skill_allowed_tools([active_tool, tag_mcp_tool(mcp_secret)], [_make_skill(["active_tool"])]) - final_tools, setup = agentmod._assemble_deferred(filtered, enabled=True) + final_tools, setup = assemble_deferred_tools(filtered, enabled=True) assert [t.name for t in final_tools] == ["active_tool"] assert "tool_search" not in {t.name for t in final_tools} @@ -167,11 +163,9 @@ def test_tool_search_appended_after_policy_but_never_exposes_denied_tool(): is derived from the already policy-filtered list — so it can never expose a tool the allowlist denied. Locks that contract so the ordering cannot regress. """ - from deerflow.agents.lead_agent import agent as agentmod - allowed = ["active_tool", "mcp_secret"] # permits the MCP tool, does NOT list tool_search filtered = filter_tools_by_skill_allowed_tools([active_tool, tag_mcp_tool(mcp_secret)], [_make_skill(allowed)]) - final_tools, setup = agentmod._assemble_deferred(filtered, enabled=True) + final_tools, setup = assemble_deferred_tools(filtered, enabled=True) names = {t.name for t in final_tools} assert "tool_search" in names # appended despite not being in the allowlist diff --git a/backend/tests/test_subagent_deferred_promotion_integration.py b/backend/tests/test_subagent_deferred_promotion_integration.py new file mode 100644 index 000000000..d4a56f91d --- /dev/null +++ b/backend/tests/test_subagent_deferred_promotion_integration.py @@ -0,0 +1,174 @@ +"""End-to-end: the subagent deferral recipe hides then promotes an MCP tool (#3341). + +#3272 wired deferred MCP loading into the lead agent only. #3341 extends it to +subagents. This locks the *subagent build recipe* - the shared helpers the +executor now calls (``assemble_deferred_tools`` + ``get_deferred_tools_prompt_section``) +plus the ``DeferredToolFilterMiddleware`` that ``build_subagent_runtime_middlewares`` +attaches - composing into the same hide/promote loop the lead has, under the +subagent's build shape (``system_prompt=None`` + a single ``SystemMessage``). + +The hide/promote mechanics themselves are also covered for the lead path by +tests/test_deferred_promotion_integration.py; this asserts the subagent recipe +produces an equivalent loop without binding MCP schemas before promotion. + +A second test (``test_subagent_builder_emits_working_deferred_filter``) closes the +remaining seam: it sources the filter from the *real* ``build_subagent_runtime_middlewares`` +(the exact call ``executor._create_agent`` makes) rather than hand-constructing it, so a +regression in how the builder wires the setup into the filter - wrong catalog hash, +dropped filter, wrong deferred set - is caught at runtime. (Running the full real stack +is intentionally avoided: the other runtime middlewares need sandbox/thread infra to +execute, which would make the test flaky; their attachment + ordering is locked in +tests/test_tool_error_handling_middleware.py instead.) +""" + +import asyncio + +from langchain.agents import create_agent +from langchain_core.language_models.fake_chat_models import GenericFakeChatModel +from langchain_core.messages import AIMessage, HumanMessage, SystemMessage +from langchain_core.tools import tool as as_tool + +from deerflow.agents.middlewares.deferred_tool_filter_middleware import DeferredToolFilterMiddleware +from deerflow.agents.thread_state import ThreadState +from deerflow.tools.builtins.tool_search import assemble_deferred_tools, get_deferred_tools_prompt_section +from deerflow.tools.mcp_metadata import tag_mcp_tool + + +@as_tool +def active_tool(x: str) -> str: + "An always-active tool." + return x + + +@as_tool +def mcp_calc(expression: str) -> str: + "Evaluate arithmetic." + return expression + + +@as_tool +def mcp_other(x: str) -> str: + "Another deferred MCP tool." + return x + + +def test_subagent_deferral_recipe_hides_then_promotes(): + bound: list[list[str]] = [] + + class RecordingModel(GenericFakeChatModel): + def bind_tools(self, tools, **kwargs): + bound.append([getattr(t, "name", None) for t in tools]) + return self + + # The subagent build path (executor._build_initial_state): policy-filtered + # tools -> assemble_deferred_tools appends tool_search, fail-closed. + filtered = [active_tool, tag_mcp_tool(mcp_calc), tag_mcp_tool(mcp_other)] + final_tools, setup = assemble_deferred_tools(filtered, enabled=True) + assert "tool_search" in [t.name for t in final_tools] + assert setup.deferred_names == frozenset({"mcp_calc", "mcp_other"}) + + # The subagent injects the section into its single SystemMessage. + section = get_deferred_tools_prompt_section(deferred_names=setup.deferred_names) + assert "" in section + assert "mcp_calc" in section and "mcp_other" in section + + turn1 = AIMessage(content="", tool_calls=[{"name": "tool_search", "args": {"query": "select:mcp_calc"}, "id": "c1", "type": "tool_call"}]) + turn2 = AIMessage(content="done") + model = RecordingModel(messages=iter([turn1, turn2])) + + # The middleware DeferredToolFilterMiddleware is exactly what + # build_subagent_runtime_middlewares attaches for this setup (locked by + # tests/test_tool_error_handling_middleware.py); the subagent build passes + # system_prompt=None with state_schema=ThreadState. + graph = create_agent( + model=model, + tools=final_tools, + middleware=[DeferredToolFilterMiddleware(setup.deferred_names, setup.catalog_hash)], + system_prompt=None, + state_schema=ThreadState, + ) + + result = asyncio.run(graph.ainvoke({"messages": [SystemMessage(content=section), HumanMessage(content="use the deferred calculator")]})) + + assert len(bound) >= 2, f"expected >=2 model binds, got {bound}" + # Turn 1: both deferred MCP tools hidden from the subagent's model binding. + assert "mcp_calc" not in bound[0] and "mcp_other" not in bound[0] + # Turn 2: the searched tool is promoted; the un-searched one stays hidden. + assert "mcp_calc" in bound[1] + assert "mcp_other" not in bound[1] + # Promotion recorded in graph state, scoped by catalog hash. + assert result["promoted"] == {"catalog_hash": setup.catalog_hash, "names": ["mcp_calc"]} + + +def test_subagent_builder_emits_working_deferred_filter(): + """The real build path the executor calls - ``build_subagent_runtime_middlewares`` - + must emit a ``DeferredToolFilterMiddleware`` that actually hides/promotes through a + graph. The recipe test above hand-builds the filter; this sources it from the real + builder given a real setup, so a regression in the builder's wiring is caught: a + wrong catalog hash silently stops promotion (turn 2 would keep mcp_calc hidden), a + dropped filter stops hiding (turn 1 would bind mcp_calc).""" + from deerflow.agents.middlewares.tool_error_handling_middleware import build_subagent_runtime_middlewares + from deerflow.config.app_config import AppConfig, CircuitBreakerConfig + from deerflow.config.guardrails_config import GuardrailsConfig + from deerflow.config.model_config import ModelConfig + from deerflow.config.sandbox_config import SandboxConfig + + bound: list[list[str]] = [] + + class RecordingModel(GenericFakeChatModel): + def bind_tools(self, tools, **kwargs): + bound.append([getattr(t, "name", None) for t in tools]) + return self + + filtered = [active_tool, tag_mcp_tool(mcp_calc), tag_mcp_tool(mcp_other)] + final_tools, setup = assemble_deferred_tools(filtered, enabled=True) + section = get_deferred_tools_prompt_section(deferred_names=setup.deferred_names) + + app_config = AppConfig( + models=[ + ModelConfig( + name="test-model", + display_name="test-model", + description=None, + use="langchain_openai:ChatOpenAI", + model="test-model", + supports_vision=False, + ) + ], + sandbox=SandboxConfig(use="test"), + guardrails=GuardrailsConfig(enabled=False), + circuit_breaker=CircuitBreakerConfig(failure_threshold=7, recovery_timeout_sec=11), + ) + + # The exact call executor._create_agent makes. Pull the filter the builder + # produced (not a hand-rolled one) so its wiring - deferred set + catalog hash - + # is what's under test. + middlewares = build_subagent_runtime_middlewares(app_config=app_config, model_name="test-model", deferred_setup=setup) + deferred_filters = [m for m in middlewares if isinstance(m, DeferredToolFilterMiddleware)] + assert len(deferred_filters) == 1, f"builder must emit exactly one deferred filter, got {[type(m).__name__ for m in middlewares]}" + + turn1 = AIMessage(content="", tool_calls=[{"name": "tool_search", "args": {"query": "select:mcp_calc"}, "id": "c1", "type": "tool_call"}]) + turn2 = AIMessage(content="done") + model = RecordingModel(messages=iter([turn1, turn2])) + + # Run only the builder-produced filter (the component under test). The other + # runtime middlewares need sandbox/thread infra to *execute*, so running the + # full stack here would be flaky; their attachment + ordering before Safety is + # locked in tests/test_tool_error_handling_middleware.py. + graph = create_agent( + model=model, + tools=final_tools, + middleware=deferred_filters, + system_prompt=None, + state_schema=ThreadState, + ) + result = asyncio.run(graph.ainvoke({"messages": [SystemMessage(content=section), HumanMessage(content="use the deferred calculator")]})) + + assert len(bound) >= 2, f"expected >=2 model binds, got {bound}" + # Turn 1: both deferred MCP tools hidden - the builder-produced filter is active. + assert "mcp_calc" not in bound[0] and "mcp_other" not in bound[0] + # Turn 2: the searched tool is promoted - proves the builder wired the catalog + # hash correctly (a wrong hash would leave mcp_calc hidden here). + assert "mcp_calc" in bound[1] + assert "mcp_other" not in bound[1] + assert result["promoted"] == {"catalog_hash": setup.catalog_hash, "names": ["mcp_calc"]} diff --git a/backend/tests/test_subagent_executor.py b/backend/tests/test_subagent_executor.py index 8987958a8..77918bc98 100644 --- a/backend/tests/test_subagent_executor.py +++ b/backend/tests/test_subagent_executor.py @@ -14,6 +14,7 @@ the real implementation in isolation. """ import asyncio +import importlib import sys import threading from datetime import datetime @@ -39,6 +40,21 @@ _MOCKED_MODULE_NAMES = [ ] +def _default_app_config(): + return SimpleNamespace(tool_search=SimpleNamespace(enabled=False)) + + +def _patch_default_get_app_config(executor_module): + executor_module.get_app_config = _default_app_config + return executor_module + + +def _clear_stale_executor_package_attr() -> None: + subagents_pkg = sys.modules.get("deerflow.subagents") + if subagents_pkg is not None and hasattr(subagents_pkg, "executor"): + delattr(subagents_pkg, "executor") + + @pytest.fixture(autouse=True) def _setup_executor_classes(): """Set up mocked modules and import real executor classes. @@ -53,6 +69,7 @@ def _setup_executor_classes(): # Remove mocked executor if exists (from conftest.py) if "deerflow.subagents.executor" in sys.modules: del sys.modules["deerflow.subagents.executor"] + _clear_stale_executor_package_attr() # Set up mocks for name in _MOCKED_MODULE_NAMES: @@ -71,6 +88,14 @@ def _setup_executor_classes(): SubagentStatus, ) + executor_module = sys.modules["deerflow.subagents.executor"] + + # Most tests in this module patch _create_agent and exercise executor + # control flow only. Keep those tests hermetic: CI checkouts do not include + # the gitignored config.yaml, and deferral-specific tests override this + # default explicitly. + _patch_default_get_app_config(executor_module) + # Store classes in a dict to yield classes = { "AIMessage": AIMessage, @@ -287,6 +312,7 @@ class TestAgentConstruction: "app_config": app_config, "model_name": "parent-model", "lazy_init": True, + "deferred_setup": None, } assert captured["agent"]["model"] is model assert captured["agent"]["middleware"] is middlewares @@ -359,7 +385,7 @@ class TestAgentConstruction: thread_id="test-thread", ) - state, _filtered_tools = await executor._build_initial_state("Do the task") + state, _final_tools, _deferred_setup = await executor._build_initial_state("Do the task") messages = state["messages"] # Should have exactly 2 messages: one combined SystemMessage + one HumanMessage @@ -397,7 +423,7 @@ class TestAgentConstruction: thread_id="test-thread", ) - state, _filtered_tools = await executor._build_initial_state("Do the task") + state, _final_tools, _deferred_setup = await executor._build_initial_state("Do the task") messages = state["messages"] from langchain_core.messages import HumanMessage, SystemMessage @@ -439,7 +465,7 @@ class TestAgentConstruction: SubagentExecutor = classes["SubagentExecutor"] executor = SubagentExecutor(config=config, tools=[], thread_id="test-thread") - state, _filtered_tools = await executor._build_initial_state("Do the task") + state, _final_tools, _deferred_setup = await executor._build_initial_state("Do the task") messages = state["messages"] from langchain_core.messages import HumanMessage, SystemMessage @@ -449,6 +475,192 @@ class TestAgentConstruction: assert "Skill content" in messages[0].content assert isinstance(messages[1], HumanMessage) + @pytest.mark.anyio + async def test_build_initial_state_defers_mcp_tools_when_tool_search_enabled( + self, + classes, + base_config, + monkeypatch: pytest.MonkeyPatch, + ): + """tool_search enabled + a surviving MCP tool: _build_initial_state appends + the tool_search tool, withholds the MCP schema, and injects the + section into the SystemMessage.""" + from langchain_core.tools import tool as as_tool + + from deerflow.subagents import executor as executor_module + from deerflow.tools.mcp_metadata import tag_mcp_tool + + SubagentExecutor = classes["SubagentExecutor"] + + monkeypatch.setattr( + sys.modules["deerflow.skills.storage"], + "get_or_new_skill_storage", + lambda *, app_config=None: SimpleNamespace(load_skills=lambda *, enabled_only: []), + ) + monkeypatch.setattr(executor_module, "get_app_config", lambda: SimpleNamespace(tool_search=SimpleNamespace(enabled=True))) + + @as_tool + def mcp_calc(expression: str) -> str: + "Evaluate arithmetic." + return expression + + executor = SubagentExecutor(config=base_config, tools=[tag_mcp_tool(mcp_calc)], thread_id="test-thread") + + state, final_tools, deferred_setup = await executor._build_initial_state("Do the task") + + assert "tool_search" in [t.name for t in final_tools] + assert deferred_setup.deferred_names == frozenset({"mcp_calc"}) + + system_message = state["messages"][0] + assert "" in system_message.content + assert "mcp_calc" in system_message.content + # The base system_prompt is still present alongside the injected section. + assert base_config.system_prompt in system_message.content + + @pytest.mark.anyio + async def test_build_initial_state_no_deferral_when_tool_search_disabled( + self, + classes, + base_config, + monkeypatch: pytest.MonkeyPatch, + ): + """tool_search disabled: no tool_search tool, no section - pure no-op even + with an MCP-tagged tool present.""" + from langchain_core.tools import tool as as_tool + + from deerflow.subagents import executor as executor_module + from deerflow.tools.mcp_metadata import tag_mcp_tool + + SubagentExecutor = classes["SubagentExecutor"] + + monkeypatch.setattr( + sys.modules["deerflow.skills.storage"], + "get_or_new_skill_storage", + lambda *, app_config=None: SimpleNamespace(load_skills=lambda *, enabled_only: []), + ) + monkeypatch.setattr(executor_module, "get_app_config", lambda: SimpleNamespace(tool_search=SimpleNamespace(enabled=False))) + + @as_tool + def mcp_calc(expression: str) -> str: + "Evaluate arithmetic." + return expression + + executor = SubagentExecutor(config=base_config, tools=[tag_mcp_tool(mcp_calc)], thread_id="test-thread") + + state, final_tools, deferred_setup = await executor._build_initial_state("Do the task") + + assert "tool_search" not in [t.name for t in final_tools] + assert deferred_setup.deferred_names == frozenset() + assert "" not in state["messages"][0].content + + @pytest.mark.anyio + async def test_build_initial_state_deferral_respects_tool_policy_and_tool_search_is_infra( + self, + classes, + monkeypatch: pytest.MonkeyPatch, + ): + """Adversarial-review follow-up (#3341): tool_search is appended AFTER the + subagent tool-policy filter, mirroring the lead's intentional decision + (test_tool_search_appended_after_policy_but_never_exposes_denied_tool). + Lock the safe-by-construction property: + + - an MCP tool denied by ``disallowed_tools`` never enters the deferred + catalog, so tool_search can never promote/expose it; + - tool_search itself is infrastructure: naming it in ``disallowed_tools`` + does not remove it, because its catalog derives from the already- + filtered list and carries no access the policy didn't already grant. + """ + from langchain_core.tools import tool as as_tool + + from deerflow.subagents import executor as executor_module + from deerflow.tools.mcp_metadata import tag_mcp_tool + + SubagentConfig = classes["SubagentConfig"] + SubagentExecutor = classes["SubagentExecutor"] + + monkeypatch.setattr( + sys.modules["deerflow.skills.storage"], + "get_or_new_skill_storage", + lambda *, app_config=None: SimpleNamespace(load_skills=lambda *, enabled_only: []), + ) + monkeypatch.setattr(executor_module, "get_app_config", lambda: SimpleNamespace(tool_search=SimpleNamespace(enabled=True))) + + @as_tool + def active_tool(x: str) -> str: + "active" + return x + + @as_tool + def mcp_allowed(x: str) -> str: + "allowed mcp tool" + return x + + @as_tool + def mcp_denied(x: str) -> str: + "denied mcp tool" + return x + + config = SubagentConfig( + name="test-agent", + description="Test agent", + system_prompt="You are a test agent.", + max_turns=10, + timeout_seconds=60, + disallowed_tools=["mcp_denied", "tool_search"], + ) + executor = SubagentExecutor( + config=config, + tools=[active_tool, tag_mcp_tool(mcp_allowed), tag_mcp_tool(mcp_denied)], + thread_id="test-thread", + ) + + _state, final_tools, deferred_setup = await executor._build_initial_state("Do the task") + + names = {t.name for t in final_tools} + # The policy-denied MCP tool is gone and never reaches the catalog. + assert "mcp_denied" not in names + assert "mcp_denied" not in deferred_setup.deferred_names + assert deferred_setup.deferred_names == frozenset({"mcp_allowed"}) + # tool_search is infra: present despite being named in disallowed_tools. + assert "tool_search" in names + + def test_create_agent_threads_deferred_setup_to_middlewares( + self, + classes, + base_config, + monkeypatch: pytest.MonkeyPatch, + ): + """A deferred setup passed to _create_agent flows into the subagent + middleware factory (so DeferredToolFilterMiddleware can attach).""" + from deerflow.subagents import executor as executor_module + from deerflow.tools.builtins.tool_search import DeferredToolSetup + + SubagentExecutor = classes["SubagentExecutor"] + app_config = SimpleNamespace(models=[SimpleNamespace(name="default-model")]) + captured: dict[str, object] = {} + + def fake_build_subagent_runtime_middlewares(**kwargs): + captured["middlewares"] = kwargs + return [object()] + + monkeypatch.setattr(executor_module, "create_chat_model", lambda **kwargs: object()) + monkeypatch.setattr(executor_module, "create_agent", lambda **kwargs: object()) + monkeypatch.setitem( + sys.modules, + "deerflow.agents.middlewares.tool_error_handling_middleware", + _module( + "deerflow.agents.middlewares.tool_error_handling_middleware", + build_subagent_runtime_middlewares=fake_build_subagent_runtime_middlewares, + ), + ) + + deferred_setup = DeferredToolSetup(object(), frozenset({"mcp_calc"}), "hash123") + executor = SubagentExecutor(config=base_config, tools=[], app_config=app_config, parent_model="parent-model") + + executor._create_agent(tools=[], deferred_setup=deferred_setup) + + assert captured["middlewares"]["deferred_setup"] is deferred_setup + # ----------------------------------------------------------------------------- # Async Execution Path Tests @@ -692,7 +904,7 @@ class TestAsyncExecutionPath: if system_messages: assert initial_messages[0] is system_messages[0], "SystemMessage must be the first message in the conversation" # The consolidated SystemMessage must carry both the system_prompt - # and all skill content — nothing should be split across two messages. + # and all skill content; nothing should be split across two messages. assert base_config.system_prompt in system_messages[0].content assert "Skill instruction text" in system_messages[0].content @@ -1128,11 +1340,9 @@ class TestThreadSafety: @pytest.fixture def executor_module(self, _setup_executor_classes): """Import the executor module with real classes.""" - import importlib + executor = importlib.import_module("deerflow.subagents.executor") - from deerflow.subagents import executor - - return importlib.reload(executor) + return _patch_default_get_app_config(importlib.reload(executor)) def test_multiple_executors_in_parallel(self, classes, base_config, msg): """Test multiple executors running in parallel via thread pool.""" @@ -1254,11 +1464,9 @@ class TestCleanupBackgroundTask: def executor_module(self, _setup_executor_classes): """Import the executor module with real classes.""" # Re-import to get the real module with cleanup_background_task - import importlib + executor = importlib.import_module("deerflow.subagents.executor") - from deerflow.subagents import executor - - return importlib.reload(executor) + return _patch_default_get_app_config(importlib.reload(executor)) def test_cleanup_removes_terminal_completed_task(self, executor_module, classes): """Test that cleanup removes a COMPLETED task.""" @@ -1399,11 +1607,9 @@ class TestCooperativeCancellation: @pytest.fixture def executor_module(self, _setup_executor_classes): """Import the executor module with real classes.""" - import importlib + executor = importlib.import_module("deerflow.subagents.executor") - from deerflow.subagents import executor - - return importlib.reload(executor) + return _patch_default_get_app_config(importlib.reload(executor)) @pytest.mark.anyio async def test_aexecute_cancelled_before_streaming(self, classes, base_config, mock_agent, msg): diff --git a/backend/tests/test_tool_error_handling_middleware.py b/backend/tests/test_tool_error_handling_middleware.py index c9b835527..b86ce5968 100644 --- a/backend/tests/test_tool_error_handling_middleware.py +++ b/backend/tests/test_tool_error_handling_middleware.py @@ -253,3 +253,45 @@ def test_subagent_runtime_middlewares_skip_view_image_for_text_model(monkeypatch middlewares = build_subagent_runtime_middlewares(app_config=app_config, model_name="test-model") assert not any(isinstance(middleware, ViewImageMiddleware) for middleware in middlewares) + + +def test_subagent_runtime_middlewares_attach_deferred_filter_when_setup_has_names(monkeypatch): + """A subagent built with deferred MCP tools gets DeferredToolFilterMiddleware, positioned before SafetyFinishReasonMiddleware (mirrors the lead ordering).""" + from langchain_core.tools import tool as as_tool + + from deerflow.agents.middlewares.deferred_tool_filter_middleware import DeferredToolFilterMiddleware + from deerflow.agents.middlewares.safety_finish_reason_middleware import SafetyFinishReasonMiddleware + from deerflow.tools.builtins.tool_search import build_deferred_tool_setup + from deerflow.tools.mcp_metadata import tag_mcp_tool + + app_config = _make_app_config() + _stub_runtime_middleware_imports(monkeypatch) + + @as_tool + def mcp_thing(x: str) -> str: + "deferred mcp tool" + return x + + setup = build_deferred_tool_setup([tag_mcp_tool(mcp_thing)], enabled=True) + assert setup.deferred_names # sanity: populated setup + + middlewares = build_subagent_runtime_middlewares(app_config=app_config, deferred_setup=setup) + + filters = [m for m in middlewares if isinstance(m, DeferredToolFilterMiddleware)] + assert len(filters) == 1 + filter_idx = next(i for i, m in enumerate(middlewares) if isinstance(m, DeferredToolFilterMiddleware)) + safety_idx = next(i for i, m in enumerate(middlewares) if isinstance(m, SafetyFinishReasonMiddleware)) + assert filter_idx < safety_idx + + +def test_subagent_runtime_middlewares_skip_deferred_filter_without_names(monkeypatch): + """No deferred setup (disabled / no MCP tool) -> no DeferredToolFilterMiddleware.""" + from deerflow.agents.middlewares.deferred_tool_filter_middleware import DeferredToolFilterMiddleware + from deerflow.tools.builtins.tool_search import DeferredToolSetup + + app_config = _make_app_config() + _stub_runtime_middleware_imports(monkeypatch) + + for setup in (None, DeferredToolSetup(None, frozenset(), None)): + middlewares = build_subagent_runtime_middlewares(app_config=app_config, deferred_setup=setup) + assert not any(isinstance(m, DeferredToolFilterMiddleware) for m in middlewares) diff --git a/backend/tests/test_tool_search.py b/backend/tests/test_tool_search.py index a0e8b2e20..3722cf3f9 100644 --- a/backend/tests/test_tool_search.py +++ b/backend/tests/test_tool_search.py @@ -8,8 +8,8 @@ filter middleware are covered by: - tests/test_thread_state_promoted.py """ -from deerflow.agents.lead_agent.prompt import get_deferred_tools_prompt_section from deerflow.config.tool_search_config import ToolSearchConfig, load_tool_search_config_from_dict +from deerflow.tools.builtins.tool_search import get_deferred_tools_prompt_section class TestToolSearchConfig: