diff --git a/backend/app/gateway/__init__.py b/backend/app/gateway/__init__.py index cab046712..82346f95d 100644 --- a/backend/app/gateway/__init__.py +++ b/backend/app/gateway/__init__.py @@ -1,4 +1,23 @@ -from .app import app, create_app -from .config import GatewayConfig, get_gateway_config +from __future__ import annotations -__all__ = ["app", "create_app", "GatewayConfig", "get_gateway_config"] +__all__ = ["GatewayConfig", "app", "get_gateway_config", "register_app"] + + +def __getattr__(name: str): + if name == "app": + from .app import app + + return app + if name == "GatewayConfig": + from .config import GatewayConfig + + return GatewayConfig + if name == "get_gateway_config": + from .config import get_gateway_config + + return get_gateway_config + if name == "register_app": + from .registrar import register_app + + return register_app + raise AttributeError(name) diff --git a/backend/app/gateway/app.py b/backend/app/gateway/app.py index 935c936f4..4ee38e608 100644 --- a/backend/app/gateway/app.py +++ b/backend/app/gateway/app.py @@ -1,363 +1,8 @@ -import logging -import os -from collections.abc import AsyncGenerator -from contextlib import asynccontextmanager +from app.gateway.registrar import register_app -from fastapi import FastAPI -from fastapi.middleware.cors import CORSMiddleware -from app.gateway.auth_middleware import AuthMiddleware -from app.gateway.config import get_gateway_config -from app.gateway.csrf_middleware import CSRFMiddleware -from app.gateway.deps import langgraph_runtime -from app.gateway.routers import ( - agents, - artifacts, - assistants_compat, - auth, - channels, - feedback, - mcp, - memory, - models, - runs, - skills, - suggestions, - thread_runs, - threads, - uploads, -) -from deerflow.config.app_config import get_app_config +def create_app(): + return register_app() -# Configure logging -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", -) -logger = logging.getLogger(__name__) - - -async def _ensure_admin_user(app: FastAPI) -> None: - """Startup hook: handle first boot and migrate orphan threads otherwise. - - After admin creation, migrate orphan threads from the LangGraph - store (metadata.user_id unset) to the admin account. This is the - "no-auth → with-auth" upgrade path: users who ran DeerFlow without - authentication have existing LangGraph thread data that needs an - owner assigned. - First boot (no admin exists): - - Does NOT create any user accounts automatically. - - The operator must visit ``/setup`` to create the first admin. - - Subsequent boots (admin already exists): - - Runs the one-time "no-auth → with-auth" orphan thread migration for - existing LangGraph thread metadata that has no owner_id. - - No SQL persistence migration is needed: the four user_id columns - (threads_meta, runs, run_events, feedback) only come into existence - alongside the auth module via create_all, so freshly created tables - never contain NULL-owner rows. - """ - from sqlalchemy import select - - from app.gateway.deps import get_local_provider - from deerflow.persistence.engine import get_session_factory - from deerflow.persistence.user.model import UserRow - - provider = get_local_provider() - admin_count = await provider.count_admin_users() - - if admin_count == 0: - logger.info("=" * 60) - logger.info(" First boot detected — no admin account exists.") - logger.info(" Visit /setup to complete admin account creation.") - logger.info("=" * 60) - return - - # Admin already exists — run orphan thread migration for any - # LangGraph thread metadata that pre-dates the auth module. - sf = get_session_factory() - if sf is None: - return - - async with sf() as session: - stmt = select(UserRow).where(UserRow.system_role == "admin").limit(1) - row = (await session.execute(stmt)).scalar_one_or_none() - - if row is None: - return # Should not happen (admin_count > 0 above), but be safe. - - admin_id = str(row.id) - - # LangGraph store orphan migration — non-fatal. - # This covers the "no-auth → with-auth" upgrade path for users - # whose existing LangGraph thread metadata has no user_id set. - store = getattr(app.state, "store", None) - if store is not None: - try: - migrated = await _migrate_orphaned_threads(store, admin_id) - if migrated: - logger.info("Migrated %d orphan LangGraph thread(s) to admin", migrated) - except Exception: - logger.exception("LangGraph thread migration failed (non-fatal)") - - -async def _iter_store_items(store, namespace, *, page_size: int = 500): - """Paginated async iterator over a LangGraph store namespace. - - Replaces the old hardcoded ``limit=1000`` call with a cursor-style - loop so that environments with more than one page of orphans do - not silently lose data. Terminates when a page is empty OR when a - short page arrives (indicating the last page). - """ - offset = 0 - while True: - batch = await store.asearch(namespace, limit=page_size, offset=offset) - if not batch: - return - for item in batch: - yield item - if len(batch) < page_size: - return - offset += page_size - - -async def _migrate_orphaned_threads(store, admin_user_id: str) -> int: - """Migrate LangGraph store threads with no user_id to the given admin. - - Uses cursor pagination so all orphans are migrated regardless of - count. Returns the number of rows migrated. - """ - migrated = 0 - async for item in _iter_store_items(store, ("threads",)): - metadata = item.value.get("metadata", {}) - if not metadata.get("user_id"): - metadata["user_id"] = admin_user_id - item.value["metadata"] = metadata - await store.aput(("threads",), item.key, item.value) - migrated += 1 - return migrated - - -@asynccontextmanager -async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: - """Application lifespan handler.""" - - # Load config and check necessary environment variables at startup - try: - get_app_config() - logger.info("Configuration loaded successfully") - except Exception as e: - error_msg = f"Failed to load configuration during gateway startup: {e}" - logger.exception(error_msg) - raise RuntimeError(error_msg) from e - config = get_gateway_config() - logger.info(f"Starting API Gateway on {config.host}:{config.port}") - - # Initialize LangGraph runtime components (StreamBridge, RunManager, checkpointer, store) - async with langgraph_runtime(app): - logger.info("LangGraph runtime initialised") - - # Ensure admin user exists (auto-create on first boot) - # Must run AFTER langgraph_runtime so app.state.store is available for thread migration - await _ensure_admin_user(app) - - # Start IM channel service if any channels are configured - try: - from app.channels.service import start_channel_service - - channel_service = await start_channel_service() - logger.info("Channel service started: %s", channel_service.get_status()) - except Exception: - logger.exception("No IM channels configured or channel service failed to start") - - yield - - # Stop channel service on shutdown - try: - from app.channels.service import stop_channel_service - - await stop_channel_service() - except Exception: - logger.exception("Failed to stop channel service") - - logger.info("Shutting down API Gateway") - - -def create_app() -> FastAPI: - """Create and configure the FastAPI application. - - Returns: - Configured FastAPI application instance. - """ - - app = FastAPI( - title="DeerFlow API Gateway", - description=""" -## DeerFlow API Gateway - -API Gateway for DeerFlow - A LangGraph-based AI agent backend with sandbox execution capabilities. - -### Features - -- **Models Management**: Query and retrieve available AI models -- **MCP Configuration**: Manage Model Context Protocol (MCP) server configurations -- **Memory Management**: Access and manage global memory data for personalized conversations -- **Skills Management**: Query and manage skills and their enabled status -- **Artifacts**: Access thread artifacts and generated files -- **Health Monitoring**: System health check endpoints - -### Architecture - -LangGraph requests are handled by nginx reverse proxy. -This gateway provides custom endpoints for models, MCP configuration, skills, and artifacts. - """, - version="0.1.0", - lifespan=lifespan, - docs_url="/docs", - redoc_url="/redoc", - openapi_url="/openapi.json", - openapi_tags=[ - { - "name": "models", - "description": "Operations for querying available AI models and their configurations", - }, - { - "name": "mcp", - "description": "Manage Model Context Protocol (MCP) server configurations", - }, - { - "name": "memory", - "description": "Access and manage global memory data for personalized conversations", - }, - { - "name": "skills", - "description": "Manage skills and their configurations", - }, - { - "name": "artifacts", - "description": "Access and download thread artifacts and generated files", - }, - { - "name": "uploads", - "description": "Upload and manage user files for threads", - }, - { - "name": "threads", - "description": "Manage DeerFlow thread-local filesystem data", - }, - { - "name": "agents", - "description": "Create and manage custom agents with per-agent config and prompts", - }, - { - "name": "suggestions", - "description": "Generate follow-up question suggestions for conversations", - }, - { - "name": "channels", - "description": "Manage IM channel integrations (Feishu, Slack, Telegram)", - }, - { - "name": "assistants-compat", - "description": "LangGraph Platform-compatible assistants API (stub)", - }, - { - "name": "runs", - "description": "LangGraph Platform-compatible runs lifecycle (create, stream, cancel)", - }, - { - "name": "health", - "description": "Health check and system status endpoints", - }, - ], - ) - - # Auth: reject unauthenticated requests to non-public paths (fail-closed safety net) - app.add_middleware(AuthMiddleware) - - # CSRF: Double Submit Cookie pattern for state-changing requests - app.add_middleware(CSRFMiddleware) - - # CORS: when GATEWAY_CORS_ORIGINS is set (dev without nginx), add CORS middleware. - # In production, nginx handles CORS and no middleware is needed. - cors_origins_env = os.environ.get("GATEWAY_CORS_ORIGINS", "") - if cors_origins_env: - cors_origins = [o.strip() for o in cors_origins_env.split(",") if o.strip()] - # Validate: wildcard origin with credentials is a security misconfiguration - for origin in cors_origins: - if origin == "*": - logger.error("GATEWAY_CORS_ORIGINS contains wildcard '*' with allow_credentials=True. This is a security misconfiguration — browsers will reject the response. Use explicit scheme://host:port origins instead.") - cors_origins = [o for o in cors_origins if o != "*"] - break - if cors_origins: - app.add_middleware( - CORSMiddleware, - allow_origins=cors_origins, - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) - - # Include routers - # Models API is mounted at /api/models - app.include_router(models.router) - - # MCP API is mounted at /api/mcp - app.include_router(mcp.router) - - # Memory API is mounted at /api/memory - app.include_router(memory.router) - - # Skills API is mounted at /api/skills - app.include_router(skills.router) - - # Artifacts API is mounted at /api/threads/{thread_id}/artifacts - app.include_router(artifacts.router) - - # Uploads API is mounted at /api/threads/{thread_id}/uploads - app.include_router(uploads.router) - - # Thread cleanup API is mounted at /api/threads/{thread_id} - app.include_router(threads.router) - - # Agents API is mounted at /api/agents - app.include_router(agents.router) - - # Suggestions API is mounted at /api/threads/{thread_id}/suggestions - app.include_router(suggestions.router) - - # Channels API is mounted at /api/channels - app.include_router(channels.router) - - # Assistants compatibility API (LangGraph Platform stub) - app.include_router(assistants_compat.router) - - # Auth API is mounted at /api/v1/auth - app.include_router(auth.router) - - # Feedback API is mounted at /api/threads/{thread_id}/runs/{run_id}/feedback - app.include_router(feedback.router) - - # Thread Runs API (LangGraph Platform-compatible runs lifecycle) - app.include_router(thread_runs.router) - - # Stateless Runs API (stream/wait without a pre-existing thread) - app.include_router(runs.router) - - @app.get("/health", tags=["health"]) - async def health_check() -> dict: - """Health check endpoint. - - Returns: - Service health status information. - """ - return {"status": "healthy", "service": "deer-flow-gateway"} - - return app - - -# Create app instance for uvicorn -app = create_app() +app = register_app() diff --git a/backend/app/gateway/common/__init__.py b/backend/app/gateway/common/__init__.py new file mode 100644 index 000000000..36e9f51f2 --- /dev/null +++ b/backend/app/gateway/common/__init__.py @@ -0,0 +1,3 @@ +from .lifespan import lifespan_manager + +__all__ = ["lifespan_manager"] diff --git a/backend/app/gateway/common/lifespan.py b/backend/app/gateway/common/lifespan.py new file mode 100644 index 000000000..6a69919bd --- /dev/null +++ b/backend/app/gateway/common/lifespan.py @@ -0,0 +1,52 @@ +from collections.abc import Callable +from contextlib import AbstractAsyncContextManager, AsyncExitStack, asynccontextmanager +from typing import Any + +from fastapi import FastAPI + +LifespanFunc = Callable[[FastAPI], AbstractAsyncContextManager[dict[str, Any] | None]] + + +class LifespanManager: + """FastAPI lifespan manager""" + + def __init__(self) -> None: + self._lifespans: list[LifespanFunc] = [] + + def register(self, func: LifespanFunc) -> LifespanFunc: + """ + Register a lifespan hook. + + :param func: lifespan hook + :return: + """ + if func not in self._lifespans: + self._lifespans.append(func) + return func + + def build(self) -> LifespanFunc: + """ + Build the combined lifespan hook. + + :return: + """ + + @asynccontextmanager + async def combined_lifespan(app: FastAPI): # noqa: ANN202 + state: dict[str, Any] = {} + async with AsyncExitStack() as exit_stack: + for lifespan_fn in self._lifespans: + result = await exit_stack.enter_async_context(lifespan_fn(app)) + if isinstance(result, dict): + state.update(result) + + for key, value in state.items(): + setattr(app.state, key, value) + + yield state or None + + return combined_lifespan + + +# Singleton lifespan_manager instance +lifespan_manager = LifespanManager() diff --git a/backend/app/gateway/dependencies/__init__.py b/backend/app/gateway/dependencies/__init__.py new file mode 100644 index 000000000..624c123de --- /dev/null +++ b/backend/app/gateway/dependencies/__init__.py @@ -0,0 +1,59 @@ +from app.gateway.dependencies.checkpointer import ( + CurrentCheckpointer, + get_checkpointer, +) +from app.plugins.auth.security.dependencies import ( + CurrentAuthService, + CurrentUserRepository, + get_auth_service, + get_current_user_from_request, + get_current_user_id, + get_optional_user_from_request, + get_user_repository, +) +from app.gateway.dependencies.db import ( + CurrentSession, + CurrentSessionTransaction, + get_db_session, + get_db_session_transaction, +) +from app.gateway.dependencies.repositories import ( + CurrentFeedbackRepository, + CurrentRunRepository, + CurrentThreadMetaRepository, + CurrentThreadMetaStorage, + get_feedback_repository, + get_run_repository, + get_thread_meta_repository, + get_thread_meta_storage, +) +from app.gateway.dependencies.stream_bridge import ( + CurrentStreamBridge, + get_stream_bridge, +) + +__all__ = [ + "CurrentCheckpointer", + "CurrentAuthService", + "CurrentFeedbackRepository", + "CurrentRunRepository", + "CurrentSession", + "CurrentSessionTransaction", + "CurrentStreamBridge", + "CurrentThreadMetaRepository", + "CurrentThreadMetaStorage", + "CurrentUserRepository", + "get_auth_service", + "get_checkpointer", + "get_current_user_from_request", + "get_current_user_id", + "get_db_session", + "get_db_session_transaction", + "get_feedback_repository", + "get_optional_user_from_request", + "get_run_repository", + "get_stream_bridge", + "get_thread_meta_repository", + "get_thread_meta_storage", + "get_user_repository", +] diff --git a/backend/app/gateway/dependencies/checkpointer.py b/backend/app/gateway/dependencies/checkpointer.py new file mode 100644 index 000000000..cd701589c --- /dev/null +++ b/backend/app/gateway/dependencies/checkpointer.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from typing import Annotated + +from fastapi import Depends, HTTPException, Request +from langgraph.types import Checkpointer + + +def get_checkpointer(request: Request) -> Checkpointer: + """Get checkpointer from app.state.persistence.""" + persistence = getattr(request.app.state, "persistence", None) + if persistence is None: + raise HTTPException(status_code=503, detail="Persistence not available") + checkpointer = getattr(persistence, "checkpointer", None) + if checkpointer is None: + raise HTTPException(status_code=503, detail="Checkpointer not available") + return checkpointer + + +CurrentCheckpointer = Annotated[Checkpointer, Depends(get_checkpointer)] diff --git a/backend/app/gateway/dependencies/db.py b/backend/app/gateway/dependencies/db.py new file mode 100644 index 000000000..88f0a0c7c --- /dev/null +++ b/backend/app/gateway/dependencies/db.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from collections.abc import AsyncIterator +from typing import Annotated + +from fastapi import Depends, HTTPException, Request +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + + +def _get_session_factory(request: Request) -> async_sessionmaker[AsyncSession]: + factory = getattr(request.app.state.persistence, "session_factory", None) + if factory is None: + raise HTTPException(status_code=503, detail="Database session factory not available") + return factory + + +async def get_db_session(request: Request) -> AsyncIterator[AsyncSession]: + """Open a session without auto-commit. Use for read-only endpoints.""" + session_factory = _get_session_factory(request) + async with session_factory() as session: + yield session + + +async def get_db_session_transaction(request: Request) -> AsyncIterator[AsyncSession]: + """Open a session and commit on success, rollback on error.""" + session_factory = _get_session_factory(request) + async with session_factory() as session: + try: + yield session + await session.commit() + except Exception: + await session.rollback() + raise + + +CurrentSession = Annotated[AsyncSession, Depends(get_db_session)] +CurrentSessionTransaction = Annotated[AsyncSession, Depends(get_db_session_transaction)] diff --git a/backend/app/gateway/dependencies/repositories.py b/backend/app/gateway/dependencies/repositories.py new file mode 100644 index 000000000..9bd5520c2 --- /dev/null +++ b/backend/app/gateway/dependencies/repositories.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from typing import Annotated + +from fastapi import Depends, HTTPException, Request + +from app.infra.storage import ThreadMetaStorage +from store.repositories.contracts import ( + FeedbackRepositoryProtocol, + RunRepositoryProtocol, + ThreadMetaRepositoryProtocol, +) + + +def _require_state(request: Request, attr: str, label: str): + value = getattr(request.app.state, attr, None) + if value is None: + raise HTTPException(status_code=503, detail=f"{label} not available") + return value + + +def get_run_repository(request: Request) -> RunRepositoryProtocol: + return _require_state(request, "run_store", "Run store") + + +def get_thread_meta_repository(request: Request) -> ThreadMetaRepositoryProtocol: + return _require_state(request, "thread_meta_repo", "Thread metadata store") + + +def get_thread_meta_storage(request: Request) -> ThreadMetaStorage: + return _require_state(request, "thread_meta_storage", "Thread metadata storage") + + +def get_feedback_repository(request: Request) -> FeedbackRepositoryProtocol: + return _require_state(request, "feedback_repo", "Feedback") + + +CurrentRunRepository = Annotated[RunRepositoryProtocol, Depends(get_run_repository)] +CurrentThreadMetaRepository = Annotated[ThreadMetaRepositoryProtocol, Depends(get_thread_meta_repository)] +CurrentThreadMetaStorage = Annotated[ThreadMetaStorage, Depends(get_thread_meta_storage)] +CurrentFeedbackRepository = Annotated[FeedbackRepositoryProtocol, Depends(get_feedback_repository)] diff --git a/backend/app/gateway/dependencies/stream_bridge.py b/backend/app/gateway/dependencies/stream_bridge.py new file mode 100644 index 000000000..77194ce3f --- /dev/null +++ b/backend/app/gateway/dependencies/stream_bridge.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from typing import Annotated + +from fastapi import Depends, HTTPException, Request + +from deerflow.runtime import StreamBridge + + +def get_stream_bridge(request: Request) -> StreamBridge: + """Get stream bridge from app.state.""" + bridge = getattr(request.app.state, "stream_bridge", None) + if bridge is None: + raise HTTPException(status_code=503, detail="Stream bridge not available") + return bridge + + +CurrentStreamBridge = Annotated[StreamBridge, Depends(get_stream_bridge)] diff --git a/backend/app/gateway/path_utils.py b/backend/app/gateway/path_utils.py index ded348c78..c269f2fea 100644 --- a/backend/app/gateway/path_utils.py +++ b/backend/app/gateway/path_utils.py @@ -5,16 +5,17 @@ from pathlib import Path from fastapi import HTTPException from deerflow.config.paths import get_paths -from deerflow.runtime.user_context import get_effective_user_id +from deerflow.runtime.actor_context import get_effective_user_id -def resolve_thread_virtual_path(thread_id: str, virtual_path: str) -> Path: +def resolve_thread_virtual_path(thread_id: str, virtual_path: str, *, user_id: str | None = None) -> Path: """Resolve a virtual path to the actual filesystem path under thread user-data. Args: thread_id: The thread ID. virtual_path: The virtual path as seen inside the sandbox (e.g., /mnt/user-data/outputs/file.txt). + user_id: Explicit user id override. Falls back to the current actor context. Returns: The resolved filesystem path. @@ -23,7 +24,8 @@ def resolve_thread_virtual_path(thread_id: str, virtual_path: str) -> Path: HTTPException: If the path is invalid or outside allowed directories. """ try: - return get_paths().resolve_virtual_path(thread_id, virtual_path, user_id=get_effective_user_id()) + resolved_user_id = get_effective_user_id() if user_id is None else user_id + return get_paths().resolve_virtual_path(thread_id, virtual_path, user_id=resolved_user_id) except ValueError as e: status = 403 if "traversal" in str(e) else 400 raise HTTPException(status_code=status, detail=str(e)) diff --git a/backend/app/gateway/registrar.py b/backend/app/gateway/registrar.py new file mode 100644 index 000000000..53951ae3e --- /dev/null +++ b/backend/app/gateway/registrar.py @@ -0,0 +1,132 @@ +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Any + +from fastapi import FastAPI +from fastapi.responses import HTMLResponse +from fastapi.staticfiles import StaticFiles +from scalar_fastapi import AgentScalarConfig, get_scalar_api_reference +from starlette.middleware.cors import CORSMiddleware +from store.persistence import create_persistence + +from app.gateway.common import lifespan_manager +from app.gateway.router import router as gateway_router +from app.infra.run_events import build_run_event_store +from app.infra.storage import FeedbackStoreAdapter, RunStoreAdapter, ThreadMetaStorage, ThreadMetaStoreAdapter +from app.plugins.auth.authorization.hooks import build_authz_hooks +from app.plugins.auth.injection import install_route_guards, load_route_policy_registry, validate_route_policy_registry +from app.plugins.auth.security import AuthMiddleware, CSRFMiddleware + +STATIC_DIR = Path(__file__).resolve().parents[1] / "static" +STATIC_MOUNT = "/api/static" +SCALAR_JS_URL = f"{STATIC_MOUNT}/scalar.js" + + +@lifespan_manager.register +@asynccontextmanager +async def init_persistence(app: FastAPI) -> AsyncGenerator[dict[str, Any], None]: + """Initialize persistence layer (DB, checkpointer, store).""" + app_persistence = await create_persistence() + + await app_persistence.setup() + run_store = RunStoreAdapter(app_persistence.session_factory) + thread_meta_store = ThreadMetaStoreAdapter(app_persistence.session_factory) + feedback_store = FeedbackStoreAdapter(app_persistence.session_factory) + + try: + yield { + "persistence": app_persistence, + "checkpointer": app_persistence.checkpointer, + "store": None, + "session_factory": app_persistence.session_factory, + "run_store": run_store, + "run_read_repo": run_store, + "run_write_repo": run_store, + "run_delete_repo": run_store, + "feedback_repo": feedback_store, + "thread_meta_repo": thread_meta_store, + "thread_meta_storage": ThreadMetaStorage(thread_meta_store), + "run_event_store": build_run_event_store(app_persistence.session_factory), + } + finally: + await app_persistence.aclose() + + +@lifespan_manager.register +@asynccontextmanager +async def init_runtime(app: FastAPI) -> AsyncGenerator[dict[str, Any], None]: + """Initialize StreamBridge for LangGraph-compatible runtime endpoints.""" + from app.infra.stream_bridge import build_stream_bridge + + async with build_stream_bridge() as stream_bridge: + yield { + "stream_bridge": stream_bridge, + } + + +def register_app() -> FastAPI: + app = FastAPI( + title="DeerFlow API Gateway", + version="0.1.0", + docs_url=None, + redoc_url=None, + lifespan=lifespan_manager.build(), + openapi_tags=[ + { + "name": "threads", + "description": "Endpoints for managing threads, which are conversations between a human and an assistant. A thread can have multiple runs as the conversation progresses." + } + ] + ) + + app.state.authz_hooks = build_authz_hooks() + + _register_static(app) + _register_routes(app) + _register_scalar(app) + _register_auth_route_policies(app) + _register_middlewares(app) + + return app + + +def _register_static(app: FastAPI) -> None: + app.mount(STATIC_MOUNT, StaticFiles(directory=STATIC_DIR), name="static") + + +def _register_routes(app: FastAPI) -> None: + app.include_router(gateway_router) + + +def _register_auth_route_policies(app: FastAPI) -> None: + registry = load_route_policy_registry() + validate_route_policy_registry(app, registry) + app.state.auth_route_policy_registry = registry + install_route_guards(app) + + +def _register_middlewares(app: FastAPI) -> None: + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + expose_headers=["*"], + ) + app.add_middleware(CSRFMiddleware) + app.add_middleware(AuthMiddleware) + + +def _register_scalar(app: FastAPI) -> None: + @app.get("/docs", include_in_schema=False) + def scalar_docs() -> HTMLResponse: + return get_scalar_api_reference( + openapi_url=app.openapi_url, + title=app.title, + scalar_js_url=SCALAR_JS_URL, + agent=AgentScalarConfig(disabled=True), + hide_client_button=True, + overrides={"mcp": {"disabled": True}}, + ) diff --git a/backend/app/gateway/router.py b/backend/app/gateway/router.py new file mode 100644 index 000000000..303a6120a --- /dev/null +++ b/backend/app/gateway/router.py @@ -0,0 +1,22 @@ +from fastapi import APIRouter + +from app.plugins.auth.api.router import router as auth_router + +from .routers import artifacts, channels, mcp, models, skills, uploads +from .routers.agents import router as agents_router +from .routers.langgraph import feedback_router, runs_router, suggestion_router, threads_router + +router = APIRouter() + +router.include_router(auth_router) +router.include_router(threads_router, prefix="/api/threads") +router.include_router(runs_router, prefix="/api/threads") +router.include_router(feedback_router, prefix="/api/threads") +router.include_router(suggestion_router) +router.include_router(agents_router) +router.include_router(channels.router) +router.include_router(artifacts.router) +router.include_router(mcp.router) +router.include_router(models.router) +router.include_router(skills.router) +router.include_router(uploads.router) diff --git a/backend/app/gateway/services/__init__.py b/backend/app/gateway/services/__init__.py new file mode 100644 index 000000000..1ff380134 --- /dev/null +++ b/backend/app/gateway/services/__init__.py @@ -0,0 +1,5 @@ +"""Gateway service layer.""" + +"""Compatibility package for app service submodules.""" + +__all__: list[str] = [] diff --git a/backend/app/gateway/services/runs/__init__.py b/backend/app/gateway/services/runs/__init__.py new file mode 100644 index 000000000..daf620f45 --- /dev/null +++ b/backend/app/gateway/services/runs/__init__.py @@ -0,0 +1,29 @@ +"""Runs app layer services.""" + +from app.infra.storage import StorageRunObserver +from .input import ( + AdaptedRunRequest, + RunSpecBuilder, + UnsupportedRunFeatureError, + adapt_create_run_request, + adapt_create_stream_request, + adapt_create_wait_request, + adapt_join_stream_request, + adapt_join_wait_request, +) +from .store import AppRunCreateStore, AppRunDeleteStore, AppRunQueryStore + +__all__ = [ + "AdaptedRunRequest", + "AppRunCreateStore", + "AppRunDeleteStore", + "AppRunQueryStore", + "RunSpecBuilder", + "StorageRunObserver", + "UnsupportedRunFeatureError", + "adapt_create_run_request", + "adapt_create_stream_request", + "adapt_create_wait_request", + "adapt_join_stream_request", + "adapt_join_wait_request", +] diff --git a/backend/app/gateway/services/runs/facade_factory.py b/backend/app/gateway/services/runs/facade_factory.py new file mode 100644 index 000000000..96cc60d64 --- /dev/null +++ b/backend/app/gateway/services/runs/facade_factory.py @@ -0,0 +1,150 @@ +"""Facade factory - assembles RunsFacade with dependencies.""" + +from __future__ import annotations + +from collections.abc import Callable +from typing import TYPE_CHECKING + +from fastapi import HTTPException, Request + +from app.gateway.dependencies import get_checkpointer, get_stream_bridge +from deerflow.runtime.runs.facade import RunsFacade +from deerflow.runtime.runs.facade import RunsRuntime +from deerflow.runtime.runs.internal.execution.supervisor import RunSupervisor +from deerflow.runtime.runs.internal.planner import ExecutionPlanner +from deerflow.runtime.runs.internal.registry import RunRegistry +from deerflow.runtime.runs.internal.streams import RunStreamService +from deerflow.runtime.runs.internal.wait import RunWaitService + +from app.infra.storage import StorageRunObserver, ThreadMetaStorage +from app.infra.storage.runs import RunDeleteRepository, RunReadRepository, RunWriteRepository +from .store import AppRunCreateStore, AppRunDeleteStore, AppRunQueryStore + +if TYPE_CHECKING: + from deerflow.runtime.stream_bridge import StreamBridge + + +type AgentFactory = Callable[..., object] + + +# Module-level singleton registry (shared across requests) +_registry: RunRegistry | None = None +_supervisor: RunSupervisor | None = None + + +def _get_state(request: Request, attr: str, label: str): + value = getattr(request.app.state, attr, None) + if value is None: + raise HTTPException(status_code=503, detail=f"{label} not available") + return value + + +def get_registry() -> RunRegistry: + """Get or create singleton registry.""" + global _registry + if _registry is None: + _registry = RunRegistry() + return _registry + + +def get_supervisor() -> RunSupervisor: + """Get or create singleton run supervisor.""" + global _supervisor + if _supervisor is None: + _supervisor = RunSupervisor() + return _supervisor + + +def resolve_agent_factory(assistant_id: str | None) -> AgentFactory: + """Resolve the agent factory callable from config.""" + from deerflow.agents.lead_agent.agent import make_lead_agent + + return make_lead_agent + + +def build_runs_facade( + *, + stream_bridge: "StreamBridge", + checkpointer: object, + store: object | None = None, + run_read_repo: RunReadRepository | None = None, + run_write_repo: RunWriteRepository | None = None, + run_delete_repo: RunDeleteRepository | None = None, + thread_meta_storage: ThreadMetaStorage | None = None, + run_event_store: object | None = None, +) -> RunsFacade: + """ + Build RunsFacade with all dependencies. + + Args: + stream_bridge: StreamBridge instance + checkpointer: LangGraph checkpointer + store: Optional LangGraph runtime store + run_read_repo: Optional run repository for durable reads + run_write_repo: Optional run repository for durable writes + run_delete_repo: Optional run repository for durable deletes + thread_meta_storage: Optional thread metadata storage adapter + + Returns: + Configured RunsFacade instance + """ + registry = get_registry() + planner = ExecutionPlanner() + supervisor = get_supervisor() + + stream_service = RunStreamService(stream_bridge) + wait_service = RunWaitService(stream_service) + query_store = AppRunQueryStore(run_read_repo) if run_read_repo else None + create_store = ( + AppRunCreateStore(run_write_repo, thread_meta_storage=thread_meta_storage) + if run_write_repo + else None + ) + delete_store = AppRunDeleteStore(run_delete_repo) if run_delete_repo else None + + # Build storage observer if repositories provided + storage_observer = None + if run_write_repo or thread_meta_storage: + storage_observer = StorageRunObserver( + run_write_repo=run_write_repo, + thread_meta_storage=thread_meta_storage, + ) + + return RunsFacade( + registry=registry, + planner=planner, + supervisor=supervisor, + stream_service=stream_service, + wait_service=wait_service, + runtime=RunsRuntime( + bridge=stream_bridge, + checkpointer=checkpointer, + store=store, + event_store=run_event_store, + agent_factory_resolver=resolve_agent_factory, + ), + observer=storage_observer, + query_store=query_store, + create_store=create_store, + delete_store=delete_store, + ) + + +def build_runs_facade_from_request(request: "Request") -> RunsFacade: + """ + Build RunsFacade from FastAPI request context. + + Extracts dependencies from request.app.state. + """ + app_state = request.app.state + + return build_runs_facade( + stream_bridge=get_stream_bridge(request), + checkpointer=get_checkpointer(request), + store=getattr(request.app.state, "store", None), + run_read_repo=getattr(app_state, "run_read_repo", None), + run_write_repo=getattr(app_state, "run_write_repo", None), + run_delete_repo=getattr(app_state, "run_delete_repo", None), + thread_meta_storage=getattr(app_state, "thread_meta_storage", None), + run_event_store=getattr(app_state, "run_event_store", None), + ) diff --git a/backend/app/gateway/services/runs/input/__init__.py b/backend/app/gateway/services/runs/input/__init__.py new file mode 100644 index 000000000..ee16291da --- /dev/null +++ b/backend/app/gateway/services/runs/input/__init__.py @@ -0,0 +1,22 @@ +"""Input adapters for app-owned runs entrypoints.""" + +from .request_adapter import ( + AdaptedRunRequest, + adapt_create_run_request, + adapt_create_stream_request, + adapt_create_wait_request, + adapt_join_stream_request, + adapt_join_wait_request, +) +from .spec_builder import RunSpecBuilder, UnsupportedRunFeatureError + +__all__ = [ + "AdaptedRunRequest", + "RunSpecBuilder", + "UnsupportedRunFeatureError", + "adapt_create_run_request", + "adapt_create_stream_request", + "adapt_create_wait_request", + "adapt_join_stream_request", + "adapt_join_wait_request", +] diff --git a/backend/app/gateway/services/runs/input/request_adapter.py b/backend/app/gateway/services/runs/input/request_adapter.py new file mode 100644 index 000000000..0fd57f06e --- /dev/null +++ b/backend/app/gateway/services/runs/input/request_adapter.py @@ -0,0 +1,127 @@ +"""App-owned request adapter for runs entrypoints.""" + +from __future__ import annotations + +from dataclasses import dataclass + +from deerflow.runtime.stream_bridge import JSONValue +from deerflow.runtime.runs.types import RunIntent + +type RequestBody = dict[str, JSONValue] +type RequestQuery = dict[str, str] + + +@dataclass(frozen=True) +class AdaptedRunRequest: + """ + 统一的内部请求 DTO. + + 路由层只负责提取 path/query/body,适配器负责转成稳定内部结构。 + """ + + intent: RunIntent + thread_id: str | None + run_id: str | None + body: RequestBody + headers: dict[str, str] + query: RequestQuery + + @property + def last_event_id(self) -> str | None: + """Extract Last-Event-ID from headers.""" + return self.headers.get("last-event-id") or self.headers.get("Last-Event-ID") + + @property + def is_stateless(self) -> bool: + """Check if this is a stateless request.""" + return self.thread_id is None + + +def adapt_create_run_request( + *, + thread_id: str | None, + body: RequestBody, + headers: dict[str, str] | None = None, + query: RequestQuery | None = None, +) -> AdaptedRunRequest: + """Adapt POST /threads/{thread_id}/runs or POST /runs.""" + return AdaptedRunRequest( + intent="create_background", + thread_id=thread_id, + run_id=None, + body=body, + headers=headers or {}, + query=query or {}, + ) + + +def adapt_create_stream_request( + *, + thread_id: str | None, + body: RequestBody, + headers: dict[str, str] | None = None, + query: RequestQuery | None = None, +) -> AdaptedRunRequest: + """Adapt POST /threads/{thread_id}/runs/stream or POST /runs/stream.""" + return AdaptedRunRequest( + intent="create_and_stream", + thread_id=thread_id, + run_id=None, + body=body, + headers=headers or {}, + query=query or {}, + ) + + +def adapt_create_wait_request( + *, + thread_id: str | None, + body: RequestBody, + headers: dict[str, str] | None = None, + query: RequestQuery | None = None, +) -> AdaptedRunRequest: + """Adapt POST /threads/{thread_id}/runs/wait or POST /runs/wait.""" + return AdaptedRunRequest( + intent="create_and_wait", + thread_id=thread_id, + run_id=None, + body=body, + headers=headers or {}, + query=query or {}, + ) + + +def adapt_join_stream_request( + *, + thread_id: str, + run_id: str, + headers: dict[str, str] | None = None, + query: RequestQuery | None = None, +) -> AdaptedRunRequest: + """Adapt GET /threads/{thread_id}/runs/{run_id}/stream.""" + return AdaptedRunRequest( + intent="join_stream", + thread_id=thread_id, + run_id=run_id, + body={}, + headers=headers or {}, + query=query or {}, + ) + + +def adapt_join_wait_request( + *, + thread_id: str, + run_id: str, + headers: dict[str, str] | None = None, + query: RequestQuery | None = None, +) -> AdaptedRunRequest: + """Adapt GET /threads/{thread_id}/runs/{run_id}/join.""" + return AdaptedRunRequest( + intent="join_wait", + thread_id=thread_id, + run_id=run_id, + body={}, + headers=headers or {}, + query=query or {}, + ) diff --git a/backend/app/gateway/services/runs/input/spec_builder.py b/backend/app/gateway/services/runs/input/spec_builder.py new file mode 100644 index 000000000..3edec225d --- /dev/null +++ b/backend/app/gateway/services/runs/input/spec_builder.py @@ -0,0 +1,254 @@ +"""App-owned RunSpec builder.""" + +from __future__ import annotations + +import re +import uuid + +from langchain_core.messages import HumanMessage + +from deerflow.runtime.runs.types import CheckpointRequest, RunScope, RunSpec +from deerflow.runtime.stream_bridge import JSONValue + +from .request_adapter import AdaptedRunRequest + +type JSONMapping = dict[str, JSONValue] +type GraphInput = dict[str, object] +type RunnableConfigDict = dict[str, object] + + +class UnsupportedRunFeatureError(ValueError): + """Raised when a phase1-unsupported feature is requested.""" + + pass + + +class RunSpecBuilder: + """ + Build RunSpec from AdaptedRunRequest. + + Phase 1 rules: + 1. messages-tuple normalized to messages + 2. enqueue not supported + 3. rollback not supported + 4. after_seconds not supported + 5. stream_resumable accepted + 6. stateless auto-generates temporary thread + """ + + # Phase 1 unsupported features + UNSUPPORTED_MULTITASK_STRATEGIES = {"enqueue"} + UNSUPPORTED_ACTIONS = {"rollback"} + + # Default stream modes + DEFAULT_STREAM_MODES = ["values", "messages"] + CONTEXT_CONFIGURABLE_KEYS = frozenset({ + "model_name", + "mode", + "thinking_enabled", + "reasoning_effort", + "is_plan_mode", + "subagent_enabled", + "max_concurrent_subagents", + }) + DEFAULT_ASSISTANT_ID = "lead_agent" + + @staticmethod + def _as_json_mapping(value: JSONValue | None) -> JSONMapping | None: + return value if isinstance(value, dict) else None + + @staticmethod + def _as_string_list(value: JSONValue | None) -> list[str] | None: + if not isinstance(value, list): + return None + return [item for item in value if isinstance(item, str)] + + def build(self, request: AdaptedRunRequest) -> RunSpec: + """Build RunSpec from adapted request.""" + body = request.body + + # Validate phase1 constraints + self._validate_constraints(body) + + # Build scope + scope = self._build_scope(request) + + # Normalize stream modes + stream_modes = self._normalize_stream_modes(body.get("stream_mode")) + + # Build checkpoint request + checkpoint_request = self._build_checkpoint_request(body) + + config = self._build_runnable_config( + thread_id=scope.thread_id, + request_config=self._as_json_mapping(body.get("config")), + metadata=self._as_json_mapping(body.get("metadata")), + assistant_id=body.get("assistant_id"), + context=self._as_json_mapping(body.get("context")), + ) + + return RunSpec( + intent=request.intent, + scope=scope, + assistant_id=body.get("assistant_id") if isinstance(body.get("assistant_id"), str) else None, + input=self._normalize_input(self._as_json_mapping(body.get("input"))), + command=self._as_json_mapping(body.get("command")), + runnable_config=config, + context=self._as_json_mapping(body.get("context")), + metadata=self._as_json_mapping(body.get("metadata")) or {}, + stream_modes=stream_modes, + stream_subgraphs=bool(body.get("stream_subgraphs", False)), + stream_resumable=bool(body.get("stream_resumable", False)), + on_disconnect=body.get("on_disconnect", "cancel") if body.get("on_disconnect") in {"cancel", "continue"} else "cancel", + on_completion=body.get("on_completion", "keep") if body.get("on_completion") in {"delete", "keep"} else "keep", + multitask_strategy=body.get("multitask_strategy", "reject") if body.get("multitask_strategy") in {"reject", "interrupt"} else "reject", + interrupt_before="*" if body.get("interrupt_before") == "*" else self._as_string_list(body.get("interrupt_before")), + interrupt_after="*" if body.get("interrupt_after") == "*" else self._as_string_list(body.get("interrupt_after")), + checkpoint_request=checkpoint_request, + follow_up_to_run_id=body.get("follow_up_to_run_id") if isinstance(body.get("follow_up_to_run_id"), str) else None, + webhook=body.get("webhook") if isinstance(body.get("webhook"), str) else None, + feedback_keys=self._as_string_list(body.get("feedback_keys")), + ) + + def _validate_constraints(self, body: JSONMapping) -> None: + """Validate phase1 constraints, raise UnsupportedRunFeatureError if violated.""" + # Check multitask_strategy + strategy = body.get("multitask_strategy", "reject") + if strategy in self.UNSUPPORTED_MULTITASK_STRATEGIES: + raise UnsupportedRunFeatureError( + f"multitask_strategy '{strategy}' is not supported in phase1. " + f"Supported: reject, interrupt" + ) + + # Check for rollback action + command = self._as_json_mapping(body.get("command")) or {} + if command.get("action") in self.UNSUPPORTED_ACTIONS: + raise UnsupportedRunFeatureError( + f"action '{command.get('action')}' is not supported in phase1" + ) + + # Check for after_seconds + if body.get("after_seconds") is not None: + raise UnsupportedRunFeatureError("after_seconds is not supported in phase1") + + def _build_scope(self, request: AdaptedRunRequest) -> RunScope: + """Build RunScope from request.""" + if request.is_stateless: + # Stateless: generate temporary thread + return RunScope( + kind="stateless", + thread_id=str(uuid.uuid4()), + temporary=True, + ) + else: + assert request.thread_id is not None + return RunScope( + kind="stateful", + thread_id=request.thread_id, + temporary=False, + ) + + def _normalize_stream_modes(self, stream_mode: JSONValue | None) -> list[str]: + """Normalize stream_mode to list, convert messages-tuple to messages.""" + if stream_mode is None: + return self.DEFAULT_STREAM_MODES.copy() + + if isinstance(stream_mode, str): + modes = [stream_mode] + elif isinstance(stream_mode, list): + modes = [mode for mode in stream_mode if isinstance(mode, str)] + else: + return self.DEFAULT_STREAM_MODES.copy() + + return ["messages" if m == "messages-tuple" else m for m in modes] + + def _build_checkpoint_request(self, body: JSONMapping) -> CheckpointRequest | None: + """Build CheckpointRequest if checkpoint data is provided.""" + checkpoint_id = body.get("checkpoint_id") + checkpoint = self._as_json_mapping(body.get("checkpoint")) + + if not isinstance(checkpoint_id, str) and checkpoint is None: + return None + + return CheckpointRequest( + checkpoint_id=checkpoint_id if isinstance(checkpoint_id, str) else None, + checkpoint=checkpoint, + ) + + def _normalize_input(self, raw_input: JSONMapping | None) -> GraphInput | None: + """Convert HTTP-friendly message dicts into LangChain message objects.""" + if raw_input is None: + return None + + messages = raw_input.get("messages") + if not messages or not isinstance(messages, list): + return raw_input + + converted: list[object] = [] + for msg in messages: + if isinstance(msg, dict): + role = msg.get("role", msg.get("type", "user")) + content = msg.get("content", "") + if role in ("user", "human"): + converted.append(HumanMessage(content=content)) + else: + converted.append(HumanMessage(content=content)) + else: + converted.append(msg) + return {**raw_input, "messages": converted} + + def _build_runnable_config( + self, + *, + thread_id: str, + request_config: JSONMapping | None, + metadata: JSONMapping | None, + assistant_id: str | None, + context: JSONMapping | None, + ) -> RunnableConfigDict: + """Build RunnableConfig from request payload and app-side rules.""" + config: RunnableConfigDict = {"recursion_limit": 100} + + if request_config: + if "context" in request_config: + config["context"] = request_config["context"] + else: + configurable = {"thread_id": thread_id} + raw_configurable = request_config.get("configurable") + if isinstance(raw_configurable, dict): + configurable.update(raw_configurable) + config["configurable"] = configurable + + for key, value in request_config.items(): + if key not in ("configurable", "context"): + config[key] = value + else: + config["configurable"] = {"thread_id": thread_id} + + configurable = config.get("configurable") + if ( + assistant_id + and assistant_id != self.DEFAULT_ASSISTANT_ID + and isinstance(configurable, dict) + and "agent_name" not in configurable + ): + normalized = assistant_id.strip().lower().replace("_", "-") + if not normalized or not re.fullmatch(r"[a-z0-9-]+", normalized): + raise ValueError( + f"Invalid assistant_id {assistant_id!r}: must contain only letters, digits, and hyphens after normalization." + ) + configurable["agent_name"] = normalized + + if metadata: + existing_metadata = config.get("metadata") + if isinstance(existing_metadata, dict): + existing_metadata.update(metadata) + else: + config["metadata"] = dict(metadata) + + if context and isinstance(configurable, dict): + for key in self.CONTEXT_CONFIGURABLE_KEYS: + if key in context: + configurable.setdefault(key, context[key]) + + return config diff --git a/backend/app/gateway/services/runs/storage_observer.py b/backend/app/gateway/services/runs/storage_observer.py new file mode 100644 index 000000000..2abfe8c2e --- /dev/null +++ b/backend/app/gateway/services/runs/storage_observer.py @@ -0,0 +1,5 @@ +"""Compatibility wrapper for the app-owned storage observer.""" + +from app.infra.storage.runs import StorageRunObserver + +__all__ = ["StorageRunObserver"] diff --git a/backend/app/gateway/services/runs/store/__init__.py b/backend/app/gateway/services/runs/store/__init__.py new file mode 100644 index 000000000..577031308 --- /dev/null +++ b/backend/app/gateway/services/runs/store/__init__.py @@ -0,0 +1,11 @@ +"""App-owned runs store adapters.""" + +from .create_store import AppRunCreateStore +from .delete_store import AppRunDeleteStore +from .query_store import AppRunQueryStore + +__all__ = [ + "AppRunCreateStore", + "AppRunDeleteStore", + "AppRunQueryStore", +] diff --git a/backend/app/gateway/services/runs/store/create_store.py b/backend/app/gateway/services/runs/store/create_store.py new file mode 100644 index 000000000..9daeed19d --- /dev/null +++ b/backend/app/gateway/services/runs/store/create_store.py @@ -0,0 +1,38 @@ +"""App-owned durable run creation adapter.""" + +from __future__ import annotations + +from deerflow.runtime.runs.store import RunCreateStore +from deerflow.runtime.runs.types import RunRecord + +from app.infra.storage import ThreadMetaStorage +from app.infra.storage.runs import RunWriteRepository + + +class AppRunCreateStore(RunCreateStore): + """Write the initial durable row for a newly created run.""" + + def __init__(self, repo: RunWriteRepository, thread_meta_storage: ThreadMetaStorage | None = None) -> None: + self._repo = repo + self._thread_meta_storage = thread_meta_storage + + async def create_run(self, record: RunRecord) -> None: + await self._repo.create( + run_id=record.run_id, + thread_id=record.thread_id, + assistant_id=record.assistant_id, + status=str(record.status), + metadata=record.metadata, + follow_up_to_run_id=record.follow_up_to_run_id, + created_at=record.created_at, + ) + if self._thread_meta_storage is not None and record.assistant_id: + thread = await self._thread_meta_storage.ensure_thread( + thread_id=record.thread_id, + assistant_id=record.assistant_id, + ) + if thread.assistant_id != record.assistant_id: + await self._thread_meta_storage.sync_thread_assistant_id( + thread_id=record.thread_id, + assistant_id=record.assistant_id, + ) diff --git a/backend/app/gateway/services/runs/store/delete_store.py b/backend/app/gateway/services/runs/store/delete_store.py new file mode 100644 index 000000000..32853b313 --- /dev/null +++ b/backend/app/gateway/services/runs/store/delete_store.py @@ -0,0 +1,17 @@ +"""App-owned durable run deletion adapter.""" + +from __future__ import annotations + +from deerflow.runtime.runs.store import RunDeleteStore + +from app.infra.storage.runs import RunDeleteRepository + + +class AppRunDeleteStore(RunDeleteStore): + """Delete durable run rows via the app storage adapter.""" + + def __init__(self, repo: RunDeleteRepository) -> None: + self._repo = repo + + async def delete_run(self, run_id: str) -> bool: + return await self._repo.delete(run_id) diff --git a/backend/app/gateway/services/runs/store/query_store.py b/backend/app/gateway/services/runs/store/query_store.py new file mode 100644 index 000000000..517c33144 --- /dev/null +++ b/backend/app/gateway/services/runs/store/query_store.py @@ -0,0 +1,47 @@ +"""App-owned durable run query adapter.""" + +from __future__ import annotations + +from deerflow.runtime.runs.store import RunQueryStore +from deerflow.runtime.runs.types import RunRecord, RunStatus + +from app.infra.storage.runs import RunReadRepository, RunRow + + +class AppRunQueryStore(RunQueryStore): + """Map app-side durable run rows into harness RunRecord DTOs.""" + + def __init__(self, repo: RunReadRepository) -> None: + self._repo = repo + + async def get_run(self, run_id: str) -> RunRecord | None: + row = await self._repo.get(run_id) + if row is None: + return None + return self._to_run_record(row) + + async def list_runs( + self, + thread_id: str, + *, + limit: int = 100, + ) -> list[RunRecord]: + rows = await self._repo.list_by_thread(thread_id, limit=limit) + return [self._to_run_record(row) for row in rows] + + def _to_run_record(self, row: RunRow) -> RunRecord: + return RunRecord( + run_id=row["run_id"], + thread_id=row["thread_id"], + assistant_id=row.get("assistant_id"), + status=RunStatus(row.get("status", "pending")), + temporary=False, + multitask_strategy=row.get("multitask_strategy", "reject"), + metadata=row.get("metadata", {}), + follow_up_to_run_id=row.get("follow_up_to_run_id"), + created_at=row.get("created_at", ""), + updated_at=row.get("updated_at", ""), + started_at=row.get("started_at"), + ended_at=row.get("ended_at"), + error=row.get("error"), + ) diff --git a/backend/app/gateway/utils.py b/backend/app/gateway/utils.py deleted file mode 100644 index 8368d84fc..000000000 --- a/backend/app/gateway/utils.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Shared utility helpers for the Gateway layer.""" - - -def sanitize_log_param(value: str) -> str: - """Strip control characters to prevent log injection.""" - return value.replace("\n", "").replace("\r", "").replace("\x00", "")