diff --git a/backend/packages/harness/deerflow/subagents/config.py b/backend/packages/harness/deerflow/subagents/config.py index b0b094e28..9081e2df9 100644 --- a/backend/packages/harness/deerflow/subagents/config.py +++ b/backend/packages/harness/deerflow/subagents/config.py @@ -26,7 +26,7 @@ class SubagentConfig: name: str description: str - system_prompt: str + system_prompt: str | None = None tools: list[str] | None = None disallowed_tools: list[str] | None = field(default_factory=lambda: ["task"]) skills: list[str] | None = None diff --git a/backend/packages/harness/deerflow/subagents/executor.py b/backend/packages/harness/deerflow/subagents/executor.py index a2fec6432..d6d2e4fc5 100644 --- a/backend/packages/harness/deerflow/subagents/executor.py +++ b/backend/packages/harness/deerflow/subagents/executor.py @@ -286,11 +286,13 @@ class SubagentExecutor: # Reuse shared middleware composition with lead agent. middlewares = build_subagent_runtime_middlewares(app_config=app_config, model_name=self.model_name, lazy_init=True) + # system_prompt is included in initial state messages (see _build_initial_state) + # to avoid multiple SystemMessages which some LLM APIs don't support. return create_agent( model=model, tools=tools if tools is not None else self.tools, middleware=middlewares, - system_prompt=self.config.system_prompt, + system_prompt=None, state_schema=ThreadState, ) @@ -365,14 +367,25 @@ class SubagentExecutor: Returns: Initial state dictionary and tools filtered by loaded skill metadata. """ + # Load skills as conversation items (Codex pattern) skills = await self._load_skills() filtered_tools = self._apply_skill_allowed_tools(skills) skill_messages = await self._load_skill_messages(skills) + # Combine system_prompt and skills into a single SystemMessage. + # Some LLM APIs reject multiple SystemMessages with + # "System message must be at the beginning." + system_parts: list[str] = [] + if self.config.system_prompt: + system_parts.append(self.config.system_prompt) + for skill_msg in skill_messages: + system_parts.append(skill_msg.content) + messages: list[Any] = [] - # Skill content injected as developer/system messages before the task - messages.extend(skill_messages) + if system_parts: + messages.append(SystemMessage(content="\n\n".join(system_parts))) + # Then the actual task messages.append(HumanMessage(content=task)) diff --git a/backend/tests/test_subagent_executor.py b/backend/tests/test_subagent_executor.py index b8da323f4..87c82ff96 100644 --- a/backend/tests/test_subagent_executor.py +++ b/backend/tests/test_subagent_executor.py @@ -291,7 +291,7 @@ class TestAgentConstruction: assert captured["agent"]["model"] is model assert captured["agent"]["middleware"] is middlewares assert captured["agent"]["tools"] == [] - assert captured["agent"]["system_prompt"] == base_config.system_prompt + assert captured["agent"]["system_prompt"] is None # system_prompt is merged into initial state messages @pytest.mark.anyio async def test_load_skill_messages_uses_explicit_app_config_for_skill_storage( @@ -331,6 +331,124 @@ class TestAgentConstruction: assert len(messages) == 1 assert "Use demo skill" in messages[0].content + @pytest.mark.anyio + async def test_build_initial_state_consolidates_system_prompt_and_skills( + self, + classes, + base_config, + monkeypatch: pytest.MonkeyPatch, + tmp_path, + ): + """_build_initial_state merges system_prompt and skills into one SystemMessage.""" + SubagentExecutor = classes["SubagentExecutor"] + + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + skill_file = skill_dir / "SKILL.md" + skill_file.write_text("Skill instructions here", encoding="utf-8") + + monkeypatch.setattr( + sys.modules["deerflow.skills.storage"], + "get_or_new_skill_storage", + lambda *, app_config=None: SimpleNamespace(load_skills=lambda *, enabled_only: [SimpleNamespace(name="my-skill", skill_file=skill_file, allowed_tools=None)]), + ) + + executor = SubagentExecutor( + config=base_config, + tools=[], + thread_id="test-thread", + ) + + state, _filtered_tools = await executor._build_initial_state("Do the task") + + messages = state["messages"] + # Should have exactly 2 messages: one combined SystemMessage + one HumanMessage + assert len(messages) == 2 + + from langchain_core.messages import HumanMessage, SystemMessage + + assert isinstance(messages[0], SystemMessage) + assert isinstance(messages[1], HumanMessage) + # SystemMessage should contain both the system_prompt and skill content + assert base_config.system_prompt in messages[0].content + assert "Skill instructions here" in messages[0].content + # HumanMessage should be the task + assert messages[1].content == "Do the task" + + @pytest.mark.anyio + async def test_build_initial_state_no_skills_only_system_prompt( + self, + classes, + base_config, + monkeypatch: pytest.MonkeyPatch, + ): + """_build_initial_state works when there are no skills.""" + SubagentExecutor = classes["SubagentExecutor"] + + monkeypatch.setattr( + sys.modules["deerflow.skills.storage"], + "get_or_new_skill_storage", + lambda *, app_config=None: SimpleNamespace(load_skills=lambda *, enabled_only: []), + ) + + executor = SubagentExecutor( + config=base_config, + tools=[], + thread_id="test-thread", + ) + + state, _filtered_tools = await executor._build_initial_state("Do the task") + + messages = state["messages"] + from langchain_core.messages import HumanMessage, SystemMessage + + assert len(messages) == 2 + assert isinstance(messages[0], SystemMessage) + assert base_config.system_prompt in messages[0].content + assert isinstance(messages[1], HumanMessage) + + @pytest.mark.anyio + async def test_build_initial_state_no_system_prompt_with_skills( + self, + classes, + monkeypatch: pytest.MonkeyPatch, + tmp_path, + ): + """_build_initial_state works when there is no system_prompt but there are skills.""" + SubagentConfig = classes["SubagentConfig"] + + config = SubagentConfig( + name="test-agent", + description="Test agent", + system_prompt=None, + max_turns=10, + timeout_seconds=60, + ) + + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + skill_file = skill_dir / "SKILL.md" + skill_file.write_text("Skill content", encoding="utf-8") + + monkeypatch.setattr( + sys.modules["deerflow.skills.storage"], + "get_or_new_skill_storage", + lambda *, app_config=None: SimpleNamespace(load_skills=lambda *, enabled_only: [SimpleNamespace(name="my-skill", skill_file=skill_file, allowed_tools=None)]), + ) + + SubagentExecutor = classes["SubagentExecutor"] + executor = SubagentExecutor(config=config, tools=[], thread_id="test-thread") + + state, _filtered_tools = await executor._build_initial_state("Do the task") + + messages = state["messages"] + from langchain_core.messages import HumanMessage, SystemMessage + + assert len(messages) == 2 + assert isinstance(messages[0], SystemMessage) + assert "Skill content" in messages[0].content + assert isinstance(messages[1], HumanMessage) + # ----------------------------------------------------------------------------- # Async Execution Path Tests @@ -514,6 +632,70 @@ class TestAsyncExecutionPath: assert result.status == SubagentStatus.COMPLETED assert "Task" in result.result + @pytest.mark.anyio + async def test_aexecute_passes_at_most_one_system_message_to_agent( + self, + classes, + base_config, + monkeypatch: pytest.MonkeyPatch, + tmp_path, + ): + """Regression: messages sent to agent.astream must contain at most one + SystemMessage and it must be the first message. + + This catches any regression where system_prompt would be re-injected + via create_agent() (e.g. system_prompt not passed as None) and appear + as a second SystemMessage, which providers like vLLM and Xinference + reject with "System message must be at the beginning." + """ + from langchain_core.messages import AIMessage, SystemMessage + + SubagentExecutor = classes["SubagentExecutor"] + SubagentStatus = classes["SubagentStatus"] + + # Set up a skill so both system_prompt AND skill content are present, + # maximising the chance of catching a double-SystemMessage regression. + skill_dir = tmp_path / "regression-skill" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text("Skill instruction text", encoding="utf-8") + + monkeypatch.setattr( + sys.modules["deerflow.skills.storage"], + "get_or_new_skill_storage", + lambda *, app_config=None: SimpleNamespace(load_skills=lambda *, enabled_only: [SimpleNamespace(name="regression-skill", skill_file=skill_dir / "SKILL.md", allowed_tools=None)]), + ) + + captured_states: list[dict] = [] + + async def capturing_astream(state, **kwargs): + captured_states.append(state) + yield {"messages": [AIMessage(content="Done", id="msg-1")]} + + mock_agent = MagicMock() + mock_agent.astream = capturing_astream + + executor = SubagentExecutor( + config=base_config, + tools=[], + thread_id="test-thread", + ) + + with patch.object(executor, "_create_agent", return_value=mock_agent): + result = await executor._aexecute("Do something") + + assert result.status == SubagentStatus.COMPLETED + assert len(captured_states) == 1, "astream should be called exactly once" + initial_messages = captured_states[0]["messages"] + + system_messages = [m for m in initial_messages if isinstance(m, SystemMessage)] + assert len(system_messages) <= 1, f"Expected at most 1 SystemMessage but got {len(system_messages)}: {system_messages}" + if system_messages: + assert initial_messages[0] is system_messages[0], "SystemMessage must be the first message in the conversation" + # The consolidated SystemMessage must carry both the system_prompt + # and all skill content — nothing should be split across two messages. + assert base_config.system_prompt in system_messages[0].content + assert "Skill instruction text" in system_messages[0].content + class TestSkillAllowedTools: @pytest.mark.anyio