From 29547c0ee445c40013a57bff43fb15a71f144d24 Mon Sep 17 00:00:00 2001 From: rayhpeng Date: Mon, 6 Apr 2026 17:45:41 +0800 Subject: [PATCH] refactor(persistence): introduce ThreadMetaStore ABC for backend-agnostic thread metadata Add ThreadMetaStore abstract base class with create/get/search/update/delete interface. ThreadMetaRepository (SQL) now inherits from it. New MemoryThreadMetaStore wraps LangGraph BaseStore for memory-mode deployments. deps.py now always provides a non-None thread_meta_repo, eliminating all `if thread_meta_repo is not None` guards in services.py, worker.py, and routers/threads.py. search_threads no longer needs a Store fallback branch. Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/gateway/deps.py | 7 +- backend/app/gateway/routers/threads.py | 72 ++++-------- backend/app/gateway/services.py | 27 +++-- .../repositories/thread_meta_base.py | 51 +++++++++ .../repositories/thread_meta_memory.py | 108 ++++++++++++++++++ .../repositories/thread_meta_repo.py | 3 +- .../harness/deerflow/runtime/runs/worker.py | 13 +-- 7 files changed, 203 insertions(+), 78 deletions(-) create mode 100644 backend/packages/harness/deerflow/persistence/repositories/thread_meta_base.py create mode 100644 backend/packages/harness/deerflow/persistence/repositories/thread_meta_memory.py diff --git a/backend/app/gateway/deps.py b/backend/app/gateway/deps.py index d8af19d4e..c6eb18a71 100644 --- a/backend/app/gateway/deps.py +++ b/backend/app/gateway/deps.py @@ -54,11 +54,12 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]: app.state.feedback_repo = FeedbackRepository(sf) app.state.thread_meta_repo = ThreadMetaRepository(sf) else: + from deerflow.persistence.repositories.thread_meta_memory import MemoryThreadMetaStore from deerflow.runtime.runs.store.memory import MemoryRunStore app.state.run_store = MemoryRunStore() app.state.feedback_repo = None - app.state.thread_meta_repo = None + app.state.thread_meta_repo = MemoryThreadMetaStore(app.state.store) # Run event store (has its own factory with config-driven backend selection) run_events_config = getattr(config, "run_events", None) @@ -104,9 +105,7 @@ def get_store(request: Request): return getattr(request.app.state, "store", None) -def get_thread_meta_repo(request: Request): - """Return the ThreadMetaRepository, or None if not available.""" - return getattr(request.app.state, "thread_meta_repo", None) +get_thread_meta_repo = _require("thread_meta_repo", "Thread metadata store") def get_run_context(request: Request) -> RunContext: diff --git a/backend/app/gateway/routers/threads.py b/backend/app/gateway/routers/threads.py index 84af7c488..3017761e4 100644 --- a/backend/app/gateway/routers/threads.py +++ b/backend/app/gateway/routers/threads.py @@ -310,15 +310,14 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe from app.gateway.deps import get_thread_meta_repo thread_meta_repo = get_thread_meta_repo(request) - if thread_meta_repo is not None: - try: - await thread_meta_repo.create( - thread_id, - assistant_id=getattr(body, "assistant_id", None), - metadata=body.metadata, - ) - except Exception: - logger.debug("Failed to upsert thread_meta on create for %s (non-fatal)", sanitize_log_param(thread_id)) + try: + await thread_meta_repo.create( + thread_id, + assistant_id=getattr(body, "assistant_id", None), + metadata=body.metadata, + ) + except Exception: + logger.debug("Failed to upsert thread_meta on create for %s (non-fatal)", sanitize_log_param(thread_id)) logger.info("Thread created: %s", sanitize_log_param(thread_id)) return ThreadResponse( @@ -334,60 +333,29 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe async def search_threads(body: ThreadSearchRequest, request: Request) -> list[ThreadResponse]: """Search and list threads. - Uses ThreadMetaRepository (SQL) when available, otherwise falls back - to the LangGraph Store for memory/lightweight deployments. + Delegates to the configured ThreadMetaStore implementation + (SQL-backed for sqlite/postgres, Store-backed for memory mode). """ from app.gateway.deps import get_thread_meta_repo repo = get_thread_meta_repo(request) - if repo is not None: - rows = await repo.search( - metadata=body.metadata or None, - status=body.status, - limit=body.limit, - offset=body.offset, - ) - return [ - ThreadResponse( - thread_id=r["thread_id"], - status=r.get("status", "idle"), - created_at=r.get("created_at", ""), - updated_at=r.get("updated_at", ""), - metadata=r.get("metadata", {}), - values={"title": r["display_name"]} if r.get("display_name") else {}, - interrupts={}, - ) - for r in rows - ] - - # Fallback: search the LangGraph Store (memory / no-SQL deployments) - store = get_store(request) - if store is None: - return [] - - filter_dict: dict[str, Any] = {} - if body.metadata: - filter_dict.update(body.metadata) - if body.status: - filter_dict["status"] = body.status - - items = await store.asearch( - THREADS_NS, - filter=filter_dict or None, + rows = await repo.search( + metadata=body.metadata or None, + status=body.status, limit=body.limit, offset=body.offset, ) return [ ThreadResponse( - thread_id=item.key, - status=item.value.get("status", "idle"), - created_at=str(item.value.get("created_at", "")), - updated_at=str(item.value.get("updated_at", "")), - metadata=item.value.get("metadata", {}), - values=item.value.get("values", {}), + thread_id=r["thread_id"], + status=r.get("status", "idle"), + created_at=r.get("created_at", ""), + updated_at=r.get("updated_at", ""), + metadata=r.get("metadata", {}), + values={"title": r["display_name"]} if r.get("display_name") else {}, interrupts={}, ) - for item in items + for r in rows ] diff --git a/backend/app/gateway/services.py b/backend/app/gateway/services.py index bab664c1b..79d2c50b0 100644 --- a/backend/app/gateway/services.py +++ b/backend/app/gateway/services.py @@ -296,20 +296,19 @@ async def start_run( if run_ctx.store is not None: await _upsert_thread_in_store(run_ctx.store, thread_id, body.metadata) - # Upsert thread metadata in the SQL-backed threads_meta table - if run_ctx.thread_meta_repo is not None: - try: - existing = await run_ctx.thread_meta_repo.get(thread_id) - if existing is None: - await run_ctx.thread_meta_repo.create( - thread_id, - assistant_id=body.assistant_id, - metadata=body.metadata, - ) - else: - await run_ctx.thread_meta_repo.update_status(thread_id, "running") - except Exception: - logger.warning("Failed to upsert thread_meta for %s (non-fatal)", sanitize_log_param(thread_id)) + # Upsert thread metadata so the thread appears in /threads/search + try: + existing = await run_ctx.thread_meta_repo.get(thread_id) + if existing is None: + await run_ctx.thread_meta_repo.create( + thread_id, + assistant_id=body.assistant_id, + metadata=body.metadata, + ) + else: + await run_ctx.thread_meta_repo.update_status(thread_id, "running") + except Exception: + logger.warning("Failed to upsert thread_meta for %s (non-fatal)", sanitize_log_param(thread_id)) agent_factory = resolve_agent_factory(body.assistant_id) graph_input = normalize_input(body.input) diff --git a/backend/packages/harness/deerflow/persistence/repositories/thread_meta_base.py b/backend/packages/harness/deerflow/persistence/repositories/thread_meta_base.py new file mode 100644 index 000000000..9d858ea80 --- /dev/null +++ b/backend/packages/harness/deerflow/persistence/repositories/thread_meta_base.py @@ -0,0 +1,51 @@ +"""Abstract interface for thread metadata storage. + +Implementations: +- ThreadMetaRepository: SQL-backed (sqlite / postgres via SQLAlchemy) +- MemoryThreadMetaStore: wraps LangGraph BaseStore (memory mode) +""" + +from __future__ import annotations + +import abc + + +class ThreadMetaStore(abc.ABC): + @abc.abstractmethod + async def create( + self, + thread_id: str, + *, + assistant_id: str | None = None, + owner_id: str | None = None, + display_name: str | None = None, + metadata: dict | None = None, + ) -> dict: + pass + + @abc.abstractmethod + async def get(self, thread_id: str) -> dict | None: + pass + + @abc.abstractmethod + async def search( + self, + *, + metadata: dict | None = None, + status: str | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + pass + + @abc.abstractmethod + async def update_display_name(self, thread_id: str, display_name: str) -> None: + pass + + @abc.abstractmethod + async def update_status(self, thread_id: str, status: str) -> None: + pass + + @abc.abstractmethod + async def delete(self, thread_id: str) -> None: + pass diff --git a/backend/packages/harness/deerflow/persistence/repositories/thread_meta_memory.py b/backend/packages/harness/deerflow/persistence/repositories/thread_meta_memory.py new file mode 100644 index 000000000..b0e87165d --- /dev/null +++ b/backend/packages/harness/deerflow/persistence/repositories/thread_meta_memory.py @@ -0,0 +1,108 @@ +"""In-memory ThreadMetaStore backed by LangGraph BaseStore. + +Used when database.backend=memory. Delegates to the LangGraph Store's +``("threads",)`` namespace — the same namespace used by the Gateway +router for thread records. +""" + +from __future__ import annotations + +import time +from typing import Any + +from langgraph.store.base import BaseStore + +from deerflow.persistence.repositories.thread_meta_base import ThreadMetaStore + +THREADS_NS: tuple[str, ...] = ("threads",) + + +class MemoryThreadMetaStore(ThreadMetaStore): + def __init__(self, store: BaseStore) -> None: + self._store = store + + async def create( + self, + thread_id: str, + *, + assistant_id: str | None = None, + owner_id: str | None = None, + display_name: str | None = None, + metadata: dict | None = None, + ) -> dict: + now = time.time() + record: dict[str, Any] = { + "thread_id": thread_id, + "assistant_id": assistant_id, + "owner_id": owner_id, + "display_name": display_name, + "status": "idle", + "metadata": metadata or {}, + "values": {}, + "created_at": now, + "updated_at": now, + } + await self._store.aput(THREADS_NS, thread_id, record) + return record + + async def get(self, thread_id: str) -> dict | None: + item = await self._store.aget(THREADS_NS, thread_id) + return item.value if item is not None else None + + async def search( + self, + *, + metadata: dict | None = None, + status: str | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + filter_dict: dict[str, Any] = {} + if metadata: + filter_dict.update(metadata) + if status: + filter_dict["status"] = status + + items = await self._store.asearch( + THREADS_NS, + filter=filter_dict or None, + limit=limit, + offset=offset, + ) + return [self._item_to_dict(item) for item in items] + + async def update_display_name(self, thread_id: str, display_name: str) -> None: + item = await self._store.aget(THREADS_NS, thread_id) + if item is None: + return + record = dict(item.value) + record["display_name"] = display_name + record["updated_at"] = time.time() + await self._store.aput(THREADS_NS, thread_id, record) + + async def update_status(self, thread_id: str, status: str) -> None: + item = await self._store.aget(THREADS_NS, thread_id) + if item is None: + return + record = dict(item.value) + record["status"] = status + record["updated_at"] = time.time() + await self._store.aput(THREADS_NS, thread_id, record) + + async def delete(self, thread_id: str) -> None: + await self._store.adelete(THREADS_NS, thread_id) + + @staticmethod + def _item_to_dict(item) -> dict[str, Any]: + """Convert a Store SearchItem to the dict format expected by callers.""" + val = item.value + return { + "thread_id": item.key, + "assistant_id": val.get("assistant_id"), + "owner_id": val.get("owner_id"), + "display_name": val.get("display_name"), + "status": val.get("status", "idle"), + "metadata": val.get("metadata", {}), + "created_at": str(val.get("created_at", "")), + "updated_at": str(val.get("updated_at", "")), + } diff --git a/backend/packages/harness/deerflow/persistence/repositories/thread_meta_repo.py b/backend/packages/harness/deerflow/persistence/repositories/thread_meta_repo.py index b10fc7061..17d8e08f1 100644 --- a/backend/packages/harness/deerflow/persistence/repositories/thread_meta_repo.py +++ b/backend/packages/harness/deerflow/persistence/repositories/thread_meta_repo.py @@ -9,9 +9,10 @@ from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from deerflow.persistence.models.thread_meta import ThreadMetaRow +from deerflow.persistence.repositories.thread_meta_base import ThreadMetaStore -class ThreadMetaRepository: +class ThreadMetaRepository(ThreadMetaStore): def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None: self._sf = session_factory diff --git a/backend/packages/harness/deerflow/runtime/runs/worker.py b/backend/packages/harness/deerflow/runtime/runs/worker.py index 00de0a2d1..a871a8bdd 100644 --- a/backend/packages/harness/deerflow/runtime/runs/worker.py +++ b/backend/packages/harness/deerflow/runtime/runs/worker.py @@ -285,7 +285,7 @@ async def run_agent( await run_manager.update_run_completion(run_id, status=record.status.value, **completion) # Sync title from checkpoint to threads_meta.display_name - if thread_meta_repo is not None and checkpointer is not None: + if checkpointer is not None: try: ckpt_config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}} ckpt_tuple = await checkpointer.aget_tuple(ckpt_config) @@ -298,12 +298,11 @@ async def run_agent( logger.debug("Failed to sync title for thread %s (non-fatal)", thread_id) # Update threads_meta status based on run outcome - if thread_meta_repo is not None: - try: - final_status = "idle" if record.status == RunStatus.success else record.status.value - await thread_meta_repo.update_status(thread_id, final_status) - except Exception: - logger.debug("Failed to update thread_meta status for %s (non-fatal)", thread_id) + try: + final_status = "idle" if record.status == RunStatus.success else record.status.value + await thread_meta_repo.update_status(thread_id, final_status) + except Exception: + logger.debug("Failed to update thread_meta status for %s (non-fatal)", thread_id) await bridge.publish_end(run_id) asyncio.create_task(bridge.cleanup(run_id, delay=60))