From 23eacf9533b418598a528a0f56b09a0bdb05066b Mon Sep 17 00:00:00 2001 From: rayhpeng Date: Thu, 2 Apr 2026 14:23:13 +0800 Subject: [PATCH] feat(persistence): add RunEventStore ABC + MemoryRunEventStore Phase 2-A prerequisite for event storage: adds the unified run event stream interface (RunEventStore) with an in-memory implementation, RunEventsConfig, gateway integration, and comprehensive tests (27 cases). Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/gateway/deps.py | 15 + .../harness/deerflow/config/app_config.py | 2 + .../deerflow/config/run_events_config.py | 33 +++ .../deerflow/runtime/events/__init__.py | 4 + .../deerflow/runtime/events/store/__init__.py | 4 + .../deerflow/runtime/events/store/base.py | 99 +++++++ .../deerflow/runtime/events/store/memory.py | 120 ++++++++ backend/tests/test_run_event_store.py | 272 ++++++++++++++++++ config.example.yaml | 14 + 9 files changed, 563 insertions(+) create mode 100644 backend/packages/harness/deerflow/config/run_events_config.py create mode 100644 backend/packages/harness/deerflow/runtime/events/__init__.py create mode 100644 backend/packages/harness/deerflow/runtime/events/store/__init__.py create mode 100644 backend/packages/harness/deerflow/runtime/events/store/base.py create mode 100644 backend/packages/harness/deerflow/runtime/events/store/memory.py create mode 100644 backend/tests/test_run_event_store.py diff --git a/backend/app/gateway/deps.py b/backend/app/gateway/deps.py index 69ce86d9e..8cd907af1 100644 --- a/backend/app/gateway/deps.py +++ b/backend/app/gateway/deps.py @@ -14,6 +14,7 @@ from contextlib import AsyncExitStack, asynccontextmanager from fastapi import FastAPI, HTTPException, Request from deerflow.runtime import RunManager, StreamBridge +from deerflow.runtime.events.store.base import RunEventStore from deerflow.runtime.runs.store.base import RunStore @@ -46,6 +47,12 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]: # RunRepository when models are implemented) app.state.run_store = MemoryRunStore() + # Initialize run event store (MemoryRunEventStore for now) + # TODO(Phase 2-B): switch to db/jsonl backend based on config.run_events.backend + from deerflow.runtime.events.store.memory import MemoryRunEventStore + + app.state.run_event_store = MemoryRunEventStore() + try: yield finally: @@ -86,6 +93,14 @@ def get_store(request: Request): return getattr(request.app.state, "store", None) +def get_run_event_store(request: Request) -> RunEventStore: + """Return the RunEventStore, or 503 if not available.""" + store = getattr(request.app.state, "run_event_store", None) + if store is None: + raise HTTPException(status_code=503, detail="Run event store not available") + return store + + def get_run_store(request: Request) -> RunStore: """Return the RunStore, or 503 if not available.""" store = getattr(request.app.state, "run_store", None) diff --git a/backend/packages/harness/deerflow/config/app_config.py b/backend/packages/harness/deerflow/config/app_config.py index 85fbc51b7..4b01c85f5 100644 --- a/backend/packages/harness/deerflow/config/app_config.py +++ b/backend/packages/harness/deerflow/config/app_config.py @@ -14,6 +14,7 @@ from deerflow.config.extensions_config import ExtensionsConfig from deerflow.config.guardrails_config import load_guardrails_config_from_dict from deerflow.config.memory_config import load_memory_config_from_dict from deerflow.config.model_config import ModelConfig +from deerflow.config.run_events_config import RunEventsConfig from deerflow.config.sandbox_config import SandboxConfig from deerflow.config.skills_config import SkillsConfig from deerflow.config.stream_bridge_config import StreamBridgeConfig, load_stream_bridge_config_from_dict @@ -43,6 +44,7 @@ class AppConfig(BaseModel): tool_search: ToolSearchConfig = Field(default_factory=ToolSearchConfig, description="Tool search / deferred loading configuration") model_config = ConfigDict(extra="allow", frozen=False) database: DatabaseConfig = Field(default_factory=DatabaseConfig, description="Unified database backend configuration") + run_events: RunEventsConfig = Field(default_factory=RunEventsConfig, description="Run event storage configuration") checkpointer: CheckpointerConfig | None = Field(default=None, description="Checkpointer configuration") stream_bridge: StreamBridgeConfig | None = Field(default=None, description="Stream bridge configuration") diff --git a/backend/packages/harness/deerflow/config/run_events_config.py b/backend/packages/harness/deerflow/config/run_events_config.py new file mode 100644 index 000000000..cddd9061f --- /dev/null +++ b/backend/packages/harness/deerflow/config/run_events_config.py @@ -0,0 +1,33 @@ +"""Run event storage configuration. + +Controls where run events (messages + execution traces) are persisted. + +Backends: +- memory: In-memory storage, data lost on restart. Suitable for + development and testing. +- db: SQL database via SQLAlchemy ORM. Provides full query capability. + Suitable for production deployments. +- jsonl: Append-only JSONL files. Lightweight alternative for + single-node deployments that need persistence without a database. +""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel, Field + + +class RunEventsConfig(BaseModel): + backend: Literal["memory", "db", "jsonl"] = Field( + default="memory", + description="Storage backend for run events. 'memory' for development (no persistence), 'db' for production (SQL queries), 'jsonl' for lightweight single-node persistence.", + ) + max_trace_content: int = Field( + default=10240, + description="Maximum trace content size in bytes before truncation (db backend only).", + ) + track_token_usage: bool = Field( + default=True, + description="Whether RunJournal should accumulate token counts to RunRow.", + ) diff --git a/backend/packages/harness/deerflow/runtime/events/__init__.py b/backend/packages/harness/deerflow/runtime/events/__init__.py new file mode 100644 index 000000000..0da8fabe5 --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/events/__init__.py @@ -0,0 +1,4 @@ +from deerflow.runtime.events.store.base import RunEventStore +from deerflow.runtime.events.store.memory import MemoryRunEventStore + +__all__ = ["MemoryRunEventStore", "RunEventStore"] diff --git a/backend/packages/harness/deerflow/runtime/events/store/__init__.py b/backend/packages/harness/deerflow/runtime/events/store/__init__.py new file mode 100644 index 000000000..0da8fabe5 --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/events/store/__init__.py @@ -0,0 +1,4 @@ +from deerflow.runtime.events.store.base import RunEventStore +from deerflow.runtime.events.store.memory import MemoryRunEventStore + +__all__ = ["MemoryRunEventStore", "RunEventStore"] diff --git a/backend/packages/harness/deerflow/runtime/events/store/base.py b/backend/packages/harness/deerflow/runtime/events/store/base.py new file mode 100644 index 000000000..ef6910f1f --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/events/store/base.py @@ -0,0 +1,99 @@ +"""Abstract interface for run event storage. + +RunEventStore is the unified storage interface for run event streams. +Messages (frontend display) and execution traces (debugging/audit) go +through the same interface, distinguished by the ``category`` field. + +Implementations: +- MemoryRunEventStore: in-memory dict (development, tests) +- Future: DB-backed store (SQLAlchemy ORM), JSONL file store +""" + +from __future__ import annotations + +import abc + + +class RunEventStore(abc.ABC): + """Run event stream storage interface. + + All implementations must guarantee: + 1. put() events are retrievable in subsequent queries + 2. seq is strictly increasing within the same thread + 3. list_messages() only returns category="message" events + 4. list_events() returns all events for the specified run + 5. Returned dicts match the RunEvent field structure + """ + + @abc.abstractmethod + async def put( + self, + *, + thread_id: str, + run_id: str, + event_type: str, + category: str, + content: str = "", + metadata: dict | None = None, + created_at: str | None = None, + ) -> dict: + """Write an event, auto-assign seq, return the complete record.""" + + @abc.abstractmethod + async def put_batch(self, events: list[dict]) -> list[dict]: + """Batch-write events. Used by RunJournal flush buffer. + + Each dict's keys match put()'s keyword arguments. + Returns complete records with seq assigned. + """ + + @abc.abstractmethod + async def list_messages( + self, + thread_id: str, + *, + limit: int = 50, + before_seq: int | None = None, + after_seq: int | None = None, + ) -> list[dict]: + """Return displayable messages (category=message) for a thread, ordered by seq ascending. + + Supports bidirectional cursor pagination: + - before_seq: return the last ``limit`` records with seq < before_seq (ascending) + - after_seq: return the first ``limit`` records with seq > after_seq (ascending) + - neither: return the latest ``limit`` records (ascending) + """ + + @abc.abstractmethod + async def list_events( + self, + thread_id: str, + run_id: str, + *, + event_types: list[str] | None = None, + limit: int = 500, + ) -> list[dict]: + """Return the full event stream for a run, ordered by seq ascending. + + Optionally filter by event_types. + """ + + @abc.abstractmethod + async def list_messages_by_run( + self, + thread_id: str, + run_id: str, + ) -> list[dict]: + """Return displayable messages (category=message) for a specific run, ordered by seq ascending.""" + + @abc.abstractmethod + async def count_messages(self, thread_id: str) -> int: + """Count displayable messages (category=message) in a thread.""" + + @abc.abstractmethod + async def delete_by_thread(self, thread_id: str) -> int: + """Delete all events for a thread. Return the number of deleted events.""" + + @abc.abstractmethod + async def delete_by_run(self, thread_id: str, run_id: str) -> int: + """Delete all events for a specific run. Return the number of deleted events.""" diff --git a/backend/packages/harness/deerflow/runtime/events/store/memory.py b/backend/packages/harness/deerflow/runtime/events/store/memory.py new file mode 100644 index 000000000..93a3df8fa --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/events/store/memory.py @@ -0,0 +1,120 @@ +"""In-memory RunEventStore. Used when run_events.backend=memory (default) and in tests. + +Thread-safe for single-process async usage (no threading locks needed +since all mutations happen within the same event loop). +""" + +from __future__ import annotations + +from datetime import UTC, datetime + +from deerflow.runtime.events.store.base import RunEventStore + + +class MemoryRunEventStore(RunEventStore): + def __init__(self) -> None: + self._events: dict[str, list[dict]] = {} # thread_id -> sorted event list + self._seq_counters: dict[str, int] = {} # thread_id -> last assigned seq + + def _next_seq(self, thread_id: str) -> int: + current = self._seq_counters.get(thread_id, 0) + next_val = current + 1 + self._seq_counters[thread_id] = next_val + return next_val + + def _put_one( + self, + *, + thread_id: str, + run_id: str, + event_type: str, + category: str, + content: str = "", + metadata: dict | None = None, + created_at: str | None = None, + ) -> dict: + seq = self._next_seq(thread_id) + record = { + "thread_id": thread_id, + "run_id": run_id, + "event_type": event_type, + "category": category, + "content": content, + "metadata": metadata or {}, + "seq": seq, + "created_at": created_at or datetime.now(UTC).isoformat(), + } + self._events.setdefault(thread_id, []).append(record) + return record + + async def put( + self, + *, + thread_id, + run_id, + event_type, + category, + content="", + metadata=None, + created_at=None, + ): + return self._put_one( + thread_id=thread_id, + run_id=run_id, + event_type=event_type, + category=category, + content=content, + metadata=metadata, + created_at=created_at, + ) + + async def put_batch(self, events): + results = [] + for ev in events: + record = self._put_one(**ev) + results.append(record) + return results + + async def list_messages(self, thread_id, *, limit=50, before_seq=None, after_seq=None): + all_events = self._events.get(thread_id, []) + messages = [e for e in all_events if e["category"] == "message"] + + if before_seq is not None: + messages = [e for e in messages if e["seq"] < before_seq] + # Take the last `limit` records + return messages[-limit:] + elif after_seq is not None: + messages = [e for e in messages if e["seq"] > after_seq] + return messages[:limit] + else: + # Return the latest `limit` records, ascending + return messages[-limit:] + + async def list_events(self, thread_id, run_id, *, event_types=None, limit=500): + all_events = self._events.get(thread_id, []) + filtered = [e for e in all_events if e["run_id"] == run_id] + if event_types is not None: + filtered = [e for e in filtered if e["event_type"] in event_types] + return filtered[:limit] + + async def list_messages_by_run(self, thread_id, run_id): + all_events = self._events.get(thread_id, []) + return [e for e in all_events if e["run_id"] == run_id and e["category"] == "message"] + + async def count_messages(self, thread_id): + all_events = self._events.get(thread_id, []) + return sum(1 for e in all_events if e["category"] == "message") + + async def delete_by_thread(self, thread_id): + events = self._events.pop(thread_id, []) + self._seq_counters.pop(thread_id, None) + return len(events) + + async def delete_by_run(self, thread_id, run_id): + all_events = self._events.get(thread_id, []) + if not all_events: + return 0 + remaining = [e for e in all_events if e["run_id"] != run_id] + removed = len(all_events) - len(remaining) + self._events[thread_id] = remaining + return removed diff --git a/backend/tests/test_run_event_store.py b/backend/tests/test_run_event_store.py new file mode 100644 index 000000000..a493a486b --- /dev/null +++ b/backend/tests/test_run_event_store.py @@ -0,0 +1,272 @@ +"""Tests for RunEventStore ABC + MemoryRunEventStore. + +Covers: +- Basic write and query (put, seq assignment, cross-thread independence) +- list_messages (category filtering, pagination, cross-run ordering) +- list_events (run filtering, event_types filtering) +- list_messages_by_run +- count_messages +- put_batch +- delete_by_thread, delete_by_run +- Edge cases (empty thread/run) +""" + +import pytest + +from deerflow.runtime.events.store.memory import MemoryRunEventStore + + +@pytest.fixture +def store(): + return MemoryRunEventStore() + + +# -- Basic write and query -- + + +class TestPutAndSeq: + @pytest.mark.anyio + async def test_put_returns_dict_with_seq(self, store): + record = await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message", content="hello") + assert "seq" in record + assert record["seq"] == 1 + assert record["thread_id"] == "t1" + assert record["run_id"] == "r1" + assert record["event_type"] == "human_message" + assert record["category"] == "message" + assert record["content"] == "hello" + assert record["metadata"] == {} + assert "created_at" in record + + @pytest.mark.anyio + async def test_seq_strictly_increasing_same_thread(self, store): + r1 = await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message") + r2 = await store.put(thread_id="t1", run_id="r1", event_type="ai_message", category="message") + r3 = await store.put(thread_id="t1", run_id="r1", event_type="llm_end", category="trace") + assert r1["seq"] == 1 + assert r2["seq"] == 2 + assert r3["seq"] == 3 + + @pytest.mark.anyio + async def test_seq_independent_across_threads(self, store): + r1 = await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message") + r2 = await store.put(thread_id="t2", run_id="r2", event_type="human_message", category="message") + assert r1["seq"] == 1 + assert r2["seq"] == 1 + + @pytest.mark.anyio + async def test_put_respects_provided_created_at(self, store): + ts = "2024-06-01T12:00:00+00:00" + record = await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message", created_at=ts) + assert record["created_at"] == ts + + @pytest.mark.anyio + async def test_put_metadata_preserved(self, store): + meta = {"model": "gpt-4", "tokens": 100} + record = await store.put(thread_id="t1", run_id="r1", event_type="llm_end", category="trace", metadata=meta) + assert record["metadata"] == meta + + +# -- list_messages -- + + +class TestListMessages: + @pytest.mark.anyio + async def test_only_returns_message_category(self, store): + await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message") + await store.put(thread_id="t1", run_id="r1", event_type="llm_end", category="trace") + await store.put(thread_id="t1", run_id="r1", event_type="run_start", category="lifecycle") + messages = await store.list_messages("t1") + assert len(messages) == 1 + assert messages[0]["category"] == "message" + + @pytest.mark.anyio + async def test_ascending_seq_order(self, store): + await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message", content="first") + await store.put(thread_id="t1", run_id="r1", event_type="ai_message", category="message", content="second") + await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message", content="third") + messages = await store.list_messages("t1") + seqs = [m["seq"] for m in messages] + assert seqs == sorted(seqs) + + @pytest.mark.anyio + async def test_before_seq_pagination(self, store): + # Put 10 messages with seq 1..10 + for i in range(10): + await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message", content=str(i)) + messages = await store.list_messages("t1", before_seq=6, limit=3) + assert len(messages) == 3 + assert [m["seq"] for m in messages] == [3, 4, 5] + + @pytest.mark.anyio + async def test_after_seq_pagination(self, store): + for i in range(10): + await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message", content=str(i)) + messages = await store.list_messages("t1", after_seq=7, limit=3) + assert len(messages) == 3 + assert [m["seq"] for m in messages] == [8, 9, 10] + + @pytest.mark.anyio + async def test_limit_restricts_count(self, store): + for _ in range(20): + await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message") + messages = await store.list_messages("t1", limit=5) + assert len(messages) == 5 + + @pytest.mark.anyio + async def test_cross_run_unified_ordering(self, store): + await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message") + await store.put(thread_id="t1", run_id="r1", event_type="ai_message", category="message") + await store.put(thread_id="t1", run_id="r2", event_type="human_message", category="message") + await store.put(thread_id="t1", run_id="r2", event_type="ai_message", category="message") + messages = await store.list_messages("t1") + assert [m["seq"] for m in messages] == [1, 2, 3, 4] + assert messages[0]["run_id"] == "r1" + assert messages[2]["run_id"] == "r2" + + @pytest.mark.anyio + async def test_default_returns_latest(self, store): + for _ in range(10): + await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message") + messages = await store.list_messages("t1", limit=3) + assert [m["seq"] for m in messages] == [8, 9, 10] + + +# -- list_events -- + + +class TestListEvents: + @pytest.mark.anyio + async def test_returns_all_categories_for_run(self, store): + await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message") + await store.put(thread_id="t1", run_id="r1", event_type="llm_end", category="trace") + await store.put(thread_id="t1", run_id="r1", event_type="run_start", category="lifecycle") + events = await store.list_events("t1", "r1") + assert len(events) == 3 + + @pytest.mark.anyio + async def test_event_types_filter(self, store): + await store.put(thread_id="t1", run_id="r1", event_type="llm_start", category="trace") + await store.put(thread_id="t1", run_id="r1", event_type="llm_end", category="trace") + await store.put(thread_id="t1", run_id="r1", event_type="tool_start", category="trace") + events = await store.list_events("t1", "r1", event_types=["llm_end"]) + assert len(events) == 1 + assert events[0]["event_type"] == "llm_end" + + @pytest.mark.anyio + async def test_only_returns_specified_run(self, store): + await store.put(thread_id="t1", run_id="r1", event_type="llm_end", category="trace") + await store.put(thread_id="t1", run_id="r2", event_type="llm_end", category="trace") + events = await store.list_events("t1", "r1") + assert len(events) == 1 + assert events[0]["run_id"] == "r1" + + +# -- list_messages_by_run -- + + +class TestListMessagesByRun: + @pytest.mark.anyio + async def test_only_messages_for_specified_run(self, store): + await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message") + await store.put(thread_id="t1", run_id="r1", event_type="llm_end", category="trace") + await store.put(thread_id="t1", run_id="r2", event_type="human_message", category="message") + messages = await store.list_messages_by_run("t1", "r1") + assert len(messages) == 1 + assert messages[0]["run_id"] == "r1" + assert messages[0]["category"] == "message" + + +# -- count_messages -- + + +class TestCountMessages: + @pytest.mark.anyio + async def test_counts_only_message_category(self, store): + await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message") + await store.put(thread_id="t1", run_id="r1", event_type="ai_message", category="message") + await store.put(thread_id="t1", run_id="r1", event_type="llm_end", category="trace") + assert await store.count_messages("t1") == 2 + + +# -- put_batch -- + + +class TestPutBatch: + @pytest.mark.anyio + async def test_batch_assigns_seq(self, store): + events = [ + {"thread_id": "t1", "run_id": "r1", "event_type": "human_message", "category": "message", "content": "a"}, + {"thread_id": "t1", "run_id": "r1", "event_type": "ai_message", "category": "message", "content": "b"}, + {"thread_id": "t1", "run_id": "r1", "event_type": "llm_end", "category": "trace"}, + ] + results = await store.put_batch(events) + assert len(results) == 3 + assert all("seq" in r for r in results) + + @pytest.mark.anyio + async def test_batch_seq_strictly_increasing(self, store): + events = [ + {"thread_id": "t1", "run_id": "r1", "event_type": "human_message", "category": "message"}, + {"thread_id": "t1", "run_id": "r1", "event_type": "ai_message", "category": "message"}, + ] + results = await store.put_batch(events) + assert results[0]["seq"] == 1 + assert results[1]["seq"] == 2 + + +# -- delete -- + + +class TestDelete: + @pytest.mark.anyio + async def test_delete_by_thread(self, store): + await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message") + await store.put(thread_id="t1", run_id="r1", event_type="ai_message", category="message") + await store.put(thread_id="t1", run_id="r2", event_type="llm_end", category="trace") + count = await store.delete_by_thread("t1") + assert count == 3 + assert await store.list_messages("t1") == [] + assert await store.count_messages("t1") == 0 + + @pytest.mark.anyio + async def test_delete_by_run(self, store): + await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message") + await store.put(thread_id="t1", run_id="r2", event_type="human_message", category="message") + await store.put(thread_id="t1", run_id="r2", event_type="llm_end", category="trace") + count = await store.delete_by_run("t1", "r2") + assert count == 2 + # r1 events should still be there + messages = await store.list_messages("t1") + assert len(messages) == 1 + assert messages[0]["run_id"] == "r1" + + @pytest.mark.anyio + async def test_delete_nonexistent_thread_returns_zero(self, store): + assert await store.delete_by_thread("nope") == 0 + + @pytest.mark.anyio + async def test_delete_nonexistent_run_returns_zero(self, store): + await store.put(thread_id="t1", run_id="r1", event_type="human_message", category="message") + assert await store.delete_by_run("t1", "nope") == 0 + + @pytest.mark.anyio + async def test_delete_nonexistent_thread_for_run_returns_zero(self, store): + assert await store.delete_by_run("nope", "r1") == 0 + + +# -- Edge cases -- + + +class TestEdgeCases: + @pytest.mark.anyio + async def test_empty_thread_list_messages(self, store): + assert await store.list_messages("empty") == [] + + @pytest.mark.anyio + async def test_empty_run_list_events(self, store): + assert await store.list_events("empty", "r1") == [] + + @pytest.mark.anyio + async def test_empty_thread_count_messages(self, store): + assert await store.count_messages("empty") == 0 diff --git a/config.example.yaml b/config.example.yaml index 85c625299..ca1ecbe87 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -613,6 +613,20 @@ checkpointer: # backend: postgres # postgres_url: $DATABASE_URL +# ============================================================================ +# Run Events Configuration +# ============================================================================ +# Storage backend for run events (messages + execution traces). +# +# backend: memory -- No persistence, data lost on restart (default) +# backend: db -- SQL database via ORM, full query capability (production) +# backend: jsonl -- Append-only JSONL files (lightweight single-node persistence) +# +# run_events: +# backend: memory +# max_trace_content: 10240 # Truncation threshold for trace content (db backend, bytes) +# track_token_usage: true # Accumulate token counts to RunRow + # ============================================================================ # IM Channels Configuration # ============================================================================