"""Test configuration for the backend test suite. Sets up sys.path and pre-mocks modules that would cause circular import issues when unit-testing lightweight config/registry code in isolation. """ from __future__ import annotations import importlib.util import sys from pathlib import Path from types import SimpleNamespace from unittest.mock import MagicMock import pytest from support.detectors.blocking_io import BlockingIOProbe, detect_blocking_io # Make 'app' and 'deerflow' importable from any working directory sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "scripts")) _BACKEND_ROOT = Path(__file__).resolve().parents[1] _blocking_io_probe = BlockingIOProbe(_BACKEND_ROOT) _BLOCKING_IO_DETECTOR_ATTR = "_blocking_io_detector" # Break the circular import chain that exists in production code: # deerflow.subagents.__init__ # -> .executor (SubagentExecutor, SubagentResult) # -> deerflow.agents.thread_state # -> deerflow.agents.__init__ # -> lead_agent.agent # -> subagent_limit_middleware # -> deerflow.subagents.executor <-- circular! # # By injecting a mock for deerflow.subagents.executor *before* any test module # triggers the import, __init__.py's "from .executor import ..." succeeds # immediately without running the real executor module. _executor_mock = MagicMock() _executor_mock.SubagentExecutor = MagicMock _executor_mock.SubagentResult = MagicMock _executor_mock.SubagentStatus = MagicMock _executor_mock.MAX_CONCURRENT_SUBAGENTS = 3 _executor_mock.get_background_task_result = MagicMock() sys.modules["deerflow.subagents.executor"] = _executor_mock @pytest.fixture() def provisioner_module(): """Load docker/provisioner/app.py as an importable test module. Shared by test_provisioner_kubeconfig and test_provisioner_pvc_volumes so that any change to the provisioner entry-point path or module name only needs to be updated in one place. """ repo_root = Path(__file__).resolve().parents[2] module_path = repo_root / "docker" / "provisioner" / "app.py" spec = importlib.util.spec_from_file_location("provisioner_app_test", module_path) assert spec is not None assert spec.loader is not None module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module @pytest.fixture() def blocking_io_detector(): """Fail a focused test if blocking calls run on the event loop thread.""" with detect_blocking_io(fail_on_exit=True) as detector: yield detector def pytest_addoption(parser: pytest.Parser) -> None: group = parser.getgroup("blocking-io") group.addoption( "--detect-blocking-io", action="store_true", default=False, help="Collect blocking calls made while an asyncio event loop is running and report a summary.", ) group.addoption( "--detect-blocking-io-fail", action="store_true", default=False, help="Set a failing exit status when --detect-blocking-io records violations.", ) def pytest_configure(config: pytest.Config) -> None: config.addinivalue_line("markers", "no_blocking_io_probe: skip the optional blocking IO probe") def pytest_sessionstart(session: pytest.Session) -> None: if _blocking_io_probe_enabled(session.config): _blocking_io_probe.clear() @pytest.hookimpl(hookwrapper=True) def pytest_runtest_call(item: pytest.Item): if not _blocking_io_probe_enabled(item.config) or _blocking_io_probe_skipped(item): yield return detector = detect_blocking_io(fail_on_exit=False, stack_limit=18) detector.__enter__() setattr(item, _BLOCKING_IO_DETECTOR_ATTR, detector) yield @pytest.hookimpl(hookwrapper=True) def pytest_runtest_teardown(item: pytest.Item): yield detector = getattr(item, _BLOCKING_IO_DETECTOR_ATTR, None) if detector is None: return try: detector.__exit__(None, None, None) _blocking_io_probe.record(item.nodeid, detector.violations) finally: delattr(item, _BLOCKING_IO_DETECTOR_ATTR) def pytest_sessionfinish(session: pytest.Session) -> None: if _blocking_io_fail_enabled(session.config) and _blocking_io_probe.violation_count and session.exitstatus == pytest.ExitCode.OK: session.exitstatus = pytest.ExitCode.TESTS_FAILED def pytest_terminal_summary(terminalreporter: pytest.TerminalReporter) -> None: if not _blocking_io_probe_enabled(terminalreporter.config): return header, *details = _blocking_io_probe.format_summary().splitlines() terminalreporter.write_sep("=", header) for line in details: terminalreporter.write_line(line) def _blocking_io_probe_enabled(config: pytest.Config) -> bool: return bool(config.getoption("--detect-blocking-io") or config.getoption("--detect-blocking-io-fail")) def _blocking_io_fail_enabled(config: pytest.Config) -> bool: return bool(config.getoption("--detect-blocking-io-fail")) def _blocking_io_probe_skipped(item: pytest.Item) -> bool: return item.path.name == "test_blocking_io_detector.py" or item.get_closest_marker("no_blocking_io_probe") is not None # --------------------------------------------------------------------------- # Auto-set user context for every test unless marked no_auto_user # --------------------------------------------------------------------------- # # Repository methods read ``user_id`` from a contextvar by default # (see ``deerflow.runtime.user_context``). Without this fixture, every # pre-existing persistence test would raise RuntimeError because the # contextvar is unset. The fixture sets a default test user on every # test; tests that explicitly want to verify behaviour *without* a user # context should mark themselves ``@pytest.mark.no_auto_user``. @pytest.fixture(autouse=True) def _reset_skill_storage_singleton(): """Reset the SkillStorage singleton between tests to prevent cross-test contamination.""" try: from deerflow.skills.storage import reset_skill_storage except ImportError: yield return reset_skill_storage() try: yield finally: reset_skill_storage() @pytest.fixture(autouse=True) def _auto_user_context(request): """Inject a default ``test-user-autouse`` into the contextvar. Opt-out via ``@pytest.mark.no_auto_user``. Uses lazy import so that tests which don't touch the persistence layer never pay the cost of importing runtime.user_context. """ if request.node.get_closest_marker("no_auto_user"): yield return try: from deerflow.runtime.user_context import ( reset_current_user, set_current_user, ) except ImportError: yield return user = SimpleNamespace(id="test-user-autouse", email="test@local") token = set_current_user(user) try: yield finally: reset_current_user(token)