import asyncio import logging import os from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from app.gateway.auth_middleware import AuthMiddleware from app.gateway.config import get_gateway_config from app.gateway.csrf_middleware import CSRFMiddleware from app.gateway.deps import langgraph_runtime from app.gateway.routers import ( agents, artifacts, assistants_compat, auth, channels, feedback, mcp, memory, models, runs, skills, suggestions, thread_runs, threads, uploads, ) from deerflow.config import app_config as deerflow_app_config from deerflow.config.app_config import apply_logging_level AppConfig = deerflow_app_config.AppConfig get_app_config = deerflow_app_config.get_app_config # Default logging; lifespan overrides from config.yaml log_level. logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) # Upper bound (seconds) each lifespan shutdown hook is allowed to run. # Bounds worker exit time so uvicorn's reload supervisor does not keep # firing signals into a worker that is stuck waiting for shutdown cleanup. _SHUTDOWN_HOOK_TIMEOUT_SECONDS = 5.0 async def _ensure_admin_user(app: FastAPI) -> None: """Startup hook: handle first boot and migrate orphan threads otherwise. After admin creation, migrate orphan threads from the LangGraph store (metadata.user_id unset) to the admin account. This is the "no-auth → with-auth" upgrade path: users who ran DeerFlow without authentication have existing LangGraph thread data that needs an owner assigned. First boot (no admin exists): - Does NOT create any user accounts automatically. - The operator must visit ``/setup`` to create the first admin. Subsequent boots (admin already exists): - Runs the one-time "no-auth → with-auth" orphan thread migration for existing LangGraph thread metadata that has no owner_id. No SQL persistence migration is needed: the four user_id columns (threads_meta, runs, run_events, feedback) only come into existence alongside the auth module via create_all, so freshly created tables never contain NULL-owner rows. """ from sqlalchemy import select from app.gateway.deps import get_local_provider from deerflow.persistence.engine import get_session_factory from deerflow.persistence.user.model import UserRow try: provider = get_local_provider() except RuntimeError: # Auth persistence may not be initialized in some test/boot paths. # Skip admin migration work rather than failing gateway startup. logger.warning("Auth persistence not ready; skipping admin bootstrap check") return sf = get_session_factory() if sf is None: return admin_count = await provider.count_admin_users() if admin_count == 0: logger.info("=" * 60) logger.info(" First boot detected — no admin account exists.") logger.info(" Visit /setup to complete admin account creation.") logger.info("=" * 60) return # Admin already exists — run orphan thread migration for any # LangGraph thread metadata that pre-dates the auth module. async with sf() as session: stmt = select(UserRow).where(UserRow.system_role == "admin").limit(1) row = (await session.execute(stmt)).scalar_one_or_none() if row is None: return # Should not happen (admin_count > 0 above), but be safe. admin_id = str(row.id) # LangGraph store orphan migration — non-fatal. # This covers the "no-auth → with-auth" upgrade path for users # whose existing LangGraph thread metadata has no user_id set. store = getattr(app.state, "store", None) if store is not None: try: migrated = await _migrate_orphaned_threads(store, admin_id) if migrated: logger.info("Migrated %d orphan LangGraph thread(s) to admin", migrated) except Exception: logger.exception("LangGraph thread migration failed (non-fatal)") async def _iter_store_items(store, namespace, *, page_size: int = 500): """Paginated async iterator over a LangGraph store namespace. Replaces the old hardcoded ``limit=1000`` call with a cursor-style loop so that environments with more than one page of orphans do not silently lose data. Terminates when a page is empty OR when a short page arrives (indicating the last page). """ offset = 0 while True: batch = await store.asearch(namespace, limit=page_size, offset=offset) if not batch: return for item in batch: yield item if len(batch) < page_size: return offset += page_size async def _migrate_orphaned_threads(store, admin_user_id: str) -> int: """Migrate LangGraph store threads with no user_id to the given admin. Uses cursor pagination so all orphans are migrated regardless of count. Returns the number of rows migrated. """ migrated = 0 async for item in _iter_store_items(store, ("threads",)): metadata = item.value.get("metadata", {}) if not metadata.get("user_id"): metadata["user_id"] = admin_user_id item.value["metadata"] = metadata await store.aput(("threads",), item.key, item.value) migrated += 1 return migrated @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: """Application lifespan handler.""" # Load config and check necessary environment variables at startup try: app.state.config = get_app_config() apply_logging_level(app.state.config.log_level) logger.info("Configuration loaded successfully") except Exception as e: error_msg = f"Failed to load configuration during gateway startup: {e}" logger.exception(error_msg) raise RuntimeError(error_msg) from e config = get_gateway_config() logger.info(f"Starting API Gateway on {config.host}:{config.port}") # Initialize LangGraph runtime components (StreamBridge, RunManager, checkpointer, store) async with langgraph_runtime(app): logger.info("LangGraph runtime initialised") # Ensure admin user exists (auto-create on first boot) # Must run AFTER langgraph_runtime so app.state.store is available for thread migration await _ensure_admin_user(app) # Start IM channel service if any channels are configured try: from app.channels.service import start_channel_service channel_service = await start_channel_service(app.state.config) logger.info("Channel service started: %s", channel_service.get_status()) except Exception: logger.exception("No IM channels configured or channel service failed to start") yield # Stop channel service on shutdown (bounded to prevent worker hang) try: from app.channels.service import stop_channel_service await asyncio.wait_for( stop_channel_service(), timeout=_SHUTDOWN_HOOK_TIMEOUT_SECONDS, ) except TimeoutError: logger.warning( "Channel service shutdown exceeded %.1fs; proceeding with worker exit.", _SHUTDOWN_HOOK_TIMEOUT_SECONDS, ) except Exception: logger.exception("Failed to stop channel service") logger.info("Shutting down API Gateway") def create_app() -> FastAPI: """Create and configure the FastAPI application. Returns: Configured FastAPI application instance. """ config = get_gateway_config() docs_kwargs = {"docs_url": "/docs", "redoc_url": "/redoc", "openapi_url": "/openapi.json"} if config.enable_docs else {"docs_url": None, "redoc_url": None, "openapi_url": None} app = FastAPI( title="DeerFlow API Gateway", description=""" ## DeerFlow API Gateway API Gateway for DeerFlow - A LangGraph-based AI agent backend with sandbox execution capabilities. ### Features - **Models Management**: Query and retrieve available AI models - **MCP Configuration**: Manage Model Context Protocol (MCP) server configurations - **Memory Management**: Access and manage global memory data for personalized conversations - **Skills Management**: Query and manage skills and their enabled status - **Artifacts**: Access thread artifacts and generated files - **Health Monitoring**: System health check endpoints ### Architecture LangGraph requests are handled by nginx reverse proxy. This gateway provides custom endpoints for models, MCP configuration, skills, and artifacts. """, version="0.1.0", lifespan=lifespan, **docs_kwargs, openapi_tags=[ { "name": "models", "description": "Operations for querying available AI models and their configurations", }, { "name": "mcp", "description": "Manage Model Context Protocol (MCP) server configurations", }, { "name": "memory", "description": "Access and manage global memory data for personalized conversations", }, { "name": "skills", "description": "Manage skills and their configurations", }, { "name": "artifacts", "description": "Access and download thread artifacts and generated files", }, { "name": "uploads", "description": "Upload and manage user files for threads", }, { "name": "threads", "description": "Manage DeerFlow thread-local filesystem data", }, { "name": "agents", "description": "Create and manage custom agents with per-agent config and prompts", }, { "name": "suggestions", "description": "Generate follow-up question suggestions for conversations", }, { "name": "channels", "description": "Manage IM channel integrations (Feishu, Slack, Telegram)", }, { "name": "assistants-compat", "description": "LangGraph Platform-compatible assistants API (stub)", }, { "name": "runs", "description": "LangGraph Platform-compatible runs lifecycle (create, stream, cancel)", }, { "name": "health", "description": "Health check and system status endpoints", }, ], ) # Auth: reject unauthenticated requests to non-public paths (fail-closed safety net) app.add_middleware(AuthMiddleware) # CSRF: Double Submit Cookie pattern for state-changing requests app.add_middleware(CSRFMiddleware) # CORS: when GATEWAY_CORS_ORIGINS is set (dev without nginx), add CORS middleware. # In production, nginx handles CORS and no middleware is needed. cors_origins_env = os.environ.get("GATEWAY_CORS_ORIGINS", "") if cors_origins_env: cors_origins = [o.strip() for o in cors_origins_env.split(",") if o.strip()] # Validate: wildcard origin with credentials is a security misconfiguration for origin in cors_origins: if origin == "*": logger.error("GATEWAY_CORS_ORIGINS contains wildcard '*' with allow_credentials=True. This is a security misconfiguration — browsers will reject the response. Use explicit scheme://host:port origins instead.") cors_origins = [o for o in cors_origins if o != "*"] break if cors_origins: app.add_middleware( CORSMiddleware, allow_origins=cors_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Include routers # Models API is mounted at /api/models app.include_router(models.router) # MCP API is mounted at /api/mcp app.include_router(mcp.router) # Memory API is mounted at /api/memory app.include_router(memory.router) # Skills API is mounted at /api/skills app.include_router(skills.router) # Artifacts API is mounted at /api/threads/{thread_id}/artifacts app.include_router(artifacts.router) # Uploads API is mounted at /api/threads/{thread_id}/uploads app.include_router(uploads.router) # Thread cleanup API is mounted at /api/threads/{thread_id} app.include_router(threads.router) # Agents API is mounted at /api/agents app.include_router(agents.router) # Suggestions API is mounted at /api/threads/{thread_id}/suggestions app.include_router(suggestions.router) # Channels API is mounted at /api/channels app.include_router(channels.router) # Assistants compatibility API (LangGraph Platform stub) app.include_router(assistants_compat.router) # Auth API is mounted at /api/v1/auth app.include_router(auth.router) # Feedback API is mounted at /api/threads/{thread_id}/runs/{run_id}/feedback app.include_router(feedback.router) # Thread Runs API (LangGraph Platform-compatible runs lifecycle) app.include_router(thread_runs.router) # Stateless Runs API (stream/wait without a pre-existing thread) app.include_router(runs.router) @app.get("/health", tags=["health"]) async def health_check() -> dict: """Health check endpoint. Returns: Service health status information. """ return {"status": "healthy", "service": "deer-flow-gateway"} return app # Create app instance for uvicorn app = create_app()