mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-25 11:18:22 +00:00
refactor(persistence): introduce ThreadMetaStore ABC for backend-agnostic thread metadata
Add ThreadMetaStore abstract base class with create/get/search/update/delete interface. ThreadMetaRepository (SQL) now inherits from it. New MemoryThreadMetaStore wraps LangGraph BaseStore for memory-mode deployments. deps.py now always provides a non-None thread_meta_repo, eliminating all `if thread_meta_repo is not None` guards in services.py, worker.py, and routers/threads.py. search_threads no longer needs a Store fallback branch. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
51c68db376
commit
29547c0ee4
@ -54,11 +54,12 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
app.state.feedback_repo = FeedbackRepository(sf)
|
||||
app.state.thread_meta_repo = ThreadMetaRepository(sf)
|
||||
else:
|
||||
from deerflow.persistence.repositories.thread_meta_memory import MemoryThreadMetaStore
|
||||
from deerflow.runtime.runs.store.memory import MemoryRunStore
|
||||
|
||||
app.state.run_store = MemoryRunStore()
|
||||
app.state.feedback_repo = None
|
||||
app.state.thread_meta_repo = None
|
||||
app.state.thread_meta_repo = MemoryThreadMetaStore(app.state.store)
|
||||
|
||||
# Run event store (has its own factory with config-driven backend selection)
|
||||
run_events_config = getattr(config, "run_events", None)
|
||||
@ -104,9 +105,7 @@ def get_store(request: Request):
|
||||
return getattr(request.app.state, "store", None)
|
||||
|
||||
|
||||
def get_thread_meta_repo(request: Request):
|
||||
"""Return the ThreadMetaRepository, or None if not available."""
|
||||
return getattr(request.app.state, "thread_meta_repo", None)
|
||||
get_thread_meta_repo = _require("thread_meta_repo", "Thread metadata store")
|
||||
|
||||
|
||||
def get_run_context(request: Request) -> RunContext:
|
||||
|
||||
@ -310,15 +310,14 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
|
||||
from app.gateway.deps import get_thread_meta_repo
|
||||
|
||||
thread_meta_repo = get_thread_meta_repo(request)
|
||||
if thread_meta_repo is not None:
|
||||
try:
|
||||
await thread_meta_repo.create(
|
||||
thread_id,
|
||||
assistant_id=getattr(body, "assistant_id", None),
|
||||
metadata=body.metadata,
|
||||
)
|
||||
except Exception:
|
||||
logger.debug("Failed to upsert thread_meta on create for %s (non-fatal)", sanitize_log_param(thread_id))
|
||||
try:
|
||||
await thread_meta_repo.create(
|
||||
thread_id,
|
||||
assistant_id=getattr(body, "assistant_id", None),
|
||||
metadata=body.metadata,
|
||||
)
|
||||
except Exception:
|
||||
logger.debug("Failed to upsert thread_meta on create for %s (non-fatal)", sanitize_log_param(thread_id))
|
||||
|
||||
logger.info("Thread created: %s", sanitize_log_param(thread_id))
|
||||
return ThreadResponse(
|
||||
@ -334,60 +333,29 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
|
||||
async def search_threads(body: ThreadSearchRequest, request: Request) -> list[ThreadResponse]:
|
||||
"""Search and list threads.
|
||||
|
||||
Uses ThreadMetaRepository (SQL) when available, otherwise falls back
|
||||
to the LangGraph Store for memory/lightweight deployments.
|
||||
Delegates to the configured ThreadMetaStore implementation
|
||||
(SQL-backed for sqlite/postgres, Store-backed for memory mode).
|
||||
"""
|
||||
from app.gateway.deps import get_thread_meta_repo
|
||||
|
||||
repo = get_thread_meta_repo(request)
|
||||
if repo is not None:
|
||||
rows = await repo.search(
|
||||
metadata=body.metadata or None,
|
||||
status=body.status,
|
||||
limit=body.limit,
|
||||
offset=body.offset,
|
||||
)
|
||||
return [
|
||||
ThreadResponse(
|
||||
thread_id=r["thread_id"],
|
||||
status=r.get("status", "idle"),
|
||||
created_at=r.get("created_at", ""),
|
||||
updated_at=r.get("updated_at", ""),
|
||||
metadata=r.get("metadata", {}),
|
||||
values={"title": r["display_name"]} if r.get("display_name") else {},
|
||||
interrupts={},
|
||||
)
|
||||
for r in rows
|
||||
]
|
||||
|
||||
# Fallback: search the LangGraph Store (memory / no-SQL deployments)
|
||||
store = get_store(request)
|
||||
if store is None:
|
||||
return []
|
||||
|
||||
filter_dict: dict[str, Any] = {}
|
||||
if body.metadata:
|
||||
filter_dict.update(body.metadata)
|
||||
if body.status:
|
||||
filter_dict["status"] = body.status
|
||||
|
||||
items = await store.asearch(
|
||||
THREADS_NS,
|
||||
filter=filter_dict or None,
|
||||
rows = await repo.search(
|
||||
metadata=body.metadata or None,
|
||||
status=body.status,
|
||||
limit=body.limit,
|
||||
offset=body.offset,
|
||||
)
|
||||
return [
|
||||
ThreadResponse(
|
||||
thread_id=item.key,
|
||||
status=item.value.get("status", "idle"),
|
||||
created_at=str(item.value.get("created_at", "")),
|
||||
updated_at=str(item.value.get("updated_at", "")),
|
||||
metadata=item.value.get("metadata", {}),
|
||||
values=item.value.get("values", {}),
|
||||
thread_id=r["thread_id"],
|
||||
status=r.get("status", "idle"),
|
||||
created_at=r.get("created_at", ""),
|
||||
updated_at=r.get("updated_at", ""),
|
||||
metadata=r.get("metadata", {}),
|
||||
values={"title": r["display_name"]} if r.get("display_name") else {},
|
||||
interrupts={},
|
||||
)
|
||||
for item in items
|
||||
for r in rows
|
||||
]
|
||||
|
||||
|
||||
|
||||
@ -296,20 +296,19 @@ async def start_run(
|
||||
if run_ctx.store is not None:
|
||||
await _upsert_thread_in_store(run_ctx.store, thread_id, body.metadata)
|
||||
|
||||
# Upsert thread metadata in the SQL-backed threads_meta table
|
||||
if run_ctx.thread_meta_repo is not None:
|
||||
try:
|
||||
existing = await run_ctx.thread_meta_repo.get(thread_id)
|
||||
if existing is None:
|
||||
await run_ctx.thread_meta_repo.create(
|
||||
thread_id,
|
||||
assistant_id=body.assistant_id,
|
||||
metadata=body.metadata,
|
||||
)
|
||||
else:
|
||||
await run_ctx.thread_meta_repo.update_status(thread_id, "running")
|
||||
except Exception:
|
||||
logger.warning("Failed to upsert thread_meta for %s (non-fatal)", sanitize_log_param(thread_id))
|
||||
# Upsert thread metadata so the thread appears in /threads/search
|
||||
try:
|
||||
existing = await run_ctx.thread_meta_repo.get(thread_id)
|
||||
if existing is None:
|
||||
await run_ctx.thread_meta_repo.create(
|
||||
thread_id,
|
||||
assistant_id=body.assistant_id,
|
||||
metadata=body.metadata,
|
||||
)
|
||||
else:
|
||||
await run_ctx.thread_meta_repo.update_status(thread_id, "running")
|
||||
except Exception:
|
||||
logger.warning("Failed to upsert thread_meta for %s (non-fatal)", sanitize_log_param(thread_id))
|
||||
|
||||
agent_factory = resolve_agent_factory(body.assistant_id)
|
||||
graph_input = normalize_input(body.input)
|
||||
|
||||
@ -0,0 +1,51 @@
|
||||
"""Abstract interface for thread metadata storage.
|
||||
|
||||
Implementations:
|
||||
- ThreadMetaRepository: SQL-backed (sqlite / postgres via SQLAlchemy)
|
||||
- MemoryThreadMetaStore: wraps LangGraph BaseStore (memory mode)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
|
||||
|
||||
class ThreadMetaStore(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
async def create(
|
||||
self,
|
||||
thread_id: str,
|
||||
*,
|
||||
assistant_id: str | None = None,
|
||||
owner_id: str | None = None,
|
||||
display_name: str | None = None,
|
||||
metadata: dict | None = None,
|
||||
) -> dict:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
async def get(self, thread_id: str) -> dict | None:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
async def search(
|
||||
self,
|
||||
*,
|
||||
metadata: dict | None = None,
|
||||
status: str | None = None,
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
) -> list[dict]:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
async def update_display_name(self, thread_id: str, display_name: str) -> None:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
async def update_status(self, thread_id: str, status: str) -> None:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
async def delete(self, thread_id: str) -> None:
|
||||
pass
|
||||
@ -0,0 +1,108 @@
|
||||
"""In-memory ThreadMetaStore backed by LangGraph BaseStore.
|
||||
|
||||
Used when database.backend=memory. Delegates to the LangGraph Store's
|
||||
``("threads",)`` namespace — the same namespace used by the Gateway
|
||||
router for thread records.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from langgraph.store.base import BaseStore
|
||||
|
||||
from deerflow.persistence.repositories.thread_meta_base import ThreadMetaStore
|
||||
|
||||
THREADS_NS: tuple[str, ...] = ("threads",)
|
||||
|
||||
|
||||
class MemoryThreadMetaStore(ThreadMetaStore):
|
||||
def __init__(self, store: BaseStore) -> None:
|
||||
self._store = store
|
||||
|
||||
async def create(
|
||||
self,
|
||||
thread_id: str,
|
||||
*,
|
||||
assistant_id: str | None = None,
|
||||
owner_id: str | None = None,
|
||||
display_name: str | None = None,
|
||||
metadata: dict | None = None,
|
||||
) -> dict:
|
||||
now = time.time()
|
||||
record: dict[str, Any] = {
|
||||
"thread_id": thread_id,
|
||||
"assistant_id": assistant_id,
|
||||
"owner_id": owner_id,
|
||||
"display_name": display_name,
|
||||
"status": "idle",
|
||||
"metadata": metadata or {},
|
||||
"values": {},
|
||||
"created_at": now,
|
||||
"updated_at": now,
|
||||
}
|
||||
await self._store.aput(THREADS_NS, thread_id, record)
|
||||
return record
|
||||
|
||||
async def get(self, thread_id: str) -> dict | None:
|
||||
item = await self._store.aget(THREADS_NS, thread_id)
|
||||
return item.value if item is not None else None
|
||||
|
||||
async def search(
|
||||
self,
|
||||
*,
|
||||
metadata: dict | None = None,
|
||||
status: str | None = None,
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
) -> list[dict]:
|
||||
filter_dict: dict[str, Any] = {}
|
||||
if metadata:
|
||||
filter_dict.update(metadata)
|
||||
if status:
|
||||
filter_dict["status"] = status
|
||||
|
||||
items = await self._store.asearch(
|
||||
THREADS_NS,
|
||||
filter=filter_dict or None,
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
)
|
||||
return [self._item_to_dict(item) for item in items]
|
||||
|
||||
async def update_display_name(self, thread_id: str, display_name: str) -> None:
|
||||
item = await self._store.aget(THREADS_NS, thread_id)
|
||||
if item is None:
|
||||
return
|
||||
record = dict(item.value)
|
||||
record["display_name"] = display_name
|
||||
record["updated_at"] = time.time()
|
||||
await self._store.aput(THREADS_NS, thread_id, record)
|
||||
|
||||
async def update_status(self, thread_id: str, status: str) -> None:
|
||||
item = await self._store.aget(THREADS_NS, thread_id)
|
||||
if item is None:
|
||||
return
|
||||
record = dict(item.value)
|
||||
record["status"] = status
|
||||
record["updated_at"] = time.time()
|
||||
await self._store.aput(THREADS_NS, thread_id, record)
|
||||
|
||||
async def delete(self, thread_id: str) -> None:
|
||||
await self._store.adelete(THREADS_NS, thread_id)
|
||||
|
||||
@staticmethod
|
||||
def _item_to_dict(item) -> dict[str, Any]:
|
||||
"""Convert a Store SearchItem to the dict format expected by callers."""
|
||||
val = item.value
|
||||
return {
|
||||
"thread_id": item.key,
|
||||
"assistant_id": val.get("assistant_id"),
|
||||
"owner_id": val.get("owner_id"),
|
||||
"display_name": val.get("display_name"),
|
||||
"status": val.get("status", "idle"),
|
||||
"metadata": val.get("metadata", {}),
|
||||
"created_at": str(val.get("created_at", "")),
|
||||
"updated_at": str(val.get("updated_at", "")),
|
||||
}
|
||||
@ -9,9 +9,10 @@ from sqlalchemy import select, update
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||
|
||||
from deerflow.persistence.models.thread_meta import ThreadMetaRow
|
||||
from deerflow.persistence.repositories.thread_meta_base import ThreadMetaStore
|
||||
|
||||
|
||||
class ThreadMetaRepository:
|
||||
class ThreadMetaRepository(ThreadMetaStore):
|
||||
def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None:
|
||||
self._sf = session_factory
|
||||
|
||||
|
||||
@ -285,7 +285,7 @@ async def run_agent(
|
||||
await run_manager.update_run_completion(run_id, status=record.status.value, **completion)
|
||||
|
||||
# Sync title from checkpoint to threads_meta.display_name
|
||||
if thread_meta_repo is not None and checkpointer is not None:
|
||||
if checkpointer is not None:
|
||||
try:
|
||||
ckpt_config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}}
|
||||
ckpt_tuple = await checkpointer.aget_tuple(ckpt_config)
|
||||
@ -298,12 +298,11 @@ async def run_agent(
|
||||
logger.debug("Failed to sync title for thread %s (non-fatal)", thread_id)
|
||||
|
||||
# Update threads_meta status based on run outcome
|
||||
if thread_meta_repo is not None:
|
||||
try:
|
||||
final_status = "idle" if record.status == RunStatus.success else record.status.value
|
||||
await thread_meta_repo.update_status(thread_id, final_status)
|
||||
except Exception:
|
||||
logger.debug("Failed to update thread_meta status for %s (non-fatal)", thread_id)
|
||||
try:
|
||||
final_status = "idle" if record.status == RunStatus.success else record.status.value
|
||||
await thread_meta_repo.update_status(thread_id, final_status)
|
||||
except Exception:
|
||||
logger.debug("Failed to update thread_meta status for %s (non-fatal)", thread_id)
|
||||
|
||||
await bridge.publish_end(run_id)
|
||||
asyncio.create_task(bridge.cleanup(run_id, delay=60))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user