diff --git a/backend/packages/harness/deerflow/runtime/journal.py b/backend/packages/harness/deerflow/runtime/journal.py index fa931793f..465bbbff0 100644 --- a/backend/packages/harness/deerflow/runtime/journal.py +++ b/backend/packages/harness/deerflow/runtime/journal.py @@ -106,6 +106,7 @@ class RunJournal(BaseCallbackHandler): ) def on_llm_end(self, response: Any, *, run_id: UUID, **kwargs: Any) -> None: + from deerflow.runtime.converters import langchain_to_openai_message from deerflow.runtime.serialization import serialize_lc_object try: @@ -138,19 +139,29 @@ class RunJournal(BaseCallbackHandler): }, ) - # Message event: ai_message (only lead_agent final replies — no pending tool_calls) + # Message events: only lead_agent gets message-category events tool_calls = getattr(message, "tool_calls", None) or [] - if caller == "lead_agent" and isinstance(content, str) and content and not tool_calls: + if caller == "lead_agent": resp_meta = getattr(message, "response_metadata", None) or {} model_name = resp_meta.get("model_name") if isinstance(resp_meta, dict) else None - self._put( - event_type="ai_message", - category="message", - content=content, - metadata={"model_name": model_name}, - ) - self._last_ai_msg = content[:2000] - self._msg_count += 1 + if tool_calls: + # ai_tool_call: agent decided to use tools + self._put( + event_type="ai_tool_call", + category="message", + content=langchain_to_openai_message(message), + metadata={"model_name": model_name, "finish_reason": "tool_calls"}, + ) + elif isinstance(content, str) and content: + # ai_message: final text reply + self._put( + event_type="ai_message", + category="message", + content={"role": "assistant", "content": content}, + metadata={"model_name": model_name, "finish_reason": "stop"}, + ) + self._last_ai_msg = content[:2000] + self._msg_count += 1 # Token accumulation if self._track_tokens: diff --git a/backend/tests/test_run_journal.py b/backend/tests/test_run_journal.py index 05c5a6112..f6d319279 100644 --- a/backend/tests/test_run_journal.py +++ b/backend/tests/test_run_journal.py @@ -23,6 +23,7 @@ def journal_setup(): def _make_llm_response(content="Hello", usage=None, tool_calls=None): """Create a mock LLM response with a message.""" msg = MagicMock() + msg.type = "ai" msg.content = content msg.id = f"msg-{id(msg)}" msg.tool_calls = tool_calls or [] @@ -70,21 +71,22 @@ class TestLlmCallbacks: messages = await store.list_messages("t1") assert len(messages) == 1 assert messages[0]["event_type"] == "ai_message" - assert messages[0]["content"] == "Answer" + assert messages[0]["content"] == {"role": "assistant", "content": "Answer"} @pytest.mark.anyio - async def test_on_llm_end_with_tool_calls_no_ai_message(self, journal_setup): - """LLM response with pending tool_calls should NOT produce ai_message.""" + async def test_on_llm_end_with_tool_calls_produces_ai_tool_call(self, journal_setup): + """LLM response with pending tool_calls should produce ai_tool_call event.""" j, store = journal_setup run_id = uuid4() j.on_llm_end( - _make_llm_response("Let me search", tool_calls=[{"name": "search"}]), + _make_llm_response("Let me search", tool_calls=[{"id": "call_1", "name": "search", "args": {}}]), run_id=run_id, tags=["lead_agent"], ) await j.flush() messages = await store.list_messages("t1") - assert len(messages) == 0 + assert len(messages) == 1 + assert messages[0]["event_type"] == "ai_tool_call" @pytest.mark.anyio async def test_on_llm_end_subagent_no_ai_message(self, journal_setup): @@ -408,6 +410,7 @@ class TestDbBackedLifecycle: assert len(messages) == 2 assert messages[0]["event_type"] == "human_message" assert messages[1]["event_type"] == "ai_message" + assert messages[1]["content"] == {"role": "assistant", "content": "DB response"} # Verify events from DB events = await event_store.list_events("t1", run_id) @@ -572,3 +575,51 @@ class TestOpenAIHumanMessage: assert len(messages) == 1 assert messages[0]["content"] == {"role": "user", "content": "What is AI?"} assert messages[0]["content"]["role"] == "user" + + +class TestOpenAIMessageFormat: + @pytest.mark.anyio + async def test_ai_message_openai_format(self, journal_setup): + """ai_message content should be OpenAI assistant message dict.""" + j, store = journal_setup + j.on_llm_end(_make_llm_response("Answer"), run_id=uuid4(), tags=["lead_agent"]) + await j.flush() + messages = await store.list_messages("t1") + assert len(messages) == 1 + assert messages[0]["content"] == {"role": "assistant", "content": "Answer"} + + @pytest.mark.anyio + async def test_ai_tool_call_event(self, journal_setup): + """LLM response with tool_calls should produce ai_tool_call message event.""" + j, store = journal_setup + tool_calls = [{"id": "call_1", "name": "search", "args": {"query": "test"}}] + j.on_llm_end( + _make_llm_response("Let me search", tool_calls=tool_calls), + run_id=uuid4(), + tags=["lead_agent"], + ) + await j.flush() + messages = await store.list_messages("t1") + assert len(messages) == 1 + assert messages[0]["event_type"] == "ai_tool_call" + assert messages[0]["content"]["role"] == "assistant" + assert messages[0]["content"]["content"] == "Let me search" + assert len(messages[0]["content"]["tool_calls"]) == 1 + tc = messages[0]["content"]["tool_calls"][0] + assert tc["id"] == "call_1" + assert tc["type"] == "function" + assert tc["function"]["name"] == "search" + + @pytest.mark.anyio + async def test_ai_tool_call_only_from_lead_agent(self, journal_setup): + """ai_tool_call should only be emitted for lead_agent, not subagents.""" + j, store = journal_setup + tool_calls = [{"id": "call_1", "name": "search", "args": {}}] + j.on_llm_end( + _make_llm_response("searching", tool_calls=tool_calls), + run_id=uuid4(), + tags=["subagent:research"], + ) + await j.flush() + messages = await store.list_messages("t1") + assert len(messages) == 0