From 229c8095be1f3622c0091ebe61d22b56820ed45c Mon Sep 17 00:00:00 2001 From: rayhpeng Date: Sat, 11 Apr 2026 23:21:15 +0800 Subject: [PATCH] fix(threads): load history messages from event store, immune to summarize MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``get_thread_history`` and ``get_thread_state`` in Gateway mode read messages from ``checkpoint.channel_values["messages"]``. After SummarizationMiddleware runs mid-run, that list is rewritten in-place: pre-summarize messages are dropped and a synthetic summary-as-human message takes position 0. The frontend then renders a chat history that starts with ``"Here is a summary of the conversation to date:..."`` instead of the user's original query, and all earlier turns are gone. The event store (``RunEventStore``) is append-only and never rewritten, so it retains the full transcript. This commit adds a helper ``_get_event_store_messages`` that loads the event store's message stream and overrides ``values["messages"]`` in both endpoints; the checkpoint fallback kicks in only when the event store is unavailable. Behavior contract of the helper: - **Full pagination.** ``list_messages`` returns the newest ``limit`` records when no cursor is given, so a fixed limit silently drops older messages on long threads. The helper sizes the read from ``count_messages()`` and pages forward with ``after_seq`` cursors. - **Copy-on-read.** Each content dict is copied before ``id`` is patched so the live store object (``MemoryRunEventStore`` returns references) is never mutated. - **Stable ids.** Messages with ``id=None`` (human + tool_result, which don't receive an id until checkpoint persistence) get a deterministic ``uuid5(NAMESPACE_URL, f"{thread_id}:{seq}")`` so React keys stay stable across requests. AI messages keep their LLM-assigned ``lc_run--*`` ids. - **Legacy ``Command`` repr sanitization.** Rows captured before the ``journal.py`` ``on_tool_end`` fix (previous commit) stored ``str(Command(update={'messages': [ToolMessage(content='X', ...)]}))`` as the tool_result content. ``_sanitize_legacy_command_repr`` regex-extracts the inner text so old threads render cleanly. - **Inline feedback.** When loading the stream, the helper also pulls ``feedback_repo.list_by_thread_grouped`` and attaches ``run_id`` to every message plus ``feedback`` to the final ``ai_message`` of each run. This removes the frontend's need to fetch a second endpoint and positional-index-map its way back to the right run. When the feedback subsystem is unavailable, the ``feedback`` field is left absent entirely so the frontend hides the button rather than rendering it over a broken write path. - **User context.** ``DbRunEventStore`` is user-scoped by default via ``resolve_user_id(AUTO)``. The helper relies on the ``@require_permission`` decorator having populated the user contextvar on both callers; the docstring documents this dependency explicitly so nobody wires it into a CLI or migration script without passing ``user_id=None``. Real data verification against thread ``6d30913e-dcd4-41c8-8941-f66c716cf359``: checkpoint showed 12 messages (summarize-corrupted), event store had 16. The original human message ``"最新伊美局势"`` was preserved as seq=1 in the event store and correctly restored to position 0 in the helper output. Helper output for AI messages was byte-identical to checkpoint for every overlapping message; only tool_result ids differed (patched to uuid5) and the legacy Command repr at seq=48 was sanitized. Tests: - ``test_thread_state_event_store.py`` — 18 tests covering ``_sanitize_legacy_command_repr`` (passthrough, single/double-quote extraction, unparseable fallback), helper happy path (all message types, stable uuid5, store non-mutation), multi-page pagination, summarize regression (recovers pre-summarize messages), feedback attachment (per-run, multi-run threads, repo failure graceful), and dependency failure fallback to ``None``. Docs: - ``docs/superpowers/plans/2026-04-10-event-store-history.md`` — the implementation plan this commit realizes, with Task 1 revised after the evaluation findings (pagination, copy-on-read, Command wrap already landed in journal.py, frontend feedback pagination in the follow-up commit, Standard-mode follow-up noted). - ``docs/superpowers/specs/2026-04-11-runjournal-history-evaluation.md`` — the Claude + second-opinion evaluation document that drove the plan revisions (pagination bug, dict-mutation bug, feedback hidden bug, Command bug). - ``docs/superpowers/specs/2026-04-11-summarize-marker-design.md`` — design for a follow-up PR that visually marks summarize events in history, based on a verified ``adispatch_custom_event`` experiment (``trace=False`` middleware nodes can still forward the Pregel task config via explicit signature injection). Scope: Gateway mode only (``make dev-pro``). Standard mode (``make dev``) hits LangGraph Server directly and bypasses these endpoints; the summarize symptom is still present there and is tracked as a separate follow-up in the plan. Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/gateway/routers/threads.py | 190 ++++++- .../tests/test_thread_state_event_store.py | 439 ++++++++++++++++ .../plans/2026-04-10-event-store-history.md | 471 ++++++++++++++++++ ...026-04-11-runjournal-history-evaluation.md | 191 +++++++ .../2026-04-11-summarize-marker-design.md | 203 ++++++++ 5 files changed, 1488 insertions(+), 6 deletions(-) create mode 100644 backend/tests/test_thread_state_event_store.py create mode 100644 docs/superpowers/plans/2026-04-10-event-store-history.md create mode 100644 docs/superpowers/specs/2026-04-11-runjournal-history-evaluation.md create mode 100644 docs/superpowers/specs/2026-04-11-summarize-marker-design.md diff --git a/backend/app/gateway/routers/threads.py b/backend/app/gateway/routers/threads.py index e99713630..5eb4a30b5 100644 --- a/backend/app/gateway/routers/threads.py +++ b/backend/app/gateway/routers/threads.py @@ -13,6 +13,7 @@ matching the LangGraph Platform wire format expected by the from __future__ import annotations import logging +import re import time import uuid from typing import Any @@ -21,7 +22,7 @@ from fastapi import APIRouter, HTTPException, Request from pydantic import BaseModel, Field, field_validator from app.gateway.authz import require_permission -from app.gateway.deps import get_checkpointer +from app.gateway.deps import get_checkpointer, get_current_user, get_feedback_repo, get_run_event_store from app.gateway.utils import sanitize_log_param from deerflow.config.paths import Paths, get_paths from deerflow.runtime import serialize_channel_values @@ -402,6 +403,165 @@ async def get_thread(thread_id: str, request: Request) -> ThreadResponse: ) +# --------------------------------------------------------------------------- +# Event-store-backed message loader +# --------------------------------------------------------------------------- + +_LEGACY_CMD_INNER_CONTENT_RE = re.compile( + r"ToolMessage\(content=(?P['\"])(?P.*?)(?P=q)", + re.DOTALL, +) + + +def _sanitize_legacy_command_repr(content_field: Any) -> Any: + """Recover the inner ToolMessage text from a legacy ``str(Command(...))`` repr. + + Runs captured before the ``on_tool_end`` fix in ``journal.py`` stored + ``str(Command(update={'messages':[ToolMessage(content='X', ...)]}))`` as the + tool_result content. New runs store ``'X'`` directly. For legacy rows, try + to extract ``'X'`` defensively; return the original string if extraction + fails (still no worse than the checkpoint fallback for summarized threads). + """ + if not isinstance(content_field, str) or not content_field.startswith("Command(update="): + return content_field + match = _LEGACY_CMD_INNER_CONTENT_RE.search(content_field) + return match.group("inner") if match else content_field + + +async def _get_event_store_messages(request: Request, thread_id: str) -> list[dict] | None: + """Load the full message stream for ``thread_id`` from the event store. + + The event store is append-only and unaffected by summarization — the + checkpoint's ``channel_values["messages"]`` is rewritten in-place when the + SummarizationMiddleware runs, which drops all pre-summarize messages. The + event store retains the full transcript, so callers in Gateway mode should + prefer it for rendering the conversation history. + + In addition to the core message content, this helper attaches two extra + fields to every returned dict: + + - ``run_id``: the ``run_id`` of the event that produced this message. + Always present. + - ``feedback``: thumbs-up/down data. Present only on the **final + ``ai_message`` of each run** (matching the per-run feedback semantics + of ``POST /api/threads/{id}/runs/{run_id}/feedback``). The frontend uses + the presence of this field to decide whether to render the feedback + button, which sidesteps the positional-index mapping bug that an + out-of-band ``/messages`` fetch exhibited. + + Behaviour contract: + + - **Full pagination.** ``RunEventStore.list_messages`` returns the newest + ``limit`` records when no cursor is given, so a fixed limit silently + drops older messages on long threads. We size the read from + ``count_messages()`` and then page forward with ``after_seq`` cursors. + - **Copy-on-read.** Each content dict is copied before ``id`` is patched + so the live store object is never mutated; ``MemoryRunEventStore`` + returns live references. + - **Stable ids.** Messages with ``id=None`` (human + tool_result) receive + a deterministic ``uuid5(NAMESPACE_URL, f"{thread_id}:{seq}")`` so React + keys are stable across requests without altering stored data. AI messages + retain their LLM-assigned ``lc_run--*`` ids. + - **Legacy Command repr.** Rows captured before the ``journal.py`` + ``on_tool_end`` fix stored ``str(Command(update={...}))`` as the tool + result content. ``_sanitize_legacy_command_repr`` extracts the inner + ToolMessage text. + - **User context.** ``DbRunEventStore`` is user-scoped by default via + ``resolve_user_id(AUTO)`` in ``runtime/user_context.py``. This helper + must run inside a request where ``@require_permission`` has populated + the user contextvar. Both callers below are decorated appropriately. + Do not call this helper from CLI or migration scripts without passing + ``user_id=None`` explicitly to the underlying store methods. + + Returns ``None`` when the event store is not configured or has no message + events for this thread, so callers fall back to checkpoint messages. + """ + try: + event_store = get_run_event_store(request) + except Exception: + return None + + try: + total = await event_store.count_messages(thread_id) + except Exception: + logger.exception("count_messages failed for thread %s", sanitize_log_param(thread_id)) + return None + if not total: + return None + + # Batch by page_size to keep memory bounded for very long threads. + page_size = 500 + collected: list[dict] = [] + after_seq: int | None = None + while True: + try: + page = await event_store.list_messages(thread_id, limit=page_size, after_seq=after_seq) + except Exception: + logger.exception("list_messages failed for thread %s", sanitize_log_param(thread_id)) + return None + if not page: + break + collected.extend(page) + if len(page) < page_size: + break + next_cursor = page[-1].get("seq") + if next_cursor is None or (after_seq is not None and next_cursor <= after_seq): + break + after_seq = next_cursor + + # Build the message list; track the final ``ai_message`` index per run so + # feedback can be attached at the right position (matches thread_runs.py). + messages: list[dict] = [] + last_ai_per_run: dict[str, int] = {} + for evt in collected: + raw = evt.get("content") + if not isinstance(raw, dict) or "type" not in raw: + continue + content = dict(raw) + if content.get("id") is None: + content["id"] = str(uuid.uuid5(uuid.NAMESPACE_URL, f"{thread_id}:{evt['seq']}")) + if content.get("type") == "tool": + content["content"] = _sanitize_legacy_command_repr(content.get("content")) + run_id = evt.get("run_id") + if run_id: + content["run_id"] = run_id + if evt.get("event_type") == "ai_message" and run_id: + last_ai_per_run[run_id] = len(messages) + messages.append(content) + + if not messages: + return None + + # Attach feedback to the final ai_message of each run. If the feedback + # subsystem is unavailable, leave the ``feedback`` field absent entirely + # so the frontend hides the button rather than showing it over a broken + # write path. + feedback_available = False + feedback_map: dict[str, dict] = {} + try: + feedback_repo = get_feedback_repo(request) + user_id = await get_current_user(request) + feedback_map = await feedback_repo.list_by_thread_grouped(thread_id, user_id=user_id) + feedback_available = True + except Exception: + logger.exception("feedback lookup failed for thread %s", sanitize_log_param(thread_id)) + + if feedback_available: + for run_id, idx in last_ai_per_run.items(): + fb = feedback_map.get(run_id) + messages[idx]["feedback"] = ( + { + "feedback_id": fb["feedback_id"], + "rating": fb["rating"], + "comment": fb.get("comment"), + } + if fb + else None + ) + + return messages + + @router.get("/{thread_id}/state", response_model=ThreadStateResponse) @require_permission("threads", "read", owner_check=True) async def get_thread_state(thread_id: str, request: Request) -> ThreadStateResponse: @@ -440,8 +600,15 @@ async def get_thread_state(thread_id: str, request: Request) -> ThreadStateRespo next_tasks = [t.name for t in tasks_raw if hasattr(t, "name")] tasks = [{"id": getattr(t, "id", ""), "name": getattr(t, "name", "")} for t in tasks_raw] + values = serialize_channel_values(channel_values) + + # Prefer event-store messages: append-only, immune to summarization. + es_messages = await _get_event_store_messages(request, thread_id) + if es_messages is not None: + values["messages"] = es_messages + return ThreadStateResponse( - values=serialize_channel_values(channel_values), + values=values, next=next_tasks, metadata=metadata, checkpoint={"id": checkpoint_id, "ts": str(metadata.get("created_at", ""))}, @@ -559,6 +726,11 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request if body.before: config["configurable"]["checkpoint_id"] = body.before + # Load the full event-store message stream once; attach to the latest + # checkpoint entry only (matching the prior semantics). The event store + # is append-only and immune to summarization. + es_messages = await _get_event_store_messages(request, thread_id) + entries: list[HistoryEntry] = [] is_latest_checkpoint = True try: @@ -582,11 +754,17 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request if thread_data := channel_values.get("thread_data"): values["thread_data"] = thread_data - # Attach messages from checkpointer only for the latest checkpoint + # Attach messages only to the latest checkpoint. Prefer the + # event-store stream (complete and unaffected by summarization); + # fall back to checkpoint channel_values when the event store is + # unavailable or empty. if is_latest_checkpoint: - messages = channel_values.get("messages") - if messages: - values["messages"] = serialize_channel_values({"messages": messages}).get("messages", []) + if es_messages is not None: + values["messages"] = es_messages + else: + messages = channel_values.get("messages") + if messages: + values["messages"] = serialize_channel_values({"messages": messages}).get("messages", []) is_latest_checkpoint = False # Derive next tasks diff --git a/backend/tests/test_thread_state_event_store.py b/backend/tests/test_thread_state_event_store.py new file mode 100644 index 000000000..0d3b19761 --- /dev/null +++ b/backend/tests/test_thread_state_event_store.py @@ -0,0 +1,439 @@ +"""Tests for event-store-backed message loading in thread state/history endpoints. + +Covers the helper functions added to ``app/gateway/routers/threads.py``: + +- ``_sanitize_legacy_command_repr`` — extracts inner ToolMessage text from + legacy ``str(Command(...))`` strings captured before the ``journal.py`` + fix for state-updating tools like ``present_files``. +- ``_get_event_store_messages`` — loads the full message stream with full + pagination, copy-on-read id patching, legacy Command sanitization, and + a clean fallback to ``None`` when the event store is unavailable. +""" + +from __future__ import annotations + +import uuid +from types import SimpleNamespace +from typing import Any + +import pytest + +from app.gateway.routers.threads import ( + _get_event_store_messages, + _sanitize_legacy_command_repr, +) +from deerflow.runtime.events.store.memory import MemoryRunEventStore + + +@pytest.fixture() +def event_store() -> MemoryRunEventStore: + return MemoryRunEventStore() + + +class _FakeFeedbackRepo: + """Minimal ``FeedbackRepository`` stand-in that returns a configured map.""" + + def __init__(self, by_run: dict[str, dict] | None = None) -> None: + self._by_run = by_run or {} + + async def list_by_thread_grouped(self, thread_id: str, *, user_id: str | None) -> dict[str, dict]: + return dict(self._by_run) + + +def _make_request( + event_store: MemoryRunEventStore, + feedback_repo: _FakeFeedbackRepo | None = None, +) -> Any: + """Build a minimal FastAPI-like Request object. + + ``get_run_event_store(request)`` reads ``request.app.state.run_event_store``. + ``get_feedback_repo(request)`` reads ``request.app.state.feedback_repo``. + ``get_current_user`` is monkey-patched separately in tests that need it. + """ + state = SimpleNamespace( + run_event_store=event_store, + feedback_repo=feedback_repo or _FakeFeedbackRepo(), + ) + app = SimpleNamespace(state=state) + return SimpleNamespace(app=app) + + +@pytest.fixture(autouse=True) +def _stub_current_user(monkeypatch): + """Stub out ``get_current_user`` so tests don't need real auth context.""" + import app.gateway.routers.threads as threads_mod + + async def _fake(_request): + return None + + monkeypatch.setattr(threads_mod, "get_current_user", _fake) + + +async def _seed_simple_run(store: MemoryRunEventStore, thread_id: str, run_id: str) -> None: + """Seed one run: human + ai_tool_call + tool_result + final ai_message, plus a trace.""" + await store.put( + thread_id=thread_id, run_id=run_id, + event_type="human_message", category="message", + content={ + "type": "human", "id": None, + "content": [{"type": "text", "text": "hello"}], + "additional_kwargs": {}, "response_metadata": {}, "name": None, + }, + ) + await store.put( + thread_id=thread_id, run_id=run_id, + event_type="ai_tool_call", category="message", + content={ + "type": "ai", "id": "lc_run--tc1", + "content": "", + "tool_calls": [{"name": "search", "args": {"q": "x"}, "id": "call_1", "type": "tool_call"}], + "invalid_tool_calls": [], + "additional_kwargs": {}, "response_metadata": {}, "name": None, + "usage_metadata": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, + }, + ) + await store.put( + thread_id=thread_id, run_id=run_id, + event_type="tool_result", category="message", + content={ + "type": "tool", "id": None, + "content": "results", + "tool_call_id": "call_1", "name": "search", + "artifact": None, "status": "success", + "additional_kwargs": {}, "response_metadata": {}, + }, + ) + await store.put( + thread_id=thread_id, run_id=run_id, + event_type="ai_message", category="message", + content={ + "type": "ai", "id": "lc_run--final1", + "content": "done", + "tool_calls": [], "invalid_tool_calls": [], + "additional_kwargs": {}, "response_metadata": {"finish_reason": "stop"}, "name": None, + "usage_metadata": {"input_tokens": 20, "output_tokens": 10, "total_tokens": 30}, + }, + ) + # Non-message trace — must be filtered out. + await store.put( + thread_id=thread_id, run_id=run_id, + event_type="llm_request", category="trace", + content={"model": "test"}, + ) + + +class TestSanitizeLegacyCommandRepr: + def test_passthrough_non_string(self): + assert _sanitize_legacy_command_repr(None) is None + assert _sanitize_legacy_command_repr(42) == 42 + assert _sanitize_legacy_command_repr([{"type": "text", "text": "x"}]) == [{"type": "text", "text": "x"}] + + def test_passthrough_plain_string(self): + assert _sanitize_legacy_command_repr("Successfully presented files") == "Successfully presented files" + assert _sanitize_legacy_command_repr("") == "" + + def test_extracts_inner_content_single_quotes(self): + legacy = ( + "Command(update={'artifacts': ['/mnt/user-data/outputs/report.md'], " + "'messages': [ToolMessage(content='Successfully presented files', " + "tool_call_id='call_abc')]})" + ) + assert _sanitize_legacy_command_repr(legacy) == "Successfully presented files" + + def test_extracts_inner_content_double_quotes(self): + legacy = 'Command(update={"messages": [ToolMessage(content="ok", tool_call_id="x")]})' + assert _sanitize_legacy_command_repr(legacy) == "ok" + + def test_unparseable_command_returns_original(self): + legacy = "Command(update={'something_else': 1})" + assert _sanitize_legacy_command_repr(legacy) == legacy + + +class TestGetEventStoreMessages: + @pytest.mark.anyio + async def test_returns_none_when_store_empty(self, event_store): + request = _make_request(event_store) + assert await _get_event_store_messages(request, "t_missing") is None + + @pytest.mark.anyio + async def test_extracts_all_message_types_in_order(self, event_store): + await _seed_simple_run(event_store, "t1", "r1") + request = _make_request(event_store) + messages = await _get_event_store_messages(request, "t1") + assert messages is not None + types = [m["type"] for m in messages] + assert types == ["human", "ai", "tool", "ai"] + # Trace events must not appear + for m in messages: + assert m.get("type") in {"human", "ai", "tool"} + + @pytest.mark.anyio + async def test_null_ids_get_deterministic_uuid5(self, event_store): + await _seed_simple_run(event_store, "t1", "r1") + request = _make_request(event_store) + messages = await _get_event_store_messages(request, "t1") + assert messages is not None + + # AI messages keep their LLM ids + assert messages[1]["id"] == "lc_run--tc1" + assert messages[3]["id"] == "lc_run--final1" + + # Human (seq=1) + tool (seq=3) get deterministic uuid5 + expected_human_id = str(uuid.uuid5(uuid.NAMESPACE_URL, "t1:1")) + expected_tool_id = str(uuid.uuid5(uuid.NAMESPACE_URL, "t1:3")) + assert messages[0]["id"] == expected_human_id + assert messages[2]["id"] == expected_tool_id + + # Re-running produces the same ids (stability across requests) + messages2 = await _get_event_store_messages(request, "t1") + assert [m["id"] for m in messages2] == [m["id"] for m in messages] + + @pytest.mark.anyio + async def test_helper_does_not_mutate_store(self, event_store): + """Helper must copy content dicts; the live store must stay unchanged.""" + await _seed_simple_run(event_store, "t1", "r1") + request = _make_request(event_store) + _ = await _get_event_store_messages(request, "t1") + + # Raw store records still have id=None for human/tool + raw = await event_store.list_messages("t1", limit=500) + human = next(e for e in raw if e["content"]["type"] == "human") + tool = next(e for e in raw if e["content"]["type"] == "tool") + assert human["content"]["id"] is None + assert tool["content"]["id"] is None + + @pytest.mark.anyio + async def test_legacy_command_repr_sanitized(self, event_store): + """A tool_result whose content is a legacy ``str(Command(...))`` is cleaned.""" + legacy = ( + "Command(update={'artifacts': ['/mnt/user-data/outputs/x.md'], " + "'messages': [ToolMessage(content='Successfully presented files', " + "tool_call_id='call_p')]})" + ) + await event_store.put( + thread_id="t2", run_id="r1", + event_type="tool_result", category="message", + content={ + "type": "tool", "id": None, + "content": legacy, + "tool_call_id": "call_p", "name": "present_files", + "artifact": None, "status": "success", + "additional_kwargs": {}, "response_metadata": {}, + }, + ) + request = _make_request(event_store) + messages = await _get_event_store_messages(request, "t2") + assert messages is not None and len(messages) == 1 + assert messages[0]["content"] == "Successfully presented files" + + @pytest.mark.anyio + async def test_pagination_covers_more_than_one_page(self, event_store, monkeypatch): + """Simulate a long thread that exceeds a single page to exercise the loop.""" + thread_id = "t_long" + # Seed 12 human messages + for i in range(12): + await event_store.put( + thread_id=thread_id, run_id="r1", + event_type="human_message", category="message", + content={ + "type": "human", "id": None, + "content": [{"type": "text", "text": f"msg {i}"}], + "additional_kwargs": {}, "response_metadata": {}, "name": None, + }, + ) + + # Force small page size to exercise pagination + import app.gateway.routers.threads as threads_mod + original = threads_mod._get_event_store_messages + + # Monkeypatch MemoryRunEventStore.list_messages to assert it's called with cursor pagination + calls: list[dict] = [] + real_list = event_store.list_messages + + async def spy_list_messages(tid, *, limit=50, before_seq=None, after_seq=None): + calls.append({"limit": limit, "after_seq": after_seq}) + return await real_list(tid, limit=limit, before_seq=before_seq, after_seq=after_seq) + + monkeypatch.setattr(event_store, "list_messages", spy_list_messages) + + request = _make_request(event_store) + messages = await original(request, thread_id) + assert messages is not None + assert len(messages) == 12 + assert [m["content"][0]["text"] for m in messages] == [f"msg {i}" for i in range(12)] + # At least one call was made with after_seq=None (the initial page) + assert any(c["after_seq"] is None for c in calls) + + @pytest.mark.anyio + async def test_summarize_regression_recovers_pre_summarize_messages(self, event_store): + """The exact bug: checkpoint would have only post-summarize messages; + event store must surface the original pre-summarize human query.""" + # Run 1 (pre-summarize) + await event_store.put( + thread_id="t_sum", run_id="r1", + event_type="human_message", category="message", + content={ + "type": "human", "id": None, + "content": [{"type": "text", "text": "original question"}], + "additional_kwargs": {}, "response_metadata": {}, "name": None, + }, + ) + await event_store.put( + thread_id="t_sum", run_id="r1", + event_type="ai_message", category="message", + content={ + "type": "ai", "id": "lc_run--r1", + "content": "first answer", + "tool_calls": [], "invalid_tool_calls": [], + "additional_kwargs": {}, "response_metadata": {}, "name": None, + "usage_metadata": {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}, + }, + ) + # Run 2 (post-summarize — what the checkpoint still has) + await event_store.put( + thread_id="t_sum", run_id="r2", + event_type="human_message", category="message", + content={ + "type": "human", "id": None, + "content": [{"type": "text", "text": "follow up"}], + "additional_kwargs": {}, "response_metadata": {}, "name": None, + }, + ) + await event_store.put( + thread_id="t_sum", run_id="r2", + event_type="ai_message", category="message", + content={ + "type": "ai", "id": "lc_run--r2", + "content": "second answer", + "tool_calls": [], "invalid_tool_calls": [], + "additional_kwargs": {}, "response_metadata": {}, "name": None, + "usage_metadata": {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}, + }, + ) + + request = _make_request(event_store) + messages = await _get_event_store_messages(request, "t_sum") + assert messages is not None + # 4 messages, not 2 (which is what the summarized checkpoint would yield) + assert len(messages) == 4 + assert messages[0]["content"][0]["text"] == "original question" + assert messages[1]["id"] == "lc_run--r1" + assert messages[3]["id"] == "lc_run--r2" + + @pytest.mark.anyio + async def test_run_id_attached_to_every_message(self, event_store): + await _seed_simple_run(event_store, "t1", "r1") + request = _make_request(event_store) + messages = await _get_event_store_messages(request, "t1") + assert messages is not None + assert all(m.get("run_id") == "r1" for m in messages) + + @pytest.mark.anyio + async def test_feedback_attached_only_to_final_ai_message_per_run(self, event_store): + await _seed_simple_run(event_store, "t1", "r1") + feedback_repo = _FakeFeedbackRepo( + {"r1": {"feedback_id": "fb1", "rating": 1, "comment": "great"}} + ) + request = _make_request(event_store, feedback_repo=feedback_repo) + messages = await _get_event_store_messages(request, "t1") + assert messages is not None + + # human (0), ai_tool_call (1), tool (2), ai_message (3) + final_ai = messages[3] + assert final_ai["feedback"] == { + "feedback_id": "fb1", + "rating": 1, + "comment": "great", + } + # Non-final messages must NOT have a feedback key at all — the + # frontend keys button visibility off of this. + assert "feedback" not in messages[0] + assert "feedback" not in messages[1] + assert "feedback" not in messages[2] + + @pytest.mark.anyio + async def test_feedback_none_when_no_row_for_run(self, event_store): + await _seed_simple_run(event_store, "t1", "r1") + request = _make_request(event_store, feedback_repo=_FakeFeedbackRepo({})) + messages = await _get_event_store_messages(request, "t1") + assert messages is not None + # Final ai_message gets an explicit ``None`` — distinguishes "eligible + # but unrated" from "not eligible" (field absent). + assert messages[3]["feedback"] is None + + @pytest.mark.anyio + async def test_feedback_per_run_for_multi_run_thread(self, event_store): + """A thread with two runs: each final ai_message should get its own feedback.""" + # Run 1 + await event_store.put( + thread_id="t_multi", run_id="r1", + event_type="human_message", category="message", + content={"type": "human", "id": None, "content": "q1", + "additional_kwargs": {}, "response_metadata": {}, "name": None}, + ) + await event_store.put( + thread_id="t_multi", run_id="r1", + event_type="ai_message", category="message", + content={"type": "ai", "id": "lc_run--a1", "content": "a1", + "tool_calls": [], "invalid_tool_calls": [], + "additional_kwargs": {}, "response_metadata": {}, "name": None, + "usage_metadata": None}, + ) + # Run 2 + await event_store.put( + thread_id="t_multi", run_id="r2", + event_type="human_message", category="message", + content={"type": "human", "id": None, "content": "q2", + "additional_kwargs": {}, "response_metadata": {}, "name": None}, + ) + await event_store.put( + thread_id="t_multi", run_id="r2", + event_type="ai_message", category="message", + content={"type": "ai", "id": "lc_run--a2", "content": "a2", + "tool_calls": [], "invalid_tool_calls": [], + "additional_kwargs": {}, "response_metadata": {}, "name": None, + "usage_metadata": None}, + ) + feedback_repo = _FakeFeedbackRepo({ + "r1": {"feedback_id": "fb_r1", "rating": 1, "comment": None}, + "r2": {"feedback_id": "fb_r2", "rating": -1, "comment": "meh"}, + }) + request = _make_request(event_store, feedback_repo=feedback_repo) + messages = await _get_event_store_messages(request, "t_multi") + assert messages is not None + # human[r1], ai[r1], human[r2], ai[r2] + assert messages[1]["feedback"]["feedback_id"] == "fb_r1" + assert messages[1]["feedback"]["rating"] == 1 + assert messages[3]["feedback"]["feedback_id"] == "fb_r2" + assert messages[3]["feedback"]["rating"] == -1 + # Humans don't get feedback + assert "feedback" not in messages[0] + assert "feedback" not in messages[2] + + @pytest.mark.anyio + async def test_feedback_repo_failure_does_not_break_helper(self, monkeypatch, event_store): + """If feedback lookup throws, messages still come back without feedback.""" + await _seed_simple_run(event_store, "t1", "r1") + + class _BoomRepo: + async def list_by_thread_grouped(self, *a, **kw): + raise RuntimeError("db down") + + request = _make_request(event_store, feedback_repo=_BoomRepo()) + messages = await _get_event_store_messages(request, "t1") + assert messages is not None + assert len(messages) == 4 + for m in messages: + assert "feedback" not in m + + @pytest.mark.anyio + async def test_returns_none_when_dep_raises(self, monkeypatch, event_store): + """When ``get_run_event_store`` is not configured, helper returns None.""" + import app.gateway.routers.threads as threads_mod + + def boom(_request): + raise RuntimeError("no store") + + monkeypatch.setattr(threads_mod, "get_run_event_store", boom) + request = _make_request(event_store) + assert await threads_mod._get_event_store_messages(request, "t1") is None diff --git a/docs/superpowers/plans/2026-04-10-event-store-history.md b/docs/superpowers/plans/2026-04-10-event-store-history.md new file mode 100644 index 000000000..0e3eb1c35 --- /dev/null +++ b/docs/superpowers/plans/2026-04-10-event-store-history.md @@ -0,0 +1,471 @@ +# Event Store History — Backend Compatibility Layer + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace checkpoint state with the append-only event store as the message source in the thread state/history endpoints, so summarization never causes message loss. + +**Architecture:** The Gateway's `get_thread_state` and `get_thread_history` endpoints currently read messages from `checkpoint.channel_values["messages"]`. After summarization, those messages are replaced with a synthetic summary-as-human message and all pre-summarize messages are gone. We modify these endpoints to read messages from the RunEventStore instead (append-only, unaffected by summarization). The response shape for each message stays identical so the chat render path needs no changes, but the frontend's feedback hook must be aligned to use the same full-history view (see Task 4). + +**Tech Stack:** Python (FastAPI, SQLAlchemy), pytest, TypeScript (React Query) + +**Scope:** Gateway mode only (`make dev-pro`). Standard mode uses the LangGraph Server directly and does not go through these endpoints; the summarize bug is still present there and must be tracked as a separate follow-up (see §"Follow-ups" at end of plan). + +**Prerequisite already landed:** `backend/packages/harness/deerflow/runtime/journal.py` now unwraps `Command(update={'messages':[ToolMessage(...)]})` in `on_tool_end`, so new runs that use state-updating tools (e.g. `present_files`) write the inner `ToolMessage` content to the event store instead of `str(Command(...))`. Legacy data captured before this fix is cleaned up defensively by the new helper (see Task 1 Step 3 `_sanitize_legacy_command_repr`). + +--- + +## Real Data Alignment Analysis + +Compared real `POST /history` response (checkpoint-based) with `run_events` table for thread `6d30913e-dcd4-41c8-8941-f66c716cf359` (docs/resp.json + backend/.deer-flow/data/deerflow.db). See `docs/superpowers/specs/2026-04-11-runjournal-history-evaluation.md` for full evidence chain. + +| Message type | Fields compared | Difference | +|-------------|----------------|------------| +| human_message | all fields | `id` is `None` in event store, has UUID in checkpoint | +| ai_message (tool_call) | all fields, 6 overlapping | **IDENTICAL** (0 diffs) | +| ai_message (final) | all fields | **IDENTICAL** | +| tool_result (normal) | all fields | Only `id` differs (`None` vs UUID) | +| tool_result (from `Command`-returning tool) | content | **Legacy data stored `str(Command(...))` repr instead of inner ToolMessage** — fixed in journal.py for new runs; legacy rows sanitized by helper | + +**Root cause for id difference:** LangGraph's checkpoint assigns `id` to HumanMessage and ToolMessage during graph execution. Event store writes happen earlier, when those ids are still None. AI messages receive `id` from the LLM response (`lc_run--*`) and are unaffected. + +**Fix for id:** Generate deterministic UUIDs for `id=None` messages using `uuid5(NAMESPACE_URL, f"{thread_id}:{seq}")` at read time. Patch a **copy** of the content dict, never the live store object. + +**Summarize impact quantified on the reproducer thread**: event_store has 16 messages (7 AI + 9 others); checkpoint has 12 after summarize (5 AI + 7 others). AI id overlap: 5 of 7 — the 2 missing AI messages are pre-summarize. + +--- + +## File Structure + +| File | Action | Responsibility | +|------|--------|----------------| +| `backend/app/gateway/routers/threads.py` | Modify | Replace checkpoint messages with event store messages in `get_thread_state` and `get_thread_history` | +| `backend/tests/test_thread_state_event_store.py` | Create | Tests for the modified endpoints | + +--- + +### Task 1: Add `_get_event_store_messages` helper to `threads.py` + +A shared helper that loads the **full** message stream from the event store, patches `id=None` messages with deterministic UUIDs, and defensively sanitizes legacy `Command(update=...)` reprs captured before the journal.py fix. Patches a copy of each content dict so the live store is never mutated. + +**Design constraints (derived from evaluation §3, §4, §5):** +- **Full pagination**, not `limit=1000`. `RunEventStore.list_messages` returns "latest N records" — a fixed limit silently truncates older messages. Use `count_messages()` to size the request or loop with `after_seq` cursors. +- **Copy before mutate**. `MemoryRunEventStore` returns live dict references; the JSONL/DB stores may return detached rows but we must not rely on that. Always `content = dict(evt["content"])` before patching `id`. +- **Legacy Command sanitization.** Legacy data contains `content["content"] == "Command(update={'artifacts': [...], 'messages': [ToolMessage(content='X', ...)]})"`. Regex-extract the inner ToolMessage content string and replace; if extraction fails, leave content as-is (still strictly better than nothing because checkpoint fallback is also wrong for summarized threads). +- **User context.** `DbRunEventStore.list_messages` is user-scoped via `resolve_user_id(AUTO)` and relies on the auth contextvar set by `@require_permission`. Both endpoints are already decorated — document this dependency in the helper docstring. + +**Files:** +- Modify: `backend/app/gateway/routers/threads.py` +- Test: `backend/tests/test_thread_state_event_store.py` + +- [ ] **Step 1: Write the test** + +Create `backend/tests/test_thread_state_event_store.py`: + +```python +"""Tests for event-store-backed message loading in thread state/history endpoints.""" + +from __future__ import annotations + +import uuid + +import pytest + +from deerflow.runtime.events.store.memory import MemoryRunEventStore + + +@pytest.fixture() +def event_store(): + return MemoryRunEventStore() + + +async def _seed_conversation(event_store: MemoryRunEventStore, thread_id: str = "t1"): + """Seed a realistic multi-turn conversation matching real checkpoint format.""" + # human_message: id is None (same as real data) + await event_store.put( + thread_id=thread_id, run_id="r1", + event_type="human_message", category="message", + content={ + "type": "human", "id": None, + "content": [{"type": "text", "text": "Hello"}], + "additional_kwargs": {}, "response_metadata": {}, "name": None, + }, + ) + # ai_tool_call: id is set by LLM + await event_store.put( + thread_id=thread_id, run_id="r1", + event_type="ai_tool_call", category="message", + content={ + "type": "ai", "id": "lc_run--abc123", + "content": "", + "tool_calls": [{"name": "search", "args": {"q": "cats"}, "id": "call_1", "type": "tool_call"}], + "invalid_tool_calls": [], + "additional_kwargs": {}, "response_metadata": {}, "name": None, + "usage_metadata": {"input_tokens": 100, "output_tokens": 50, "total_tokens": 150}, + }, + ) + # tool_result: id is None (same as real data) + await event_store.put( + thread_id=thread_id, run_id="r1", + event_type="tool_result", category="message", + content={ + "type": "tool", "id": None, + "content": "Found 10 results", + "tool_call_id": "call_1", "name": "search", + "artifact": None, "status": "success", + "additional_kwargs": {}, "response_metadata": {}, + }, + ) + # ai_message: id is set by LLM + await event_store.put( + thread_id=thread_id, run_id="r1", + event_type="ai_message", category="message", + content={ + "type": "ai", "id": "lc_run--def456", + "content": "I found 10 results about cats.", + "tool_calls": [], "invalid_tool_calls": [], + "additional_kwargs": {}, "response_metadata": {"finish_reason": "stop"}, "name": None, + "usage_metadata": {"input_tokens": 200, "output_tokens": 100, "total_tokens": 300}, + }, + ) + # Also add a trace event — should NOT appear + await event_store.put( + thread_id=thread_id, run_id="r1", + event_type="llm_request", category="trace", + content={"model": "gpt-4"}, + ) + + +class TestGetEventStoreMessages: + """Verify event store message extraction with id patching.""" + + @pytest.mark.asyncio + async def test_extracts_all_message_types(self, event_store): + await _seed_conversation(event_store) + events = await event_store.list_messages("t1", limit=500) + messages = [evt["content"] for evt in events if isinstance(evt.get("content"), dict) and "type" in evt["content"]] + assert len(messages) == 4 + assert [m["type"] for m in messages] == ["human", "ai", "tool", "ai"] + + @pytest.mark.asyncio + async def test_null_ids_get_patched(self, event_store): + """Messages with id=None should get deterministic UUIDs.""" + await _seed_conversation(event_store) + events = await event_store.list_messages("t1", limit=500) + messages = [] + for evt in events: + content = evt.get("content") + if isinstance(content, dict) and "type" in content: + if content.get("id") is None: + content["id"] = str(uuid.uuid5(uuid.NAMESPACE_URL, f"t1:{evt['seq']}")) + messages.append(content) + + # All messages now have an id + for m in messages: + assert m["id"] is not None + assert isinstance(m["id"], str) + assert len(m["id"]) > 0 + + # AI messages keep their original id + assert messages[1]["id"] == "lc_run--abc123" + assert messages[3]["id"] == "lc_run--def456" + + # Human and tool messages get deterministic ids (same input = same output) + human_id_1 = str(uuid.uuid5(uuid.NAMESPACE_URL, "t1:1")) + assert messages[0]["id"] == human_id_1 + + @pytest.mark.asyncio + async def test_empty_thread(self, event_store): + events = await event_store.list_messages("nonexistent", limit=500) + messages = [evt["content"] for evt in events if isinstance(evt.get("content"), dict)] + assert messages == [] + + @pytest.mark.asyncio + async def test_tool_call_fields_preserved(self, event_store): + await _seed_conversation(event_store) + events = await event_store.list_messages("t1", limit=500) + messages = [evt["content"] for evt in events if isinstance(evt.get("content"), dict) and "type" in evt["content"]] + + # AI tool_call message + ai_tc = messages[1] + assert ai_tc["tool_calls"][0]["name"] == "search" + assert ai_tc["tool_calls"][0]["id"] == "call_1" + + # Tool result + tool = messages[2] + assert tool["tool_call_id"] == "call_1" + assert tool["status"] == "success" +``` + +- [ ] **Step 2: Run tests to verify they pass** + +Run: `cd backend && PYTHONPATH=. uv run pytest tests/test_thread_state_event_store.py -v` + +- [ ] **Step 3: Add the helper function and modify `get_thread_history`** + +In `backend/app/gateway/routers/threads.py`: + +1. Add import at the top: +```python +import uuid # ADD (may already exist, check first) +from app.gateway.deps import get_run_event_store # ADD +``` + +2. Add the helper function (before the endpoint functions, after the model definitions): + +```python +_LEGACY_CMD_INNER_CONTENT_RE = re.compile( + r"ToolMessage\(content=(?P['\"])(?P.*?)(?P=q)", + re.DOTALL, +) + + +def _sanitize_legacy_command_repr(content_field: Any) -> Any: + """Recover the inner ToolMessage text from a legacy ``str(Command(...))`` repr. + + Runs that pre-date the ``on_tool_end`` fix in ``journal.py`` stored + ``str(Command(update={'messages':[ToolMessage(content='X', ...)]}))`` as the + tool_result content. New runs store ``'X'`` directly. For old threads, try + to extract ``'X'`` defensively; return the original string if extraction + fails (still no worse than the current checkpoint-based fallback, which is + broken for summarized threads anyway). + """ + if not isinstance(content_field, str) or not content_field.startswith("Command(update="): + return content_field + match = _LEGACY_CMD_INNER_CONTENT_RE.search(content_field) + return match.group("inner") if match else content_field + + +async def _get_event_store_messages(request: Request, thread_id: str) -> list[dict] | None: + """Load messages from the event store, returning None if unavailable. + + The event store is append-only and immune to summarization. Each + message event's ``content`` field contains a ``model_dump()``'d + LangChain Message dict that is already JSON-serialisable. + + **Full pagination, not a fixed limit.** ``RunEventStore.list_messages`` + returns the newest ``limit`` records when no cursor is given, which + silently drops older messages. We call ``count_messages()`` first and + request that many records. For stores that may return fewer (e.g. filtered + by user), we also fall back to ``after_seq``-cursor pagination. + + **Copy-on-read.** Each content dict is copied before ``id`` is patched so + the live store object is never mutated; ``MemoryRunEventStore`` returns + live references. + + **Legacy Command repr sanitization.** See ``_sanitize_legacy_command_repr``. + + **User context.** ``DbRunEventStore`` is user-scoped by default via + ``resolve_user_id(AUTO)`` (see ``runtime/user_context.py``). Callers of + this helper must be inside a request where ``@require_permission`` has + populated the user contextvar. Both ``get_thread_history`` and + ``get_thread_state`` satisfy that. Do not call this helper from CLI or + migration scripts without passing ``user_id=None`` explicitly. + + Returns ``None`` when the event store is not configured or contains no + messages for this thread, so callers can fall back to checkpoint messages. + """ + try: + event_store = get_run_event_store(request) + except Exception: + return None + + try: + total = await event_store.count_messages(thread_id) + except Exception: + logger.exception("count_messages failed for thread %s", sanitize_log_param(thread_id)) + return None + if not total: + return None + + # Batch by page_size to keep memory bounded for very long threads. + page_size = 500 + collected: list[dict] = [] + after_seq: int | None = None + while True: + page = await event_store.list_messages(thread_id, limit=page_size, after_seq=after_seq) + if not page: + break + collected.extend(page) + if len(page) < page_size: + break + after_seq = page[-1].get("seq") + if after_seq is None: + break + + messages: list[dict] = [] + for evt in collected: + raw = evt.get("content") + if not isinstance(raw, dict) or "type" not in raw: + continue + # Copy to avoid mutating the store-owned dict. + content = dict(raw) + if content.get("id") is None: + content["id"] = str(uuid.uuid5(uuid.NAMESPACE_URL, f"{thread_id}:{evt['seq']}")) + # Sanitize legacy Command reprs on tool_result messages only. + if content.get("type") == "tool": + content["content"] = _sanitize_legacy_command_repr(content.get("content")) + messages.append(content) + return messages if messages else None +``` + +Also add `import re` at the top of the file if it isn't already imported. + +3. In `get_thread_history` (around line 585-590), replace the messages section: + +**Before:** +```python + # Attach messages from checkpointer only for the latest checkpoint + if is_latest_checkpoint: + messages = channel_values.get("messages") + if messages: + values["messages"] = serialize_channel_values({"messages": messages}).get("messages", []) + is_latest_checkpoint = False +``` + +**After:** +```python + # Attach messages: prefer event store (immune to summarization), + # fall back to checkpoint messages when event store is unavailable. + if is_latest_checkpoint: + es_messages = await _get_event_store_messages(request, thread_id) + if es_messages is not None: + values["messages"] = es_messages + else: + messages = channel_values.get("messages") + if messages: + values["messages"] = serialize_channel_values({"messages": messages}).get("messages", []) + is_latest_checkpoint = False +``` + +- [ ] **Step 4: Modify `get_thread_state` similarly** + +In `get_thread_state` (around line 443-444), replace: + +**Before:** +```python + return ThreadStateResponse( + values=serialize_channel_values(channel_values), +``` + +**After:** +```python + values = serialize_channel_values(channel_values) + + # Override messages with event store data (immune to summarization) + es_messages = await _get_event_store_messages(request, thread_id) + if es_messages is not None: + values["messages"] = es_messages + + return ThreadStateResponse( + values=values, +``` + +- [ ] **Step 5: Run all backend tests** + +Run: `cd backend && PYTHONPATH=. uv run pytest tests/ -v --timeout=30 -x` + +- [ ] **Step 6: Commit** + +```bash +git add backend/app/gateway/routers/threads.py backend/tests/test_thread_state_event_store.py +git commit -m "feat(threads): load messages from event store instead of checkpoint state + +Event store is append-only and immune to summarization. Messages with +null ids (human, tool) get deterministic UUIDs based on thread_id:seq +for stable frontend rendering." +``` + +--- + +### Task 2 (OPTIONAL, deferred): Reduce flush_threshold for shorter mid-stream gap + +**Status:** Not a correctness fix. Re-evaluation (see spec) found that `RunJournal` already flushes on `run_end`, `run_error`, cancel, and worker `finally` paths. The only window this tuning narrows is a hard process crash or mid-run reload. Defer and decide separately; do not couple with Task 1 merge. + +If pursued: change `flush_threshold` default from 20 → 5 in `journal.py:42`, rerun `tests/test_run_journal.py`, commit as a separate `perf(journal): …` commit. + +--- + +### Task 3: Fix `useThreadFeedback` pagination in frontend + +Once `/history` returns the full event-store-backed message stream, the frontend's `runIdByAiIndex` map must also cover the full stream or its positional AI-index mapping drifts and feedback clicks go to the wrong `run_id`. The current hook hardcodes `limit=200`. + +**Files:** +- Modify: `frontend/src/core/threads/hooks.ts` (around line 679) + +- [ ] **Step 1: Replace the fixed `?limit=200` with full pagination** + +Change: + +```ts +const res = await fetchWithAuth( + `${getBackendBaseURL()}/api/threads/${encodeURIComponent(threadId)}/messages?limit=200`, +); +``` + +to a loop that pages via `after_seq` (or an equivalent query param exposed by the `/messages` endpoint — check `backend/app/gateway/routers/thread_runs.py:285-323` for the actual parameter names before writing the TS code). Accumulate `messages` until a page returns fewer than the page size. + +- [ ] **Step 2: Defensive index guard** + +`runIdByAiIndex[aiMessageIndex]` can still be `undefined` when the frontend renders optimistic state before the messages query refreshes. The current `?? undefined` in `message-list.tsx:71` already handles this; do not remove it. + +- [ ] **Step 3: Invalidate `["thread-feedback", threadId]` after a new run** + +In `useThreadStream` (or wherever stream-end is handled), call `queryClient.invalidateQueries({ queryKey: ["thread-feedback", threadId] })` when the stream closes so the runIdByAiIndex picks up the new run's AI message immediately. + +- [ ] **Step 4: Run `pnpm check`** + +```bash +cd frontend && pnpm check +``` + +- [ ] **Step 5: Commit** + +```bash +git add frontend/src/core/threads/hooks.ts +git commit -m "fix(feedback): paginate useThreadFeedback and invalidate after stream" +``` + +--- + +### Task 4: End-to-end test — summarize + multi-run feedback + +Add a regression test that exercises the exact bug class we are fixing: a summarized thread with at least two runs, where feedback clicks must target the correct `run_id`. + +**Files:** +- Modify: `backend/tests/test_thread_state_event_store.py` + +- [ ] **Step 1: Write the test** + +Seed a `MemoryRunEventStore` with two runs worth of messages (`r1`: human + ai + human + ai, `r2`: human + ai), then simulate a summarized checkpoint state that drops the `r1` messages. Call `_get_event_store_messages` and assert: + +- Length matches the event store, not the checkpoint +- The first message is the original `r1` human, not a summary +- AI messages preserve their `lc_run--*` ids in order +- Any `id=None` messages get a stable `uuid5(...)` id +- A legacy `str(Command(update=...))` content field in a tool_result is sanitized to the inner text + +- [ ] **Step 2: Run the new test** + +```bash +cd backend && PYTHONPATH=. uv run pytest tests/test_thread_state_event_store.py -v +``` + +- [ ] **Step 3: Commit with Tasks 1, 3 changes** + +Bundle with the Task 1 commit so tests always land alongside the implementation. + +--- + +### Task 5: Standard mode follow-up (documentation only) + +Standard mode (`make dev`) hits LangGraph Server directly for `/threads/{id}/history` and does not go through the Gateway router we just patched. The summarize bug is still present there. + +**Files:** +- Modify: this plan (add follow-up section at the bottom, see below) OR create a separate tracking issue + +- [ ] **Step 1: Record the gap** + +Append to the bottom of this plan (or open a GitHub issue and link it): + +> **Follow-up — Standard mode summarize bug** +> `get_thread_history` in `backend/app/gateway/routers/threads.py` is only hit in Gateway mode. Standard mode proxies `/api/langgraph/*` directly to the LangGraph Server (see `backend/CLAUDE.md` nginx routing and `frontend/CLAUDE.md` `NEXT_PUBLIC_LANGGRAPH_BASE_URL`). The summarize-message-loss symptom is still reproducible there. Options: (a) teach the LangGraph Server checkpointer to branch on an override, (b) move `/history` behind Gateway in Standard mode as well, (c) accept as known limitation for Standard mode. Decide before GA. diff --git a/docs/superpowers/specs/2026-04-11-runjournal-history-evaluation.md b/docs/superpowers/specs/2026-04-11-runjournal-history-evaluation.md new file mode 100644 index 000000000..44a466960 --- /dev/null +++ b/docs/superpowers/specs/2026-04-11-runjournal-history-evaluation.md @@ -0,0 +1,191 @@ +# RunJournal 替换 History Messages — 方案评估与对比 + +**日期**:2026-04-11 +**分支**:`rayhpeng/fix-persistence-new` +**相关 plan**:[`docs/superpowers/plans/2026-04-10-event-store-history.md`](../plans/2026-04-10-event-store-history.md)(尚未落地) + +--- + +## 1. 问题与数据核对 + +**症状**:SummarizationMiddleware 触发后,前端历史中无法展示 summarize 之前的真实用户消息。 + +**复现数据**(thread `6d30913e-dcd4-41c8-8941-f66c716cf359`): + +| 数据源 | seq=1 的 message | 总 message 数 | 是否保留原始 human | +|---|---|---:|---| +| `run_events`(SQLite) | human `"最新伊美局势"` | 9(1 human + 7 ai_tool_call + 9 tool_result + 1 ai_message) | ✅ | +| `/history` 响应(`docs/resp.json`) | type=human,content=`"Here is a summary of the conversation to date:…"` | 不定 | ❌(已被 summary 替换)| + +**根因**:`backend/app/gateway/routers/threads.py:587-589` 的 `get_thread_history` 从 `checkpoint.channel_values["messages"]` 读取,而 LangGraph 的 SummarizationMiddleware 会原地改写这个列表。 + +--- + +## 2. 候选方案 + +| 方案 | 描述 | 本次是否推荐 | +|---|---|---| +| **A. event_store 覆盖 messages**(已有 plan) | `/history`、`/state` 改读 `RunEventStore.list_messages()`,覆盖 `channel_values["messages"]`;其它字段保持 checkpoint 来源 | ✅ 主方案 | +| B. 修 SummarizationMiddleware | 让 summarize 不原地替换 messages(作为附加 system message) | ❌ 违背 summarize 的 token 预算初衷 | +| C. 双读合并(checkpoint + event_store diff) | 合并 summarize 切点前后的两段 | ❌ 合并逻辑复杂无额外收益 | +| D. 切到现有 `/api/threads/{id}/messages` 端点 | 前端直接消费已经存在的 event-store 消息端点(`thread_runs.py:285-323`)| ⚠️ 更干净但需要前端改动 | + +--- + +## 3. Claude 自评 vs Codex 独立评估 + +两方独立分析了同一份 plan。重合点基本一致,但 **Codex 发现了一个我遗漏的关键 bug**。 + +### 3.1 一致结论 + +| 维度 | 结论 | +|---|---| +| 正确性方向 | event_store 是 append-only + 不受 summarize 影响,方向正确 | +| ID 补齐 | `uuid5(NAMESPACE_URL, f"{thread_id}:{seq}")` 稳定且确定性,安全 | +| 前端 schema | 零改动 | +| Non-message 字段(artifacts/todos/title/thread_data) | summarize 只影响 messages,不需要覆盖其它字段 | +| 多 checkpoint 语义 | 前端 `useStream` 只取 `limit: 1`(`frontend/src/core/threads/hooks.ts:203-210`),不做时间旅行;latest-only 可接受但应在注释/文档写清楚 | +| 作用域 | 仅 Gateway mode;Standard mode 直连 LangGraph Server,bug 在默认部署路径仍然存在 | + +### 3.2 Claude 的独立观察 + +1. 已验证数据对齐:plan 文档第 15-28 行的真实数据对齐表与本次 `run_events` 导出一致(9 条消息 id 分布:AI 来自 LLM `lc_run--*`、human/tool 为 None)。 +2. 担心 `run_end` / `run_error` / `cancel` 路径未必都 flush —— 这一点 Codex 实际核查了代码并给出确定结论(见下)。 +3. 方案 A 的单文件改动约 60 行,复杂度小。 + +### 3.3 Codex 的关键补充(Claude 遗漏) + +> **Bug #1 — Plan 用 `limit=1000` 并非全量** +> `RunEventStore.list_messages()` 的语义是"返回最新 limit 条"(`base.py:51-65`、`db.py:151-181`)。对于消息数超过 1000 的长对话,plan 当前写法会**丢掉最早的消息**,再次引入"消息丢失"bug(只是换了丢失的段)。 + +> **Bug #2 — helper 就地修改了 store 的 dict** +> plan 的 helper 里对 `content` 原地写 `id`;`MemoryRunEventStore` 返回的是**活引用**,会污染 store 中的对象。应 deep-copy 或 dict 推导出新对象。 + +> **Flush 路径已核查**: +> `RunJournal` 在 threshold (`journal.py:360-373`)、`run_end` (`91-96`)、`run_error` (`97-106`)、worker `finally` (`worker.py:280-286`) 都会 flush;`CancelledError` 也走 finally。**正常 end/error/cancel 都 flush,仅硬 kill / 进程崩溃会丢缓冲区**。 +> 因此 `flush_threshold 20 → 5` 的意义**仅在于硬崩溃窗口**与 mid-run reload 可见性,**不是正确性修复**,属于可选 tuning。代价是更多 put_batch / SQLite churn;且 `_flush_sync()` (`383-398`) 已防止并发 flush,所以"每 5 条一 flush"是 best-effort 非严格保证。 + +### 3.4 Codex 未否决但提示的次要点 + +- 方案 D(消费现有 `/api/threads/{id}/messages` 端点)更干净但需前端改动。 +- `/history` 一旦被方案 A 改过,就不再是严格意义上的"按 checkpoint 快照"API(对 messages 字段),应写进注释和 API 文档。 +- Standard mode 的 summarize bug 应建立独立 follow-up issue。 + +--- + +## 4. 最终合并判决 + +**Codex**:APPROVE-WITH-CHANGES +**Claude**:同意 Codex 的判决 + +### 合并前必须修改(Top 3) + +1. **修复分页 bug**:不能用固定 `limit=1000`。必须用以下之一: + - `count = await event_store.count_messages(thread_id)`,再 `list_messages(thread_id, limit=count)` + - 或循环 cursor 分页(`after_seq`)直到耗尽 +2. **不要原地修改 store dict**:helper 对 `content` 的 id 补齐需要 copy(`dict(content)` 浅拷贝足够,因为只写 top-level `id`) +3. **Standard mode 显式 follow-up**:在 plan 文末加 "Standard-mode follow-up: TODO #xxx",或在合并 PR 描述中明确这是 Gateway-only 止血 + +### 可选(非阻塞) + +4. `flush_threshold 20 → 5` 降级为"可选 tuning",不是修复的一部分;或独立一条 commit 并说明只对硬崩溃窗口有用 +5. `get_thread_history` 新增注释,说明 messages 字段脱离了 checkpoint 快照语义 +6. 测试覆盖:模拟 summarize 后的 checkpoint + 真实 event_store,端到端验证 `/history` 返回包含原始 human 消息 + +--- + +## 5. 推荐执行顺序 + +1. 按本文档 §4 修订 `docs/superpowers/plans/2026-04-10-event-store-history.md`(主要是 Task 1 的 helper 实现 + 分页) +2. 按修订后的 plan 执行(走 `superpowers:executing-plans`) +3. 合并后立即建 Standard mode follow-up issue + +## 6. Feedback 影响分析(2026-04-11 补充) + +### 6.1 数据模型 + +`feedback` 表(`persistence/feedback/model.py`): + +| 字段 | 说明 | +|---|---| +| `feedback_id` PK | - | +| `run_id` NOT NULL | 反馈目标 run | +| `thread_id` NOT NULL | - | +| `user_id` | - | +| `message_id` nullable | 注释明确写:`optional RunEventStore event identifier` — 已经面向 event_store 设计 | +| UNIQUE(thread_id, run_id, user_id) | 每 run 每用户至多一条 | + +**结论**:feedback **不按 message uuid 存**,按 `run_id` 存,所以 summarize 导致的 checkpoint messages 丢失**不会影响 feedback 存储**。schema 天生与 event_store 兼容,**无需数据迁移**。 + +### 6.2 前端的 runId 映射:发现隐藏 bug + +前端 feedback 目前走两条并行的数据链: + +| 用途 | 数据源 | 位置 | +|---|---|---| +| 渲染消息体 | `POST /history`(checkpoint) | `useStream` → `thread.messages` | +| 拿 `runId` 映射 | `GET /api/threads/{id}/messages?limit=200`(**event_store**) | `useThreadFeedback` (`hooks.ts:669-709`) | + +两者通过 **"AI 消息的序号"** 对齐: + +```ts +// hooks.ts:691-698 +for (const msg of messages) { + if (msg.event_type === "ai_message") { + runIdByAiIndex.push(msg.run_id); // 只按 AI 顺序 push + } +} +// message-list.tsx:70-71 +runId = feedbackData.runIdByAiIndex[aiMessageIndex] +``` + +**Bug**:summarize 过的 thread 里,两条数据链的 AI 消息数量和顺序**不一致**: + +| 数据源 | 本 thread 的 AI 消息序列 | 数量 | +|---|---|---:| +| `/history`(checkpoint,summarize 后) | seq=19,31,37,45,53 | 5 | +| `/messages`(event_store,完整) | seq=5,13,19,31,37,45,53 | 7 | + +结果:前端渲染的"第 0 条 AI 消息"是 seq=19,但 `runIdByAiIndex[0]` 指向 seq=5 的 run(本例同一 run 里没事,**跨多 run 的 thread 点赞就会打到错的 run 上**)。 + +**这个 bug 和本次 plan 无关,已经存在了**。只是用户未必注意到。 + +### 6.3 方案 A 对 feedback 的影响 + +**负面**:无。feedback 存储不受影响。 + +**正面(意外收益)**:`/history` 切换到 event_store 后,**两条数据链的 AI 消息序列自动对齐**,§6.2 的隐藏 bug 被顺带修好。 + +**前提条件**(加入 Top 3 改动之一同等重要): + +- 新 helper 必须和 `/messages` 端点用**同样的消息获取逻辑**(same store, same filter)。否则两条链仍然可能在边界条件下漂移 +- 具体说:**两边都要做完整分页**。目前 `/messages?limit=200` 在前端硬编码 200,如果 thread 有 >200 条消息就会截断;plan 的 `limit=1000` 也一样有上限。两个上限不一致 → 两边顺序不再对齐 → feedback 映射错位 +- **必须修**:`useThreadFeedback` 的 `limit=200` 需要改成分页获取全部,或者 `/messages` 后端改为默认全量 + +### 6.4 对前端改造顺序的影响 + +原 plan 声明"零前端改动",但加入 feedback 考虑后应修正为: + +| 改动 | 必须 | 可选 | +|---|---|---| +| 后端 `/history` 改读 event_store | ✅ | - | +| 后端 helper 用分页而非 `limit=1000` | ✅ | - | +| 前端 `useThreadFeedback` 改用分页或提升 limit | ✅ | - | +| `runIdByAiIndex` 增加防御:索引越界 fallback `undefined`(已有)| - | ✅ 已经是 | +| 前端改用 `/messages` 直接做渲染(方案 D) | - | ✅ 长期更干净 | + +### 6.5 feedback 相关的新 Top 3 补充 + +在原来的 Top 3 之外,再加: + +4. **前端 `useThreadFeedback` 必须分页或拉全**(`frontend/src/core/threads/hooks.ts:679`),否则和 `/history` 的新全量行为仍然错位 +5. **端到端测试**:一个 thread 跨 >1 个 run + 触发 summarize + 给历史 AI 消息点赞,确认 feedback 打到正确的 run_id +6. **TanStack Query 缓存协调**:`thread-feedback` 与 history 查询的 `staleTime` / invalidation 需要在新 run 结束时同步刷新,否则新消息写入后 `runIdByAiIndex` 没更新,点赞会打到上一个 run + +--- + +## 8. 未决问题 + +- `RunEventStore.count_messages()` 与 `list_messages(after_seq=...)` 的实际性能(SQLite 上对于数千消息级别应无问题,但未压测) +- `MemoryRunEventStore` 与 `DbRunEventStore` 分页语义是否一致(Codex 只核查了 `db.py`,`memory.py` 需确认) +- 是否应把 `/api/threads/{id}/messages` 提升为前端主用 endpoint,把 `/history` 保留为纯 checkpoint API —— 架构层面更干净但成本更高 diff --git a/docs/superpowers/specs/2026-04-11-summarize-marker-design.md b/docs/superpowers/specs/2026-04-11-summarize-marker-design.md new file mode 100644 index 000000000..79cd748d4 --- /dev/null +++ b/docs/superpowers/specs/2026-04-11-summarize-marker-design.md @@ -0,0 +1,203 @@ +# Summarize Marker in History — Design & Verification + +**Date**: 2026-04-11 +**Branch**: `rayhpeng/fix-persistence-new` +**Status**: Design approved, implementation deferred to a follow-up PR +**Depends on**: [`2026-04-11-runjournal-history-evaluation.md`](./2026-04-11-runjournal-history-evaluation.md) (the event-store-backed history fix this builds on) + +--- + +## 1. Goal + +Display a "summarization happened here" marker in the conversation history UI when `SummarizationMiddleware` ran mid-run, so users understand why earlier messages look condensed or missing. The event-store-backed `/history` fix already recovered the original messages; this spec adds a **visible marker** at the seq position where summarization occurred, optionally showing the generated summary text. + +## 2. Investigation findings + +### 2.1 Today's state: zero middleware records + +Full scan of `backend/.deer-flow/data/deerflow.db` `run_events`: + +| category | rows | +|---|---:| +| trace | 76 | +| message | 34 | +| lifecycle | 8 | +| **middleware** | **0** | + +No row has `event_type` containing `summariz` or `middleware`. The middleware category is dead in production. + +### 2.2 Why: two dead code paths in `journal.py` + +| Location | Status | +|---|---| +| `journal.py:343-362` — `on_custom_event("summarization", ...)` writes one trace event + one `category="middleware"` event. | Dead. Only fires when something calls `adispatch_custom_event("summarization", {...})`. The upstream LangChain `SummarizationMiddleware` (`.venv/.../langchain/agents/middleware/summarization.py:272`) **never emits custom events** — its `before_model`/`abefore_model` just mutate messages in place and return `{'messages': new_messages}`. Callback never triggered. | +| `journal.py:449` — `record_middleware(tag, *, name, hook, action, changes)` helper | Dead. Grep shows zero callers in the harness. Added speculatively, never wired up. | + +### 2.3 Concrete evidence of summarize running unlogged + +Thread `3d5dea4a-0983-4727-a4e8-41a64428933a`: + +- `run_events` seq=1 → original human `"写一份关于deer-flow的详细技术报告"` ✓ (event store is fine) +- `run_events` seq=43 → `llm_request` trace whose `messages[0]` literal contains `"Here is a summary of the conversation to date:"` — proof that SummarizationMiddleware did inject a summary mid-run +- Zero rows with `category='middleware'` for this thread → nothing captured for UI to render + +## 3. Approaches considered + +### A. Subclass `SummarizationMiddleware` and dispatch a custom event + +Wrap the upstream class, override `abefore_model`, call `await adispatch_custom_event("summarization", {...})` after super(). Journal's existing `on_custom_event` path captures it. + +### B. Frontend-only diff heuristic + +Compare `event_store.count_messages()` vs rendered count, infer summarization happened from the gap. **Rejected**: can't pinpoint position in the stream, can't show summary text. Only yields a vague badge. + +### C. Hybrid A + frontend inline card rendered at the middleware event's seq position + +Same backend as A, plus frontend renders an inline `[N messages condensed]` card at the correct chronological position. **Recommended terminal state**. + +## 4. Subagent's wrong claim and its rebuttal + +An independent agent flagged approach A as structurally broken because: + +> `RunnableCallable(trace=False)` skips `set_config_context`, therefore `var_child_runnable_config` is never set, therefore `adispatch_custom_event` raises `RuntimeError("Unable to dispatch an adhoc event without a parent run id")`. + +**This is wrong.** The user's counter-intuition was correct: `trace=False` does not prevent `adispatch_custom_event` from working, as long as the middleware signature explicitly accepts `config: RunnableConfig`. The mechanism: + +1. `RunnableCallable.__init__` (`langgraph/_internal/_runnable.py:293-319`) inspects the function signature. If it accepts `config: RunnableConfig`, that parameter is recorded in `self.func_accepts`. +2. Both `trace=True` and `trace=False` branches of `ainvoke` run the same kwarg-injection loop (`_runnable.py:349-356`): `if kw == "config": kw_value = config`. The `config` passed to `ainvoke` (from Pregel's `task.proc.ainvoke(task.input, config)` at `pregel/_retry.py:138`) is the task config with callbacks already bound. +3. Inside the middleware, passing that `config` explicitly to `adispatch_custom_event(..., config=config)` means the function doesn't rely on `var_child_runnable_config.get()` at all. The LangChain docstring at `langchain_core/callbacks/manager.py:2574-2579` even says "If using python 3.10 and async, you MUST specify the config parameter" — which is exactly this path. + +`trace=False` only changes whether **this runnable layer creates a new child callback scope**. It does not affect whether the outer-layer config (with callbacks including `RunJournal`) is passed down to the function. + +## 5. Verification + +Ran `/tmp/verify_summarize_event.py` (standalone minimal reproduction): + +- Minimal `AgentMiddleware` subclass with `abefore_model(self, state, runtime, config: RunnableConfig)` +- Calls `await adispatch_custom_event("summarization", {...}, config=config)` inside +- `create_agent(model=FakeChatModel, middleware=[probe])` +- `agent.ainvoke({...}, config={"callbacks": [RecordingHandler()]})` + +**Result**: + +``` +INFO verify: ProbeMiddleware.abefore_model called +INFO verify: config keys: ['callbacks', 'configurable', 'metadata'] +INFO verify: config.callbacks type: AsyncCallbackManager +INFO verify: config.metadata: {'langgraph_step': 1, 'langgraph_node': 'probe.before_model', ...} +INFO verify: on_custom_event fired: name=summarization + run_id=019d7d19-1727-7830-aa33-648ecbee4b95 + data={'summary': 'fake summary', 'replaced_count': 3} +SUCCESS: approach A is viable (config injection + adispatch work) +``` + +All five predictions held: + +1. ✅ `config: RunnableConfig` signature triggers auto-injection despite `trace=False` +2. ✅ `config.callbacks` is an `AsyncCallbackManager` with `parent_run_id` set +3. ✅ `adispatch_custom_event(..., config=config)` runs without error +4. ✅ `RecordingHandler.on_custom_event` receives the event +5. ✅ The received `run_id` is a valid UUID tied to the running graph + +**Bonus finding**: `config.metadata` contains `langgraph_step` and `langgraph_node`. These can be included in the middleware event's metadata to help the frontend position the marker on the timeline. + +## 6. Recommended implementation (approach C) + +### 6.1 Backend + +**New wrapper middleware** in `backend/packages/harness/deerflow/agents/lead_agent/agent.py`: + +```python +from langchain.agents.middleware.summarization import SummarizationMiddleware +from langchain_core.callbacks import adispatch_custom_event +from langchain_core.runnables import RunnableConfig + + +class _TrackingSummarizationMiddleware(SummarizationMiddleware): + """Wraps upstream SummarizationMiddleware to emit a ``summarization`` + custom event on every actual summarization, so RunJournal can persist + a middleware:summarize row to the event store. + + The upstream class does not emit events of its own. Declaring + ``config: RunnableConfig`` in the override lets LangGraph's + ``RunnableCallable`` inject the Pregel task config (with callbacks + and parent_run_id) regardless of ``trace=False`` on the node. + """ + + async def abefore_model(self, state, runtime, config: RunnableConfig): + before_count = len(state.get("messages") or []) + result = await super().abefore_model(state, runtime) + if result is None: + return None + + new_messages = result.get("messages") or [] + replaced_count = max(0, before_count - len(new_messages)) + summary_text = _extract_summary_text(new_messages) + + await adispatch_custom_event( + "summarization", + { + "summary": summary_text, + "replaced_count": replaced_count, + }, + config=config, + ) + return result + + +def _extract_summary_text(messages: list) -> str: + """Pull the summary string out of the HumanMessage the upstream class + injects as ``Here is a summary of the conversation to date:...``.""" + for msg in messages: + if getattr(msg, "type", None) == "human": + content = getattr(msg, "content", "") + text = content if isinstance(content, str) else "" + if text.startswith("Here is a summary of the conversation to date"): + return text + return "" +``` + +Swap the existing `SummarizationMiddleware()` instantiation in `_build_middlewares` for `_TrackingSummarizationMiddleware(...)` with the same args. + +**Journal change**: **zero**. `on_custom_event("summarization", ...)` in `journal.py:343-362` already writes both a trace and a `category="middleware"` row. + +**History helper change**: extend `_get_event_store_messages` in `backend/app/gateway/routers/threads.py` to surface `category="middleware"` rows as pseudo-messages, e.g.: + +```python +# In the per-event loop, after the existing message branch: +if evt.get("category") == "middleware" and evt.get("event_type") == "middleware:summarize": + meta = evt.get("metadata") or {} + messages.append({ + "id": f"summary-marker-{evt['seq']}", + "type": "summary_marker", + "replaced_count": meta.get("replaced_count", 0), + "summary": (raw or {}).get("content", "") if isinstance(raw, dict) else "", + "run_id": evt.get("run_id"), + }) +``` + +The marker uses a sentinel `type` (`summary_marker`) that doesn't collide with any LangChain message type, so downstream consumers that loop over messages can skip or render it explicitly. + +### 6.2 Frontend + +- `core/messages/utils.ts`: extend the message grouping to recognize `type === "summary_marker"` and yield it as its own group (`"assistant:summary-marker"`) +- `components/workspace/messages/message-list.tsx`: add a branch in the grouped render switch that renders a distinctive inline card showing `N messages condensed` and a collapsible panel with the summary text +- No changes to feedback logic: the marker has no `feedback` field so the button naturally doesn't render on it + +## 7. Risks + +1. **Synchronous path**. The upstream class has both `before_model` and `abefore_model`. Our wrapper only overrides the async variant. If any deer-flow code path ever uses the sync flow, those summarizations won't be captured. Mitigation: also override `before_model` and use `dispatch_custom_event` (sync variant) with the same pattern. +2. **`_extract_summary_text` fragility**. It depends on the upstream class prefix `"Here is a summary of the conversation to date"` in the injected `HumanMessage`. Any upstream template change breaks detection. Mitigation: pick the first new `HumanMessage` that wasn't in `state["messages"]` before super() — resilient to template wording changes at the cost of a small diff helper. +3. **`replaced_count` accuracy when concurrent updates**. If another middleware in the chain also modifies `state["messages"]` before super() returns, the naive `before_count - len(new_messages)` arithmetic is wrong. Mitigation: inspect the `RemoveMessage(id=REMOVE_ALL_MESSAGES)` that upstream emits and count from the original input list directly. +4. **History helper contract change**. Introducing a non-LangChain-typed entry (`type="summary_marker"`) in the `/history` response could break frontend code that blindly casts entries to `Message`. Mitigation: the frontend change above adds an explicit branch; type-check the frontend end-to-end before merging. + +## 8. Out of scope / deferred + +- Other middleware types (Title, Guardrail, HITL) do not emit custom events either. If we want markers for those too, repeat the wrapper pattern for each. Not in this design. +- Retroactive markers for old threads (captured before this patch) are impossible without re-running the graph. Legacy threads will show the event-store-recovered messages without a marker. +- Standard mode (`make dev`) — agent runs inside LangGraph Server, not the Gateway-embedded runtime. `RunJournal` may not be wired there, so the custom event fires but is captured by no one. Tracked as a separate follow-up. + +## 9. Next actions + +1. Land the current summarize-message-loss fixes (journal `Command` unwrap + event-store-backed `/history` + inline feedback) — implementation verified, being committed now as three commits on `rayhpeng/fix-persistence-new` +2. Summarize-marker implementation (this spec) → separate follow-up PR based on the above verified design