diff --git a/backend/Makefile b/backend/Makefile index dd06742a0..a2f745cf3 100644 --- a/backend/Makefile +++ b/backend/Makefile @@ -8,7 +8,7 @@ gateway: PYTHONPATH=. uv run uvicorn app.gateway.app:app --host 0.0.0.0 --port 8001 test: - PYTHONPATH=. uv run pytest tests/ -v + PYTHONPATH=. uv run pytest tests/unittest -v lint: uvx ruff check . diff --git a/backend/app/gateway/services.py b/backend/app/gateway/services.py deleted file mode 100644 index 9a1cdb12f..000000000 --- a/backend/app/gateway/services.py +++ /dev/null @@ -1,309 +0,0 @@ -"""Run lifecycle service layer. - -Centralizes the business logic for creating runs, formatting SSE -frames, and consuming stream bridge events. Router modules -(``thread_runs``, ``runs``) are thin HTTP handlers that delegate here. -""" - -from __future__ import annotations - -import asyncio -import dataclasses -import json -import logging -import re -from typing import Any - -from fastapi import HTTPException, Request -from langchain_core.messages import HumanMessage - -from app.gateway.deps import get_run_context, get_run_manager, get_run_store, get_stream_bridge -from app.gateway.utils import sanitize_log_param -from deerflow.runtime import ( - END_SENTINEL, - HEARTBEAT_SENTINEL, - ConflictError, - DisconnectMode, - RunManager, - RunRecord, - RunStatus, - StreamBridge, - UnsupportedStrategyError, - run_agent, -) - -logger = logging.getLogger(__name__) - - -# --------------------------------------------------------------------------- -# SSE formatting -# --------------------------------------------------------------------------- - - -def format_sse(event: str, data: Any, *, event_id: str | None = None) -> str: - """Format a single SSE frame. - - Field order: ``event:`` -> ``data:`` -> ``id:`` (optional) -> blank line. - This matches the LangGraph Platform wire format consumed by the - ``useStream`` React hook and the Python ``langgraph-sdk`` SSE decoder. - """ - payload = json.dumps(data, default=str, ensure_ascii=False) - parts = [f"event: {event}", f"data: {payload}"] - if event_id: - parts.append(f"id: {event_id}") - parts.append("") - parts.append("") - return "\n".join(parts) - - -# --------------------------------------------------------------------------- -# Input / config helpers -# --------------------------------------------------------------------------- - - -def normalize_stream_modes(raw: list[str] | str | None) -> list[str]: - """Normalize the stream_mode parameter to a list. - - Default matches what ``useStream`` expects: values + messages-tuple. - """ - if raw is None: - return ["values"] - if isinstance(raw, str): - return [raw] - return raw if raw else ["values"] - - -def normalize_input(raw_input: dict[str, Any] | None) -> dict[str, Any]: - """Convert LangGraph Platform input format to LangChain state dict.""" - if raw_input is None: - return {} - messages = raw_input.get("messages") - if messages and isinstance(messages, list): - converted = [] - for msg in messages: - if isinstance(msg, dict): - role = msg.get("role", msg.get("type", "user")) - content = msg.get("content", "") - if role in ("user", "human"): - converted.append(HumanMessage(content=content)) - else: - # TODO: handle other message types (system, ai, tool) - converted.append(HumanMessage(content=content)) - else: - converted.append(msg) - return {**raw_input, "messages": converted} - return raw_input - - -_DEFAULT_ASSISTANT_ID = "lead_agent" - - -def resolve_agent_factory(assistant_id: str | None): - """Resolve the agent factory callable from config. - - Custom agents are implemented as ``lead_agent`` + an ``agent_name`` - injected into ``configurable`` — see :func:`build_run_config`. All - ``assistant_id`` values therefore map to the same factory; the routing - happens inside ``make_lead_agent`` when it reads ``cfg["agent_name"]``. - """ - from deerflow.agents.lead_agent.agent import make_lead_agent - - return make_lead_agent - - -def build_run_config( - thread_id: str, - request_config: dict[str, Any] | None, - metadata: dict[str, Any] | None, - *, - assistant_id: str | None = None, -) -> dict[str, Any]: - """Build a RunnableConfig dict for the agent. - - When *assistant_id* refers to a custom agent (anything other than - ``"lead_agent"`` / ``None``), the name is forwarded as - ``configurable["agent_name"]``. ``make_lead_agent`` reads this key to - load the matching ``agents//SOUL.md`` and per-agent config — - without it the agent silently runs as the default lead agent. - - This mirrors the channel manager's ``_resolve_run_params`` logic so that - the LangGraph Platform-compatible HTTP API and the IM channel path behave - identically. - """ - config: dict[str, Any] = {"recursion_limit": 100} - if request_config: - # LangGraph >= 0.6.0 introduced ``context`` as the preferred way to - # pass thread-level data and rejects requests that include both - # ``configurable`` and ``context``. If the caller already sends - # ``context``, honour it and skip our own ``configurable`` dict. - if "context" in request_config: - if "configurable" in request_config: - logger.warning( - "build_run_config: client sent both 'context' and 'configurable'; preferring 'context' (LangGraph >= 0.6.0). thread_id=%s, caller_configurable keys=%s", - thread_id, - list(request_config.get("configurable", {}).keys()), - ) - config["context"] = request_config["context"] - else: - configurable = {"thread_id": thread_id} - configurable.update(request_config.get("configurable", {})) - config["configurable"] = configurable - for k, v in request_config.items(): - if k not in ("configurable", "context"): - config[k] = v - else: - config["configurable"] = {"thread_id": thread_id} - - # Inject custom agent name when the caller specified a non-default assistant. - # Honour an explicit configurable["agent_name"] in the request if already set. - if assistant_id and assistant_id != _DEFAULT_ASSISTANT_ID and "configurable" in config: - if "agent_name" not in config["configurable"]: - normalized = assistant_id.strip().lower().replace("_", "-") - if not normalized or not re.fullmatch(r"[a-z0-9-]+", normalized): - raise ValueError(f"Invalid assistant_id {assistant_id!r}: must contain only letters, digits, and hyphens after normalization.") - config["configurable"]["agent_name"] = normalized - if metadata: - config.setdefault("metadata", {}).update(metadata) - return config - - -# --------------------------------------------------------------------------- -# Run lifecycle -# --------------------------------------------------------------------------- - - -async def start_run( - body: Any, - thread_id: str, - request: Request, -) -> RunRecord: - """Create a RunRecord and launch the background agent task. - - Parameters - ---------- - body : RunCreateRequest - The validated request body (typed as Any to avoid circular import - with the router module that defines the Pydantic model). - thread_id : str - Target thread. - request : Request - FastAPI request — used to retrieve singletons from ``app.state``. - """ - bridge = get_stream_bridge(request) - run_mgr = get_run_manager(request) - run_ctx = get_run_context(request) - - disconnect = DisconnectMode.cancel if body.on_disconnect == "cancel" else DisconnectMode.continue_ - - try: - record = await run_mgr.create_or_reject( - thread_id, - body.assistant_id, - on_disconnect=disconnect, - metadata=body.metadata or {}, - kwargs={"input": body.input, "config": body.config}, - multitask_strategy=body.multitask_strategy, - ) - except ConflictError as exc: - raise HTTPException(status_code=409, detail=str(exc)) from exc - except UnsupportedStrategyError as exc: - raise HTTPException(status_code=501, detail=str(exc)) from exc - - # Upsert thread metadata so the thread appears in /threads/search, - # even for threads that were never explicitly created via POST /threads - # (e.g. stateless runs). - try: - existing = await run_ctx.thread_store.get(thread_id) - if existing is None: - await run_ctx.thread_store.create( - thread_id, - assistant_id=body.assistant_id, - metadata=body.metadata, - ) - else: - await run_ctx.thread_store.update_status(thread_id, "running") - except Exception: - logger.warning("Failed to upsert thread_meta for %s (non-fatal)", sanitize_log_param(thread_id)) - - agent_factory = resolve_agent_factory(body.assistant_id) - graph_input = normalize_input(body.input) - config = build_run_config(thread_id, body.config, body.metadata, assistant_id=body.assistant_id) - - # Merge DeerFlow-specific context overrides into configurable. - # The ``context`` field is a custom extension for the langgraph-compat layer - # that carries agent configuration (model_name, thinking_enabled, etc.). - # Only agent-relevant keys are forwarded; unknown keys (e.g. thread_id) are ignored. - context = getattr(body, "context", None) - if context: - _CONTEXT_CONFIGURABLE_KEYS = { - "model_name", - "mode", - "thinking_enabled", - "reasoning_effort", - "is_plan_mode", - "subagent_enabled", - "max_concurrent_subagents", - } - configurable = config.setdefault("configurable", {}) - for key in _CONTEXT_CONFIGURABLE_KEYS: - if key in context: - configurable.setdefault(key, context[key]) - - stream_modes = normalize_stream_modes(body.stream_mode) - - task = asyncio.create_task( - run_agent( - bridge, - run_mgr, - record, - ctx=run_ctx, - agent_factory=agent_factory, - graph_input=graph_input, - config=config, - stream_modes=stream_modes, - stream_subgraphs=body.stream_subgraphs, - interrupt_before=body.interrupt_before, - interrupt_after=body.interrupt_after, - ) - ) - record.task = task - - # Title sync is handled by worker.py's finally block which reads the - # title from the checkpoint and calls thread_store.update_display_name - # after the run completes. - - return record - - -async def sse_consumer( - bridge: StreamBridge, - record: RunRecord, - request: Request, - run_mgr: RunManager, -): - """Async generator that yields SSE frames from the bridge. - - The ``finally`` block implements ``on_disconnect`` semantics: - - ``cancel``: abort the background task on client disconnect. - - ``continue``: let the task run; events are discarded. - """ - last_event_id = request.headers.get("Last-Event-ID") - try: - async for entry in bridge.subscribe(record.run_id, last_event_id=last_event_id): - if await request.is_disconnected(): - break - - if entry is HEARTBEAT_SENTINEL: - yield ": heartbeat\n\n" - continue - - if entry is END_SENTINEL: - yield format_sse("end", None, event_id=entry.id or None) - return - - yield format_sse(entry.event, entry.data, event_id=entry.id or None) - - finally: - if record.status in (RunStatus.pending, RunStatus.running): - if record.on_disconnect == DisconnectMode.cancel: - await run_mgr.cancel(record.run_id) diff --git a/backend/langgraph.json b/backend/langgraph.json index 7d4df8efe..ed5e81e0c 100644 --- a/backend/langgraph.json +++ b/backend/langgraph.json @@ -6,7 +6,10 @@ "graphs": { "lead_agent": "deerflow.agents:make_lead_agent" }, + "auth": { + "path": "./app/plugins/auth/langgraph.py:auth" + }, "checkpointer": { - "path": "./packages/harness/deerflow/runtime/checkpointer/async_provider.py:make_checkpointer" + "path": "./packages/storage/store/persistence/async_provider.py:make_checkpointer" } } diff --git a/backend/pyproject.toml b/backend/pyproject.toml index bd3041227..2fa39927c 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -6,6 +6,7 @@ readme = "README.md" requires-python = ">=3.12" dependencies = [ "deerflow-harness", + "deerflow-storage", "fastapi>=0.115.0", "httpx>=0.28.0", "python-multipart>=0.0.20", @@ -20,6 +21,7 @@ dependencies = [ "bcrypt>=4.0.0", "pyjwt>=2.9.0", "email-validator>=2.0.0", + "scalar-fastapi>=1.8.2", ] [project.optional-dependencies] diff --git a/backend/uv.lock b/backend/uv.lock index 0dba58c59..d2694f1c0 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -805,6 +805,7 @@ source = { virtual = "." } dependencies = [ { name = "bcrypt" }, { name = "deerflow-harness" }, + { name = "deerflow-storage" }, { name = "email-validator" }, { name = "fastapi" }, { name = "httpx" }, @@ -814,6 +815,7 @@ dependencies = [ { name = "pyjwt" }, { name = "python-multipart" }, { name = "python-telegram-bot" }, + { name = "scalar-fastapi" }, { name = "slack-sdk" }, { name = "sse-starlette" }, { name = "uvicorn", extra = ["standard"] }, @@ -836,6 +838,7 @@ requires-dist = [ { name = "bcrypt", specifier = ">=4.0.0" }, { name = "deerflow-harness", editable = "packages/harness" }, { name = "deerflow-harness", extras = ["postgres"], marker = "extra == 'postgres'", editable = "packages/harness" }, + { name = "deerflow-storage", editable = "packages/storage" }, { name = "email-validator", specifier = ">=2.0.0" }, { name = "fastapi", specifier = ">=0.115.0" }, { name = "httpx", specifier = ">=0.28.0" }, @@ -845,6 +848,7 @@ requires-dist = [ { name = "pyjwt", specifier = ">=2.9.0" }, { name = "python-multipart", specifier = ">=0.0.20" }, { name = "python-telegram-bot", specifier = ">=21.0" }, + { name = "scalar-fastapi", specifier = ">=1.8.2" }, { name = "slack-sdk", specifier = ">=3.33.0" }, { name = "sse-starlette", specifier = ">=2.1.0" }, { name = "uvicorn", extras = ["standard"], specifier = ">=0.34.0" }, @@ -3966,6 +3970,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c4/1c/1dbe51782c0e1e9cfce1d1004752672d2d4629ea46945d19d731ad772b3b/ruff-0.14.11-py3-none-win_arm64.whl", hash = "sha256:649fb6c9edd7f751db276ef42df1f3df41c38d67d199570ae2a7bd6cbc3590f0", size = 12938644, upload-time = "2026-01-08T19:11:50.027Z" }, ] +[[package]] +name = "scalar-fastapi" +version = "1.8.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/cc/81/c0c70776a3d7d371ee06d38d26a8d361c97439d46f79acb6d67cf6c760ad/scalar_fastapi-1.8.2.tar.gz", hash = "sha256:0de09b8c63f78c1052792faa200d740b2ccaeeb88ac54e7ea633ac4edc6fde82", size = 8371, upload-time = "2026-04-09T22:41:24.267Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d8/b1/b9a482620479b2801a4dfa5c4c10b4cc280097e894114e0298f21a0fca55/scalar_fastapi-1.8.2-py3-none-any.whl", hash = "sha256:d96e2c8b3676491eaebb4ec8d9f4de77adb2374d86f87d321546fa6f084e8cb8", size = 7677, upload-time = "2026-04-09T22:41:23.209Z" }, +] + [[package]] name = "setuptools" version = "80.10.2"