diff --git a/backend/app/channels/service.py b/backend/app/channels/service.py index 8042733c2..72414e651 100644 --- a/backend/app/channels/service.py +++ b/backend/app/channels/service.py @@ -4,7 +4,7 @@ from __future__ import annotations import logging import os -from typing import Any +from typing import TYPE_CHECKING, Any from app.channels.base import Channel from app.channels.manager import DEFAULT_GATEWAY_URL, DEFAULT_LANGGRAPH_URL, ChannelManager @@ -13,6 +13,9 @@ from app.channels.store import ChannelStore logger = logging.getLogger(__name__) +if TYPE_CHECKING: + from deerflow.config.app_config import AppConfig + # Channel name → import path for lazy loading _CHANNEL_REGISTRY: dict[str, str] = { "discord": "app.channels.discord:DiscordChannel", @@ -75,14 +78,15 @@ class ChannelService: self._running = False @classmethod - def from_app_config(cls) -> ChannelService: + def from_app_config(cls, app_config: AppConfig | None = None) -> ChannelService: """Create a ChannelService from the application config.""" - from deerflow.config.app_config import get_app_config + if app_config is None: + from deerflow.config.app_config import get_app_config - config = get_app_config() + app_config = get_app_config() channels_config = {} # extra fields are allowed by AppConfig (extra="allow") - extra = config.model_extra or {} + extra = app_config.model_extra or {} if "channels" in extra: channels_config = extra["channels"] return cls(channels_config=channels_config) @@ -201,12 +205,12 @@ def get_channel_service() -> ChannelService | None: return _channel_service -async def start_channel_service() -> ChannelService: +async def start_channel_service(app_config: AppConfig | None = None) -> ChannelService: """Create and start the global ChannelService from app config.""" global _channel_service if _channel_service is not None: return _channel_service - _channel_service = ChannelService.from_app_config() + _channel_service = ChannelService.from_app_config(app_config) await _channel_service.start() return _channel_service diff --git a/backend/app/gateway/app.py b/backend/app/gateway/app.py index 852c787fd..6fff53d24 100644 --- a/backend/app/gateway/app.py +++ b/backend/app/gateway/app.py @@ -28,7 +28,10 @@ from app.gateway.routers import ( threads, uploads, ) -from deerflow.config.app_config import get_app_config +from deerflow.config import app_config as deerflow_app_config + +AppConfig = deerflow_app_config.AppConfig +get_app_config = deerflow_app_config.get_app_config # Configure logging logging.basicConfig( @@ -160,7 +163,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # Load config and check necessary environment variables at startup try: - get_app_config() + app.state.config = get_app_config() logger.info("Configuration loaded successfully") except Exception as e: error_msg = f"Failed to load configuration during gateway startup: {e}" @@ -181,7 +184,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: try: from app.channels.service import start_channel_service - channel_service = await start_channel_service() + channel_service = await start_channel_service(app.state.config) logger.info("Channel service started: %s", channel_service.get_status()) except Exception: logger.exception("No IM channels configured or channel service failed to start") diff --git a/backend/app/gateway/deps.py b/backend/app/gateway/deps.py index 20da78af9..a076e28ff 100644 --- a/backend/app/gateway/deps.py +++ b/backend/app/gateway/deps.py @@ -15,6 +15,7 @@ from typing import TYPE_CHECKING, TypeVar, cast from fastapi import FastAPI, HTTPException, Request from langgraph.types import Checkpointer +from deerflow.config.app_config import AppConfig from deerflow.persistence.feedback import FeedbackRepository from deerflow.runtime import RunContext, RunManager, StreamBridge from deerflow.runtime.events.store.base import RunEventStore @@ -29,6 +30,14 @@ if TYPE_CHECKING: T = TypeVar("T") +def get_config(request: Request) -> AppConfig: + """Return the app-scoped ``AppConfig`` stored on ``app.state``.""" + config = getattr(request.app.state, "config", None) + if config is None: + raise HTTPException(status_code=503, detail="Configuration not available") + return config + + @asynccontextmanager async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]: """Bootstrap and tear down all LangGraph runtime singletons. @@ -38,22 +47,24 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]: async with langgraph_runtime(app): yield """ - from deerflow.config import get_app_config from deerflow.persistence.engine import close_engine, get_session_factory, init_engine_from_config from deerflow.runtime import make_store, make_stream_bridge from deerflow.runtime.checkpointer.async_provider import make_checkpointer from deerflow.runtime.events.store import make_run_event_store async with AsyncExitStack() as stack: - app.state.stream_bridge = await stack.enter_async_context(make_stream_bridge()) + config = getattr(app.state, "config", None) + if config is None: + raise RuntimeError("langgraph_runtime() requires app.state.config to be initialized") + + app.state.stream_bridge = await stack.enter_async_context(make_stream_bridge(config)) # Initialize persistence engine BEFORE checkpointer so that # auto-create-database logic runs first (postgres backend). - config = get_app_config() await init_engine_from_config(config.database) - app.state.checkpointer = await stack.enter_async_context(make_checkpointer()) - app.state.store = await stack.enter_async_context(make_store()) + app.state.checkpointer = await stack.enter_async_context(make_checkpointer(config)) + app.state.store = await stack.enter_async_context(make_store(config)) # Initialize repositories — one get_session_factory() call for all. sf = get_session_factory() @@ -130,13 +141,12 @@ def get_run_context(request: Request) -> RunContext: Returns a *base* context with infrastructure dependencies. """ - from deerflow.config import get_app_config - + config = get_config(request) return RunContext( checkpointer=get_checkpointer(request), store=get_store(request), event_store=get_run_event_store(request), - run_events_config=getattr(get_app_config(), "run_events", None), + run_events_config=getattr(config, "run_events", None), thread_store=get_thread_store(request), ) diff --git a/backend/packages/harness/deerflow/config/app_config.py b/backend/packages/harness/deerflow/config/app_config.py index 5e31c3e37..df6b82708 100644 --- a/backend/packages/harness/deerflow/config/app_config.py +++ b/backend/packages/harness/deerflow/config/app_config.py @@ -73,7 +73,7 @@ class AppConfig(BaseModel): subagents: SubagentsAppConfig = Field(default_factory=SubagentsAppConfig, description="Subagent runtime configuration") guardrails: GuardrailsConfig = Field(default_factory=GuardrailsConfig, description="Guardrail middleware configuration") circuit_breaker: CircuitBreakerConfig = Field(default_factory=CircuitBreakerConfig, description="LLM circuit breaker configuration") - model_config = ConfigDict(extra="allow", frozen=False) + model_config = ConfigDict(extra="allow") database: DatabaseConfig = Field(default_factory=DatabaseConfig, description="Unified database backend configuration") run_events: RunEventsConfig = Field(default_factory=RunEventsConfig, description="Run event storage configuration") checkpointer: CheckpointerConfig | None = Field(default=None, description="Checkpointer configuration") @@ -292,6 +292,9 @@ class AppConfig(BaseModel): return next((group for group in self.tool_groups if group.name == name), None) +# Compatibility singleton layer for code paths that have not yet been +# migrated to explicit ``AppConfig`` threading. New composition roots should +# prefer constructing ``AppConfig`` once and passing it down directly. _app_config: AppConfig | None = None _app_config_path: Path | None = None _app_config_mtime: float | None = None diff --git a/backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py b/backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py index 21c747b45..9a04cb1af 100644 --- a/backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py +++ b/backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py @@ -24,7 +24,7 @@ from collections.abc import AsyncIterator from langgraph.types import Checkpointer -from deerflow.config.app_config import get_app_config +from deerflow.config.app_config import AppConfig, get_app_config from deerflow.runtime.checkpointer.provider import ( POSTGRES_CONN_REQUIRED, POSTGRES_INSTALL, @@ -123,11 +123,11 @@ async def _async_checkpointer_from_database(db_config) -> AsyncIterator[Checkpoi @contextlib.asynccontextmanager -async def make_checkpointer() -> AsyncIterator[Checkpointer]: +async def make_checkpointer(app_config: AppConfig | None = None) -> AsyncIterator[Checkpointer]: """Async context manager that yields a checkpointer for the caller's lifetime. Resources are opened on enter and closed on exit -- no global state:: - async with make_checkpointer() as checkpointer: + async with make_checkpointer(app_config) as checkpointer: app.state.checkpointer = checkpointer Yields an ``InMemorySaver`` when no checkpointer is configured in *config.yaml*. @@ -138,16 +138,17 @@ async def make_checkpointer() -> AsyncIterator[Checkpointer]: 3. Default InMemorySaver """ - config = get_app_config() + if app_config is None: + app_config = get_app_config() # Legacy: standalone checkpointer config takes precedence - if config.checkpointer is not None: - async with _async_checkpointer(config.checkpointer) as saver: + if app_config.checkpointer is not None: + async with _async_checkpointer(app_config.checkpointer) as saver: yield saver return # Unified database config - db_config = getattr(config, "database", None) + db_config = getattr(app_config, "database", None) if db_config is not None and db_config.backend != "memory": async with _async_checkpointer_from_database(db_config) as saver: yield saver diff --git a/backend/packages/harness/deerflow/runtime/store/async_provider.py b/backend/packages/harness/deerflow/runtime/store/async_provider.py index 68cd107c8..bc1f07eba 100644 --- a/backend/packages/harness/deerflow/runtime/store/async_provider.py +++ b/backend/packages/harness/deerflow/runtime/store/async_provider.py @@ -23,7 +23,7 @@ from collections.abc import AsyncIterator from langgraph.store.base import BaseStore -from deerflow.config.app_config import get_app_config +from deerflow.config.app_config import AppConfig, get_app_config from deerflow.runtime.store.provider import POSTGRES_CONN_REQUIRED, POSTGRES_STORE_INSTALL, SQLITE_STORE_INSTALL, ensure_sqlite_parent_dir, resolve_sqlite_conn_str logger = logging.getLogger(__name__) @@ -86,7 +86,7 @@ async def _async_store(config) -> AsyncIterator[BaseStore]: @contextlib.asynccontextmanager -async def make_store() -> AsyncIterator[BaseStore]: +async def make_store(app_config: AppConfig | None = None) -> AsyncIterator[BaseStore]: """Async context manager that yields a Store whose backend matches the configured checkpointer. @@ -94,20 +94,21 @@ async def make_store() -> AsyncIterator[BaseStore]: :func:`deerflow.runtime.checkpointer.async_provider.make_checkpointer` so that both singletons always use the same persistence technology:: - async with make_store() as store: + async with make_store(app_config) as store: app.state.store = store Yields an :class:`~langgraph.store.memory.InMemoryStore` when no ``checkpointer`` section is configured (emits a WARNING in that case). """ - config = get_app_config() + if app_config is None: + app_config = get_app_config() - if config.checkpointer is None: + if app_config.checkpointer is None: from langgraph.store.memory import InMemoryStore logger.warning("No 'checkpointer' section in config.yaml — using InMemoryStore for the store. Thread list will be lost on server restart. Configure a sqlite or postgres backend for persistence.") yield InMemoryStore() return - async with _async_store(config.checkpointer) as store: + async with _async_store(app_config.checkpointer) as store: yield store diff --git a/backend/packages/harness/deerflow/runtime/stream_bridge/async_provider.py b/backend/packages/harness/deerflow/runtime/stream_bridge/async_provider.py index f35b7d639..929e29977 100644 --- a/backend/packages/harness/deerflow/runtime/stream_bridge/async_provider.py +++ b/backend/packages/harness/deerflow/runtime/stream_bridge/async_provider.py @@ -17,6 +17,7 @@ import contextlib import logging from collections.abc import AsyncIterator +from deerflow.config.app_config import AppConfig from deerflow.config.stream_bridge_config import get_stream_bridge_config from .base import StreamBridge @@ -25,14 +26,16 @@ logger = logging.getLogger(__name__) @contextlib.asynccontextmanager -async def make_stream_bridge(config=None) -> AsyncIterator[StreamBridge]: +async def make_stream_bridge(app_config: AppConfig | None = None) -> AsyncIterator[StreamBridge]: """Async context manager that yields a :class:`StreamBridge`. Falls back to :class:`MemoryStreamBridge` when no configuration is provided and nothing is set globally. """ - if config is None: + if app_config is None: config = get_stream_bridge_config() + else: + config = app_config.stream_bridge if config is None or config.type == "memory": from deerflow.runtime.stream_bridge.memory import MemoryStreamBridge diff --git a/backend/tests/test_channels.py b/backend/tests/test_channels.py index 779b75b08..7cf329bff 100644 --- a/backend/tests/test_channels.py +++ b/backend/tests/test_channels.py @@ -2032,6 +2032,22 @@ class TestChannelService: assert service.manager._langgraph_url == "http://custom-gateway:8001/api" assert service.manager._gateway_url == "http://custom-gateway:8001" + def test_from_app_config_uses_explicit_config(self): + from app.channels.service import ChannelService + + app_config = SimpleNamespace( + model_extra={ + "channels": { + "telegram": {"enabled": False}, + } + } + ) + + with patch("deerflow.config.app_config.get_app_config", side_effect=AssertionError("should not read global config")): + service = ChannelService.from_app_config(app_config) + + assert service._config == {"telegram": {"enabled": False}} + def test_disabled_channel_with_string_creds_emits_warning(self, caplog): """Warning is emitted when a channel has string credentials but enabled=false.""" import logging diff --git a/backend/tests/test_gateway_deps_config.py b/backend/tests/test_gateway_deps_config.py new file mode 100644 index 000000000..70f9124b6 --- /dev/null +++ b/backend/tests/test_gateway_deps_config.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from fastapi import Depends, FastAPI +from fastapi.testclient import TestClient + +from app.gateway.deps import get_config +from deerflow.config.app_config import AppConfig +from deerflow.config.sandbox_config import SandboxConfig + + +def test_get_config_returns_app_state_config(): + """get_config should return the exact AppConfig stored on app.state.""" + app = FastAPI() + config = AppConfig(sandbox=SandboxConfig(use="test")) + app.state.config = config + + @app.get("/probe") + def probe(cfg: AppConfig = Depends(get_config)): + return {"same_identity": cfg is config, "log_level": cfg.log_level} + + client = TestClient(app) + response = client.get("/probe") + + assert response.status_code == 200 + assert response.json() == {"same_identity": True, "log_level": "info"} + + +def test_get_config_reads_updated_app_state(): + """Swapping app.state.config should be visible to the dependency.""" + app = FastAPI() + app.state.config = AppConfig(sandbox=SandboxConfig(use="test"), log_level="info") + + @app.get("/log-level") + def log_level(cfg: AppConfig = Depends(get_config)): + return {"level": cfg.log_level} + + client = TestClient(app) + assert client.get("/log-level").json() == {"level": "info"} + + app.state.config = app.state.config.model_copy(update={"log_level": "debug"}) + assert client.get("/log-level").json() == {"level": "debug"}