feat(events): ai_message uses OpenAI format, add ai_tool_call message event

- ai_message content now uses {"role": "assistant", "content": "..."} format
- New ai_tool_call message event emitted when lead_agent LLM responds with tool_calls
- ai_tool_call uses langchain_to_openai_message converter for consistent format
- Both events include finish_reason in metadata ("stop" or "tool_calls")

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
rayhpeng 2026-04-04 09:13:12 +08:00
parent db59dfa6fb
commit 8b1d569589
2 changed files with 77 additions and 15 deletions

View File

@ -106,6 +106,7 @@ class RunJournal(BaseCallbackHandler):
)
def on_llm_end(self, response: Any, *, run_id: UUID, **kwargs: Any) -> None:
from deerflow.runtime.converters import langchain_to_openai_message
from deerflow.runtime.serialization import serialize_lc_object
try:
@ -138,19 +139,29 @@ class RunJournal(BaseCallbackHandler):
},
)
# Message event: ai_message (only lead_agent final replies — no pending tool_calls)
# Message events: only lead_agent gets message-category events
tool_calls = getattr(message, "tool_calls", None) or []
if caller == "lead_agent" and isinstance(content, str) and content and not tool_calls:
if caller == "lead_agent":
resp_meta = getattr(message, "response_metadata", None) or {}
model_name = resp_meta.get("model_name") if isinstance(resp_meta, dict) else None
self._put(
event_type="ai_message",
category="message",
content=content,
metadata={"model_name": model_name},
)
self._last_ai_msg = content[:2000]
self._msg_count += 1
if tool_calls:
# ai_tool_call: agent decided to use tools
self._put(
event_type="ai_tool_call",
category="message",
content=langchain_to_openai_message(message),
metadata={"model_name": model_name, "finish_reason": "tool_calls"},
)
elif isinstance(content, str) and content:
# ai_message: final text reply
self._put(
event_type="ai_message",
category="message",
content={"role": "assistant", "content": content},
metadata={"model_name": model_name, "finish_reason": "stop"},
)
self._last_ai_msg = content[:2000]
self._msg_count += 1
# Token accumulation
if self._track_tokens:

View File

@ -23,6 +23,7 @@ def journal_setup():
def _make_llm_response(content="Hello", usage=None, tool_calls=None):
"""Create a mock LLM response with a message."""
msg = MagicMock()
msg.type = "ai"
msg.content = content
msg.id = f"msg-{id(msg)}"
msg.tool_calls = tool_calls or []
@ -70,21 +71,22 @@ class TestLlmCallbacks:
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["event_type"] == "ai_message"
assert messages[0]["content"] == "Answer"
assert messages[0]["content"] == {"role": "assistant", "content": "Answer"}
@pytest.mark.anyio
async def test_on_llm_end_with_tool_calls_no_ai_message(self, journal_setup):
"""LLM response with pending tool_calls should NOT produce ai_message."""
async def test_on_llm_end_with_tool_calls_produces_ai_tool_call(self, journal_setup):
"""LLM response with pending tool_calls should produce ai_tool_call event."""
j, store = journal_setup
run_id = uuid4()
j.on_llm_end(
_make_llm_response("Let me search", tool_calls=[{"name": "search"}]),
_make_llm_response("Let me search", tool_calls=[{"id": "call_1", "name": "search", "args": {}}]),
run_id=run_id,
tags=["lead_agent"],
)
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 0
assert len(messages) == 1
assert messages[0]["event_type"] == "ai_tool_call"
@pytest.mark.anyio
async def test_on_llm_end_subagent_no_ai_message(self, journal_setup):
@ -408,6 +410,7 @@ class TestDbBackedLifecycle:
assert len(messages) == 2
assert messages[0]["event_type"] == "human_message"
assert messages[1]["event_type"] == "ai_message"
assert messages[1]["content"] == {"role": "assistant", "content": "DB response"}
# Verify events from DB
events = await event_store.list_events("t1", run_id)
@ -572,3 +575,51 @@ class TestOpenAIHumanMessage:
assert len(messages) == 1
assert messages[0]["content"] == {"role": "user", "content": "What is AI?"}
assert messages[0]["content"]["role"] == "user"
class TestOpenAIMessageFormat:
@pytest.mark.anyio
async def test_ai_message_openai_format(self, journal_setup):
"""ai_message content should be OpenAI assistant message dict."""
j, store = journal_setup
j.on_llm_end(_make_llm_response("Answer"), run_id=uuid4(), tags=["lead_agent"])
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["content"] == {"role": "assistant", "content": "Answer"}
@pytest.mark.anyio
async def test_ai_tool_call_event(self, journal_setup):
"""LLM response with tool_calls should produce ai_tool_call message event."""
j, store = journal_setup
tool_calls = [{"id": "call_1", "name": "search", "args": {"query": "test"}}]
j.on_llm_end(
_make_llm_response("Let me search", tool_calls=tool_calls),
run_id=uuid4(),
tags=["lead_agent"],
)
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["event_type"] == "ai_tool_call"
assert messages[0]["content"]["role"] == "assistant"
assert messages[0]["content"]["content"] == "Let me search"
assert len(messages[0]["content"]["tool_calls"]) == 1
tc = messages[0]["content"]["tool_calls"][0]
assert tc["id"] == "call_1"
assert tc["type"] == "function"
assert tc["function"]["name"] == "search"
@pytest.mark.anyio
async def test_ai_tool_call_only_from_lead_agent(self, journal_setup):
"""ai_tool_call should only be emitted for lead_agent, not subagents."""
j, store = journal_setup
tool_calls = [{"id": "call_1", "name": "search", "args": {}}]
j.on_llm_end(
_make_llm_response("searching", tool_calls=tool_calls),
run_id=uuid4(),
tags=["subagent:research"],
)
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 0