From b55a9c8d28d082bf1cff1881097d790264cf5718 Mon Sep 17 00:00:00 2001 From: rayhpeng Date: Sat, 4 Apr 2026 21:23:32 +0800 Subject: [PATCH] feat(threads): history endpoint reads messages from event store - POST /api/threads/{thread_id}/history now combines two data sources: checkpointer for checkpoint_id, metadata, title, thread_data; event store for messages (complete history, not truncated by summarization) - Strip internal LangGraph metadata keys from response - Remove full channel_values serialization in favor of selective fields Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/gateway/routers/threads.py | 46 ++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/backend/app/gateway/routers/threads.py b/backend/app/gateway/routers/threads.py index cad2e377a..9c6a3c17b 100644 --- a/backend/app/gateway/routers/threads.py +++ b/backend/app/gateway/routers/threads.py @@ -561,13 +561,36 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re @router.post("/{thread_id}/history", response_model=list[HistoryEntry]) async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request: Request) -> list[HistoryEntry]: - """Get checkpoint history for a thread.""" + """Get checkpoint history for a thread. + + Combines data from two sources: + - **Checkpointer**: checkpoint_id, parent_checkpoint_id, metadata, + values.title, values.thread_data + - **RunEventStore**: values.messages (complete conversation history, + not affected by summarization truncation) + """ + from app.gateway.deps import get_run_event_store + checkpointer = get_checkpointer(request) config: dict[str, Any] = {"configurable": {"thread_id": thread_id}} if body.before: config["configurable"]["checkpoint_id"] = body.before + # Fetch messages from event store (full history, not truncated by summarization) + event_store = get_run_event_store(request) + try: + all_messages = await event_store.list_messages(thread_id, limit=10_000) + except Exception: + logger.warning("Failed to load messages from event store for thread %s", thread_id, exc_info=True) + all_messages = [] + + # Group messages by run_id for per-checkpoint assembly + messages_by_run: dict[str, list[dict]] = {} + for msg in all_messages: + run_id = msg.get("run_id", "") + messages_by_run.setdefault(run_id, []).append(msg.get("content", {})) + entries: list[HistoryEntry] = [] try: async for checkpoint_tuple in checkpointer.alist(config, limit=body.limit): @@ -583,16 +606,33 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request channel_values = checkpoint.get("channel_values", {}) + # Build values: title + thread_data from checkpoint, messages from event store + values: dict[str, Any] = {} + if title := channel_values.get("title"): + values["title"] = title + if thread_data := channel_values.get("thread_data"): + values["thread_data"] = thread_data + + # Attach all messages from event store (not just this checkpoint's run) + if all_messages: + values["messages"] = [m.get("content", {}) for m in all_messages] + # Derive next tasks tasks_raw = getattr(checkpoint_tuple, "tasks", []) or [] next_tasks = [t.name for t in tasks_raw if hasattr(t, "name")] + # Strip LangGraph internal keys from metadata + user_meta = {k: v for k, v in metadata.items() if k not in ("created_at", "updated_at", "step", "source", "writes", "parents")} + # Keep step for ordering context + if "step" in metadata: + user_meta["step"] = metadata["step"] + entries.append( HistoryEntry( checkpoint_id=checkpoint_id, parent_checkpoint_id=parent_id, - metadata=metadata, - values=serialize_channel_values(channel_values), + metadata=user_meta, + values=values, created_at=str(metadata.get("created_at", "")), next=next_tasks, )