mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-27 12:18:14 +00:00
feat(threads): history endpoint reads messages from event store
- POST /api/threads/{thread_id}/history now combines two data sources:
checkpointer for checkpoint_id, metadata, title, thread_data;
event store for messages (complete history, not truncated by summarization)
- Strip internal LangGraph metadata keys from response
- Remove full channel_values serialization in favor of selective fields
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
35001c7c73
commit
b55a9c8d28
@ -561,13 +561,36 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
|
||||
|
||||
@router.post("/{thread_id}/history", response_model=list[HistoryEntry])
|
||||
async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request: Request) -> list[HistoryEntry]:
|
||||
"""Get checkpoint history for a thread."""
|
||||
"""Get checkpoint history for a thread.
|
||||
|
||||
Combines data from two sources:
|
||||
- **Checkpointer**: checkpoint_id, parent_checkpoint_id, metadata,
|
||||
values.title, values.thread_data
|
||||
- **RunEventStore**: values.messages (complete conversation history,
|
||||
not affected by summarization truncation)
|
||||
"""
|
||||
from app.gateway.deps import get_run_event_store
|
||||
|
||||
checkpointer = get_checkpointer(request)
|
||||
|
||||
config: dict[str, Any] = {"configurable": {"thread_id": thread_id}}
|
||||
if body.before:
|
||||
config["configurable"]["checkpoint_id"] = body.before
|
||||
|
||||
# Fetch messages from event store (full history, not truncated by summarization)
|
||||
event_store = get_run_event_store(request)
|
||||
try:
|
||||
all_messages = await event_store.list_messages(thread_id, limit=10_000)
|
||||
except Exception:
|
||||
logger.warning("Failed to load messages from event store for thread %s", thread_id, exc_info=True)
|
||||
all_messages = []
|
||||
|
||||
# Group messages by run_id for per-checkpoint assembly
|
||||
messages_by_run: dict[str, list[dict]] = {}
|
||||
for msg in all_messages:
|
||||
run_id = msg.get("run_id", "")
|
||||
messages_by_run.setdefault(run_id, []).append(msg.get("content", {}))
|
||||
|
||||
entries: list[HistoryEntry] = []
|
||||
try:
|
||||
async for checkpoint_tuple in checkpointer.alist(config, limit=body.limit):
|
||||
@ -583,16 +606,33 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
|
||||
|
||||
channel_values = checkpoint.get("channel_values", {})
|
||||
|
||||
# Build values: title + thread_data from checkpoint, messages from event store
|
||||
values: dict[str, Any] = {}
|
||||
if title := channel_values.get("title"):
|
||||
values["title"] = title
|
||||
if thread_data := channel_values.get("thread_data"):
|
||||
values["thread_data"] = thread_data
|
||||
|
||||
# Attach all messages from event store (not just this checkpoint's run)
|
||||
if all_messages:
|
||||
values["messages"] = [m.get("content", {}) for m in all_messages]
|
||||
|
||||
# Derive next tasks
|
||||
tasks_raw = getattr(checkpoint_tuple, "tasks", []) or []
|
||||
next_tasks = [t.name for t in tasks_raw if hasattr(t, "name")]
|
||||
|
||||
# Strip LangGraph internal keys from metadata
|
||||
user_meta = {k: v for k, v in metadata.items() if k not in ("created_at", "updated_at", "step", "source", "writes", "parents")}
|
||||
# Keep step for ordering context
|
||||
if "step" in metadata:
|
||||
user_meta["step"] = metadata["step"]
|
||||
|
||||
entries.append(
|
||||
HistoryEntry(
|
||||
checkpoint_id=checkpoint_id,
|
||||
parent_checkpoint_id=parent_id,
|
||||
metadata=metadata,
|
||||
values=serialize_channel_values(channel_values),
|
||||
metadata=user_meta,
|
||||
values=values,
|
||||
created_at=str(metadata.get("created_at", "")),
|
||||
next=next_tasks,
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user