From ce24424449d1c5a54e0c82a86eeae1621c9d48b8 Mon Sep 17 00:00:00 2001 From: rayhpeng Date: Sat, 11 Apr 2026 23:20:27 +0800 Subject: [PATCH] fix(journal): unwrap Command tool results in on_tool_end MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tools that update graph state (e.g. ``present_files``) return ``Command(update={'messages': [ToolMessage(...)], 'artifacts': [...]})``. LangGraph later unwraps the inner ``ToolMessage`` into checkpoint state, but ``RunJournal.on_tool_end`` was receiving the ``Command`` object directly via the LangChain callback chain and storing ``str(Command(update={...}))`` as the tool_result content. This produced a visible divergence between the event-store and the checkpoint for any thread that used a Command-returning tool, blocking the event-store-backed history fix in the follow-up commit. Concrete example from thread ``6d30913e-dcd4-41c8-8941-f66c716cf359`` (seq=48): checkpoint had ``'Successfully presented files'`` while event_store stored the full Command repr. The fix detects ``Command`` in ``on_tool_end``, extracts the first ``ToolMessage`` from ``update['messages']``, and lets the existing ToolMessage branch handle the ``model_dump()`` path. Legacy rows still containing the Command repr are separately cleaned up by the history helper in the follow-up commit. Tests: - ``test_tool_end_unwraps_command_with_inner_tool_message`` — unit test of the unwrap branch with a constructed Command - ``test_tool_invoke_end_to_end_unwraps_command`` — end-to-end via ``CallbackManager`` + ``tool.invoke`` to exercise the real LangChain dispatch path that production uses, matching the repro shape from ``present_files`` - Counter-proof: temporarily reverted the patch, both tests failed with the exact ``Command(update={...})`` repr that was stored in the production SQLite row at seq=48, confirming LangChain does pass the ``Command`` through callbacks (the unwrap is load-bearing) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../harness/deerflow/runtime/journal.py | 13 ++++ backend/tests/test_run_journal.py | 75 +++++++++++++++++++ 2 files changed, 88 insertions(+) diff --git a/backend/packages/harness/deerflow/runtime/journal.py b/backend/packages/harness/deerflow/runtime/journal.py index ad35f5ff9..a70404e11 100644 --- a/backend/packages/harness/deerflow/runtime/journal.py +++ b/backend/packages/harness/deerflow/runtime/journal.py @@ -246,6 +246,19 @@ class RunJournal(BaseCallbackHandler): def on_tool_end(self, output: Any, *, run_id: UUID, **kwargs: Any) -> None: from langchain_core.messages import ToolMessage + from langgraph.types import Command + + # Tools that update graph state return a ``Command`` (e.g. + # ``present_files``). LangGraph later unwraps the inner ToolMessage + # into checkpoint state, so to stay checkpoint-aligned we must + # extract it here rather than storing ``str(Command(...))``. + if isinstance(output, Command): + update = getattr(output, "update", None) or {} + inner_msgs = update.get("messages") if isinstance(update, dict) else None + if isinstance(inner_msgs, list): + inner_tool_msg = next((m for m in inner_msgs if isinstance(m, ToolMessage)), None) + if inner_tool_msg is not None: + output = inner_tool_msg # Extract fields from ToolMessage object when LangChain provides one. # LangChain's _format_output wraps tool results into a ToolMessage diff --git a/backend/tests/test_run_journal.py b/backend/tests/test_run_journal.py index dbb307a55..b306f59ec 100644 --- a/backend/tests/test_run_journal.py +++ b/backend/tests/test_run_journal.py @@ -709,6 +709,81 @@ class TestToolResultMessage: assert tool_end["metadata"]["tool_call_id"] == "call_from_obj" assert tool_end["metadata"]["tool_name"] == "web_search" + @pytest.mark.anyio + async def test_tool_invoke_end_to_end_unwraps_command(self, journal_setup): + """End-to-end: invoke a real LangChain tool that returns Command(update={'messages':[ToolMessage]}). + + This goes through the real LangChain callback path (tool.invoke -> CallbackManager + -> on_tool_start/on_tool_end), which is what the production agent uses. Mirrors + the ``present_files`` tool shape exactly. + """ + from langchain_core.callbacks import CallbackManager + from langchain_core.messages import ToolMessage + from langchain_core.tools import tool + from langgraph.types import Command + + j, store = journal_setup + + @tool + def fake_present_files(filepaths: list[str]) -> Command: + """Fake present_files that returns a Command with an inner ToolMessage.""" + return Command( + update={ + "artifacts": filepaths, + "messages": [ToolMessage("Successfully presented files", tool_call_id="tc_123")], + }, + ) + + # Real LangChain callback dispatch (matches production agent path) + cm = CallbackManager(handlers=[j]) + fake_present_files.invoke( + {"filepaths": ["/mnt/user-data/outputs/report.md"]}, + config={"callbacks": cm, "run_id": uuid4()}, + ) + await j.flush() + + messages = await store.list_messages("t1") + assert len(messages) == 1, f"expected 1 message event, got {len(messages)}: {messages}" + content = messages[0]["content"] + assert content["type"] == "tool" + # CRITICAL: must be the inner ToolMessage text, not str(Command(...)) + assert content["content"] == "Successfully presented files", ( + f"Command unwrap failed; stored content = {content['content']!r}" + ) + assert "Command(update=" not in str(content["content"]) + + @pytest.mark.anyio + async def test_tool_end_unwraps_command_with_inner_tool_message(self, journal_setup): + """Tools like ``present_files`` return Command(update={'messages': [ToolMessage(...)]}). + + LangGraph unwraps the inner ToolMessage into checkpoint state, so the + event store must do the same — otherwise it captures ``str(Command(...))`` + and the /history response diverges from the real rendered message. + """ + from langchain_core.messages import ToolMessage + from langgraph.types import Command + + j, store = journal_setup + run_id = uuid4() + inner = ToolMessage( + content="Successfully presented files", + tool_call_id="call_present", + name="present_files", + status="success", + ) + cmd = Command(update={"artifacts": ["/mnt/user-data/outputs/report.md"], "messages": [inner]}) + j.on_tool_end(cmd, run_id=run_id) + await j.flush() + + messages = await store.list_messages("t1") + assert len(messages) == 1 + content = messages[0]["content"] + assert content["type"] == "tool" + assert content["content"] == "Successfully presented files" + assert content["tool_call_id"] == "call_present" + assert content["name"] == "present_files" + assert "Command(update=" not in str(content["content"]) + @pytest.mark.anyio async def test_tool_message_object_overrides_kwargs(self, journal_setup): """ToolMessage object fields take priority over kwargs."""