mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-11 11:13:51 +00:00
fix(journal): unwrap Command tool results in on_tool_end
Tools that update graph state (e.g. ``present_files``) return
``Command(update={'messages': [ToolMessage(...)], 'artifacts': [...]})``.
LangGraph later unwraps the inner ``ToolMessage`` into checkpoint state,
but ``RunJournal.on_tool_end`` was receiving the ``Command`` object
directly via the LangChain callback chain and storing
``str(Command(update={...}))`` as the tool_result content.
This produced a visible divergence between the event-store and the
checkpoint for any thread that used a Command-returning tool, blocking
the event-store-backed history fix in the follow-up commit. Concrete
example from thread ``6d30913e-dcd4-41c8-8941-f66c716cf359`` (seq=48):
checkpoint had ``'Successfully presented files'`` while event_store
stored the full Command repr.
The fix detects ``Command`` in ``on_tool_end``, extracts the first
``ToolMessage`` from ``update['messages']``, and lets the existing
ToolMessage branch handle the ``model_dump()`` path. Legacy rows still
containing the Command repr are separately cleaned up by the history
helper in the follow-up commit.
Tests:
- ``test_tool_end_unwraps_command_with_inner_tool_message`` — unit test
of the unwrap branch with a constructed Command
- ``test_tool_invoke_end_to_end_unwraps_command`` — end-to-end via
``CallbackManager`` + ``tool.invoke`` to exercise the real LangChain
dispatch path that production uses, matching the repro shape from
``present_files``
- Counter-proof: temporarily reverted the patch, both tests failed with
the exact ``Command(update={...})`` repr that was stored in the
production SQLite row at seq=48, confirming LangChain does pass the
``Command`` through callbacks (the unwrap is load-bearing)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
4810898cfa
commit
ce24424449
@ -246,6 +246,19 @@ class RunJournal(BaseCallbackHandler):
|
||||
|
||||
def on_tool_end(self, output: Any, *, run_id: UUID, **kwargs: Any) -> None:
|
||||
from langchain_core.messages import ToolMessage
|
||||
from langgraph.types import Command
|
||||
|
||||
# Tools that update graph state return a ``Command`` (e.g.
|
||||
# ``present_files``). LangGraph later unwraps the inner ToolMessage
|
||||
# into checkpoint state, so to stay checkpoint-aligned we must
|
||||
# extract it here rather than storing ``str(Command(...))``.
|
||||
if isinstance(output, Command):
|
||||
update = getattr(output, "update", None) or {}
|
||||
inner_msgs = update.get("messages") if isinstance(update, dict) else None
|
||||
if isinstance(inner_msgs, list):
|
||||
inner_tool_msg = next((m for m in inner_msgs if isinstance(m, ToolMessage)), None)
|
||||
if inner_tool_msg is not None:
|
||||
output = inner_tool_msg
|
||||
|
||||
# Extract fields from ToolMessage object when LangChain provides one.
|
||||
# LangChain's _format_output wraps tool results into a ToolMessage
|
||||
|
||||
@ -709,6 +709,81 @@ class TestToolResultMessage:
|
||||
assert tool_end["metadata"]["tool_call_id"] == "call_from_obj"
|
||||
assert tool_end["metadata"]["tool_name"] == "web_search"
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_tool_invoke_end_to_end_unwraps_command(self, journal_setup):
|
||||
"""End-to-end: invoke a real LangChain tool that returns Command(update={'messages':[ToolMessage]}).
|
||||
|
||||
This goes through the real LangChain callback path (tool.invoke -> CallbackManager
|
||||
-> on_tool_start/on_tool_end), which is what the production agent uses. Mirrors
|
||||
the ``present_files`` tool shape exactly.
|
||||
"""
|
||||
from langchain_core.callbacks import CallbackManager
|
||||
from langchain_core.messages import ToolMessage
|
||||
from langchain_core.tools import tool
|
||||
from langgraph.types import Command
|
||||
|
||||
j, store = journal_setup
|
||||
|
||||
@tool
|
||||
def fake_present_files(filepaths: list[str]) -> Command:
|
||||
"""Fake present_files that returns a Command with an inner ToolMessage."""
|
||||
return Command(
|
||||
update={
|
||||
"artifacts": filepaths,
|
||||
"messages": [ToolMessage("Successfully presented files", tool_call_id="tc_123")],
|
||||
},
|
||||
)
|
||||
|
||||
# Real LangChain callback dispatch (matches production agent path)
|
||||
cm = CallbackManager(handlers=[j])
|
||||
fake_present_files.invoke(
|
||||
{"filepaths": ["/mnt/user-data/outputs/report.md"]},
|
||||
config={"callbacks": cm, "run_id": uuid4()},
|
||||
)
|
||||
await j.flush()
|
||||
|
||||
messages = await store.list_messages("t1")
|
||||
assert len(messages) == 1, f"expected 1 message event, got {len(messages)}: {messages}"
|
||||
content = messages[0]["content"]
|
||||
assert content["type"] == "tool"
|
||||
# CRITICAL: must be the inner ToolMessage text, not str(Command(...))
|
||||
assert content["content"] == "Successfully presented files", (
|
||||
f"Command unwrap failed; stored content = {content['content']!r}"
|
||||
)
|
||||
assert "Command(update=" not in str(content["content"])
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_tool_end_unwraps_command_with_inner_tool_message(self, journal_setup):
|
||||
"""Tools like ``present_files`` return Command(update={'messages': [ToolMessage(...)]}).
|
||||
|
||||
LangGraph unwraps the inner ToolMessage into checkpoint state, so the
|
||||
event store must do the same — otherwise it captures ``str(Command(...))``
|
||||
and the /history response diverges from the real rendered message.
|
||||
"""
|
||||
from langchain_core.messages import ToolMessage
|
||||
from langgraph.types import Command
|
||||
|
||||
j, store = journal_setup
|
||||
run_id = uuid4()
|
||||
inner = ToolMessage(
|
||||
content="Successfully presented files",
|
||||
tool_call_id="call_present",
|
||||
name="present_files",
|
||||
status="success",
|
||||
)
|
||||
cmd = Command(update={"artifacts": ["/mnt/user-data/outputs/report.md"], "messages": [inner]})
|
||||
j.on_tool_end(cmd, run_id=run_id)
|
||||
await j.flush()
|
||||
|
||||
messages = await store.list_messages("t1")
|
||||
assert len(messages) == 1
|
||||
content = messages[0]["content"]
|
||||
assert content["type"] == "tool"
|
||||
assert content["content"] == "Successfully presented files"
|
||||
assert content["tool_call_id"] == "call_present"
|
||||
assert content["name"] == "present_files"
|
||||
assert "Command(update=" not in str(content["content"])
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_tool_message_object_overrides_kwargs(self, journal_setup):
|
||||
"""ToolMessage object fields take priority over kwargs."""
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user