mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-25 11:18:22 +00:00
* fix(gateway): bound lifespan shutdown hooks to prevent worker hang Gateway worker can hang indefinitely in `uvicorn --reload` mode with the listening socket still bound — all /api/* requests return 504, and SIGKILL is the only recovery. Root cause (py-spy dump from a reproduction showed 16+ stacked frames of signal_handler -> Event.set -> threading.Lock.__enter__ on the main thread): CPython's `threading.Event` uses `Condition(Lock())` where the inner Lock is non-reentrant. uvicorn's BaseReload signal handler calls `should_exit.set()` directly from signal context; if a second signal (SIGTERM/SIGHUP from the reload supervisor, or watchfiles-triggered reload) arrives while the first handler holds the Lock, the reentrant call deadlocks on itself. The reload supervisor keeps sending those signals only when the worker fails to exit promptly. DeerFlow's lifespan currently awaits `stop_channel_service()` with no timeout; if a channel's `stop()` stalls (e.g. Feishu/Slack WebSocket waiting for an ack), the worker can't exit, the supervisor keeps signaling, and the deadlock becomes reachable. This is a defense-in-depth fix — it does not repair the upstream uvicorn/CPython issue, but it ensures DeerFlow's lifespan exits within a bounded window so the supervisor has no reason to keep firing signals. No behavior change on the happy path. Wraps the shutdown hook in `asyncio.wait_for(timeout=5.0)` and logs a warning on timeout before proceeding to worker exit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Update backend/app/gateway/app.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * style: apply make format (ruff) to test assertions Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
236 lines
7.5 KiB
Python
236 lines
7.5 KiB
Python
import asyncio
|
|
import logging
|
|
from collections.abc import AsyncGenerator
|
|
from contextlib import asynccontextmanager
|
|
|
|
from fastapi import FastAPI
|
|
|
|
from app.gateway.config import get_gateway_config
|
|
from app.gateway.deps import langgraph_runtime
|
|
from app.gateway.routers import (
|
|
agents,
|
|
artifacts,
|
|
assistants_compat,
|
|
channels,
|
|
mcp,
|
|
memory,
|
|
models,
|
|
runs,
|
|
skills,
|
|
suggestions,
|
|
thread_runs,
|
|
threads,
|
|
uploads,
|
|
)
|
|
from deerflow.config.app_config import get_app_config
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Upper bound (seconds) each lifespan shutdown hook is allowed to run.
|
|
# Bounds worker exit time so uvicorn's reload supervisor does not keep
|
|
# firing signals into a worker that is stuck waiting for shutdown cleanup.
|
|
_SHUTDOWN_HOOK_TIMEOUT_SECONDS = 5.0
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
|
"""Application lifespan handler."""
|
|
|
|
# Load config and check necessary environment variables at startup
|
|
try:
|
|
get_app_config()
|
|
logger.info("Configuration loaded successfully")
|
|
except Exception as e:
|
|
error_msg = f"Failed to load configuration during gateway startup: {e}"
|
|
logger.exception(error_msg)
|
|
raise RuntimeError(error_msg) from e
|
|
config = get_gateway_config()
|
|
logger.info(f"Starting API Gateway on {config.host}:{config.port}")
|
|
|
|
# Initialize LangGraph runtime components (StreamBridge, RunManager, checkpointer, store)
|
|
async with langgraph_runtime(app):
|
|
logger.info("LangGraph runtime initialised")
|
|
|
|
# Start IM channel service if any channels are configured
|
|
try:
|
|
from app.channels.service import start_channel_service
|
|
|
|
channel_service = await start_channel_service()
|
|
logger.info("Channel service started: %s", channel_service.get_status())
|
|
except Exception:
|
|
logger.exception("No IM channels configured or channel service failed to start")
|
|
|
|
yield
|
|
|
|
# Stop channel service on shutdown (bounded to prevent worker hang)
|
|
try:
|
|
from app.channels.service import stop_channel_service
|
|
|
|
await asyncio.wait_for(
|
|
stop_channel_service(),
|
|
timeout=_SHUTDOWN_HOOK_TIMEOUT_SECONDS,
|
|
)
|
|
except TimeoutError:
|
|
logger.warning(
|
|
"Channel service shutdown exceeded %.1fs; proceeding with worker exit.",
|
|
_SHUTDOWN_HOOK_TIMEOUT_SECONDS,
|
|
)
|
|
except Exception:
|
|
logger.exception("Failed to stop channel service")
|
|
|
|
logger.info("Shutting down API Gateway")
|
|
|
|
|
|
def create_app() -> FastAPI:
|
|
"""Create and configure the FastAPI application.
|
|
|
|
Returns:
|
|
Configured FastAPI application instance.
|
|
"""
|
|
|
|
app = FastAPI(
|
|
title="DeerFlow API Gateway",
|
|
description="""
|
|
## DeerFlow API Gateway
|
|
|
|
API Gateway for DeerFlow - A LangGraph-based AI agent backend with sandbox execution capabilities.
|
|
|
|
### Features
|
|
|
|
- **Models Management**: Query and retrieve available AI models
|
|
- **MCP Configuration**: Manage Model Context Protocol (MCP) server configurations
|
|
- **Memory Management**: Access and manage global memory data for personalized conversations
|
|
- **Skills Management**: Query and manage skills and their enabled status
|
|
- **Artifacts**: Access thread artifacts and generated files
|
|
- **Health Monitoring**: System health check endpoints
|
|
|
|
### Architecture
|
|
|
|
LangGraph requests are handled by nginx reverse proxy.
|
|
This gateway provides custom endpoints for models, MCP configuration, skills, and artifacts.
|
|
""",
|
|
version="0.1.0",
|
|
lifespan=lifespan,
|
|
docs_url="/docs",
|
|
redoc_url="/redoc",
|
|
openapi_url="/openapi.json",
|
|
openapi_tags=[
|
|
{
|
|
"name": "models",
|
|
"description": "Operations for querying available AI models and their configurations",
|
|
},
|
|
{
|
|
"name": "mcp",
|
|
"description": "Manage Model Context Protocol (MCP) server configurations",
|
|
},
|
|
{
|
|
"name": "memory",
|
|
"description": "Access and manage global memory data for personalized conversations",
|
|
},
|
|
{
|
|
"name": "skills",
|
|
"description": "Manage skills and their configurations",
|
|
},
|
|
{
|
|
"name": "artifacts",
|
|
"description": "Access and download thread artifacts and generated files",
|
|
},
|
|
{
|
|
"name": "uploads",
|
|
"description": "Upload and manage user files for threads",
|
|
},
|
|
{
|
|
"name": "threads",
|
|
"description": "Manage DeerFlow thread-local filesystem data",
|
|
},
|
|
{
|
|
"name": "agents",
|
|
"description": "Create and manage custom agents with per-agent config and prompts",
|
|
},
|
|
{
|
|
"name": "suggestions",
|
|
"description": "Generate follow-up question suggestions for conversations",
|
|
},
|
|
{
|
|
"name": "channels",
|
|
"description": "Manage IM channel integrations (Feishu, Slack, Telegram)",
|
|
},
|
|
{
|
|
"name": "assistants-compat",
|
|
"description": "LangGraph Platform-compatible assistants API (stub)",
|
|
},
|
|
{
|
|
"name": "runs",
|
|
"description": "LangGraph Platform-compatible runs lifecycle (create, stream, cancel)",
|
|
},
|
|
{
|
|
"name": "health",
|
|
"description": "Health check and system status endpoints",
|
|
},
|
|
],
|
|
)
|
|
|
|
# CORS is handled by nginx - no need for FastAPI middleware
|
|
|
|
# Include routers
|
|
# Models API is mounted at /api/models
|
|
app.include_router(models.router)
|
|
|
|
# MCP API is mounted at /api/mcp
|
|
app.include_router(mcp.router)
|
|
|
|
# Memory API is mounted at /api/memory
|
|
app.include_router(memory.router)
|
|
|
|
# Skills API is mounted at /api/skills
|
|
app.include_router(skills.router)
|
|
|
|
# Artifacts API is mounted at /api/threads/{thread_id}/artifacts
|
|
app.include_router(artifacts.router)
|
|
|
|
# Uploads API is mounted at /api/threads/{thread_id}/uploads
|
|
app.include_router(uploads.router)
|
|
|
|
# Thread cleanup API is mounted at /api/threads/{thread_id}
|
|
app.include_router(threads.router)
|
|
|
|
# Agents API is mounted at /api/agents
|
|
app.include_router(agents.router)
|
|
|
|
# Suggestions API is mounted at /api/threads/{thread_id}/suggestions
|
|
app.include_router(suggestions.router)
|
|
|
|
# Channels API is mounted at /api/channels
|
|
app.include_router(channels.router)
|
|
|
|
# Assistants compatibility API (LangGraph Platform stub)
|
|
app.include_router(assistants_compat.router)
|
|
|
|
# Thread Runs API (LangGraph Platform-compatible runs lifecycle)
|
|
app.include_router(thread_runs.router)
|
|
|
|
# Stateless Runs API (stream/wait without a pre-existing thread)
|
|
app.include_router(runs.router)
|
|
|
|
@app.get("/health", tags=["health"])
|
|
async def health_check() -> dict:
|
|
"""Health check endpoint.
|
|
|
|
Returns:
|
|
Service health status information.
|
|
"""
|
|
return {"status": "healthy", "service": "deer-flow-gateway"}
|
|
|
|
return app
|
|
|
|
|
|
# Create app instance for uvicorn
|
|
app = create_app()
|