diff --git a/backend/app/gateway/app.py b/backend/app/gateway/app.py index d358923ca..39d17498f 100644 --- a/backend/app/gateway/app.py +++ b/backend/app/gateway/app.py @@ -5,15 +5,19 @@ from contextlib import asynccontextmanager from fastapi import FastAPI from app.gateway.config import get_gateway_config +from app.gateway.deps import langgraph_runtime from app.gateway.routers import ( agents, artifacts, + assistants_compat, channels, mcp, memory, models, + runs, skills, suggestions, + thread_runs, threads, uploads, ) @@ -44,29 +48,29 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: config = get_gateway_config() logger.info(f"Starting API Gateway on {config.host}:{config.port}") - # NOTE: MCP tools initialization is NOT done here because: - # 1. Gateway doesn't use MCP tools - they are used by Agents in the LangGraph Server - # 2. Gateway and LangGraph Server are separate processes with independent caches - # MCP tools are lazily initialized in LangGraph Server when first needed + # Initialize LangGraph runtime components (StreamBridge, RunManager, checkpointer, store) + async with langgraph_runtime(app): + logger.info("LangGraph runtime initialised") - # Start IM channel service if any channels are configured - try: - from app.channels.service import start_channel_service + # Start IM channel service if any channels are configured + try: + from app.channels.service import start_channel_service - channel_service = await start_channel_service() - logger.info("Channel service started: %s", channel_service.get_status()) - except Exception: - logger.exception("No IM channels configured or channel service failed to start") + channel_service = await start_channel_service() + logger.info("Channel service started: %s", channel_service.get_status()) + except Exception: + logger.exception("No IM channels configured or channel service failed to start") - yield + yield - # Stop channel service on shutdown - try: - from app.channels.service import stop_channel_service + # Stop channel service on shutdown + try: + from app.channels.service import stop_channel_service + + await stop_channel_service() + except Exception: + logger.exception("Failed to stop channel service") - await stop_channel_service() - except Exception: - logger.exception("Failed to stop channel service") logger.info("Shutting down API Gateway") @@ -144,6 +148,14 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an "name": "channels", "description": "Manage IM channel integrations (Feishu, Slack, Telegram)", }, + { + "name": "assistants-compat", + "description": "LangGraph Platform-compatible assistants API (stub)", + }, + { + "name": "runs", + "description": "LangGraph Platform-compatible runs lifecycle (create, stream, cancel)", + }, { "name": "health", "description": "Health check and system status endpoints", @@ -184,6 +196,15 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an # Channels API is mounted at /api/channels app.include_router(channels.router) + # Assistants compatibility API (LangGraph Platform stub) + app.include_router(assistants_compat.router) + + # Thread Runs API (LangGraph Platform-compatible runs lifecycle) + app.include_router(thread_runs.router) + + # Stateless Runs API (stream/wait without a pre-existing thread) + app.include_router(runs.router) + @app.get("/health", tags=["health"]) async def health_check() -> dict: """Health check endpoint. diff --git a/backend/app/gateway/deps.py b/backend/app/gateway/deps.py new file mode 100644 index 000000000..115868331 --- /dev/null +++ b/backend/app/gateway/deps.py @@ -0,0 +1,70 @@ +"""Centralized accessors for singleton objects stored on ``app.state``. + +**Getters** (used by routers): raise 503 when a required dependency is +missing, except ``get_store`` which returns ``None``. + +Initialization is handled directly in ``app.py`` via :class:`AsyncExitStack`. +""" + +from __future__ import annotations + +from collections.abc import AsyncGenerator +from contextlib import AsyncExitStack, asynccontextmanager + +from fastapi import FastAPI, HTTPException, Request + +from deerflow.runtime import RunManager, StreamBridge + + +@asynccontextmanager +async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]: + """Bootstrap and tear down all LangGraph runtime singletons. + + Usage in ``app.py``:: + + async with langgraph_runtime(app): + yield + """ + from deerflow.agents.checkpointer.async_provider import make_checkpointer + from deerflow.runtime import make_store, make_stream_bridge + + async with AsyncExitStack() as stack: + app.state.stream_bridge = await stack.enter_async_context(make_stream_bridge()) + app.state.checkpointer = await stack.enter_async_context(make_checkpointer()) + app.state.store = await stack.enter_async_context(make_store()) + app.state.run_manager = RunManager() + yield + + +# --------------------------------------------------------------------------- +# Getters – called by routers per-request +# --------------------------------------------------------------------------- + + +def get_stream_bridge(request: Request) -> StreamBridge: + """Return the global :class:`StreamBridge`, or 503.""" + bridge = getattr(request.app.state, "stream_bridge", None) + if bridge is None: + raise HTTPException(status_code=503, detail="Stream bridge not available") + return bridge + + +def get_run_manager(request: Request) -> RunManager: + """Return the global :class:`RunManager`, or 503.""" + mgr = getattr(request.app.state, "run_manager", None) + if mgr is None: + raise HTTPException(status_code=503, detail="Run manager not available") + return mgr + + +def get_checkpointer(request: Request): + """Return the global checkpointer, or 503.""" + cp = getattr(request.app.state, "checkpointer", None) + if cp is None: + raise HTTPException(status_code=503, detail="Checkpointer not available") + return cp + + +def get_store(request: Request): + """Return the global store (may be ``None`` if not configured).""" + return getattr(request.app.state, "store", None) diff --git a/backend/app/gateway/routers/__init__.py b/backend/app/gateway/routers/__init__.py index 984288ae7..c5f67a396 100644 --- a/backend/app/gateway/routers/__init__.py +++ b/backend/app/gateway/routers/__init__.py @@ -1,3 +1,3 @@ -from . import artifacts, mcp, models, skills, suggestions, threads, uploads +from . import artifacts, assistants_compat, mcp, models, skills, suggestions, thread_runs, threads, uploads -__all__ = ["artifacts", "mcp", "models", "skills", "suggestions", "threads", "uploads"] +__all__ = ["artifacts", "assistants_compat", "mcp", "models", "skills", "suggestions", "threads", "thread_runs", "uploads"] diff --git a/backend/app/gateway/routers/assistants_compat.py b/backend/app/gateway/routers/assistants_compat.py new file mode 100644 index 000000000..83708747c --- /dev/null +++ b/backend/app/gateway/routers/assistants_compat.py @@ -0,0 +1,149 @@ +"""Assistants compatibility endpoints. + +Provides LangGraph Platform-compatible assistants API backed by the +``langgraph.json`` graph registry and ``config.yaml`` agent definitions. + +This is a minimal stub that satisfies the ``useStream`` React hook's +initialization requirements (``assistants.search()`` and ``assistants.get()``). +""" + +from __future__ import annotations + +import logging +from datetime import UTC, datetime +from typing import Any + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/assistants", tags=["assistants-compat"]) + + +class AssistantResponse(BaseModel): + assistant_id: str + graph_id: str + name: str + config: dict[str, Any] = Field(default_factory=dict) + metadata: dict[str, Any] = Field(default_factory=dict) + description: str | None = None + created_at: str = "" + updated_at: str = "" + version: int = 1 + + +class AssistantSearchRequest(BaseModel): + graph_id: str | None = None + name: str | None = None + metadata: dict[str, Any] | None = None + limit: int = 10 + offset: int = 0 + + +def _get_default_assistant() -> AssistantResponse: + """Return the default lead_agent assistant.""" + now = datetime.now(UTC).isoformat() + return AssistantResponse( + assistant_id="lead_agent", + graph_id="lead_agent", + name="lead_agent", + config={}, + metadata={"created_by": "system"}, + description="DeerFlow lead agent", + created_at=now, + updated_at=now, + version=1, + ) + + +def _list_assistants() -> list[AssistantResponse]: + """List all available assistants from config.""" + assistants = [_get_default_assistant()] + + # Also include custom agents from config.yaml agents directory + try: + from deerflow.config.agents_config import list_custom_agents + + for agent_cfg in list_custom_agents(): + now = datetime.now(UTC).isoformat() + assistants.append( + AssistantResponse( + assistant_id=agent_cfg.name, + graph_id="lead_agent", # All agents use the same graph + name=agent_cfg.name, + config={}, + metadata={"created_by": "user"}, + description=agent_cfg.description or "", + created_at=now, + updated_at=now, + version=1, + ) + ) + except Exception: + logger.debug("Could not load custom agents for assistants list") + + return assistants + + +@router.post("/search", response_model=list[AssistantResponse]) +async def search_assistants(body: AssistantSearchRequest | None = None) -> list[AssistantResponse]: + """Search assistants. + + Returns all registered assistants (lead_agent + custom agents from config). + """ + assistants = _list_assistants() + + if body and body.graph_id: + assistants = [a for a in assistants if a.graph_id == body.graph_id] + if body and body.name: + assistants = [a for a in assistants if body.name.lower() in a.name.lower()] + + offset = body.offset if body else 0 + limit = body.limit if body else 10 + return assistants[offset : offset + limit] + + +@router.get("/{assistant_id}", response_model=AssistantResponse) +async def get_assistant_compat(assistant_id: str) -> AssistantResponse: + """Get an assistant by ID.""" + for a in _list_assistants(): + if a.assistant_id == assistant_id: + return a + raise HTTPException(status_code=404, detail=f"Assistant {assistant_id} not found") + + +@router.get("/{assistant_id}/graph") +async def get_assistant_graph(assistant_id: str) -> dict: + """Get the graph structure for an assistant. + + Returns a minimal graph description. Full graph introspection is + not supported in the Gateway — this stub satisfies SDK validation. + """ + found = any(a.assistant_id == assistant_id for a in _list_assistants()) + if not found: + raise HTTPException(status_code=404, detail=f"Assistant {assistant_id} not found") + + return { + "graph_id": "lead_agent", + "nodes": [], + "edges": [], + } + + +@router.get("/{assistant_id}/schemas") +async def get_assistant_schemas(assistant_id: str) -> dict: + """Get JSON schemas for an assistant's input/output/state. + + Returns empty schemas — full introspection not supported in Gateway. + """ + found = any(a.assistant_id == assistant_id for a in _list_assistants()) + if not found: + raise HTTPException(status_code=404, detail=f"Assistant {assistant_id} not found") + + return { + "graph_id": "lead_agent", + "input_schema": {}, + "output_schema": {}, + "state_schema": {}, + "config_schema": {}, + } diff --git a/backend/app/gateway/routers/runs.py b/backend/app/gateway/routers/runs.py new file mode 100644 index 000000000..46628f3a7 --- /dev/null +++ b/backend/app/gateway/routers/runs.py @@ -0,0 +1,86 @@ +"""Stateless runs endpoints -- stream and wait without a pre-existing thread. + +These endpoints auto-create a temporary thread when no ``thread_id`` is +supplied in the request body. When a ``thread_id`` **is** provided, it +is reused so that conversation history is preserved across calls. +""" + +from __future__ import annotations + +import asyncio +import logging +import uuid + +from fastapi import APIRouter, Request +from fastapi.responses import StreamingResponse + +from app.gateway.deps import get_checkpointer, get_run_manager, get_stream_bridge +from app.gateway.routers.thread_runs import RunCreateRequest +from app.gateway.services import sse_consumer, start_run +from deerflow.runtime import serialize_channel_values + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/runs", tags=["runs"]) + + +def _resolve_thread_id(body: RunCreateRequest) -> str: + """Return the thread_id from the request body, or generate a new one.""" + thread_id = (body.config or {}).get("configurable", {}).get("thread_id") + if thread_id: + return str(thread_id) + return str(uuid.uuid4()) + + +@router.post("/stream") +async def stateless_stream(body: RunCreateRequest, request: Request) -> StreamingResponse: + """Create a run and stream events via SSE. + + If ``config.configurable.thread_id`` is provided, the run is created + on the given thread so that conversation history is preserved. + Otherwise a new temporary thread is created. + """ + thread_id = _resolve_thread_id(body) + bridge = get_stream_bridge(request) + run_mgr = get_run_manager(request) + record = await start_run(body, thread_id, request) + + return StreamingResponse( + sse_consumer(bridge, record, request, run_mgr), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + +@router.post("/wait", response_model=dict) +async def stateless_wait(body: RunCreateRequest, request: Request) -> dict: + """Create a run and block until completion. + + If ``config.configurable.thread_id`` is provided, the run is created + on the given thread so that conversation history is preserved. + Otherwise a new temporary thread is created. + """ + thread_id = _resolve_thread_id(body) + record = await start_run(body, thread_id, request) + + if record.task is not None: + try: + await record.task + except asyncio.CancelledError: + pass + + checkpointer = get_checkpointer(request) + config = {"configurable": {"thread_id": thread_id}} + try: + checkpoint_tuple = await checkpointer.aget_tuple(config) + if checkpoint_tuple is not None: + checkpoint = getattr(checkpoint_tuple, "checkpoint", {}) or {} + channel_values = checkpoint.get("channel_values", {}) + return serialize_channel_values(channel_values) + except Exception: + logger.exception("Failed to fetch final state for run %s", record.run_id) + + return {"status": record.status.value, "error": record.error} diff --git a/backend/app/gateway/routers/thread_runs.py b/backend/app/gateway/routers/thread_runs.py new file mode 100644 index 000000000..217605685 --- /dev/null +++ b/backend/app/gateway/routers/thread_runs.py @@ -0,0 +1,265 @@ +"""Runs endpoints — create, stream, wait, cancel. + +Implements the LangGraph Platform runs API on top of +:class:`deerflow.agents.runs.RunManager` and +:class:`deerflow.agents.stream_bridge.StreamBridge`. + +SSE format is aligned with the LangGraph Platform protocol so that +the ``useStream`` React hook from ``@langchain/langgraph-sdk/react`` +works without modification. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any, Literal + +from fastapi import APIRouter, HTTPException, Query, Request +from fastapi.responses import Response, StreamingResponse +from pydantic import BaseModel, Field + +from app.gateway.deps import get_checkpointer, get_run_manager, get_stream_bridge +from app.gateway.services import sse_consumer, start_run +from deerflow.runtime import RunRecord, serialize_channel_values + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/threads", tags=["runs"]) + + +# --------------------------------------------------------------------------- +# Request / response models +# --------------------------------------------------------------------------- + + +class RunCreateRequest(BaseModel): + assistant_id: str | None = Field(default=None, description="Agent / assistant to use") + input: dict[str, Any] | None = Field(default=None, description="Graph input (e.g. {messages: [...]})") + command: dict[str, Any] | None = Field(default=None, description="LangGraph Command") + metadata: dict[str, Any] | None = Field(default=None, description="Run metadata") + config: dict[str, Any] | None = Field(default=None, description="RunnableConfig overrides") + webhook: str | None = Field(default=None, description="Completion callback URL") + checkpoint_id: str | None = Field(default=None, description="Resume from checkpoint") + checkpoint: dict[str, Any] | None = Field(default=None, description="Full checkpoint object") + interrupt_before: list[str] | Literal["*"] | None = Field(default=None, description="Nodes to interrupt before") + interrupt_after: list[str] | Literal["*"] | None = Field(default=None, description="Nodes to interrupt after") + stream_mode: list[str] | str | None = Field(default=None, description="Stream mode(s)") + stream_subgraphs: bool = Field(default=False, description="Include subgraph events") + stream_resumable: bool | None = Field(default=None, description="SSE resumable mode") + on_disconnect: Literal["cancel", "continue"] = Field(default="cancel", description="Behaviour on SSE disconnect") + on_completion: Literal["delete", "keep"] = Field(default="keep", description="Delete temp thread on completion") + multitask_strategy: Literal["reject", "rollback", "interrupt", "enqueue"] = Field(default="reject", description="Concurrency strategy") + after_seconds: float | None = Field(default=None, description="Delayed execution") + if_not_exists: Literal["reject", "create"] = Field(default="create", description="Thread creation policy") + feedback_keys: list[str] | None = Field(default=None, description="LangSmith feedback keys") + + +class RunResponse(BaseModel): + run_id: str + thread_id: str + assistant_id: str | None = None + status: str + metadata: dict[str, Any] = Field(default_factory=dict) + kwargs: dict[str, Any] = Field(default_factory=dict) + multitask_strategy: str = "reject" + created_at: str = "" + updated_at: str = "" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _record_to_response(record: RunRecord) -> RunResponse: + return RunResponse( + run_id=record.run_id, + thread_id=record.thread_id, + assistant_id=record.assistant_id, + status=record.status.value, + metadata=record.metadata, + kwargs=record.kwargs, + multitask_strategy=record.multitask_strategy, + created_at=record.created_at, + updated_at=record.updated_at, + ) + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + + +@router.post("/{thread_id}/runs", response_model=RunResponse) +async def create_run(thread_id: str, body: RunCreateRequest, request: Request) -> RunResponse: + """Create a background run (returns immediately).""" + record = await start_run(body, thread_id, request) + return _record_to_response(record) + + +@router.post("/{thread_id}/runs/stream") +async def stream_run(thread_id: str, body: RunCreateRequest, request: Request) -> StreamingResponse: + """Create a run and stream events via SSE. + + The response includes a ``Content-Location`` header with the run's + resource URL, matching the LangGraph Platform protocol. The + ``useStream`` React hook uses this to extract run metadata. + """ + bridge = get_stream_bridge(request) + run_mgr = get_run_manager(request) + record = await start_run(body, thread_id, request) + + return StreamingResponse( + sse_consumer(bridge, record, request, run_mgr), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + # LangGraph Platform includes run metadata in this header. + # The SDK's _get_run_metadata_from_response() parses it. + "Content-Location": (f"/api/threads/{thread_id}/runs/{record.run_id}/stream?thread_id={thread_id}&run_id={record.run_id}"), + }, + ) + + +@router.post("/{thread_id}/runs/wait", response_model=dict) +async def wait_run(thread_id: str, body: RunCreateRequest, request: Request) -> dict: + """Create a run and block until it completes, returning the final state.""" + record = await start_run(body, thread_id, request) + + if record.task is not None: + try: + await record.task + except asyncio.CancelledError: + pass + + checkpointer = get_checkpointer(request) + config = {"configurable": {"thread_id": thread_id}} + try: + checkpoint_tuple = await checkpointer.aget_tuple(config) + if checkpoint_tuple is not None: + checkpoint = getattr(checkpoint_tuple, "checkpoint", {}) or {} + channel_values = checkpoint.get("channel_values", {}) + return serialize_channel_values(channel_values) + except Exception: + logger.exception("Failed to fetch final state for run %s", record.run_id) + + return {"status": record.status.value, "error": record.error} + + +@router.get("/{thread_id}/runs", response_model=list[RunResponse]) +async def list_runs(thread_id: str, request: Request) -> list[RunResponse]: + """List all runs for a thread.""" + run_mgr = get_run_manager(request) + records = await run_mgr.list_by_thread(thread_id) + return [_record_to_response(r) for r in records] + + +@router.get("/{thread_id}/runs/{run_id}", response_model=RunResponse) +async def get_run(thread_id: str, run_id: str, request: Request) -> RunResponse: + """Get details of a specific run.""" + run_mgr = get_run_manager(request) + record = run_mgr.get(run_id) + if record is None or record.thread_id != thread_id: + raise HTTPException(status_code=404, detail=f"Run {run_id} not found") + return _record_to_response(record) + + +@router.post("/{thread_id}/runs/{run_id}/cancel") +async def cancel_run( + thread_id: str, + run_id: str, + request: Request, + wait: bool = Query(default=False, description="Block until run completes after cancel"), + action: Literal["interrupt", "rollback"] = Query(default="interrupt", description="Cancel action"), +) -> Response: + """Cancel a running or pending run. + + - action=interrupt: Stop execution, keep current checkpoint (can be resumed) + - action=rollback: Stop execution, revert to pre-run checkpoint state + - wait=true: Block until the run fully stops, return 204 + - wait=false: Return immediately with 202 + """ + run_mgr = get_run_manager(request) + record = run_mgr.get(run_id) + if record is None or record.thread_id != thread_id: + raise HTTPException(status_code=404, detail=f"Run {run_id} not found") + + cancelled = await run_mgr.cancel(run_id, action=action) + if not cancelled: + raise HTTPException( + status_code=409, + detail=f"Run {run_id} is not cancellable (status: {record.status.value})", + ) + + if wait and record.task is not None: + try: + await record.task + except asyncio.CancelledError: + pass + return Response(status_code=204) + + return Response(status_code=202) + + +@router.get("/{thread_id}/runs/{run_id}/join") +async def join_run(thread_id: str, run_id: str, request: Request) -> StreamingResponse: + """Join an existing run's SSE stream.""" + bridge = get_stream_bridge(request) + run_mgr = get_run_manager(request) + record = run_mgr.get(run_id) + if record is None or record.thread_id != thread_id: + raise HTTPException(status_code=404, detail=f"Run {run_id} not found") + + return StreamingResponse( + sse_consumer(bridge, record, request, run_mgr), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + +@router.api_route("/{thread_id}/runs/{run_id}/stream", methods=["GET", "POST"], response_model=None) +async def stream_existing_run( + thread_id: str, + run_id: str, + request: Request, + action: Literal["interrupt", "rollback"] | None = Query(default=None, description="Cancel action"), + wait: int = Query(default=0, description="Block until cancelled (1) or return immediately (0)"), +): + """Join an existing run's SSE stream (GET), or cancel-then-stream (POST). + + The LangGraph SDK's ``joinStream`` and ``useStream`` stop button both use + ``POST`` to this endpoint. When ``action=interrupt`` or ``action=rollback`` + is present the run is cancelled first; the response then streams any + remaining buffered events so the client observes a clean shutdown. + """ + run_mgr = get_run_manager(request) + record = run_mgr.get(run_id) + if record is None or record.thread_id != thread_id: + raise HTTPException(status_code=404, detail=f"Run {run_id} not found") + + # Cancel if an action was requested (stop-button / interrupt flow) + if action is not None: + cancelled = await run_mgr.cancel(run_id, action=action) + if cancelled and wait and record.task is not None: + try: + await record.task + except (asyncio.CancelledError, Exception): + pass + return Response(status_code=204) + + bridge = get_stream_bridge(request) + return StreamingResponse( + sse_consumer(bridge, record, request, run_mgr), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) diff --git a/backend/app/gateway/routers/threads.py b/backend/app/gateway/routers/threads.py index 7b259dd56..562edfdb7 100644 --- a/backend/app/gateway/routers/threads.py +++ b/backend/app/gateway/routers/threads.py @@ -1,14 +1,45 @@ +"""Thread CRUD, state, and history endpoints. + +Combines the existing thread-local filesystem cleanup with LangGraph +Platform-compatible thread management backed by the checkpointer. + +Channel values returned in state responses are serialized through +:func:`deerflow.runtime.serialization.serialize_channel_values` to +ensure LangChain message objects are converted to JSON-safe dicts +matching the LangGraph Platform wire format expected by the +``useStream`` React hook. +""" + +from __future__ import annotations + import logging +import time +import uuid +from typing import Any -from fastapi import APIRouter, HTTPException -from pydantic import BaseModel +from fastapi import APIRouter, HTTPException, Request +from pydantic import BaseModel, Field +from app.gateway.deps import get_checkpointer, get_store from deerflow.config.paths import Paths, get_paths +from deerflow.runtime import serialize_channel_values + +# --------------------------------------------------------------------------- +# Store namespace +# --------------------------------------------------------------------------- + +THREADS_NS: tuple[str, ...] = ("threads",) +"""Namespace used by the Store for thread metadata records.""" logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/threads", tags=["threads"]) +# --------------------------------------------------------------------------- +# Response / request models +# --------------------------------------------------------------------------- + + class ThreadDeleteResponse(BaseModel): """Response model for thread cleanup.""" @@ -16,6 +47,85 @@ class ThreadDeleteResponse(BaseModel): message: str +class ThreadResponse(BaseModel): + """Response model for a single thread.""" + + thread_id: str = Field(description="Unique thread identifier") + status: str = Field(default="idle", description="Thread status: idle, busy, interrupted, error") + created_at: str = Field(default="", description="ISO timestamp") + updated_at: str = Field(default="", description="ISO timestamp") + metadata: dict[str, Any] = Field(default_factory=dict, description="Thread metadata") + values: dict[str, Any] = Field(default_factory=dict, description="Current state channel values") + interrupts: dict[str, Any] = Field(default_factory=dict, description="Pending interrupts") + + +class ThreadCreateRequest(BaseModel): + """Request body for creating a thread.""" + + thread_id: str | None = Field(default=None, description="Optional thread ID (auto-generated if omitted)") + metadata: dict[str, Any] = Field(default_factory=dict, description="Initial metadata") + + +class ThreadSearchRequest(BaseModel): + """Request body for searching threads.""" + + metadata: dict[str, Any] = Field(default_factory=dict, description="Metadata filter (exact match)") + limit: int = Field(default=100, ge=1, le=1000, description="Maximum results") + offset: int = Field(default=0, ge=0, description="Pagination offset") + status: str | None = Field(default=None, description="Filter by thread status") + + +class ThreadStateResponse(BaseModel): + """Response model for thread state.""" + + values: dict[str, Any] = Field(default_factory=dict, description="Current channel values") + next: list[str] = Field(default_factory=list, description="Next tasks to execute") + metadata: dict[str, Any] = Field(default_factory=dict, description="Checkpoint metadata") + checkpoint: dict[str, Any] = Field(default_factory=dict, description="Checkpoint info") + checkpoint_id: str | None = Field(default=None, description="Current checkpoint ID") + parent_checkpoint_id: str | None = Field(default=None, description="Parent checkpoint ID") + created_at: str | None = Field(default=None, description="Checkpoint timestamp") + tasks: list[dict[str, Any]] = Field(default_factory=list, description="Interrupted task details") + + +class ThreadPatchRequest(BaseModel): + """Request body for patching thread metadata.""" + + metadata: dict[str, Any] = Field(default_factory=dict, description="Metadata to merge") + + +class ThreadStateUpdateRequest(BaseModel): + """Request body for updating thread state (human-in-the-loop resume).""" + + values: dict[str, Any] | None = Field(default=None, description="Channel values to merge") + checkpoint_id: str | None = Field(default=None, description="Checkpoint to branch from") + checkpoint: dict[str, Any] | None = Field(default=None, description="Full checkpoint object") + as_node: str | None = Field(default=None, description="Node identity for the update") + + +class HistoryEntry(BaseModel): + """Single checkpoint history entry.""" + + checkpoint_id: str + parent_checkpoint_id: str | None = None + metadata: dict[str, Any] = Field(default_factory=dict) + values: dict[str, Any] = Field(default_factory=dict) + created_at: str | None = None + next: list[str] = Field(default_factory=list) + + +class ThreadHistoryRequest(BaseModel): + """Request body for checkpoint history.""" + + limit: int = Field(default=10, ge=1, le=100, description="Maximum entries") + before: str | None = Field(default=None, description="Cursor for pagination") + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + def _delete_thread_data(thread_id: str, paths: Paths | None = None) -> ThreadDeleteResponse: """Delete local persisted filesystem data for a thread.""" path_manager = paths or get_paths() @@ -23,6 +133,10 @@ def _delete_thread_data(thread_id: str, paths: Paths | None = None) -> ThreadDel path_manager.delete_thread_dir(thread_id) except ValueError as exc: raise HTTPException(status_code=422, detail=str(exc)) from exc + except FileNotFoundError: + # Not critical — thread data may not exist on disk + logger.debug("No local thread data to delete for %s", thread_id) + return ThreadDeleteResponse(success=True, message=f"No local data for {thread_id}") except Exception as exc: logger.exception("Failed to delete thread data for %s", thread_id) raise HTTPException(status_code=500, detail="Failed to delete local thread data.") from exc @@ -31,11 +145,535 @@ def _delete_thread_data(thread_id: str, paths: Paths | None = None) -> ThreadDel return ThreadDeleteResponse(success=True, message=f"Deleted local thread data for {thread_id}") +async def _store_get(store, thread_id: str) -> dict | None: + """Fetch a thread record from the Store; returns ``None`` if absent.""" + item = await store.aget(THREADS_NS, thread_id) + return item.value if item is not None else None + + +async def _store_put(store, record: dict) -> None: + """Write a thread record to the Store.""" + await store.aput(THREADS_NS, record["thread_id"], record) + + +async def _store_upsert(store, thread_id: str, *, metadata: dict | None = None, values: dict | None = None) -> None: + """Create or refresh a thread record in the Store. + + On creation the record is written with ``status="idle"``. On update only + ``updated_at`` (and optionally ``metadata`` / ``values``) are changed so + that existing fields are preserved. + + ``values`` carries the agent-state snapshot exposed to the frontend + (currently just ``{"title": "..."}``). + """ + now = time.time() + existing = await _store_get(store, thread_id) + if existing is None: + await _store_put( + store, + { + "thread_id": thread_id, + "status": "idle", + "created_at": now, + "updated_at": now, + "metadata": metadata or {}, + "values": values or {}, + }, + ) + else: + val = dict(existing) + val["updated_at"] = now + if metadata: + val.setdefault("metadata", {}).update(metadata) + if values: + val.setdefault("values", {}).update(values) + await _store_put(store, val) + + +def _derive_thread_status(checkpoint_tuple) -> str: + """Derive thread status from checkpoint metadata.""" + if checkpoint_tuple is None: + return "idle" + pending_writes = getattr(checkpoint_tuple, "pending_writes", None) or [] + + # Check for error in pending writes + for pw in pending_writes: + if len(pw) >= 2 and pw[1] == "__error__": + return "error" + + # Check for pending next tasks (indicates interrupt) + tasks = getattr(checkpoint_tuple, "tasks", None) + if tasks: + return "interrupted" + + return "idle" + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + + @router.delete("/{thread_id}", response_model=ThreadDeleteResponse) -async def delete_thread_data(thread_id: str) -> ThreadDeleteResponse: +async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteResponse: """Delete local persisted filesystem data for a thread. - This endpoint only cleans DeerFlow-managed thread directories. LangGraph - thread state deletion remains handled by the LangGraph API. + Cleans DeerFlow-managed thread directories, removes checkpoint data, + and removes the thread record from the Store. """ - return _delete_thread_data(thread_id) + # Clean local filesystem + response = _delete_thread_data(thread_id) + + # Remove from Store (best-effort) + store = get_store(request) + if store is not None: + try: + await store.adelete(THREADS_NS, thread_id) + except Exception: + logger.debug("Could not delete store record for thread %s (not critical)", thread_id) + + # Remove checkpoints (best-effort) + checkpointer = getattr(request.app.state, "checkpointer", None) + if checkpointer is not None: + try: + if hasattr(checkpointer, "adelete_thread"): + await checkpointer.adelete_thread(thread_id) + except Exception: + logger.debug("Could not delete checkpoints for thread %s (not critical)", thread_id) + + return response + + +@router.post("", response_model=ThreadResponse) +async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadResponse: + """Create a new thread. + + The thread record is written to the Store (for fast listing) and an + empty checkpoint is written to the checkpointer (for state reads). + Idempotent: returns the existing record when ``thread_id`` already exists. + """ + store = get_store(request) + checkpointer = get_checkpointer(request) + thread_id = body.thread_id or str(uuid.uuid4()) + now = time.time() + + # Idempotency: return existing record from Store when already present + if store is not None: + existing_record = await _store_get(store, thread_id) + if existing_record is not None: + return ThreadResponse( + thread_id=thread_id, + status=existing_record.get("status", "idle"), + created_at=str(existing_record.get("created_at", "")), + updated_at=str(existing_record.get("updated_at", "")), + metadata=existing_record.get("metadata", {}), + ) + + # Write thread record to Store + if store is not None: + try: + await _store_put( + store, + { + "thread_id": thread_id, + "status": "idle", + "created_at": now, + "updated_at": now, + "metadata": body.metadata, + }, + ) + except Exception: + logger.exception("Failed to write thread %s to store", thread_id) + raise HTTPException(status_code=500, detail="Failed to create thread") + + # Write an empty checkpoint so state endpoints work immediately + config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}} + try: + from langgraph.checkpoint.base import empty_checkpoint + + ckpt_metadata = { + "step": -1, + "source": "input", + "writes": None, + "parents": {}, + **body.metadata, + "created_at": now, + } + await checkpointer.aput(config, empty_checkpoint(), ckpt_metadata, {}) + except Exception: + logger.exception("Failed to create checkpoint for thread %s", thread_id) + raise HTTPException(status_code=500, detail="Failed to create thread") + + logger.info("Thread created: %s", thread_id) + return ThreadResponse( + thread_id=thread_id, + status="idle", + created_at=str(now), + updated_at=str(now), + metadata=body.metadata, + ) + + +@router.post("/search", response_model=list[ThreadResponse]) +async def search_threads(body: ThreadSearchRequest, request: Request) -> list[ThreadResponse]: + """Search and list threads. + + Two-phase approach: + + **Phase 1 — Store (fast path, O(threads))**: returns threads that were + created or run through this Gateway. Store records are tiny metadata + dicts so fetching all of them at once is cheap. + + **Phase 2 — Checkpointer supplement (lazy migration)**: threads that + were created directly by LangGraph Server (and therefore absent from the + Store) are discovered here by iterating the shared checkpointer. Any + newly found thread is immediately written to the Store so that the next + search skips Phase 2 for that thread — the Store converges to a full + index over time without a one-shot migration job. + """ + store = get_store(request) + checkpointer = get_checkpointer(request) + + # ----------------------------------------------------------------------- + # Phase 1: Store + # ----------------------------------------------------------------------- + merged: dict[str, ThreadResponse] = {} + + if store is not None: + try: + items = await store.asearch(THREADS_NS, limit=10_000) + except Exception: + logger.warning("Store search failed — falling back to checkpointer only", exc_info=True) + items = [] + + for item in items: + val = item.value + merged[val["thread_id"]] = ThreadResponse( + thread_id=val["thread_id"], + status=val.get("status", "idle"), + created_at=str(val.get("created_at", "")), + updated_at=str(val.get("updated_at", "")), + metadata=val.get("metadata", {}), + values=val.get("values", {}), + ) + + # ----------------------------------------------------------------------- + # Phase 2: Checkpointer supplement + # Discovers threads not yet in the Store (e.g. created by LangGraph + # Server) and lazily migrates them so future searches skip this phase. + # ----------------------------------------------------------------------- + try: + async for checkpoint_tuple in checkpointer.alist(None): + cfg = getattr(checkpoint_tuple, "config", {}) + thread_id = cfg.get("configurable", {}).get("thread_id") + if not thread_id or thread_id in merged: + continue + + # Skip sub-graph checkpoints (checkpoint_ns is non-empty for those) + if cfg.get("configurable", {}).get("checkpoint_ns", ""): + continue + + ckpt_meta = getattr(checkpoint_tuple, "metadata", {}) or {} + # Strip LangGraph internal keys from the user-visible metadata dict + user_meta = {k: v for k, v in ckpt_meta.items() if k not in ("created_at", "updated_at", "step", "source", "writes", "parents")} + + # Extract state values (title) from the checkpoint's channel_values + checkpoint_data = getattr(checkpoint_tuple, "checkpoint", {}) or {} + channel_values = checkpoint_data.get("channel_values", {}) + ckpt_values = {} + if title := channel_values.get("title"): + ckpt_values["title"] = title + + thread_resp = ThreadResponse( + thread_id=thread_id, + status=_derive_thread_status(checkpoint_tuple), + created_at=str(ckpt_meta.get("created_at", "")), + updated_at=str(ckpt_meta.get("updated_at", ckpt_meta.get("created_at", ""))), + metadata=user_meta, + values=ckpt_values, + ) + merged[thread_id] = thread_resp + + # Lazy migration — write to Store so the next search finds it there + if store is not None: + try: + await _store_upsert(store, thread_id, metadata=user_meta, values=ckpt_values or None) + except Exception: + logger.debug("Failed to migrate thread %s to store (non-fatal)", thread_id) + except Exception: + logger.exception("Checkpointer scan failed during thread search") + # Don't raise — return whatever was collected from Store + partial scan + + # ----------------------------------------------------------------------- + # Phase 3: Filter → sort → paginate + # ----------------------------------------------------------------------- + results = list(merged.values()) + + if body.metadata: + results = [r for r in results if all(r.metadata.get(k) == v for k, v in body.metadata.items())] + + if body.status: + results = [r for r in results if r.status == body.status] + + results.sort(key=lambda r: r.updated_at, reverse=True) + return results[body.offset : body.offset + body.limit] + + +@router.patch("/{thread_id}", response_model=ThreadResponse) +async def patch_thread(thread_id: str, body: ThreadPatchRequest, request: Request) -> ThreadResponse: + """Merge metadata into a thread record.""" + store = get_store(request) + if store is None: + raise HTTPException(status_code=503, detail="Store not available") + + record = await _store_get(store, thread_id) + if record is None: + raise HTTPException(status_code=404, detail=f"Thread {thread_id} not found") + + now = time.time() + updated = dict(record) + updated.setdefault("metadata", {}).update(body.metadata) + updated["updated_at"] = now + + try: + await _store_put(store, updated) + except Exception: + logger.exception("Failed to patch thread %s", thread_id) + raise HTTPException(status_code=500, detail="Failed to update thread") + + return ThreadResponse( + thread_id=thread_id, + status=updated.get("status", "idle"), + created_at=str(updated.get("created_at", "")), + updated_at=str(now), + metadata=updated.get("metadata", {}), + ) + + +@router.get("/{thread_id}", response_model=ThreadResponse) +async def get_thread(thread_id: str, request: Request) -> ThreadResponse: + """Get thread info. + + Reads metadata from the Store and derives the accurate execution + status from the checkpointer. Falls back to the checkpointer alone + for threads that pre-date Store adoption (backward compat). + """ + store = get_store(request) + checkpointer = get_checkpointer(request) + + record: dict | None = None + if store is not None: + record = await _store_get(store, thread_id) + + # Derive accurate status from the checkpointer + config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}} + try: + checkpoint_tuple = await checkpointer.aget_tuple(config) + except Exception: + logger.exception("Failed to get checkpoint for thread %s", thread_id) + raise HTTPException(status_code=500, detail="Failed to get thread") + + if record is None and checkpoint_tuple is None: + raise HTTPException(status_code=404, detail=f"Thread {thread_id} not found") + + # If the thread exists in the checkpointer but not the store (e.g. legacy + # data), synthesize a minimal store record from the checkpoint metadata. + if record is None and checkpoint_tuple is not None: + ckpt_meta = getattr(checkpoint_tuple, "metadata", {}) or {} + record = { + "thread_id": thread_id, + "status": "idle", + "created_at": ckpt_meta.get("created_at", ""), + "updated_at": ckpt_meta.get("updated_at", ckpt_meta.get("created_at", "")), + "metadata": {k: v for k, v in ckpt_meta.items() if k not in ("created_at", "updated_at", "step", "source", "writes", "parents")}, + } + + status = _derive_thread_status(checkpoint_tuple) if checkpoint_tuple is not None else record.get("status", "idle") # type: ignore[union-attr] + checkpoint = getattr(checkpoint_tuple, "checkpoint", {}) or {} if checkpoint_tuple is not None else {} + channel_values = checkpoint.get("channel_values", {}) + + return ThreadResponse( + thread_id=thread_id, + status=status, + created_at=str(record.get("created_at", "")), # type: ignore[union-attr] + updated_at=str(record.get("updated_at", "")), # type: ignore[union-attr] + metadata=record.get("metadata", {}), # type: ignore[union-attr] + values=serialize_channel_values(channel_values), + ) + + +@router.get("/{thread_id}/state", response_model=ThreadStateResponse) +async def get_thread_state(thread_id: str, request: Request) -> ThreadStateResponse: + """Get the latest state snapshot for a thread. + + Channel values are serialized to ensure LangChain message objects + are converted to JSON-safe dicts. + """ + checkpointer = get_checkpointer(request) + + config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}} + try: + checkpoint_tuple = await checkpointer.aget_tuple(config) + except Exception: + logger.exception("Failed to get state for thread %s", thread_id) + raise HTTPException(status_code=500, detail="Failed to get thread state") + + if checkpoint_tuple is None: + raise HTTPException(status_code=404, detail=f"Thread {thread_id} not found") + + checkpoint = getattr(checkpoint_tuple, "checkpoint", {}) or {} + metadata = getattr(checkpoint_tuple, "metadata", {}) or {} + checkpoint_id = None + ckpt_config = getattr(checkpoint_tuple, "config", {}) + if ckpt_config: + checkpoint_id = ckpt_config.get("configurable", {}).get("checkpoint_id") + + channel_values = checkpoint.get("channel_values", {}) + + parent_config = getattr(checkpoint_tuple, "parent_config", None) + parent_checkpoint_id = None + if parent_config: + parent_checkpoint_id = parent_config.get("configurable", {}).get("checkpoint_id") + + tasks_raw = getattr(checkpoint_tuple, "tasks", []) or [] + next_tasks = [t.name for t in tasks_raw if hasattr(t, "name")] + tasks = [{"id": getattr(t, "id", ""), "name": getattr(t, "name", "")} for t in tasks_raw] + + return ThreadStateResponse( + values=serialize_channel_values(channel_values), + next=next_tasks, + metadata=metadata, + checkpoint={"id": checkpoint_id, "ts": str(metadata.get("created_at", ""))}, + checkpoint_id=checkpoint_id, + parent_checkpoint_id=parent_checkpoint_id, + created_at=str(metadata.get("created_at", "")), + tasks=tasks, + ) + + +@router.post("/{thread_id}/state", response_model=ThreadStateResponse) +async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, request: Request) -> ThreadStateResponse: + """Update thread state (e.g. for human-in-the-loop resume or title rename). + + Writes a new checkpoint that merges *body.values* into the latest + channel values, then syncs any updated ``title`` field back to the Store + so that ``/threads/search`` reflects the change immediately. + """ + checkpointer = get_checkpointer(request) + store = get_store(request) + + # checkpoint_ns must be present in the config for aput — default to "" + # (the root graph namespace). checkpoint_id is optional; omitting it + # fetches the latest checkpoint for the thread. + read_config: dict[str, Any] = { + "configurable": { + "thread_id": thread_id, + "checkpoint_ns": "", + } + } + if body.checkpoint_id: + read_config["configurable"]["checkpoint_id"] = body.checkpoint_id + + try: + checkpoint_tuple = await checkpointer.aget_tuple(read_config) + except Exception: + logger.exception("Failed to get state for thread %s", thread_id) + raise HTTPException(status_code=500, detail="Failed to get thread state") + + if checkpoint_tuple is None: + raise HTTPException(status_code=404, detail=f"Thread {thread_id} not found") + + # Work on mutable copies so we don't accidentally mutate cached objects. + checkpoint: dict[str, Any] = dict(getattr(checkpoint_tuple, "checkpoint", {}) or {}) + metadata: dict[str, Any] = dict(getattr(checkpoint_tuple, "metadata", {}) or {}) + channel_values: dict[str, Any] = dict(checkpoint.get("channel_values", {})) + + if body.values: + channel_values.update(body.values) + + checkpoint["channel_values"] = channel_values + metadata["updated_at"] = time.time() + + if body.as_node: + metadata["source"] = "update" + metadata["step"] = metadata.get("step", 0) + 1 + metadata["writes"] = {body.as_node: body.values} + + # aput requires checkpoint_ns in the config — use the same config used for the + # read (which always includes checkpoint_ns=""). Do NOT include checkpoint_id + # so that aput generates a fresh checkpoint ID for the new snapshot. + write_config: dict[str, Any] = { + "configurable": { + "thread_id": thread_id, + "checkpoint_ns": "", + } + } + try: + new_config = await checkpointer.aput(write_config, checkpoint, metadata, {}) + except Exception: + logger.exception("Failed to update state for thread %s", thread_id) + raise HTTPException(status_code=500, detail="Failed to update thread state") + + new_checkpoint_id: str | None = None + if isinstance(new_config, dict): + new_checkpoint_id = new_config.get("configurable", {}).get("checkpoint_id") + + # Sync title changes to the Store so /threads/search reflects them immediately. + if store is not None and body.values and "title" in body.values: + try: + await _store_upsert(store, thread_id, values={"title": body.values["title"]}) + except Exception: + logger.debug("Failed to sync title to store for thread %s (non-fatal)", thread_id) + + return ThreadStateResponse( + values=serialize_channel_values(channel_values), + next=[], + metadata=metadata, + checkpoint_id=new_checkpoint_id, + created_at=str(metadata.get("created_at", "")), + ) + + +@router.post("/{thread_id}/history", response_model=list[HistoryEntry]) +async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request: Request) -> list[HistoryEntry]: + """Get checkpoint history for a thread.""" + checkpointer = get_checkpointer(request) + + config: dict[str, Any] = {"configurable": {"thread_id": thread_id}} + if body.before: + config["configurable"]["checkpoint_id"] = body.before + + entries: list[HistoryEntry] = [] + try: + async for checkpoint_tuple in checkpointer.alist(config, limit=body.limit): + ckpt_config = getattr(checkpoint_tuple, "config", {}) + parent_config = getattr(checkpoint_tuple, "parent_config", None) + metadata = getattr(checkpoint_tuple, "metadata", {}) or {} + checkpoint = getattr(checkpoint_tuple, "checkpoint", {}) or {} + + checkpoint_id = ckpt_config.get("configurable", {}).get("checkpoint_id", "") + parent_id = None + if parent_config: + parent_id = parent_config.get("configurable", {}).get("checkpoint_id") + + channel_values = checkpoint.get("channel_values", {}) + + # Derive next tasks + tasks_raw = getattr(checkpoint_tuple, "tasks", []) or [] + next_tasks = [t.name for t in tasks_raw if hasattr(t, "name")] + + entries.append( + HistoryEntry( + checkpoint_id=checkpoint_id, + parent_checkpoint_id=parent_id, + metadata=metadata, + values=serialize_channel_values(channel_values), + created_at=str(metadata.get("created_at", "")), + next=next_tasks, + ) + ) + except Exception: + logger.exception("Failed to get history for thread %s", thread_id) + raise HTTPException(status_code=500, detail="Failed to get thread history") + + return entries diff --git a/backend/app/gateway/services.py b/backend/app/gateway/services.py new file mode 100644 index 000000000..da4a06824 --- /dev/null +++ b/backend/app/gateway/services.py @@ -0,0 +1,296 @@ +"""Run lifecycle service layer. + +Centralizes the business logic for creating runs, formatting SSE +frames, and consuming stream bridge events. Router modules +(``thread_runs``, ``runs``) are thin HTTP handlers that delegate here. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import time +from typing import Any + +from fastapi import HTTPException, Request +from langchain_core.messages import HumanMessage + +from app.gateway.deps import get_checkpointer, get_run_manager, get_store, get_stream_bridge +from deerflow.runtime import ( + END_SENTINEL, + HEARTBEAT_SENTINEL, + ConflictError, + DisconnectMode, + RunManager, + RunRecord, + RunStatus, + StreamBridge, + UnsupportedStrategyError, + run_agent, +) + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# SSE formatting +# --------------------------------------------------------------------------- + + +def format_sse(event: str, data: Any, *, event_id: str | None = None) -> str: + """Format a single SSE frame. + + Field order: ``event:`` -> ``data:`` -> ``id:`` (optional) -> blank line. + This matches the LangGraph Platform wire format consumed by the + ``useStream`` React hook and the Python ``langgraph-sdk`` SSE decoder. + """ + payload = json.dumps(data, default=str, ensure_ascii=False) + parts = [f"event: {event}", f"data: {payload}"] + if event_id: + parts.append(f"id: {event_id}") + parts.append("") + parts.append("") + return "\n".join(parts) + + +# --------------------------------------------------------------------------- +# Input / config helpers +# --------------------------------------------------------------------------- + + +def normalize_stream_modes(raw: list[str] | str | None) -> list[str]: + """Normalize the stream_mode parameter to a list. + + Default matches what ``useStream`` expects: values + messages-tuple. + """ + if raw is None: + return ["values"] + if isinstance(raw, str): + return [raw] + return raw if raw else ["values"] + + +def normalize_input(raw_input: dict[str, Any] | None) -> dict[str, Any]: + """Convert LangGraph Platform input format to LangChain state dict.""" + if raw_input is None: + return {} + messages = raw_input.get("messages") + if messages and isinstance(messages, list): + converted = [] + for msg in messages: + if isinstance(msg, dict): + role = msg.get("role", msg.get("type", "user")) + content = msg.get("content", "") + if role in ("user", "human"): + converted.append(HumanMessage(content=content)) + else: + # TODO: handle other message types (system, ai, tool) + converted.append(HumanMessage(content=content)) + else: + converted.append(msg) + return {**raw_input, "messages": converted} + return raw_input + + +def resolve_agent_factory(assistant_id: str | None): + """Resolve the agent factory callable from config.""" + from deerflow.agents.lead_agent.agent import make_lead_agent + + if assistant_id and assistant_id != "lead_agent": + logger.info("assistant_id=%s requested; falling back to lead_agent", assistant_id) + return make_lead_agent + + +def build_run_config(thread_id: str, request_config: dict[str, Any] | None, metadata: dict[str, Any] | None) -> dict[str, Any]: + """Build a RunnableConfig dict for the agent.""" + configurable = {"thread_id": thread_id} + if request_config: + configurable.update(request_config.get("configurable", {})) + config: dict[str, Any] = {"configurable": configurable, "recursion_limit": 100} + if request_config: + for k, v in request_config.items(): + if k != "configurable": + config[k] = v + if metadata: + config.setdefault("metadata", {}).update(metadata) + return config + + +# --------------------------------------------------------------------------- +# Run lifecycle +# --------------------------------------------------------------------------- + + +async def _upsert_thread_in_store(store, thread_id: str, metadata: dict | None) -> None: + """Create or refresh the thread record in the Store. + + Called from :func:`start_run` so that threads created via the stateless + ``/runs/stream`` endpoint (which never calls ``POST /threads``) still + appear in ``/threads/search`` results. + """ + # Deferred import to avoid circular import with the threads router module. + from app.gateway.routers.threads import _store_upsert + + try: + await _store_upsert(store, thread_id, metadata=metadata) + except Exception: + logger.warning("Failed to upsert thread %s in store (non-fatal)", thread_id) + + +async def _sync_thread_title_after_run( + run_task: asyncio.Task, + thread_id: str, + checkpointer: Any, + store: Any, +) -> None: + """Wait for *run_task* to finish, then persist the generated title to the Store. + + TitleMiddleware writes the generated title to the LangGraph agent state + (checkpointer) but the Gateway's Store record is not updated automatically. + This coroutine closes that gap by reading the final checkpoint after the + run completes and syncing ``values.title`` into the Store record so that + subsequent ``/threads/search`` responses include the correct title. + + Runs as a fire-and-forget :func:`asyncio.create_task`; failures are + logged at DEBUG level and never propagate. + """ + # Wait for the background run task to complete (any outcome). + # asyncio.wait does not propagate task exceptions — it just returns + # when the task is done, cancelled, or failed. + await asyncio.wait({run_task}) + + # Deferred import to avoid circular import with the threads router module. + from app.gateway.routers.threads import _store_get, _store_put + + try: + ckpt_config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}} + ckpt_tuple = await checkpointer.aget_tuple(ckpt_config) + if ckpt_tuple is None: + return + + channel_values = ckpt_tuple.checkpoint.get("channel_values", {}) + title = channel_values.get("title") + if not title: + return + + existing = await _store_get(store, thread_id) + if existing is None: + return + + updated = dict(existing) + updated.setdefault("values", {})["title"] = title + updated["updated_at"] = time.time() + await _store_put(store, updated) + logger.debug("Synced title %r for thread %s", title, thread_id) + except Exception: + logger.debug("Failed to sync title for thread %s (non-fatal)", thread_id, exc_info=True) + + +async def start_run( + body: Any, + thread_id: str, + request: Request, +) -> RunRecord: + """Create a RunRecord and launch the background agent task. + + Parameters + ---------- + body : RunCreateRequest + The validated request body (typed as Any to avoid circular import + with the router module that defines the Pydantic model). + thread_id : str + Target thread. + request : Request + FastAPI request — used to retrieve singletons from ``app.state``. + """ + bridge = get_stream_bridge(request) + run_mgr = get_run_manager(request) + checkpointer = get_checkpointer(request) + store = get_store(request) + + disconnect = DisconnectMode.cancel if body.on_disconnect == "cancel" else DisconnectMode.continue_ + + try: + record = await run_mgr.create_or_reject( + thread_id, + body.assistant_id, + on_disconnect=disconnect, + metadata=body.metadata or {}, + kwargs={"input": body.input, "config": body.config}, + multitask_strategy=body.multitask_strategy, + ) + except ConflictError as exc: + raise HTTPException(status_code=409, detail=str(exc)) from exc + except UnsupportedStrategyError as exc: + raise HTTPException(status_code=501, detail=str(exc)) from exc + + # Ensure the thread is visible in /threads/search, even for threads that + # were never explicitly created via POST /threads (e.g. stateless runs). + store = get_store(request) + if store is not None: + await _upsert_thread_in_store(store, thread_id, body.metadata) + + agent_factory = resolve_agent_factory(body.assistant_id) + graph_input = normalize_input(body.input) + config = build_run_config(thread_id, body.config, body.metadata) + stream_modes = normalize_stream_modes(body.stream_mode) + + task = asyncio.create_task( + run_agent( + bridge, + run_mgr, + record, + checkpointer=checkpointer, + store=store, + agent_factory=agent_factory, + graph_input=graph_input, + config=config, + stream_modes=stream_modes, + stream_subgraphs=body.stream_subgraphs, + interrupt_before=body.interrupt_before, + interrupt_after=body.interrupt_after, + ) + ) + record.task = task + + # After the run completes, sync the title generated by TitleMiddleware from + # the checkpointer into the Store record so that /threads/search returns the + # correct title instead of an empty values dict. + if store is not None: + asyncio.create_task(_sync_thread_title_after_run(task, thread_id, checkpointer, store)) + + return record + + +async def sse_consumer( + bridge: StreamBridge, + record: RunRecord, + request: Request, + run_mgr: RunManager, +): + """Async generator that yields SSE frames from the bridge. + + The ``finally`` block implements ``on_disconnect`` semantics: + - ``cancel``: abort the background task on client disconnect. + - ``continue``: let the task run; events are discarded. + """ + try: + async for entry in bridge.subscribe(record.run_id): + if await request.is_disconnected(): + break + + if entry is HEARTBEAT_SENTINEL: + yield ": heartbeat\n\n" + continue + + if entry is END_SENTINEL: + yield format_sse("end", None, event_id=entry.id or None) + return + + yield format_sse(entry.event, entry.data, event_id=entry.id or None) + + finally: + if record.status in (RunStatus.pending, RunStatus.running): + if record.on_disconnect == DisconnectMode.cancel: + await run_mgr.cancel(record.run_id) diff --git a/backend/packages/harness/deerflow/agents/checkpointer/async_provider.py b/backend/packages/harness/deerflow/agents/checkpointer/async_provider.py index abd802fba..9380d781e 100644 --- a/backend/packages/harness/deerflow/agents/checkpointer/async_provider.py +++ b/backend/packages/harness/deerflow/agents/checkpointer/async_provider.py @@ -27,9 +27,9 @@ from deerflow.agents.checkpointer.provider import ( POSTGRES_CONN_REQUIRED, POSTGRES_INSTALL, SQLITE_INSTALL, - _resolve_sqlite_conn_str, ) from deerflow.config.app_config import get_app_config +from deerflow.runtime.store._sqlite_utils import ensure_sqlite_parent_dir, resolve_sqlite_conn_str logger = logging.getLogger(__name__) @@ -53,12 +53,8 @@ async def _async_checkpointer(config) -> AsyncIterator[Checkpointer]: except ImportError as exc: raise ImportError(SQLITE_INSTALL) from exc - import pathlib - - conn_str = _resolve_sqlite_conn_str(config.connection_string or "store.db") - # Only create parent directories for real filesystem paths - if conn_str != ":memory:" and not conn_str.startswith("file:"): - pathlib.Path(conn_str).parent.mkdir(parents=True, exist_ok=True) + conn_str = resolve_sqlite_conn_str(config.connection_string or "store.db") + ensure_sqlite_parent_dir(conn_str) async with AsyncSqliteSaver.from_conn_string(conn_str) as saver: await saver.setup() yield saver diff --git a/backend/packages/harness/deerflow/agents/checkpointer/provider.py b/backend/packages/harness/deerflow/agents/checkpointer/provider.py index f66908add..6f09aac94 100644 --- a/backend/packages/harness/deerflow/agents/checkpointer/provider.py +++ b/backend/packages/harness/deerflow/agents/checkpointer/provider.py @@ -27,7 +27,7 @@ from langgraph.types import Checkpointer from deerflow.config.app_config import get_app_config from deerflow.config.checkpointer_config import CheckpointerConfig -from deerflow.config.paths import resolve_path +from deerflow.runtime.store._sqlite_utils import resolve_sqlite_conn_str logger = logging.getLogger(__name__) @@ -44,18 +44,6 @@ POSTGRES_CONN_REQUIRED = "checkpointer.connection_string is required for the pos # --------------------------------------------------------------------------- -def _resolve_sqlite_conn_str(raw: str) -> str: - """Return a SQLite connection string ready for use with ``SqliteSaver``. - - SQLite special strings (``":memory:"`` and ``file:`` URIs) are returned - unchanged. Plain filesystem paths — relative or absolute — are resolved - to an absolute string via :func:`resolve_path`. - """ - if raw == ":memory:" or raw.startswith("file:"): - return raw - return str(resolve_path(raw)) - - @contextlib.contextmanager def _sync_checkpointer_cm(config: CheckpointerConfig) -> Iterator[Checkpointer]: """Context manager that creates and tears down a sync checkpointer. @@ -78,7 +66,7 @@ def _sync_checkpointer_cm(config: CheckpointerConfig) -> Iterator[Checkpointer]: except ImportError as exc: raise ImportError(SQLITE_INSTALL) from exc - conn_str = _resolve_sqlite_conn_str(config.connection_string or "store.db") + conn_str = resolve_sqlite_conn_str(config.connection_string or "store.db") with SqliteSaver.from_conn_string(conn_str) as saver: saver.setup() logger.info("Checkpointer: using SqliteSaver (%s)", conn_str) diff --git a/backend/packages/harness/deerflow/config/app_config.py b/backend/packages/harness/deerflow/config/app_config.py index 4caabb0bf..d034ffc4c 100644 --- a/backend/packages/harness/deerflow/config/app_config.py +++ b/backend/packages/harness/deerflow/config/app_config.py @@ -15,6 +15,7 @@ from deerflow.config.memory_config import load_memory_config_from_dict from deerflow.config.model_config import ModelConfig from deerflow.config.sandbox_config import SandboxConfig from deerflow.config.skills_config import SkillsConfig +from deerflow.config.stream_bridge_config import StreamBridgeConfig, load_stream_bridge_config_from_dict from deerflow.config.subagents_config import load_subagents_config_from_dict from deerflow.config.summarization_config import load_summarization_config_from_dict from deerflow.config.title_config import load_title_config_from_dict @@ -41,6 +42,7 @@ class AppConfig(BaseModel): tool_search: ToolSearchConfig = Field(default_factory=ToolSearchConfig, description="Tool search / deferred loading configuration") model_config = ConfigDict(extra="allow", frozen=False) checkpointer: CheckpointerConfig | None = Field(default=None, description="Checkpointer configuration") + stream_bridge: StreamBridgeConfig | None = Field(default=None, description="Stream bridge configuration") @classmethod def resolve_config_path(cls, config_path: str | None = None) -> Path: @@ -120,6 +122,10 @@ class AppConfig(BaseModel): if "checkpointer" in config_data: load_checkpointer_config_from_dict(config_data["checkpointer"]) + # Load stream bridge config if present + if "stream_bridge" in config_data: + load_stream_bridge_config_from_dict(config_data["stream_bridge"]) + # Always refresh ACP agent config so removed entries do not linger across reloads. load_acp_config_from_dict(config_data.get("acp_agents", {})) diff --git a/backend/packages/harness/deerflow/config/stream_bridge_config.py b/backend/packages/harness/deerflow/config/stream_bridge_config.py new file mode 100644 index 000000000..895c4639c --- /dev/null +++ b/backend/packages/harness/deerflow/config/stream_bridge_config.py @@ -0,0 +1,46 @@ +"""Configuration for stream bridge.""" + +from typing import Literal + +from pydantic import BaseModel, Field + +StreamBridgeType = Literal["memory", "redis"] + + +class StreamBridgeConfig(BaseModel): + """Configuration for the stream bridge that connects agent workers to SSE endpoints.""" + + type: StreamBridgeType = Field( + default="memory", + description="Stream bridge backend type. 'memory' uses in-process asyncio.Queue (single-process only). 'redis' uses Redis Streams (planned for Phase 2, not yet implemented).", + ) + redis_url: str | None = Field( + default=None, + description="Redis URL for the redis stream bridge type. Example: 'redis://localhost:6379/0'.", + ) + queue_maxsize: int = Field( + default=256, + description="Maximum number of events buffered per run in the memory bridge.", + ) + + +# Global configuration instance — None means no stream bridge is configured +# (falls back to memory with defaults). +_stream_bridge_config: StreamBridgeConfig | None = None + + +def get_stream_bridge_config() -> StreamBridgeConfig | None: + """Get the current stream bridge configuration, or None if not configured.""" + return _stream_bridge_config + + +def set_stream_bridge_config(config: StreamBridgeConfig | None) -> None: + """Set the stream bridge configuration.""" + global _stream_bridge_config + _stream_bridge_config = config + + +def load_stream_bridge_config_from_dict(config_dict: dict) -> None: + """Load stream bridge configuration from a dictionary.""" + global _stream_bridge_config + _stream_bridge_config = StreamBridgeConfig(**config_dict) diff --git a/backend/packages/harness/deerflow/runtime/__init__.py b/backend/packages/harness/deerflow/runtime/__init__.py new file mode 100644 index 000000000..d7eccf101 --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/__init__.py @@ -0,0 +1,39 @@ +"""LangGraph-compatible runtime — runs, streaming, and lifecycle management. + +Re-exports the public API of :mod:`~deerflow.runtime.runs` and +:mod:`~deerflow.runtime.stream_bridge` so that consumers can import +directly from ``deerflow.runtime``. +""" + +from .runs import ConflictError, DisconnectMode, RunManager, RunRecord, RunStatus, UnsupportedStrategyError, run_agent +from .serialization import serialize, serialize_channel_values, serialize_lc_object, serialize_messages_tuple +from .store import get_store, make_store, reset_store, store_context +from .stream_bridge import END_SENTINEL, HEARTBEAT_SENTINEL, MemoryStreamBridge, StreamBridge, StreamEvent, make_stream_bridge + +__all__ = [ + # runs + "ConflictError", + "DisconnectMode", + "RunManager", + "RunRecord", + "RunStatus", + "UnsupportedStrategyError", + "run_agent", + # serialization + "serialize", + "serialize_channel_values", + "serialize_lc_object", + "serialize_messages_tuple", + # store + "get_store", + "make_store", + "reset_store", + "store_context", + # stream_bridge + "END_SENTINEL", + "HEARTBEAT_SENTINEL", + "MemoryStreamBridge", + "StreamBridge", + "StreamEvent", + "make_stream_bridge", +] diff --git a/backend/packages/harness/deerflow/runtime/runs/__init__.py b/backend/packages/harness/deerflow/runtime/runs/__init__.py new file mode 100644 index 000000000..afed90f48 --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/runs/__init__.py @@ -0,0 +1,15 @@ +"""Run lifecycle management for LangGraph Platform API compatibility.""" + +from .manager import ConflictError, RunManager, RunRecord, UnsupportedStrategyError +from .schemas import DisconnectMode, RunStatus +from .worker import run_agent + +__all__ = [ + "ConflictError", + "DisconnectMode", + "RunManager", + "RunRecord", + "RunStatus", + "UnsupportedStrategyError", + "run_agent", +] diff --git a/backend/packages/harness/deerflow/runtime/runs/manager.py b/backend/packages/harness/deerflow/runtime/runs/manager.py new file mode 100644 index 000000000..42b0372a2 --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/runs/manager.py @@ -0,0 +1,212 @@ +"""In-memory run registry.""" + +from __future__ import annotations + +import asyncio +import logging +import uuid +from dataclasses import dataclass, field +from datetime import UTC, datetime + +from .schemas import DisconnectMode, RunStatus + +logger = logging.getLogger(__name__) + + +def _now_iso() -> str: + return datetime.now(UTC).isoformat() + + +@dataclass +class RunRecord: + """Mutable record for a single run.""" + + run_id: str + thread_id: str + assistant_id: str | None + status: RunStatus + on_disconnect: DisconnectMode + multitask_strategy: str = "reject" + metadata: dict = field(default_factory=dict) + kwargs: dict = field(default_factory=dict) + created_at: str = "" + updated_at: str = "" + task: asyncio.Task | None = field(default=None, repr=False) + abort_event: asyncio.Event = field(default_factory=asyncio.Event, repr=False) + abort_action: str = "interrupt" + error: str | None = None + + +class RunManager: + """In-memory run registry. All mutations are protected by an asyncio lock.""" + + def __init__(self) -> None: + self._runs: dict[str, RunRecord] = {} + self._lock = asyncio.Lock() + + async def create( + self, + thread_id: str, + assistant_id: str | None = None, + *, + on_disconnect: DisconnectMode = DisconnectMode.cancel, + metadata: dict | None = None, + kwargs: dict | None = None, + multitask_strategy: str = "reject", + ) -> RunRecord: + """Create a new pending run and register it.""" + run_id = str(uuid.uuid4()) + now = _now_iso() + record = RunRecord( + run_id=run_id, + thread_id=thread_id, + assistant_id=assistant_id, + status=RunStatus.pending, + on_disconnect=on_disconnect, + multitask_strategy=multitask_strategy, + metadata=metadata or {}, + kwargs=kwargs or {}, + created_at=now, + updated_at=now, + ) + async with self._lock: + self._runs[run_id] = record + logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id) + return record + + def get(self, run_id: str) -> RunRecord | None: + """Return a run record by ID, or ``None``.""" + return self._runs.get(run_id) + + async def list_by_thread(self, thread_id: str) -> list[RunRecord]: + """Return all runs for a given thread, newest first.""" + async with self._lock: + return sorted( + (r for r in self._runs.values() if r.thread_id == thread_id), + key=lambda r: r.created_at, + reverse=True, + ) + + async def set_status(self, run_id: str, status: RunStatus, *, error: str | None = None) -> None: + """Transition a run to a new status.""" + async with self._lock: + record = self._runs.get(run_id) + if record is None: + logger.warning("set_status called for unknown run %s", run_id) + return + record.status = status + record.updated_at = _now_iso() + if error is not None: + record.error = error + logger.info("Run %s -> %s", run_id, status.value) + + async def cancel(self, run_id: str, *, action: str = "interrupt") -> bool: + """Request cancellation of a run. + + Args: + run_id: The run ID to cancel. + action: "interrupt" keeps checkpoint, "rollback" reverts to pre-run state. + + Sets the abort event with the action reason and cancels the asyncio task. + Returns ``True`` if the run was in-flight and cancellation was initiated. + """ + async with self._lock: + record = self._runs.get(run_id) + if record is None: + return False + if record.status not in (RunStatus.pending, RunStatus.running): + return False + record.abort_action = action + record.abort_event.set() + if record.task is not None and not record.task.done(): + record.task.cancel() + record.status = RunStatus.interrupted + record.updated_at = _now_iso() + logger.info("Run %s cancelled (action=%s)", run_id, action) + return True + + async def create_or_reject( + self, + thread_id: str, + assistant_id: str | None = None, + *, + on_disconnect: DisconnectMode = DisconnectMode.cancel, + metadata: dict | None = None, + kwargs: dict | None = None, + multitask_strategy: str = "reject", + ) -> RunRecord: + """Atomically check for inflight runs and create a new one. + + For ``reject`` strategy, raises ``ConflictError`` if thread + already has a pending/running run. For ``interrupt``/``rollback``, + cancels inflight runs before creating. + + This method holds the lock across both the check and the insert, + eliminating the TOCTOU race in separate ``has_inflight`` + ``create``. + """ + run_id = str(uuid.uuid4()) + now = _now_iso() + + _supported_strategies = ("reject", "interrupt", "rollback") + + async with self._lock: + if multitask_strategy not in _supported_strategies: + raise UnsupportedStrategyError(f"Multitask strategy '{multitask_strategy}' is not yet supported. Supported strategies: {', '.join(_supported_strategies)}") + + inflight = [r for r in self._runs.values() if r.thread_id == thread_id and r.status in (RunStatus.pending, RunStatus.running)] + + if multitask_strategy == "reject" and inflight: + raise ConflictError(f"Thread {thread_id} already has an active run") + + if multitask_strategy in ("interrupt", "rollback") and inflight: + for r in inflight: + r.abort_action = multitask_strategy + r.abort_event.set() + if r.task is not None and not r.task.done(): + r.task.cancel() + r.status = RunStatus.interrupted + r.updated_at = now + logger.info( + "Cancelled %d inflight run(s) on thread %s (strategy=%s)", + len(inflight), + thread_id, + multitask_strategy, + ) + + record = RunRecord( + run_id=run_id, + thread_id=thread_id, + assistant_id=assistant_id, + status=RunStatus.pending, + on_disconnect=on_disconnect, + multitask_strategy=multitask_strategy, + metadata=metadata or {}, + kwargs=kwargs or {}, + created_at=now, + updated_at=now, + ) + self._runs[run_id] = record + + logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id) + return record + + async def has_inflight(self, thread_id: str) -> bool: + """Return ``True`` if *thread_id* has a pending or running run.""" + async with self._lock: + return any(r.thread_id == thread_id and r.status in (RunStatus.pending, RunStatus.running) for r in self._runs.values()) + + async def cleanup(self, run_id: str, *, delay: float = 300) -> None: + """Remove a run record after an optional delay.""" + if delay > 0: + await asyncio.sleep(delay) + async with self._lock: + self._runs.pop(run_id, None) + logger.debug("Run record %s cleaned up", run_id) + + +class ConflictError(Exception): + """Raised when multitask_strategy=reject and thread has inflight runs.""" + + +class UnsupportedStrategyError(Exception): + """Raised when a multitask_strategy value is not yet implemented.""" diff --git a/backend/packages/harness/deerflow/runtime/runs/schemas.py b/backend/packages/harness/deerflow/runtime/runs/schemas.py new file mode 100644 index 000000000..622d8b70b --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/runs/schemas.py @@ -0,0 +1,21 @@ +"""Run status and disconnect mode enums.""" + +from enum import StrEnum + + +class RunStatus(StrEnum): + """Lifecycle status of a single run.""" + + pending = "pending" + running = "running" + success = "success" + error = "error" + timeout = "timeout" + interrupted = "interrupted" + + +class DisconnectMode(StrEnum): + """Behaviour when the SSE consumer disconnects.""" + + cancel = "cancel" + continue_ = "continue" diff --git a/backend/packages/harness/deerflow/runtime/runs/worker.py b/backend/packages/harness/deerflow/runtime/runs/worker.py new file mode 100644 index 000000000..deaec055a --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/runs/worker.py @@ -0,0 +1,253 @@ +"""Background agent execution. + +Runs an agent graph inside an ``asyncio.Task``, publishing events to +a :class:`StreamBridge` as they are produced. + +Uses ``graph.astream(stream_mode=[...])`` which gives correct full-state +snapshots for ``values`` mode, proper ``{node: writes}`` for ``updates``, +and ``(chunk, metadata)`` tuples for ``messages`` mode. + +Note: ``events`` mode is not supported through the gateway — it requires +``graph.astream_events()`` which cannot simultaneously produce ``values`` +snapshots. The JS open-source LangGraph API server works around this via +internal checkpoint callbacks that are not exposed in the Python public API. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any, Literal + +from deerflow.runtime.serialization import serialize +from deerflow.runtime.stream_bridge import StreamBridge + +from .manager import RunManager, RunRecord +from .schemas import RunStatus + +logger = logging.getLogger(__name__) + +# Valid stream_mode values for LangGraph's graph.astream() +_VALID_LG_MODES = {"values", "updates", "checkpoints", "tasks", "debug", "messages", "custom"} + + +async def run_agent( + bridge: StreamBridge, + run_manager: RunManager, + record: RunRecord, + *, + checkpointer: Any, + store: Any | None = None, + agent_factory: Any, + graph_input: dict, + config: dict, + stream_modes: list[str] | None = None, + stream_subgraphs: bool = False, + interrupt_before: list[str] | Literal["*"] | None = None, + interrupt_after: list[str] | Literal["*"] | None = None, +) -> None: + """Execute an agent in the background, publishing events to *bridge*.""" + + run_id = record.run_id + thread_id = record.thread_id + requested_modes: set[str] = set(stream_modes or ["values"]) + + # Track whether "events" was requested but skipped + if "events" in requested_modes: + logger.info( + "Run %s: 'events' stream_mode not supported in gateway (requires astream_events + checkpoint callbacks). Skipping.", + run_id, + ) + + try: + # 1. Mark running + await run_manager.set_status(run_id, RunStatus.running) + + # Record pre-run checkpoint_id to support rollback (Phase 2). + pre_run_checkpoint_id = None + try: + config_for_check = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}} + ckpt_tuple = await checkpointer.aget_tuple(config_for_check) + if ckpt_tuple is not None: + pre_run_checkpoint_id = getattr(ckpt_tuple, "config", {}).get("configurable", {}).get("checkpoint_id") + except Exception: + logger.debug("Could not get pre-run checkpoint_id for run %s", run_id) + + # 2. Publish metadata — useStream needs both run_id AND thread_id + await bridge.publish( + run_id, + "metadata", + { + "run_id": run_id, + "thread_id": thread_id, + }, + ) + + # 3. Build the agent + from langchain_core.runnables import RunnableConfig + from langgraph.runtime import Runtime + + # Inject runtime context so middlewares can access thread_id + # (langgraph-cli does this automatically; we must do it manually) + runtime = Runtime(context={"thread_id": thread_id}, store=store) + config.setdefault("configurable", {})["__pregel_runtime"] = runtime + + runnable_config = RunnableConfig(**config) + agent = agent_factory(config=runnable_config) + + # 4. Attach checkpointer and store + if checkpointer is not None: + agent.checkpointer = checkpointer + if store is not None: + agent.store = store + + # 5. Set interrupt nodes + if interrupt_before: + agent.interrupt_before_nodes = interrupt_before + if interrupt_after: + agent.interrupt_after_nodes = interrupt_after + + # 6. Build LangGraph stream_mode list + # "events" is NOT a valid astream mode — skip it + # "messages-tuple" maps to LangGraph's "messages" mode + lg_modes: list[str] = [] + for m in requested_modes: + if m == "messages-tuple": + lg_modes.append("messages") + elif m == "events": + # Skipped — see log above + continue + elif m in _VALID_LG_MODES: + lg_modes.append(m) + if not lg_modes: + lg_modes = ["values"] + + # Deduplicate while preserving order + seen: set[str] = set() + deduped: list[str] = [] + for m in lg_modes: + if m not in seen: + seen.add(m) + deduped.append(m) + lg_modes = deduped + + logger.info("Run %s: streaming with modes %s (requested: %s)", run_id, lg_modes, requested_modes) + + # 7. Stream using graph.astream + if len(lg_modes) == 1 and not stream_subgraphs: + # Single mode, no subgraphs: astream yields raw chunks + single_mode = lg_modes[0] + async for chunk in agent.astream(graph_input, config=runnable_config, stream_mode=single_mode): + if record.abort_event.is_set(): + logger.info("Run %s abort requested — stopping", run_id) + break + sse_event = _lg_mode_to_sse_event(single_mode) + await bridge.publish(run_id, sse_event, serialize(chunk, mode=single_mode)) + else: + # Multiple modes or subgraphs: astream yields tuples + async for item in agent.astream( + graph_input, + config=runnable_config, + stream_mode=lg_modes, + subgraphs=stream_subgraphs, + ): + if record.abort_event.is_set(): + logger.info("Run %s abort requested — stopping", run_id) + break + + mode, chunk = _unpack_stream_item(item, lg_modes, stream_subgraphs) + if mode is None: + continue + + sse_event = _lg_mode_to_sse_event(mode) + await bridge.publish(run_id, sse_event, serialize(chunk, mode=mode)) + + # 8. Final status + if record.abort_event.is_set(): + action = record.abort_action + if action == "rollback": + await run_manager.set_status(run_id, RunStatus.error, error="Rolled back by user") + # TODO(Phase 2): Implement full checkpoint rollback. + # Use pre_run_checkpoint_id to revert the thread's checkpoint + # to the state before this run started. Requires a + # checkpointer.adelete() or equivalent API. + try: + if checkpointer is not None and pre_run_checkpoint_id is not None: + # Phase 2: roll back to pre_run_checkpoint_id + pass + logger.info("Run %s rolled back", run_id) + except Exception: + logger.warning("Failed to rollback checkpoint for run %s", run_id) + else: + await run_manager.set_status(run_id, RunStatus.interrupted) + else: + await run_manager.set_status(run_id, RunStatus.success) + + except asyncio.CancelledError: + action = record.abort_action + if action == "rollback": + await run_manager.set_status(run_id, RunStatus.error, error="Rolled back by user") + logger.info("Run %s was cancelled (rollback)", run_id) + else: + await run_manager.set_status(run_id, RunStatus.interrupted) + logger.info("Run %s was cancelled", run_id) + + except Exception as exc: + error_msg = f"{exc}" + logger.exception("Run %s failed: %s", run_id, error_msg) + await run_manager.set_status(run_id, RunStatus.error, error=error_msg) + await bridge.publish( + run_id, + "error", + { + "message": error_msg, + "name": type(exc).__name__, + }, + ) + + finally: + await bridge.publish_end(run_id) + asyncio.create_task(bridge.cleanup(run_id, delay=60)) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _lg_mode_to_sse_event(mode: str) -> str: + """Map LangGraph internal stream_mode name to SSE event name. + + LangGraph's ``astream(stream_mode="messages")`` produces message + tuples. The SSE protocol calls this ``messages-tuple`` when the + client explicitly requests it, but the default SSE event name used + by LangGraph Platform is simply ``"messages"``. + """ + # All LG modes map 1:1 to SSE event names — "messages" stays "messages" + return mode + + +def _unpack_stream_item( + item: Any, + lg_modes: list[str], + stream_subgraphs: bool, +) -> tuple[str | None, Any]: + """Unpack a multi-mode or subgraph stream item into (mode, chunk). + + Returns ``(None, None)`` if the item cannot be parsed. + """ + if stream_subgraphs: + if isinstance(item, tuple) and len(item) == 3: + _ns, mode, chunk = item + return str(mode), chunk + if isinstance(item, tuple) and len(item) == 2: + mode, chunk = item + return str(mode), chunk + return None, None + + if isinstance(item, tuple) and len(item) == 2: + mode, chunk = item + return str(mode), chunk + + # Fallback: single-element output from first mode + return lg_modes[0] if lg_modes else None, item diff --git a/backend/packages/harness/deerflow/runtime/serialization.py b/backend/packages/harness/deerflow/runtime/serialization.py new file mode 100644 index 000000000..48853dfb3 --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/serialization.py @@ -0,0 +1,78 @@ +"""Canonical serialization for LangChain / LangGraph objects. + +Provides a single source of truth for converting LangChain message +objects, Pydantic models, and LangGraph state dicts into plain +JSON-serialisable Python structures. + +Consumers: ``deerflow.runtime.runs.worker`` (SSE publishing) and +``app.gateway.routers.threads`` (REST responses). +""" + +from __future__ import annotations + +from typing import Any + + +def serialize_lc_object(obj: Any) -> Any: + """Recursively serialize a LangChain object to a JSON-serialisable dict.""" + if obj is None: + return None + if isinstance(obj, (str, int, float, bool)): + return obj + if isinstance(obj, dict): + return {k: serialize_lc_object(v) for k, v in obj.items()} + if isinstance(obj, (list, tuple)): + return [serialize_lc_object(item) for item in obj] + # Pydantic v2 + if hasattr(obj, "model_dump"): + try: + return obj.model_dump() + except Exception: + pass + # Pydantic v1 / older objects + if hasattr(obj, "dict"): + try: + return obj.dict() + except Exception: + pass + # Last resort + try: + return str(obj) + except Exception: + return repr(obj) + + +def serialize_channel_values(channel_values: dict[str, Any]) -> dict[str, Any]: + """Serialize channel values, stripping internal LangGraph keys. + + Internal keys like ``__pregel_*`` and ``__interrupt__`` are removed + to match what the LangGraph Platform API returns. + """ + result: dict[str, Any] = {} + for key, value in channel_values.items(): + if key.startswith("__pregel_") or key == "__interrupt__": + continue + result[key] = serialize_lc_object(value) + return result + + +def serialize_messages_tuple(obj: Any) -> Any: + """Serialize a messages-mode tuple ``(chunk, metadata)``.""" + if isinstance(obj, tuple) and len(obj) == 2: + chunk, metadata = obj + return [serialize_lc_object(chunk), metadata if isinstance(metadata, dict) else {}] + return serialize_lc_object(obj) + + +def serialize(obj: Any, *, mode: str = "") -> Any: + """Serialize LangChain objects with mode-specific handling. + + * ``messages`` — obj is ``(message_chunk, metadata_dict)`` + * ``values`` — obj is the full state dict; ``__pregel_*`` keys stripped + * everything else — recursive ``model_dump()`` / ``dict()`` fallback + """ + if mode == "messages": + return serialize_messages_tuple(obj) + if mode == "values": + return serialize_channel_values(obj) if isinstance(obj, dict) else serialize_lc_object(obj) + return serialize_lc_object(obj) diff --git a/backend/packages/harness/deerflow/runtime/store/__init__.py b/backend/packages/harness/deerflow/runtime/store/__init__.py new file mode 100644 index 000000000..2f5e77aaa --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/store/__init__.py @@ -0,0 +1,31 @@ +"""Store provider for the DeerFlow runtime. + +Re-exports the public API of both the async provider (for long-running +servers) and the sync provider (for CLI tools and the embedded client). + +Async usage (FastAPI lifespan):: + + from deerflow.runtime.store import make_store + + async with make_store() as store: + app.state.store = store + +Sync usage (CLI / DeerFlowClient):: + + from deerflow.runtime.store import get_store, store_context + + store = get_store() # singleton + with store_context() as store: ... # one-shot +""" + +from .async_provider import make_store +from .provider import get_store, reset_store, store_context + +__all__ = [ + # async + "make_store", + # sync + "get_store", + "reset_store", + "store_context", +] diff --git a/backend/packages/harness/deerflow/runtime/store/_sqlite_utils.py b/backend/packages/harness/deerflow/runtime/store/_sqlite_utils.py new file mode 100644 index 000000000..bb970e572 --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/store/_sqlite_utils.py @@ -0,0 +1,28 @@ +"""Shared SQLite connection utilities for store and checkpointer providers.""" + +from __future__ import annotations + +import pathlib + +from deerflow.config.paths import resolve_path + + +def resolve_sqlite_conn_str(raw: str) -> str: + """Return a SQLite connection string ready for use with store/checkpointer backends. + + SQLite special strings (``":memory:"`` and ``file:`` URIs) are returned + unchanged. Plain filesystem paths — relative or absolute — are resolved + to an absolute string via :func:`resolve_path`. + """ + if raw == ":memory:" or raw.startswith("file:"): + return raw + return str(resolve_path(raw)) + + +def ensure_sqlite_parent_dir(conn_str: str) -> None: + """Create parent directory for a SQLite filesystem path. + + No-op for in-memory databases (``":memory:"``) and ``file:`` URIs. + """ + if conn_str != ":memory:" and not conn_str.startswith("file:"): + pathlib.Path(conn_str).parent.mkdir(parents=True, exist_ok=True) diff --git a/backend/packages/harness/deerflow/runtime/store/async_provider.py b/backend/packages/harness/deerflow/runtime/store/async_provider.py new file mode 100644 index 000000000..bc7a60559 --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/store/async_provider.py @@ -0,0 +1,113 @@ +"""Async Store factory — backend mirrors the configured checkpointer. + +The store and checkpointer share the same ``checkpointer`` section in +*config.yaml* so they always use the same persistence backend: + +- ``type: memory`` → :class:`langgraph.store.memory.InMemoryStore` +- ``type: sqlite`` → :class:`langgraph.store.sqlite.aio.AsyncSqliteStore` +- ``type: postgres`` → :class:`langgraph.store.postgres.aio.AsyncPostgresStore` + +Usage (e.g. FastAPI lifespan):: + + from deerflow.runtime.store import make_store + + async with make_store() as store: + app.state.store = store +""" + +from __future__ import annotations + +import contextlib +import logging +from collections.abc import AsyncIterator + +from langgraph.store.base import BaseStore + +from deerflow.config.app_config import get_app_config +from deerflow.runtime.store.provider import POSTGRES_CONN_REQUIRED, POSTGRES_STORE_INSTALL, SQLITE_STORE_INSTALL, ensure_sqlite_parent_dir, resolve_sqlite_conn_str + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Internal backend factory +# --------------------------------------------------------------------------- + + +@contextlib.asynccontextmanager +async def _async_store(config) -> AsyncIterator[BaseStore]: + """Async context manager that constructs and tears down a Store. + + The ``config`` argument is a :class:`deerflow.config.checkpointer_config.CheckpointerConfig` + instance — the same object used by the checkpointer factory. + """ + if config.type == "memory": + from langgraph.store.memory import InMemoryStore + + logger.info("Store: using InMemoryStore (in-process, not persistent)") + yield InMemoryStore() + return + + if config.type == "sqlite": + try: + from langgraph.store.sqlite.aio import AsyncSqliteStore + except ImportError as exc: + raise ImportError(SQLITE_STORE_INSTALL) from exc + + conn_str = resolve_sqlite_conn_str(config.connection_string or "store.db") + ensure_sqlite_parent_dir(conn_str) + + async with AsyncSqliteStore.from_conn_string(conn_str) as store: + await store.setup() + logger.info("Store: using AsyncSqliteStore (%s)", conn_str) + yield store + return + + if config.type == "postgres": + try: + from langgraph.store.postgres.aio import AsyncPostgresStore # type: ignore[import] + except ImportError as exc: + raise ImportError(POSTGRES_STORE_INSTALL) from exc + + if not config.connection_string: + raise ValueError(POSTGRES_CONN_REQUIRED) + + async with AsyncPostgresStore.from_conn_string(config.connection_string) as store: + await store.setup() + logger.info("Store: using AsyncPostgresStore") + yield store + return + + raise ValueError(f"Unknown store backend type: {config.type!r}") + + +# --------------------------------------------------------------------------- +# Public async context manager +# --------------------------------------------------------------------------- + + +@contextlib.asynccontextmanager +async def make_store() -> AsyncIterator[BaseStore]: + """Async context manager that yields a Store whose backend matches the + configured checkpointer. + + Reads from the same ``checkpointer`` section of *config.yaml* used by + :func:`deerflow.agents.checkpointer.async_provider.make_checkpointer` so + that both singletons always use the same persistence technology:: + + async with make_store() as store: + app.state.store = store + + Yields an :class:`~langgraph.store.memory.InMemoryStore` when no + ``checkpointer`` section is configured (emits a WARNING in that case). + """ + config = get_app_config() + + if config.checkpointer is None: + from langgraph.store.memory import InMemoryStore + + logger.warning("No 'checkpointer' section in config.yaml — using InMemoryStore for the store. Thread list will be lost on server restart. Configure a sqlite or postgres backend for persistence.") + yield InMemoryStore() + return + + async with _async_store(config.checkpointer) as store: + yield store diff --git a/backend/packages/harness/deerflow/runtime/store/provider.py b/backend/packages/harness/deerflow/runtime/store/provider.py new file mode 100644 index 000000000..a9394fb9f --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/store/provider.py @@ -0,0 +1,188 @@ +"""Sync Store factory. + +Provides a **sync singleton** and a **sync context manager** for CLI tools +and the embedded :class:`~deerflow.client.DeerFlowClient`. + +The backend mirrors the configured checkpointer so that both always use the +same persistence technology. Supported backends: memory, sqlite, postgres. + +Usage:: + + from deerflow.runtime.store.provider import get_store, store_context + + # Singleton — reused across calls, closed on process exit + store = get_store() + + # One-shot — fresh connection, closed on block exit + with store_context() as store: + store.put(("ns",), "key", {"value": 1}) +""" + +from __future__ import annotations + +import contextlib +import logging +from collections.abc import Iterator + +from langgraph.store.base import BaseStore + +from deerflow.config.app_config import get_app_config +from deerflow.runtime.store._sqlite_utils import ensure_sqlite_parent_dir, resolve_sqlite_conn_str + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Error message constants +# --------------------------------------------------------------------------- + +SQLITE_STORE_INSTALL = "langgraph-checkpoint-sqlite is required for the SQLite store. Install it with: uv add langgraph-checkpoint-sqlite" +POSTGRES_STORE_INSTALL = "langgraph-checkpoint-postgres is required for the PostgreSQL store. Install it with: uv add langgraph-checkpoint-postgres psycopg[binary] psycopg-pool" +POSTGRES_CONN_REQUIRED = "checkpointer.connection_string is required for the postgres backend" + +# --------------------------------------------------------------------------- +# Sync factory +# --------------------------------------------------------------------------- + + +@contextlib.contextmanager +def _sync_store_cm(config) -> Iterator[BaseStore]: + """Context manager that creates and tears down a sync Store. + + The ``config`` argument is a + :class:`~deerflow.config.checkpointer_config.CheckpointerConfig` instance — + the same object used by the checkpointer factory. + """ + if config.type == "memory": + from langgraph.store.memory import InMemoryStore + + logger.info("Store: using InMemoryStore (in-process, not persistent)") + yield InMemoryStore() + return + + if config.type == "sqlite": + try: + from langgraph.store.sqlite import SqliteStore + except ImportError as exc: + raise ImportError(SQLITE_STORE_INSTALL) from exc + + conn_str = resolve_sqlite_conn_str(config.connection_string or "store.db") + ensure_sqlite_parent_dir(conn_str) + + with SqliteStore.from_conn_string(conn_str) as store: + store.setup() + logger.info("Store: using SqliteStore (%s)", conn_str) + yield store + return + + if config.type == "postgres": + try: + from langgraph.store.postgres import PostgresStore # type: ignore[import] + except ImportError as exc: + raise ImportError(POSTGRES_STORE_INSTALL) from exc + + if not config.connection_string: + raise ValueError(POSTGRES_CONN_REQUIRED) + + with PostgresStore.from_conn_string(config.connection_string) as store: + store.setup() + logger.info("Store: using PostgresStore") + yield store + return + + raise ValueError(f"Unknown store backend type: {config.type!r}") + + +# --------------------------------------------------------------------------- +# Sync singleton +# --------------------------------------------------------------------------- + +_store: BaseStore | None = None +_store_ctx = None # open context manager keeping the connection alive + + +def get_store() -> BaseStore: + """Return the global sync Store singleton, creating it on first call. + + Returns an :class:`~langgraph.store.memory.InMemoryStore` when no + checkpointer is configured in *config.yaml* (emits a WARNING in that case). + + Raises: + ImportError: If the required package for the configured backend is not installed. + ValueError: If ``connection_string`` is missing for a backend that requires it. + """ + global _store, _store_ctx + + if _store is not None: + return _store + + # Lazily load app config, mirroring the checkpointer singleton pattern so + # that tests that set the global checkpointer config explicitly remain isolated. + from deerflow.config.app_config import _app_config + from deerflow.config.checkpointer_config import get_checkpointer_config + + config = get_checkpointer_config() + + if config is None and _app_config is None: + try: + get_app_config() + except FileNotFoundError: + pass + config = get_checkpointer_config() + + if config is None: + from langgraph.store.memory import InMemoryStore + + logger.warning("No 'checkpointer' section in config.yaml — using InMemoryStore for the store. Thread list will be lost on server restart. Configure a sqlite or postgres backend for persistence.") + _store = InMemoryStore() + return _store + + _store_ctx = _sync_store_cm(config) + _store = _store_ctx.__enter__() + return _store + + +def reset_store() -> None: + """Reset the sync singleton, forcing recreation on the next call. + + Closes any open backend connections and clears the cached instance. + Useful in tests or after a configuration change. + """ + global _store, _store_ctx + if _store_ctx is not None: + try: + _store_ctx.__exit__(None, None, None) + except Exception: + logger.warning("Error during store cleanup", exc_info=True) + _store_ctx = None + _store = None + + +# --------------------------------------------------------------------------- +# Sync context manager +# --------------------------------------------------------------------------- + + +@contextlib.contextmanager +def store_context() -> Iterator[BaseStore]: + """Sync context manager that yields a Store and cleans up on exit. + + Unlike :func:`get_store`, this does **not** cache the instance — each + ``with`` block creates and destroys its own connection. Use it in CLI + scripts or tests where you want deterministic cleanup:: + + with store_context() as store: + store.put(("threads",), thread_id, {...}) + + Yields an :class:`~langgraph.store.memory.InMemoryStore` when no + checkpointer is configured in *config.yaml*. + """ + config = get_app_config() + if config.checkpointer is None: + from langgraph.store.memory import InMemoryStore + + logger.warning("No 'checkpointer' section in config.yaml — using InMemoryStore for the store. Thread list will be lost on server restart. Configure a sqlite or postgres backend for persistence.") + yield InMemoryStore() + return + + with _sync_store_cm(config.checkpointer) as store: + yield store diff --git a/backend/packages/harness/deerflow/runtime/stream_bridge/__init__.py b/backend/packages/harness/deerflow/runtime/stream_bridge/__init__.py new file mode 100644 index 000000000..435520c48 --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/stream_bridge/__init__.py @@ -0,0 +1,21 @@ +"""Stream bridge — decouples agent workers from SSE endpoints. + +A ``StreamBridge`` sits between the background task that runs an agent +(producer) and the HTTP endpoint that pushes Server-Sent Events to +the client (consumer). This package provides an abstract protocol +(:class:`StreamBridge`) plus a default in-memory implementation backed +by :mod:`asyncio.Queue`. +""" + +from .async_provider import make_stream_bridge +from .base import END_SENTINEL, HEARTBEAT_SENTINEL, StreamBridge, StreamEvent +from .memory import MemoryStreamBridge + +__all__ = [ + "END_SENTINEL", + "HEARTBEAT_SENTINEL", + "MemoryStreamBridge", + "StreamBridge", + "StreamEvent", + "make_stream_bridge", +] diff --git a/backend/packages/harness/deerflow/runtime/stream_bridge/async_provider.py b/backend/packages/harness/deerflow/runtime/stream_bridge/async_provider.py new file mode 100644 index 000000000..891f79fa0 --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/stream_bridge/async_provider.py @@ -0,0 +1,52 @@ +"""Async stream bridge factory. + +Provides an **async context manager** aligned with +:func:`deerflow.agents.checkpointer.async_provider.make_checkpointer`. + +Usage (e.g. FastAPI lifespan):: + + from deerflow.agents.stream_bridge import make_stream_bridge + + async with make_stream_bridge() as bridge: + app.state.stream_bridge = bridge +""" + +from __future__ import annotations + +import contextlib +import logging +from collections.abc import AsyncIterator + +from deerflow.config.stream_bridge_config import get_stream_bridge_config + +from .base import StreamBridge + +logger = logging.getLogger(__name__) + + +@contextlib.asynccontextmanager +async def make_stream_bridge(config=None) -> AsyncIterator[StreamBridge]: + """Async context manager that yields a :class:`StreamBridge`. + + Falls back to :class:`MemoryStreamBridge` when no configuration is + provided and nothing is set globally. + """ + if config is None: + config = get_stream_bridge_config() + + if config is None or config.type == "memory": + from deerflow.runtime.stream_bridge.memory import MemoryStreamBridge + + maxsize = config.queue_maxsize if config is not None else 256 + bridge = MemoryStreamBridge(queue_maxsize=maxsize) + logger.info("Stream bridge initialised: memory (queue_maxsize=%d)", maxsize) + try: + yield bridge + finally: + await bridge.close() + return + + if config.type == "redis": + raise NotImplementedError("Redis stream bridge planned for Phase 2") + + raise ValueError(f"Unknown stream bridge type: {config.type!r}") diff --git a/backend/packages/harness/deerflow/runtime/stream_bridge/base.py b/backend/packages/harness/deerflow/runtime/stream_bridge/base.py new file mode 100644 index 000000000..c34353a08 --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/stream_bridge/base.py @@ -0,0 +1,72 @@ +"""Abstract stream bridge protocol. + +StreamBridge decouples agent workers (producers) from SSE endpoints +(consumers), aligning with LangGraph Platform's Queue + StreamManager +architecture. +""" + +from __future__ import annotations + +import abc +from collections.abc import AsyncIterator +from dataclasses import dataclass +from typing import Any + + +@dataclass(frozen=True) +class StreamEvent: + """Single stream event. + + Attributes: + id: Monotonically increasing event ID (used as SSE ``id:`` field, + supports ``Last-Event-ID`` reconnection). + event: SSE event name, e.g. ``"metadata"``, ``"updates"``, + ``"events"``, ``"error"``, ``"end"``. + data: JSON-serialisable payload. + """ + + id: str + event: str + data: Any + + +HEARTBEAT_SENTINEL = StreamEvent(id="", event="__heartbeat__", data=None) +END_SENTINEL = StreamEvent(id="", event="__end__", data=None) + + +class StreamBridge(abc.ABC): + """Abstract base for stream bridges.""" + + @abc.abstractmethod + async def publish(self, run_id: str, event: str, data: Any) -> None: + """Enqueue a single event for *run_id* (producer side).""" + + @abc.abstractmethod + async def publish_end(self, run_id: str) -> None: + """Signal that no more events will be produced for *run_id*.""" + + @abc.abstractmethod + def subscribe( + self, + run_id: str, + *, + last_event_id: str | None = None, + heartbeat_interval: float = 15.0, + ) -> AsyncIterator[StreamEvent]: + """Async iterator that yields events for *run_id* (consumer side). + + Yields :data:`HEARTBEAT_SENTINEL` when no event arrives within + *heartbeat_interval* seconds. Yields :data:`END_SENTINEL` once + the producer calls :meth:`publish_end`. + """ + + @abc.abstractmethod + async def cleanup(self, run_id: str, *, delay: float = 0) -> None: + """Release resources associated with *run_id*. + + If *delay* > 0 the implementation should wait before releasing, + giving late subscribers a chance to drain remaining events. + """ + + async def close(self) -> None: + """Release backend resources. Default is a no-op.""" diff --git a/backend/packages/harness/deerflow/runtime/stream_bridge/memory.py b/backend/packages/harness/deerflow/runtime/stream_bridge/memory.py new file mode 100644 index 000000000..5056e6ca3 --- /dev/null +++ b/backend/packages/harness/deerflow/runtime/stream_bridge/memory.py @@ -0,0 +1,90 @@ +"""In-memory stream bridge backed by :class:`asyncio.Queue`.""" + +from __future__ import annotations + +import asyncio +import logging +import time +from collections.abc import AsyncIterator +from typing import Any + +from .base import END_SENTINEL, HEARTBEAT_SENTINEL, StreamBridge, StreamEvent + +logger = logging.getLogger(__name__) + +_PUBLISH_TIMEOUT = 30.0 # seconds to wait when queue is full + + +class MemoryStreamBridge(StreamBridge): + """Per-run ``asyncio.Queue`` implementation. + + Each *run_id* gets its own queue on first :meth:`publish` call. + """ + + def __init__(self, *, queue_maxsize: int = 256) -> None: + self._maxsize = queue_maxsize + self._queues: dict[str, asyncio.Queue[StreamEvent]] = {} + self._counters: dict[str, int] = {} + + # -- helpers --------------------------------------------------------------- + + def _get_or_create_queue(self, run_id: str) -> asyncio.Queue[StreamEvent]: + if run_id not in self._queues: + self._queues[run_id] = asyncio.Queue(maxsize=self._maxsize) + self._counters[run_id] = 0 + return self._queues[run_id] + + def _next_id(self, run_id: str) -> str: + self._counters[run_id] = self._counters.get(run_id, 0) + 1 + ts = int(time.time() * 1000) + seq = self._counters[run_id] - 1 + return f"{ts}-{seq}" + + # -- StreamBridge API ------------------------------------------------------ + + async def publish(self, run_id: str, event: str, data: Any) -> None: + queue = self._get_or_create_queue(run_id) + entry = StreamEvent(id=self._next_id(run_id), event=event, data=data) + try: + await asyncio.wait_for(queue.put(entry), timeout=_PUBLISH_TIMEOUT) + except TimeoutError: + logger.warning("Stream bridge queue full for run %s — dropping event %s", run_id, event) + + async def publish_end(self, run_id: str) -> None: + queue = self._get_or_create_queue(run_id) + try: + await asyncio.wait_for(queue.put(END_SENTINEL), timeout=_PUBLISH_TIMEOUT) + except TimeoutError: + logger.warning("Stream bridge queue full for run %s — dropping END sentinel", run_id) + + async def subscribe( + self, + run_id: str, + *, + last_event_id: str | None = None, + heartbeat_interval: float = 15.0, + ) -> AsyncIterator[StreamEvent]: + if last_event_id is not None: + logger.debug("last_event_id=%s accepted but ignored (memory bridge has no replay)", last_event_id) + + queue = self._get_or_create_queue(run_id) + while True: + try: + entry = await asyncio.wait_for(queue.get(), timeout=heartbeat_interval) + except TimeoutError: + yield HEARTBEAT_SENTINEL + continue + if entry is END_SENTINEL: + yield END_SENTINEL + return + yield entry + + async def cleanup(self, run_id: str, *, delay: float = 0) -> None: + if delay > 0: + await asyncio.sleep(delay) + self._queues.pop(run_id, None) + self._counters.pop(run_id, None) + + async def close(self) -> None: + self._queues.clear() + self._counters.clear() diff --git a/backend/tests/test_gateway_services.py b/backend/tests/test_gateway_services.py new file mode 100644 index 000000000..730616027 --- /dev/null +++ b/backend/tests/test_gateway_services.py @@ -0,0 +1,102 @@ +"""Tests for app.gateway.services — run lifecycle service layer.""" + +from __future__ import annotations + +import json + + +def test_format_sse_basic(): + from app.gateway.services import format_sse + + frame = format_sse("metadata", {"run_id": "abc"}) + assert frame.startswith("event: metadata\n") + assert "data: " in frame + parsed = json.loads(frame.split("data: ")[1].split("\n")[0]) + assert parsed["run_id"] == "abc" + + +def test_format_sse_with_event_id(): + from app.gateway.services import format_sse + + frame = format_sse("metadata", {"run_id": "abc"}, event_id="123-0") + assert "id: 123-0" in frame + + +def test_format_sse_end_event_null(): + from app.gateway.services import format_sse + + frame = format_sse("end", None) + assert "data: null" in frame + + +def test_format_sse_no_event_id(): + from app.gateway.services import format_sse + + frame = format_sse("values", {"x": 1}) + assert "id:" not in frame + + +def test_normalize_stream_modes_none(): + from app.gateway.services import normalize_stream_modes + + assert normalize_stream_modes(None) == ["values"] + + +def test_normalize_stream_modes_string(): + from app.gateway.services import normalize_stream_modes + + assert normalize_stream_modes("messages-tuple") == ["messages-tuple"] + + +def test_normalize_stream_modes_list(): + from app.gateway.services import normalize_stream_modes + + assert normalize_stream_modes(["values", "messages-tuple"]) == ["values", "messages-tuple"] + + +def test_normalize_stream_modes_empty_list(): + from app.gateway.services import normalize_stream_modes + + assert normalize_stream_modes([]) == ["values"] + + +def test_normalize_input_none(): + from app.gateway.services import normalize_input + + assert normalize_input(None) == {} + + +def test_normalize_input_with_messages(): + from app.gateway.services import normalize_input + + result = normalize_input({"messages": [{"role": "user", "content": "hi"}]}) + assert len(result["messages"]) == 1 + assert result["messages"][0].content == "hi" + + +def test_normalize_input_passthrough(): + from app.gateway.services import normalize_input + + result = normalize_input({"custom_key": "value"}) + assert result == {"custom_key": "value"} + + +def test_build_run_config_basic(): + from app.gateway.services import build_run_config + + config = build_run_config("thread-1", None, None) + assert config["configurable"]["thread_id"] == "thread-1" + assert config["recursion_limit"] == 100 + + +def test_build_run_config_with_overrides(): + from app.gateway.services import build_run_config + + config = build_run_config( + "thread-1", + {"configurable": {"model_name": "gpt-4"}, "tags": ["test"]}, + {"user": "alice"}, + ) + assert config["configurable"]["model_name"] == "gpt-4" + assert config["tags"] == ["test"] + assert config["metadata"]["user"] == "alice" diff --git a/backend/tests/test_run_manager.py b/backend/tests/test_run_manager.py new file mode 100644 index 000000000..1e6526d6e --- /dev/null +++ b/backend/tests/test_run_manager.py @@ -0,0 +1,131 @@ +"""Tests for RunManager.""" + +import re + +import pytest + +from deerflow.runtime import RunManager, RunStatus + +ISO_RE = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}") + + +@pytest.fixture +def manager() -> RunManager: + return RunManager() + + +@pytest.mark.anyio +async def test_create_and_get(manager: RunManager): + """Created run should be retrievable with new fields.""" + record = await manager.create( + "thread-1", + "lead_agent", + metadata={"key": "val"}, + kwargs={"input": {}}, + multitask_strategy="reject", + ) + assert record.status == RunStatus.pending + assert record.thread_id == "thread-1" + assert record.assistant_id == "lead_agent" + assert record.metadata == {"key": "val"} + assert record.kwargs == {"input": {}} + assert record.multitask_strategy == "reject" + assert ISO_RE.match(record.created_at) + assert ISO_RE.match(record.updated_at) + + fetched = manager.get(record.run_id) + assert fetched is record + + +@pytest.mark.anyio +async def test_status_transitions(manager: RunManager): + """Status should transition pending -> running -> success.""" + record = await manager.create("thread-1") + assert record.status == RunStatus.pending + + await manager.set_status(record.run_id, RunStatus.running) + assert record.status == RunStatus.running + assert ISO_RE.match(record.updated_at) + + await manager.set_status(record.run_id, RunStatus.success) + assert record.status == RunStatus.success + + +@pytest.mark.anyio +async def test_cancel(manager: RunManager): + """Cancel should set abort_event and transition to interrupted.""" + record = await manager.create("thread-1") + await manager.set_status(record.run_id, RunStatus.running) + + cancelled = await manager.cancel(record.run_id) + assert cancelled is True + assert record.abort_event.is_set() + assert record.status == RunStatus.interrupted + + +@pytest.mark.anyio +async def test_cancel_not_inflight(manager: RunManager): + """Cancelling a completed run should return False.""" + record = await manager.create("thread-1") + await manager.set_status(record.run_id, RunStatus.success) + + cancelled = await manager.cancel(record.run_id) + assert cancelled is False + + +@pytest.mark.anyio +async def test_list_by_thread(manager: RunManager): + """Same thread should return multiple runs, newest first.""" + r1 = await manager.create("thread-1") + r2 = await manager.create("thread-1") + await manager.create("thread-2") + + runs = await manager.list_by_thread("thread-1") + assert len(runs) == 2 + assert runs[0].run_id == r2.run_id + assert runs[1].run_id == r1.run_id + + +@pytest.mark.anyio +async def test_has_inflight(manager: RunManager): + """has_inflight should be True when a run is pending or running.""" + record = await manager.create("thread-1") + assert await manager.has_inflight("thread-1") is True + + await manager.set_status(record.run_id, RunStatus.success) + assert await manager.has_inflight("thread-1") is False + + +@pytest.mark.anyio +async def test_cleanup(manager: RunManager): + """After cleanup, the run should be gone.""" + record = await manager.create("thread-1") + run_id = record.run_id + + await manager.cleanup(run_id, delay=0) + assert manager.get(run_id) is None + + +@pytest.mark.anyio +async def test_set_status_with_error(manager: RunManager): + """Error message should be stored on the record.""" + record = await manager.create("thread-1") + await manager.set_status(record.run_id, RunStatus.error, error="Something went wrong") + assert record.status == RunStatus.error + assert record.error == "Something went wrong" + + +@pytest.mark.anyio +async def test_get_nonexistent(manager: RunManager): + """Getting a nonexistent run should return None.""" + assert manager.get("does-not-exist") is None + + +@pytest.mark.anyio +async def test_create_defaults(manager: RunManager): + """Create with no optional args should use defaults.""" + record = await manager.create("thread-1") + assert record.metadata == {} + assert record.kwargs == {} + assert record.multitask_strategy == "reject" + assert record.assistant_id is None diff --git a/backend/tests/test_serialization.py b/backend/tests/test_serialization.py new file mode 100644 index 000000000..b707d7143 --- /dev/null +++ b/backend/tests/test_serialization.py @@ -0,0 +1,159 @@ +"""Tests for deerflow.runtime.serialization.""" + +from __future__ import annotations + + +class _FakePydanticV2: + """Object with model_dump (Pydantic v2).""" + + def model_dump(self): + return {"key": "v2"} + + +class _FakePydanticV1: + """Object with dict (Pydantic v1).""" + + def dict(self): + return {"key": "v1"} + + +class _Unprintable: + """Object whose str() raises.""" + + def __str__(self): + raise RuntimeError("no str") + + def __repr__(self): + return "" + + +def test_serialize_none(): + from deerflow.runtime.serialization import serialize_lc_object + + assert serialize_lc_object(None) is None + + +def test_serialize_primitives(): + from deerflow.runtime.serialization import serialize_lc_object + + assert serialize_lc_object("hello") == "hello" + assert serialize_lc_object(42) == 42 + assert serialize_lc_object(3.14) == 3.14 + assert serialize_lc_object(True) is True + + +def test_serialize_dict(): + from deerflow.runtime.serialization import serialize_lc_object + + obj = {"a": _FakePydanticV2(), "b": [1, "two"]} + result = serialize_lc_object(obj) + assert result == {"a": {"key": "v2"}, "b": [1, "two"]} + + +def test_serialize_list(): + from deerflow.runtime.serialization import serialize_lc_object + + result = serialize_lc_object([_FakePydanticV1(), 1]) + assert result == [{"key": "v1"}, 1] + + +def test_serialize_tuple(): + from deerflow.runtime.serialization import serialize_lc_object + + result = serialize_lc_object((_FakePydanticV2(),)) + assert result == [{"key": "v2"}] + + +def test_serialize_pydantic_v2(): + from deerflow.runtime.serialization import serialize_lc_object + + assert serialize_lc_object(_FakePydanticV2()) == {"key": "v2"} + + +def test_serialize_pydantic_v1(): + from deerflow.runtime.serialization import serialize_lc_object + + assert serialize_lc_object(_FakePydanticV1()) == {"key": "v1"} + + +def test_serialize_fallback_str(): + from deerflow.runtime.serialization import serialize_lc_object + + result = serialize_lc_object(object()) + assert isinstance(result, str) + + +def test_serialize_fallback_repr(): + from deerflow.runtime.serialization import serialize_lc_object + + assert serialize_lc_object(_Unprintable()) == "" + + +def test_serialize_channel_values_strips_pregel_keys(): + from deerflow.runtime.serialization import serialize_channel_values + + raw = { + "messages": ["hello"], + "__pregel_tasks": "internal", + "__pregel_resuming": True, + "__interrupt__": "stop", + "title": "Test", + } + result = serialize_channel_values(raw) + assert "messages" in result + assert "title" in result + assert "__pregel_tasks" not in result + assert "__pregel_resuming" not in result + assert "__interrupt__" not in result + + +def test_serialize_channel_values_serializes_objects(): + from deerflow.runtime.serialization import serialize_channel_values + + result = serialize_channel_values({"obj": _FakePydanticV2()}) + assert result == {"obj": {"key": "v2"}} + + +def test_serialize_messages_tuple(): + from deerflow.runtime.serialization import serialize_messages_tuple + + chunk = _FakePydanticV2() + metadata = {"langgraph_node": "agent"} + result = serialize_messages_tuple((chunk, metadata)) + assert result == [{"key": "v2"}, {"langgraph_node": "agent"}] + + +def test_serialize_messages_tuple_non_dict_metadata(): + from deerflow.runtime.serialization import serialize_messages_tuple + + result = serialize_messages_tuple((_FakePydanticV2(), "not-a-dict")) + assert result == [{"key": "v2"}, {}] + + +def test_serialize_messages_tuple_fallback(): + from deerflow.runtime.serialization import serialize_messages_tuple + + result = serialize_messages_tuple("not-a-tuple") + assert result == "not-a-tuple" + + +def test_serialize_dispatcher_messages_mode(): + from deerflow.runtime.serialization import serialize + + chunk = _FakePydanticV2() + result = serialize((chunk, {"node": "x"}), mode="messages") + assert result == [{"key": "v2"}, {"node": "x"}] + + +def test_serialize_dispatcher_values_mode(): + from deerflow.runtime.serialization import serialize + + result = serialize({"msg": "hi", "__pregel_tasks": "x"}, mode="values") + assert result == {"msg": "hi"} + + +def test_serialize_dispatcher_default_mode(): + from deerflow.runtime.serialization import serialize + + result = serialize(_FakePydanticV1()) + assert result == {"key": "v1"} diff --git a/backend/tests/test_sse_format.py b/backend/tests/test_sse_format.py new file mode 100644 index 000000000..5647a22a1 --- /dev/null +++ b/backend/tests/test_sse_format.py @@ -0,0 +1,30 @@ +"""Tests for SSE frame formatting utilities.""" + +import json + + +def _format_sse(event: str, data, *, event_id: str | None = None) -> str: + from app.gateway.services import format_sse + + return format_sse(event, data, event_id=event_id) + + +def test_sse_end_event_data_null(): + """End event should have data: null.""" + frame = _format_sse("end", None) + assert "data: null" in frame + + +def test_sse_metadata_event(): + """Metadata event should include run_id and attempt.""" + frame = _format_sse("metadata", {"run_id": "abc", "attempt": 1}, event_id="123-0") + assert "event: metadata" in frame + assert "id: 123-0" in frame + + +def test_sse_error_format(): + """Error event should use message/name format.""" + frame = _format_sse("error", {"message": "boom", "name": "ValueError"}) + parsed = json.loads(frame.split("data: ")[1].split("\n")[0]) + assert parsed["message"] == "boom" + assert parsed["name"] == "ValueError" diff --git a/backend/tests/test_stream_bridge.py b/backend/tests/test_stream_bridge.py new file mode 100644 index 000000000..34d2e2811 --- /dev/null +++ b/backend/tests/test_stream_bridge.py @@ -0,0 +1,152 @@ +"""Tests for the in-memory StreamBridge implementation.""" + +import asyncio +import re + +import pytest + +from deerflow.runtime import END_SENTINEL, HEARTBEAT_SENTINEL, MemoryStreamBridge, make_stream_bridge + +# --------------------------------------------------------------------------- +# Unit tests for MemoryStreamBridge +# --------------------------------------------------------------------------- + + +@pytest.fixture +def bridge() -> MemoryStreamBridge: + return MemoryStreamBridge(queue_maxsize=256) + + +@pytest.mark.anyio +async def test_publish_subscribe(bridge: MemoryStreamBridge): + """Three events followed by end should be received in order.""" + run_id = "run-1" + + await bridge.publish(run_id, "metadata", {"run_id": run_id}) + await bridge.publish(run_id, "values", {"messages": []}) + await bridge.publish(run_id, "updates", {"step": 1}) + await bridge.publish_end(run_id) + + received = [] + async for entry in bridge.subscribe(run_id, heartbeat_interval=1.0): + received.append(entry) + if entry is END_SENTINEL: + break + + assert len(received) == 4 + assert received[0].event == "metadata" + assert received[1].event == "values" + assert received[2].event == "updates" + assert received[3] is END_SENTINEL + + +@pytest.mark.anyio +async def test_heartbeat(bridge: MemoryStreamBridge): + """When no events arrive within the heartbeat interval, yield a heartbeat.""" + run_id = "run-heartbeat" + bridge._get_or_create_queue(run_id) # ensure queue exists + + received = [] + + async def consumer(): + async for entry in bridge.subscribe(run_id, heartbeat_interval=0.1): + received.append(entry) + if entry is HEARTBEAT_SENTINEL: + break + + await asyncio.wait_for(consumer(), timeout=2.0) + assert len(received) == 1 + assert received[0] is HEARTBEAT_SENTINEL + + +@pytest.mark.anyio +async def test_cleanup(bridge: MemoryStreamBridge): + """After cleanup, the run's queue is removed.""" + run_id = "run-cleanup" + await bridge.publish(run_id, "test", {}) + assert run_id in bridge._queues + + await bridge.cleanup(run_id) + assert run_id not in bridge._queues + assert run_id not in bridge._counters + + +@pytest.mark.anyio +async def test_backpressure(): + """With maxsize=1, publish should not block forever.""" + bridge = MemoryStreamBridge(queue_maxsize=1) + run_id = "run-bp" + + await bridge.publish(run_id, "first", {}) + + # Second publish should either succeed after queue drains or warn+drop + # It should not hang indefinitely + async def publish_second(): + await bridge.publish(run_id, "second", {}) + + # Give it a generous timeout — the publish timeout is 30s but we don't + # want to wait that long in tests. Instead, drain the queue first. + async def drain(): + await asyncio.sleep(0.05) + bridge._queues[run_id].get_nowait() + + await asyncio.gather(publish_second(), drain()) + assert bridge._queues[run_id].qsize() == 1 + + +@pytest.mark.anyio +async def test_multiple_runs(bridge: MemoryStreamBridge): + """Two different run_ids should not interfere with each other.""" + await bridge.publish("run-a", "event-a", {"a": 1}) + await bridge.publish("run-b", "event-b", {"b": 2}) + await bridge.publish_end("run-a") + await bridge.publish_end("run-b") + + events_a = [] + async for entry in bridge.subscribe("run-a", heartbeat_interval=1.0): + events_a.append(entry) + if entry is END_SENTINEL: + break + + events_b = [] + async for entry in bridge.subscribe("run-b", heartbeat_interval=1.0): + events_b.append(entry) + if entry is END_SENTINEL: + break + + assert len(events_a) == 2 + assert events_a[0].event == "event-a" + assert events_a[0].data == {"a": 1} + + assert len(events_b) == 2 + assert events_b[0].event == "event-b" + assert events_b[0].data == {"b": 2} + + +@pytest.mark.anyio +async def test_event_id_format(bridge: MemoryStreamBridge): + """Event IDs should use timestamp-sequence format.""" + run_id = "run-id-format" + await bridge.publish(run_id, "test", {"key": "value"}) + await bridge.publish_end(run_id) + + received = [] + async for entry in bridge.subscribe(run_id, heartbeat_interval=1.0): + received.append(entry) + if entry is END_SENTINEL: + break + + event = received[0] + assert re.match(r"^\d+-\d+$", event.id), f"Expected timestamp-seq format, got {event.id}" + + +# --------------------------------------------------------------------------- +# Factory tests +# --------------------------------------------------------------------------- + + +@pytest.mark.anyio +async def test_make_stream_bridge_defaults(): + """make_stream_bridge() with no config yields a MemoryStreamBridge.""" + async with make_stream_bridge() as bridge: + assert isinstance(bridge, MemoryStreamBridge) diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf index 744e82315..fdb5e0a50 100644 --- a/docker/nginx/nginx.conf +++ b/docker/nginx/nginx.conf @@ -85,6 +85,34 @@ http { chunked_transfer_encoding on; } + # Experimental: Gateway-backed LangGraph-compatible API + # Frontend can opt-in via NEXT_PUBLIC_LANGGRAPH_BASE_URL=/api/langgraph-compat + location /api/langgraph-compat/ { + rewrite ^/api/langgraph-compat/(.*) /api/$1 break; + proxy_pass http://gateway; + proxy_http_version 1.1; + + # Headers + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header Connection ''; + + # SSE/Streaming support + proxy_buffering off; + proxy_cache off; + proxy_set_header X-Accel-Buffering no; + + # Timeouts for long-running requests + proxy_connect_timeout 600s; + proxy_send_timeout 600s; + proxy_read_timeout 600s; + + # Chunked transfer encoding + chunked_transfer_encoding on; + } + # Custom API: Models endpoint location /api/models { proxy_pass http://gateway; diff --git a/docker/nginx/nginx.local.conf b/docker/nginx/nginx.local.conf index 289c3889e..f8dc5b609 100644 --- a/docker/nginx/nginx.local.conf +++ b/docker/nginx/nginx.local.conf @@ -48,8 +48,8 @@ http { return 204; } - # LangGraph API routes - # Rewrites /api/langgraph/* to /* before proxying + # LangGraph API routes (served by langgraph dev) + # Rewrites /api/langgraph/* to /* before proxying to LangGraph server location /api/langgraph/ { rewrite ^/api/langgraph/(.*) /$1 break; proxy_pass http://langgraph; @@ -76,6 +76,34 @@ http { chunked_transfer_encoding on; } + # Experimental: Gateway-backed LangGraph-compatible API + # Frontend can opt-in via NEXT_PUBLIC_LANGGRAPH_BASE_URL=/api/langgraph-compat + location /api/langgraph-compat/ { + rewrite ^/api/langgraph-compat/(.*) /api/$1 break; + proxy_pass http://gateway; + proxy_http_version 1.1; + + # Headers + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header Connection ''; + + # SSE/Streaming support + proxy_buffering off; + proxy_cache off; + proxy_set_header X-Accel-Buffering no; + + # Timeouts for long-running requests + proxy_connect_timeout 600s; + proxy_send_timeout 600s; + proxy_read_timeout 600s; + + # Chunked transfer encoding + chunked_transfer_encoding on; + } + # Custom API: Models endpoint location /api/models { proxy_pass http://gateway; diff --git a/frontend/.env.example b/frontend/.env.example index 75d14f611..96c1431c8 100644 --- a/frontend/.env.example +++ b/frontend/.env.example @@ -15,3 +15,9 @@ # NEXT_PUBLIC_BACKEND_BASE_URL="http://localhost:8001" # NEXT_PUBLIC_LANGGRAPH_BASE_URL="http://localhost:2024" +# LangGraph API base URL +# Default: /api/langgraph (uses langgraph dev server via nginx) +# Set to /api/langgraph-compat to use the experimental Gateway-backed runtime +# Requires: SKIP_LANGGRAPH_SERVER=1 in serve.sh (optional, saves resources) +# NEXT_PUBLIC_LANGGRAPH_BASE_URL=/api/langgraph-compat + diff --git a/scripts/serve.sh b/scripts/serve.sh index 7f1d82988..d5d3b42cf 100755 --- a/scripts/serve.sh +++ b/scripts/serve.sh @@ -95,7 +95,9 @@ cleanup() { trap - INT TERM echo "" echo "Shutting down services..." - pkill -f "langgraph dev" 2>/dev/null || true + if [ "${SKIP_LANGGRAPH_SERVER:-0}" != "1" ]; then + pkill -f "langgraph dev" 2>/dev/null || true + fi pkill -f "uvicorn app.gateway.app:app" 2>/dev/null || true pkill -f "next dev" 2>/dev/null || true pkill -f "next start" 2>/dev/null || true @@ -128,21 +130,26 @@ else GATEWAY_EXTRA_FLAGS="" fi -echo "Starting LangGraph server..." -# Read log_level from config.yaml, fallback to env var, then to "info" -CONFIG_LOG_LEVEL=$(grep -m1 '^log_level:' config.yaml 2>/dev/null | awk '{print $2}' | tr -d ' ') -LANGGRAPH_LOG_LEVEL="${LANGGRAPH_LOG_LEVEL:-${CONFIG_LOG_LEVEL:-info}}" -(cd backend && NO_COLOR=1 uv run langgraph dev --no-browser --allow-blocking --server-log-level $LANGGRAPH_LOG_LEVEL $LANGGRAPH_EXTRA_FLAGS > ../logs/langgraph.log 2>&1) & -./scripts/wait-for-port.sh 2024 60 "LangGraph" || { - echo " See logs/langgraph.log for details" - tail -20 logs/langgraph.log - if grep -qE "config_version|outdated|Environment variable .* not found|KeyError|ValidationError|config\.yaml" logs/langgraph.log 2>/dev/null; then - echo "" - echo " Hint: This may be a configuration issue. Try running 'make config-upgrade' to update your config.yaml." - fi - cleanup -} -echo "✓ LangGraph server started on localhost:2024" +if [ "${SKIP_LANGGRAPH_SERVER:-0}" != "1" ]; then + echo "Starting LangGraph server..." + # Read log_level from config.yaml, fallback to env var, then to "info" + CONFIG_LOG_LEVEL=$(grep -m1 '^log_level:' config.yaml 2>/dev/null | awk '{print $2}' | tr -d ' ') + LANGGRAPH_LOG_LEVEL="${LANGGRAPH_LOG_LEVEL:-${CONFIG_LOG_LEVEL:-info}}" + (cd backend && NO_COLOR=1 uv run langgraph dev --no-browser --allow-blocking --server-log-level $LANGGRAPH_LOG_LEVEL $LANGGRAPH_EXTRA_FLAGS > ../logs/langgraph.log 2>&1) & + ./scripts/wait-for-port.sh 2024 60 "LangGraph" || { + echo " See logs/langgraph.log for details" + tail -20 logs/langgraph.log + if grep -qE "config_version|outdated|Environment variable .* not found|KeyError|ValidationError|config\.yaml" logs/langgraph.log 2>/dev/null; then + echo "" + echo " Hint: This may be a configuration issue. Try running 'make config-upgrade' to update your config.yaml." + fi + cleanup + } + echo "✓ LangGraph server started on localhost:2024" +else + echo "⏩ Skipping LangGraph server (SKIP_LANGGRAPH_SERVER=1)" + echo " Use /api/langgraph-compat/* via Gateway instead" +fi echo "Starting Gateway API..." (cd backend && PYTHONPATH=. uv run uvicorn app.gateway.app:app --host 0.0.0.0 --port 8001 $GATEWAY_EXTRA_FLAGS > ../logs/gateway.log 2>&1) & @@ -190,7 +197,16 @@ echo "==========================================" echo "" echo " 🌐 Application: http://localhost:2026" echo " 📡 API Gateway: http://localhost:2026/api/*" -echo " 🤖 LangGraph: http://localhost:2026/api/langgraph/*" +if [ "${SKIP_LANGGRAPH_SERVER:-0}" = "1" ]; then + echo " 🤖 LangGraph: skipped (SKIP_LANGGRAPH_SERVER=1)" +else + echo " 🤖 LangGraph: http://localhost:2026/api/langgraph/* (served by langgraph dev)" +fi +echo " 🧪 LangGraph Compat (experimental): http://localhost:2026/api/langgraph-compat/* (served by Gateway)" +if [ "${SKIP_LANGGRAPH_SERVER:-0}" = "1" ]; then + echo "" + echo " 💡 Set NEXT_PUBLIC_LANGGRAPH_BASE_URL=/api/langgraph-compat in frontend/.env.local" +fi echo "" echo " 📋 Logs:" echo " - LangGraph: logs/langgraph.log"