diff --git a/backend/app/gateway/routers/threads.py b/backend/app/gateway/routers/threads.py index 59f5e6ea0..84af7c488 100644 --- a/backend/app/gateway/routers/threads.py +++ b/backend/app/gateway/routers/threads.py @@ -332,39 +332,62 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe @router.post("/search", response_model=list[ThreadResponse]) async def search_threads(body: ThreadSearchRequest, request: Request) -> list[ThreadResponse]: - """Search and list threads from the threads_meta table. + """Search and list threads. - NOTE: Migration from pre-persistence-layer deployments: - Threads created via LangGraph Server before this change are NOT - automatically indexed in threads_meta. They will not appear in - search results until a new run is created on them (which triggers - thread_meta upsert in services.py). For bulk migration, run: - python -m deerflow.persistence.migrate_threads_from_checkpointer - (migration script TBD in a follow-up PR) + Uses ThreadMetaRepository (SQL) when available, otherwise falls back + to the LangGraph Store for memory/lightweight deployments. """ from app.gateway.deps import get_thread_meta_repo repo = get_thread_meta_repo(request) - if repo is None: - raise HTTPException(status_code=503, detail="Thread metadata store not available") + if repo is not None: + rows = await repo.search( + metadata=body.metadata or None, + status=body.status, + limit=body.limit, + offset=body.offset, + ) + return [ + ThreadResponse( + thread_id=r["thread_id"], + status=r.get("status", "idle"), + created_at=r.get("created_at", ""), + updated_at=r.get("updated_at", ""), + metadata=r.get("metadata", {}), + values={"title": r["display_name"]} if r.get("display_name") else {}, + interrupts={}, + ) + for r in rows + ] - rows = await repo.search( - metadata=body.metadata or None, - status=body.status, + # Fallback: search the LangGraph Store (memory / no-SQL deployments) + store = get_store(request) + if store is None: + return [] + + filter_dict: dict[str, Any] = {} + if body.metadata: + filter_dict.update(body.metadata) + if body.status: + filter_dict["status"] = body.status + + items = await store.asearch( + THREADS_NS, + filter=filter_dict or None, limit=body.limit, offset=body.offset, ) return [ ThreadResponse( - thread_id=r["thread_id"], - status=r.get("status", "idle"), - created_at=r.get("created_at", ""), - updated_at=r.get("updated_at", ""), - metadata=r.get("metadata", {}), - values={"title": r["display_name"]} if r.get("display_name") else {}, + thread_id=item.key, + status=item.value.get("status", "idle"), + created_at=str(item.value.get("created_at", "")), + updated_at=str(item.value.get("updated_at", "")), + metadata=item.value.get("metadata", {}), + values=item.value.get("values", {}), interrupts={}, ) - for r in rows + for item in items ]