refactor(history): read messages from checkpointer instead of RunEventStore

The /history endpoint now reads messages directly from the
checkpointer's channel_values (the authoritative source) instead of
querying RunEventStore.list_messages(). The RunEventStore API is
preserved for other consumers.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
rayhpeng 2026-04-06 21:24:05 +08:00
parent 29547c0ee4
commit 0ecc2f954c

View File

@ -581,28 +581,18 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request: Request) -> list[HistoryEntry]:
"""Get checkpoint history for a thread.
Combines data from two sources:
- **Checkpointer**: checkpoint_id, parent_checkpoint_id, metadata,
values.title, values.thread_data
- **RunEventStore**: values.messages (complete conversation history,
not affected by summarization truncation)
Messages are read from the checkpointer's channel values (the
authoritative source) and serialized via
:func:`~deerflow.runtime.serialization.serialize_channel_values`.
Only the latest (first) checkpoint carries the ``messages`` key to
avoid duplicating them across every entry.
"""
from app.gateway.deps import get_run_event_store
checkpointer = get_checkpointer(request)
config: dict[str, Any] = {"configurable": {"thread_id": thread_id}}
if body.before:
config["configurable"]["checkpoint_id"] = body.before
# Fetch messages from event store (full history, not truncated by summarization)
event_store = get_run_event_store(request)
try:
all_messages = await event_store.list_messages(thread_id, limit=10_000)
except Exception:
logger.warning("Failed to load messages from event store for thread %s", sanitize_log_param(thread_id), exc_info=True)
all_messages = []
entries: list[HistoryEntry] = []
is_latest_checkpoint = True
try:
@ -619,16 +609,18 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
channel_values = checkpoint.get("channel_values", {})
# Build values: title + thread_data from checkpoint, messages from event store
# Build values from checkpoint channel_values
values: dict[str, Any] = {}
if title := channel_values.get("title"):
values["title"] = title
if thread_data := channel_values.get("thread_data"):
values["thread_data"] = thread_data
# Attach all messages only to the latest (first) checkpoint entry
if is_latest_checkpoint and all_messages:
values["messages"] = [m.get("content", {}) for m in all_messages]
# Attach messages from checkpointer only for the latest checkpoint
if is_latest_checkpoint:
messages = channel_values.get("messages")
if messages:
values["messages"] = serialize_channel_values({"messages": messages}).get("messages", [])
is_latest_checkpoint = False
# Derive next tasks