fix(subagent): inherit parent agent's tool_groups in task_tool (#2305)

* fix(subagent): inherit parent agent's tool_groups in task_tool

When a custom agent defines tool_groups (e.g. [file:read, file:write, bash]),
the restriction is correctly applied to the lead agent. However, when the lead
agent delegates work to a subagent via the task tool, get_available_tools() is
called without the groups parameter, causing the subagent to receive ALL tools
(including web_search, web_fetch, image_search, etc.) regardless of the parent
agent's configuration.

This fix propagates tool_groups through run metadata so that task_tool passes
the same group filter when building the subagent's tool set.

Changes:
- agent.py: include tool_groups in run metadata
- task_tool.py: read tool_groups from metadata and pass to get_available_tools()

* fix: initialize metadata before conditional block and update tests for tool_groups propagation

- Initialize metadata = {} before the 'if runtime is not None' block to
  avoid Ruff F821 (possibly-undefined variable) and simplify the
  parent_tool_groups expression.
- Update existing test assertion to expect groups=None in
  get_available_tools call signature.
- Add 3 new test cases:
  - test_task_tool_propagates_tool_groups_to_subagent
  - test_task_tool_no_tool_groups_passes_none
  - test_task_tool_runtime_none_passes_groups_none
This commit is contained in:
Shawn Jasper 2026-04-18 22:17:37 +08:00 committed by GitHub
parent 24fe5fbd8c
commit 55474011c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 134 additions and 3 deletions

View File

@ -332,6 +332,7 @@ def make_lead_agent(config: RunnableConfig):
"reasoning_effort": reasoning_effort,
"is_plan_mode": is_plan_mode,
"subagent_enabled": subagent_enabled,
"tool_groups": agent_config.tool_groups if agent_config else None,
}
)

View File

@ -88,6 +88,7 @@ async def task_tool(
thread_id = None
parent_model = None
trace_id = None
metadata: dict = {}
if runtime is not None:
sandbox_state = runtime.state.get("sandbox")
@ -107,8 +108,11 @@ async def task_tool(
# Lazy import to avoid circular dependency
from deerflow.tools import get_available_tools
# Inherit parent agent's tool_groups so subagents respect the same restrictions
parent_tool_groups = metadata.get("tool_groups")
# Subagents should not have subagent tools enabled (prevent recursive nesting)
tools = get_available_tools(model_name=parent_model, subagent_enabled=False)
tools = get_available_tools(model_name=parent_model, groups=parent_tool_groups, subagent_enabled=False)
# Create executor
executor = SubagentExecutor(

View File

@ -167,14 +167,140 @@ def test_task_tool_emits_running_and_completed_events(monkeypatch):
assert captured["executor_kwargs"]["config"].max_turns == 7
assert "Skills Appendix" in captured["executor_kwargs"]["config"].system_prompt
get_available_tools.assert_called_once_with(model_name="ark-model", subagent_enabled=False)
get_available_tools.assert_called_once_with(model_name="ark-model", groups=None, subagent_enabled=False)
event_types = [e["type"] for e in events]
assert event_types == ["task_started", "task_running", "task_running", "task_completed"]
assert events[-1]["result"] == "all done"
def test_task_tool_returns_failed_message(monkeypatch):
def test_task_tool_propagates_tool_groups_to_subagent(monkeypatch):
"""Verify tool_groups from parent metadata are passed to get_available_tools(groups=...)."""
config = _make_subagent_config()
parent_tool_groups = ["file:read", "file:write", "bash"]
runtime = SimpleNamespace(
state={
"sandbox": {"sandbox_id": "local"},
"thread_data": {"workspace_path": "/tmp/workspace"},
},
context={"thread_id": "thread-1"},
config={"metadata": {"model_name": "ark-model", "trace_id": "trace-1", "tool_groups": parent_tool_groups}},
)
events = []
get_available_tools = MagicMock(return_value=["tool-a"])
class DummyExecutor:
def __init__(self, **kwargs):
pass
def execute_async(self, prompt, task_id=None):
return task_id or "generated-task-id"
monkeypatch.setattr(task_tool_module, "SubagentStatus", FakeSubagentStatus)
monkeypatch.setattr(task_tool_module, "SubagentExecutor", DummyExecutor)
monkeypatch.setattr(task_tool_module, "get_subagent_config", lambda _: config)
monkeypatch.setattr(task_tool_module, "get_skills_prompt_section", lambda: "")
monkeypatch.setattr(
task_tool_module,
"get_background_task_result",
lambda _: _make_result(FakeSubagentStatus.COMPLETED, result="done"),
)
monkeypatch.setattr(task_tool_module, "get_stream_writer", lambda: events.append)
monkeypatch.setattr(task_tool_module.asyncio, "sleep", _no_sleep)
monkeypatch.setattr("deerflow.tools.get_available_tools", get_available_tools)
output = _run_task_tool(
runtime=runtime,
description="执行任务",
prompt="file work only",
subagent_type="general-purpose",
tool_call_id="tc-groups",
)
assert output == "Task Succeeded. Result: done"
# The key assertion: groups should be propagated from parent metadata
get_available_tools.assert_called_once_with(model_name="ark-model", groups=parent_tool_groups, subagent_enabled=False)
def test_task_tool_no_tool_groups_passes_none(monkeypatch):
"""Verify that when metadata has no tool_groups, groups=None is passed (backward compat)."""
config = _make_subagent_config()
# Default _make_runtime() has no tool_groups in metadata
runtime = _make_runtime()
events = []
get_available_tools = MagicMock(return_value=[])
class DummyExecutor:
def __init__(self, **kwargs):
pass
def execute_async(self, prompt, task_id=None):
return task_id or "generated-task-id"
monkeypatch.setattr(task_tool_module, "SubagentStatus", FakeSubagentStatus)
monkeypatch.setattr(task_tool_module, "SubagentExecutor", DummyExecutor)
monkeypatch.setattr(task_tool_module, "get_subagent_config", lambda _: config)
monkeypatch.setattr(task_tool_module, "get_skills_prompt_section", lambda: "")
monkeypatch.setattr(
task_tool_module,
"get_background_task_result",
lambda _: _make_result(FakeSubagentStatus.COMPLETED, result="ok"),
)
monkeypatch.setattr(task_tool_module, "get_stream_writer", lambda: events.append)
monkeypatch.setattr(task_tool_module.asyncio, "sleep", _no_sleep)
monkeypatch.setattr("deerflow.tools.get_available_tools", get_available_tools)
output = _run_task_tool(
runtime=runtime,
description="执行任务",
prompt="normal work",
subagent_type="general-purpose",
tool_call_id="tc-no-groups",
)
assert output == "Task Succeeded. Result: ok"
# No tool_groups in metadata → groups=None (default behavior preserved)
get_available_tools.assert_called_once_with(model_name="ark-model", groups=None, subagent_enabled=False)
def test_task_tool_runtime_none_passes_groups_none(monkeypatch):
"""Verify that when runtime is None, groups=None is passed (e.g., unknown subagent path exits early, but tools still load correctly)."""
config = _make_subagent_config()
events = []
get_available_tools = MagicMock(return_value=[])
class DummyExecutor:
def __init__(self, **kwargs):
pass
def execute_async(self, prompt, task_id=None):
return task_id or "generated-task-id"
monkeypatch.setattr(task_tool_module, "SubagentStatus", FakeSubagentStatus)
monkeypatch.setattr(task_tool_module, "SubagentExecutor", DummyExecutor)
monkeypatch.setattr(task_tool_module, "get_subagent_config", lambda _: config)
monkeypatch.setattr(task_tool_module, "get_skills_prompt_section", lambda: "")
monkeypatch.setattr(
task_tool_module,
"get_background_task_result",
lambda _: _make_result(FakeSubagentStatus.COMPLETED, result="ok"),
)
monkeypatch.setattr(task_tool_module, "get_stream_writer", lambda: events.append)
monkeypatch.setattr(task_tool_module.asyncio, "sleep", _no_sleep)
monkeypatch.setattr("deerflow.tools.get_available_tools", get_available_tools)
output = _run_task_tool(
runtime=None,
description="执行任务",
prompt="no runtime",
subagent_type="general-purpose",
tool_call_id="tc-no-runtime",
)
assert output == "Task Succeeded. Result: ok"
# runtime is None → metadata is empty dict → groups=None
get_available_tools.assert_called_once_with(model_name=None, groups=None, subagent_enabled=False)
config = _make_subagent_config()
events = []