From 0ecc2f954cf12d906a138665068e749887706331 Mon Sep 17 00:00:00 2001 From: rayhpeng Date: Mon, 6 Apr 2026 21:24:05 +0800 Subject: [PATCH] refactor(history): read messages from checkpointer instead of RunEventStore The /history endpoint now reads messages directly from the checkpointer's channel_values (the authoritative source) instead of querying RunEventStore.list_messages(). The RunEventStore API is preserved for other consumers. Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/gateway/routers/threads.py | 30 ++++++++++---------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/backend/app/gateway/routers/threads.py b/backend/app/gateway/routers/threads.py index 3017761e4..437105111 100644 --- a/backend/app/gateway/routers/threads.py +++ b/backend/app/gateway/routers/threads.py @@ -581,28 +581,18 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request: Request) -> list[HistoryEntry]: """Get checkpoint history for a thread. - Combines data from two sources: - - **Checkpointer**: checkpoint_id, parent_checkpoint_id, metadata, - values.title, values.thread_data - - **RunEventStore**: values.messages (complete conversation history, - not affected by summarization truncation) + Messages are read from the checkpointer's channel values (the + authoritative source) and serialized via + :func:`~deerflow.runtime.serialization.serialize_channel_values`. + Only the latest (first) checkpoint carries the ``messages`` key to + avoid duplicating them across every entry. """ - from app.gateway.deps import get_run_event_store - checkpointer = get_checkpointer(request) config: dict[str, Any] = {"configurable": {"thread_id": thread_id}} if body.before: config["configurable"]["checkpoint_id"] = body.before - # Fetch messages from event store (full history, not truncated by summarization) - event_store = get_run_event_store(request) - try: - all_messages = await event_store.list_messages(thread_id, limit=10_000) - except Exception: - logger.warning("Failed to load messages from event store for thread %s", sanitize_log_param(thread_id), exc_info=True) - all_messages = [] - entries: list[HistoryEntry] = [] is_latest_checkpoint = True try: @@ -619,16 +609,18 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request channel_values = checkpoint.get("channel_values", {}) - # Build values: title + thread_data from checkpoint, messages from event store + # Build values from checkpoint channel_values values: dict[str, Any] = {} if title := channel_values.get("title"): values["title"] = title if thread_data := channel_values.get("thread_data"): values["thread_data"] = thread_data - # Attach all messages only to the latest (first) checkpoint entry - if is_latest_checkpoint and all_messages: - values["messages"] = [m.get("content", {}) for m in all_messages] + # Attach messages from checkpointer only for the latest checkpoint + if is_latest_checkpoint: + messages = channel_values.get("messages") + if messages: + values["messages"] = serialize_channel_values({"messages": messages}).get("messages", []) is_latest_checkpoint = False # Derive next tasks