diff --git a/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py b/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py index 75fb0b5ca..5bc3c3981 100644 --- a/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py +++ b/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py @@ -112,6 +112,9 @@ class AioSandboxProvider(SandboxProvider): atexit.register(self.shutdown) self._register_signal_handlers() + # Reconcile orphaned containers from previous process lifecycles + self._reconcile_orphans() + # Start idle checker if enabled if self._config.get("idle_timeout", DEFAULT_IDLE_TIMEOUT) > 0: self._start_idle_checker() @@ -175,6 +178,51 @@ class AioSandboxProvider(SandboxProvider): resolved[key] = str(value) return resolved + # ── Startup reconciliation ──────────────────────────────────────────── + + def _reconcile_orphans(self) -> None: + """Reconcile orphaned containers left by previous process lifecycles. + + On startup, enumerate all running containers matching our prefix + and adopt them all into the warm pool. The idle checker will reclaim + containers that nobody re-acquires within ``idle_timeout``. + + All containers are adopted unconditionally because we cannot + distinguish "orphaned" from "actively used by another process" + based on age alone — ``idle_timeout`` represents inactivity, not + uptime. Adopting into the warm pool and letting the idle checker + decide avoids destroying containers that a concurrent process may + still be using. + + This closes the fundamental gap where in-memory state loss (process + restart, crash, SIGKILL) leaves Docker containers running forever. + """ + try: + running = self._backend.list_running() + except Exception as e: + logger.warning(f"Failed to enumerate running containers during startup reconciliation: {e}") + return + + if not running: + return + + current_time = time.time() + adopted = 0 + + for info in running: + age = current_time - info.created_at if info.created_at > 0 else float("inf") + # Single lock acquisition per container: atomic check-and-insert. + # Avoids a TOCTOU window between the "already tracked?" check and + # the warm-pool insert. + with self._lock: + if info.sandbox_id in self._sandboxes or info.sandbox_id in self._warm_pool: + continue + self._warm_pool[info.sandbox_id] = (info, current_time) + adopted += 1 + logger.info(f"Adopted container {info.sandbox_id} into warm pool (age: {age:.0f}s)") + + logger.info(f"Startup reconciliation complete: {adopted} adopted into warm pool, {len(running)} total found") + # ── Deterministic ID ───────────────────────────────────────────────── @staticmethod @@ -316,13 +364,23 @@ class AioSandboxProvider(SandboxProvider): # ── Signal handling ────────────────────────────────────────────────── def _register_signal_handlers(self) -> None: - """Register signal handlers for graceful shutdown.""" + """Register signal handlers for graceful shutdown. + + Handles SIGTERM, SIGINT, and SIGHUP (terminal close) to ensure + sandbox containers are cleaned up even when the user closes the terminal. + """ self._original_sigterm = signal.getsignal(signal.SIGTERM) self._original_sigint = signal.getsignal(signal.SIGINT) + self._original_sighup = signal.getsignal(signal.SIGHUP) if hasattr(signal, "SIGHUP") else None def signal_handler(signum, frame): self.shutdown() - original = self._original_sigterm if signum == signal.SIGTERM else self._original_sigint + if signum == signal.SIGTERM: + original = self._original_sigterm + elif hasattr(signal, "SIGHUP") and signum == signal.SIGHUP: + original = self._original_sighup + else: + original = self._original_sigint if callable(original): original(signum, frame) elif original == signal.SIG_DFL: @@ -332,6 +390,8 @@ class AioSandboxProvider(SandboxProvider): try: signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) + if hasattr(signal, "SIGHUP"): + signal.signal(signal.SIGHUP, signal_handler) except ValueError: logger.debug("Could not register signal handlers (not main thread)") diff --git a/backend/packages/harness/deerflow/community/aio_sandbox/backend.py b/backend/packages/harness/deerflow/community/aio_sandbox/backend.py index 62ac7c288..0200ba783 100644 --- a/backend/packages/harness/deerflow/community/aio_sandbox/backend.py +++ b/backend/packages/harness/deerflow/community/aio_sandbox/backend.py @@ -96,3 +96,19 @@ class SandboxBackend(ABC): SandboxInfo if found and healthy, None otherwise. """ ... + + def list_running(self) -> list[SandboxInfo]: + """Enumerate all running sandboxes managed by this backend. + + Used for startup reconciliation: when the process restarts, it needs + to discover containers started by previous processes so they can be + adopted into the warm pool or destroyed if idle too long. + + The default implementation returns an empty list, which is correct + for backends that don't manage local containers (e.g., RemoteSandboxBackend + delegates lifecycle to the provisioner which handles its own cleanup). + + Returns: + A list of SandboxInfo for all currently running sandboxes. + """ + return [] diff --git a/backend/packages/harness/deerflow/community/aio_sandbox/local_backend.py b/backend/packages/harness/deerflow/community/aio_sandbox/local_backend.py index 52124ab73..4b680df2d 100644 --- a/backend/packages/harness/deerflow/community/aio_sandbox/local_backend.py +++ b/backend/packages/harness/deerflow/community/aio_sandbox/local_backend.py @@ -6,9 +6,11 @@ Handles container lifecycle, port allocation, and cross-process container discov from __future__ import annotations +import json import logging import os import subprocess +from datetime import datetime from deerflow.utils.network import get_free_port, release_port @@ -18,6 +20,52 @@ from .sandbox_info import SandboxInfo logger = logging.getLogger(__name__) +def _parse_docker_timestamp(raw: str) -> float: + """Parse Docker's ISO 8601 timestamp into a Unix epoch float. + + Docker returns timestamps with nanosecond precision and a trailing ``Z`` + (e.g. ``2026-04-08T01:22:50.123456789Z``). Python's ``fromisoformat`` + accepts at most microseconds and (pre-3.11) does not accept ``Z``, so the + string is normalized before parsing. Returns ``0.0`` on empty input or + parse failure so callers can use ``0.0`` as a sentinel for "unknown age". + """ + if not raw: + return 0.0 + try: + s = raw.strip() + if "." in s: + dot_pos = s.index(".") + tz_start = dot_pos + 1 + while tz_start < len(s) and s[tz_start].isdigit(): + tz_start += 1 + frac = s[dot_pos + 1 : tz_start][:6] # truncate to microseconds + tz_suffix = s[tz_start:] + s = s[: dot_pos + 1] + frac + tz_suffix + if s.endswith("Z"): + s = s[:-1] + "+00:00" + return datetime.fromisoformat(s).timestamp() + except (ValueError, TypeError) as e: + logger.debug(f"Could not parse docker timestamp {raw!r}: {e}") + return 0.0 + + +def _extract_host_port(inspect_entry: dict, container_port: int) -> int | None: + """Extract the host port mapped to ``container_port/tcp`` from a docker inspect entry. + + Returns None if the container has no port mapping for that port. + """ + try: + ports = (inspect_entry.get("NetworkSettings") or {}).get("Ports") or {} + bindings = ports.get(f"{container_port}/tcp") or [] + if bindings: + host_port = bindings[0].get("HostPort") + if host_port: + return int(host_port) + except (ValueError, TypeError, AttributeError): + pass + return None + + def _format_container_mount(runtime: str, host_path: str, container_path: str, read_only: bool) -> list[str]: """Format a bind-mount argument for the selected runtime. @@ -172,8 +220,12 @@ class LocalContainerBackend(SandboxBackend): def destroy(self, info: SandboxInfo) -> None: """Stop the container and release its port.""" - if info.container_id: - self._stop_container(info.container_id) + # Prefer container_id, fall back to container_name (both accepted by docker stop). + # This ensures containers discovered via list_running() (which only has the name) + # can also be stopped. + stop_target = info.container_id or info.container_name + if stop_target: + self._stop_container(stop_target) # Extract port from sandbox_url for release try: from urllib.parse import urlparse @@ -222,6 +274,129 @@ class LocalContainerBackend(SandboxBackend): container_name=container_name, ) + def list_running(self) -> list[SandboxInfo]: + """Enumerate all running containers matching the configured prefix. + + Uses a single ``docker ps`` call to list container names, then a + single batched ``docker inspect`` call to retrieve creation timestamp + and port mapping for all containers at once. Total subprocess calls: + 2 (down from 2N+1 in the naive per-container approach). + + Note: Docker's ``--filter name=`` performs *substring* matching, + so a secondary ``startswith`` check is applied to ensure only + containers with the exact prefix are included. + + Containers without port mappings are still included (with empty + sandbox_url) so that startup reconciliation can adopt orphans + regardless of their port state. + """ + # Step 1: enumerate container names via docker ps + try: + result = subprocess.run( + [ + self._runtime, + "ps", + "--filter", + f"name={self._container_prefix}-", + "--format", + "{{.Names}}", + ], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode != 0: + stderr = (result.stderr or "").strip() + logger.warning( + "Failed to list running containers with %s ps (returncode=%s, stderr=%s)", + self._runtime, + result.returncode, + stderr or "", + ) + return [] + if not result.stdout.strip(): + return [] + except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError, OSError) as e: + logger.warning(f"Failed to list running containers: {e}") + return [] + + # Filter to names matching our exact prefix (docker filter is substring-based) + container_names = [name.strip() for name in result.stdout.strip().splitlines() if name.strip().startswith(self._container_prefix + "-")] + if not container_names: + return [] + + # Step 2: batched docker inspect — single subprocess call for all containers + inspections = self._batch_inspect(container_names) + + infos: list[SandboxInfo] = [] + sandbox_host = os.environ.get("DEER_FLOW_SANDBOX_HOST", "localhost") + for container_name in container_names: + data = inspections.get(container_name) + if data is None: + # Container disappeared between ps and inspect, or inspect failed + continue + created_at, host_port = data + sandbox_id = container_name[len(self._container_prefix) + 1 :] + sandbox_url = f"http://{sandbox_host}:{host_port}" if host_port else "" + + infos.append( + SandboxInfo( + sandbox_id=sandbox_id, + sandbox_url=sandbox_url, + container_name=container_name, + created_at=created_at, + ) + ) + + logger.info(f"Found {len(infos)} running sandbox container(s)") + return infos + + def _batch_inspect(self, container_names: list[str]) -> dict[str, tuple[float, int | None]]: + """Batch-inspect containers in a single subprocess call. + + Returns a mapping of ``container_name -> (created_at, host_port)``. + Missing containers or parse failures are silently dropped from the result. + """ + if not container_names: + return {} + try: + result = subprocess.run( + [self._runtime, "inspect", *container_names], + capture_output=True, + text=True, + timeout=15, + ) + except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError, OSError) as e: + logger.warning(f"Failed to batch-inspect containers: {e}") + return {} + + if result.returncode != 0: + stderr = (result.stderr or "").strip() + logger.warning( + "Failed to batch-inspect containers with %s inspect (returncode=%s, stderr=%s)", + self._runtime, + result.returncode, + stderr or "", + ) + return {} + + try: + payload = json.loads(result.stdout or "[]") + except json.JSONDecodeError as e: + logger.warning(f"Failed to parse docker inspect output as JSON: {e}") + return {} + + out: dict[str, tuple[float, int | None]] = {} + for entry in payload: + # ``Name`` is prefixed with ``/`` in the docker inspect response + name = (entry.get("Name") or "").lstrip("/") + if not name: + continue + created_at = _parse_docker_timestamp(entry.get("Created", "")) + host_port = _extract_host_port(entry, 8080) + out[name] = (created_at, host_port) + return out + # ── Container operations ───────────────────────────────────────────── def _start_container( diff --git a/backend/tests/test_sandbox_orphan_reconciliation.py b/backend/tests/test_sandbox_orphan_reconciliation.py new file mode 100644 index 000000000..b01ad5004 --- /dev/null +++ b/backend/tests/test_sandbox_orphan_reconciliation.py @@ -0,0 +1,550 @@ +"""Tests for sandbox container orphan reconciliation on startup. + +Covers: +- SandboxBackend.list_running() default behavior +- LocalContainerBackend.list_running() with mocked docker commands +- _parse_docker_timestamp() / _extract_host_port() helpers +- AioSandboxProvider._reconcile_orphans() decision logic +- SIGHUP signal handler registration +""" + +import importlib +import json +import signal +import threading +import time +from datetime import UTC, datetime +from unittest.mock import MagicMock + +import pytest + +from deerflow.community.aio_sandbox.sandbox_info import SandboxInfo + +# ── SandboxBackend.list_running() default ──────────────────────────────────── + + +def test_backend_list_running_default_returns_empty(): + """Base SandboxBackend.list_running() returns empty list (backward compat for RemoteSandboxBackend).""" + from deerflow.community.aio_sandbox.backend import SandboxBackend + + class StubBackend(SandboxBackend): + def create(self, thread_id, sandbox_id, extra_mounts=None): + pass + + def destroy(self, info): + pass + + def is_alive(self, info): + return False + + def discover(self, sandbox_id): + return None + + backend = StubBackend() + assert backend.list_running() == [] + + +# ── Helpers ────────────────────────────────────────────────────────────────── + + +def _make_local_backend(): + """Create a LocalContainerBackend with minimal config.""" + from deerflow.community.aio_sandbox.local_backend import LocalContainerBackend + + return LocalContainerBackend( + image="test-image:latest", + base_port=8080, + container_prefix="deer-flow-sandbox", + config_mounts=[], + environment={}, + ) + + +def _make_inspect_entry(name: str, created: str, host_port: str | None = None) -> dict: + """Build a minimal docker inspect JSON entry matching the real schema.""" + ports: dict = {} + if host_port is not None: + ports["8080/tcp"] = [{"HostIp": "0.0.0.0", "HostPort": host_port}] + return { + "Name": f"/{name}", # docker inspect prefixes names with "/" + "Created": created, + "NetworkSettings": {"Ports": ports}, + } + + +def _mock_ps_and_inspect(monkeypatch, ps_output: str, inspect_payload: list | None): + """Patch subprocess.run to serve fixed ps + inspect responses.""" + import subprocess + + def mock_run(cmd, **kwargs): + result = MagicMock() + if len(cmd) >= 2 and cmd[1] == "ps": + result.returncode = 0 + result.stdout = ps_output + result.stderr = "" + return result + if len(cmd) >= 2 and cmd[1] == "inspect": + if inspect_payload is None: + result.returncode = 1 + result.stdout = "" + result.stderr = "inspect failed" + return result + result.returncode = 0 + result.stdout = json.dumps(inspect_payload) + result.stderr = "" + return result + result.returncode = 1 + result.stdout = "" + result.stderr = "unexpected command" + return result + + monkeypatch.setattr(subprocess, "run", mock_run) + + +# ── LocalContainerBackend.list_running() ───────────────────────────────────── + + +def test_list_running_returns_containers(monkeypatch): + """list_running should enumerate containers via docker ps and batch-inspect them.""" + backend = _make_local_backend() + monkeypatch.setattr(backend, "_runtime", "docker") + + _mock_ps_and_inspect( + monkeypatch, + ps_output="deer-flow-sandbox-abc12345\ndeer-flow-sandbox-def67890\n", + inspect_payload=[ + _make_inspect_entry("deer-flow-sandbox-abc12345", "2026-04-08T01:22:50.000000000Z", "8081"), + _make_inspect_entry("deer-flow-sandbox-def67890", "2026-04-08T02:22:50.000000000Z", "8082"), + ], + ) + + infos = backend.list_running() + + assert len(infos) == 2 + ids = {info.sandbox_id for info in infos} + assert ids == {"abc12345", "def67890"} + urls = {info.sandbox_url for info in infos} + assert "http://localhost:8081" in urls + assert "http://localhost:8082" in urls + + +def test_list_running_empty_when_no_containers(monkeypatch): + """list_running should return empty list when docker ps returns nothing.""" + backend = _make_local_backend() + monkeypatch.setattr(backend, "_runtime", "docker") + _mock_ps_and_inspect(monkeypatch, ps_output="", inspect_payload=[]) + + assert backend.list_running() == [] + + +def test_list_running_skips_non_matching_names(monkeypatch): + """list_running should skip containers whose names don't match the prefix pattern.""" + backend = _make_local_backend() + monkeypatch.setattr(backend, "_runtime", "docker") + + _mock_ps_and_inspect( + monkeypatch, + ps_output="deer-flow-sandbox-abc12345\nsome-other-container\n", + inspect_payload=[ + _make_inspect_entry("deer-flow-sandbox-abc12345", "2026-04-08T01:22:50Z", "8081"), + ], + ) + + infos = backend.list_running() + assert len(infos) == 1 + assert infos[0].sandbox_id == "abc12345" + + +def test_list_running_includes_containers_without_port(monkeypatch): + """Containers without a port mapping should still be listed (with empty URL).""" + backend = _make_local_backend() + monkeypatch.setattr(backend, "_runtime", "docker") + + _mock_ps_and_inspect( + monkeypatch, + ps_output="deer-flow-sandbox-abc12345\n", + inspect_payload=[ + _make_inspect_entry("deer-flow-sandbox-abc12345", "2026-04-08T01:22:50Z", host_port=None), + ], + ) + + infos = backend.list_running() + assert len(infos) == 1 + assert infos[0].sandbox_id == "abc12345" + assert infos[0].sandbox_url == "" + + +def test_list_running_handles_docker_failure(monkeypatch): + """list_running should return empty list when docker ps fails.""" + backend = _make_local_backend() + monkeypatch.setattr(backend, "_runtime", "docker") + + import subprocess + + def mock_run(cmd, **kwargs): + result = MagicMock() + result.returncode = 1 + result.stdout = "" + result.stderr = "daemon not running" + return result + + monkeypatch.setattr(subprocess, "run", mock_run) + + assert backend.list_running() == [] + + +def test_list_running_handles_inspect_failure(monkeypatch): + """list_running should return empty list when batch inspect fails.""" + backend = _make_local_backend() + monkeypatch.setattr(backend, "_runtime", "docker") + + _mock_ps_and_inspect( + monkeypatch, + ps_output="deer-flow-sandbox-abc12345\n", + inspect_payload=None, # Signals inspect failure + ) + + assert backend.list_running() == [] + + +def test_list_running_handles_malformed_inspect_json(monkeypatch): + """list_running should return empty list when docker inspect emits invalid JSON.""" + backend = _make_local_backend() + monkeypatch.setattr(backend, "_runtime", "docker") + + import subprocess + + def mock_run(cmd, **kwargs): + result = MagicMock() + if len(cmd) >= 2 and cmd[1] == "ps": + result.returncode = 0 + result.stdout = "deer-flow-sandbox-abc12345\n" + result.stderr = "" + else: + result.returncode = 0 + result.stdout = "this is not json" + result.stderr = "" + return result + + monkeypatch.setattr(subprocess, "run", mock_run) + + assert backend.list_running() == [] + + +def test_list_running_uses_single_batch_inspect_call(monkeypatch): + """list_running should issue exactly ONE docker inspect call regardless of container count.""" + backend = _make_local_backend() + monkeypatch.setattr(backend, "_runtime", "docker") + + inspect_call_count = {"count": 0} + + import subprocess + + def mock_run(cmd, **kwargs): + result = MagicMock() + if len(cmd) >= 2 and cmd[1] == "ps": + result.returncode = 0 + result.stdout = "deer-flow-sandbox-a\ndeer-flow-sandbox-b\ndeer-flow-sandbox-c\n" + result.stderr = "" + return result + if len(cmd) >= 2 and cmd[1] == "inspect": + inspect_call_count["count"] += 1 + # Expect all three names passed in a single call + assert cmd[2:] == ["deer-flow-sandbox-a", "deer-flow-sandbox-b", "deer-flow-sandbox-c"] + result.returncode = 0 + result.stdout = json.dumps( + [ + _make_inspect_entry("deer-flow-sandbox-a", "2026-04-08T01:22:50Z", "8081"), + _make_inspect_entry("deer-flow-sandbox-b", "2026-04-08T01:22:50Z", "8082"), + _make_inspect_entry("deer-flow-sandbox-c", "2026-04-08T01:22:50Z", "8083"), + ] + ) + result.stderr = "" + return result + result.returncode = 1 + result.stdout = "" + return result + + monkeypatch.setattr(subprocess, "run", mock_run) + + infos = backend.list_running() + assert len(infos) == 3 + assert inspect_call_count["count"] == 1 # ← The core performance assertion + + +# ── _parse_docker_timestamp() ──────────────────────────────────────────────── + + +def test_parse_docker_timestamp_with_nanoseconds(): + """Should correctly parse Docker's ISO 8601 timestamp with nanoseconds.""" + from deerflow.community.aio_sandbox.local_backend import _parse_docker_timestamp + + ts = _parse_docker_timestamp("2026-04-08T01:22:50.123456789Z") + assert ts > 0 + expected = datetime(2026, 4, 8, 1, 22, 50, tzinfo=UTC).timestamp() + assert abs(ts - expected) < 1.0 + + +def test_parse_docker_timestamp_without_fractional_seconds(): + """Should parse plain ISO 8601 timestamps without fractional seconds.""" + from deerflow.community.aio_sandbox.local_backend import _parse_docker_timestamp + + ts = _parse_docker_timestamp("2026-04-08T01:22:50Z") + expected = datetime(2026, 4, 8, 1, 22, 50, tzinfo=UTC).timestamp() + assert abs(ts - expected) < 1.0 + + +def test_parse_docker_timestamp_empty_returns_zero(): + from deerflow.community.aio_sandbox.local_backend import _parse_docker_timestamp + + assert _parse_docker_timestamp("") == 0.0 + assert _parse_docker_timestamp("not a timestamp") == 0.0 + + +# ── _extract_host_port() ───────────────────────────────────────────────────── + + +def test_extract_host_port_returns_mapped_port(): + from deerflow.community.aio_sandbox.local_backend import _extract_host_port + + entry = {"NetworkSettings": {"Ports": {"8080/tcp": [{"HostIp": "0.0.0.0", "HostPort": "8081"}]}}} + assert _extract_host_port(entry, 8080) == 8081 + + +def test_extract_host_port_returns_none_when_unmapped(): + from deerflow.community.aio_sandbox.local_backend import _extract_host_port + + entry = {"NetworkSettings": {"Ports": {}}} + assert _extract_host_port(entry, 8080) is None + + +def test_extract_host_port_handles_missing_fields(): + from deerflow.community.aio_sandbox.local_backend import _extract_host_port + + assert _extract_host_port({}, 8080) is None + assert _extract_host_port({"NetworkSettings": None}, 8080) is None + + +# ── AioSandboxProvider._reconcile_orphans() ────────────────────────────────── + + +def _make_provider_for_reconciliation(): + """Build a minimal AioSandboxProvider without triggering __init__ side effects. + + WARNING: This helper intentionally bypasses ``__init__`` via ``__new__`` so + tests don't depend on Docker or touch the real idle-checker thread. The + downside is that this helper is tightly coupled to the set of attributes + set up in ``AioSandboxProvider.__init__``. If ``__init__`` gains a new + attribute that ``_reconcile_orphans`` (or other methods under test) reads, + this helper must be updated in lockstep — otherwise tests will fail with a + confusing ``AttributeError`` instead of a meaningful assertion failure. + """ + aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider") + provider = aio_mod.AioSandboxProvider.__new__(aio_mod.AioSandboxProvider) + provider._lock = threading.Lock() + provider._sandboxes = {} + provider._sandbox_infos = {} + provider._thread_sandboxes = {} + provider._thread_locks = {} + provider._last_activity = {} + provider._warm_pool = {} + provider._shutdown_called = False + provider._idle_checker_stop = threading.Event() + provider._idle_checker_thread = None + provider._config = { + "idle_timeout": 600, + "replicas": 3, + } + provider._backend = MagicMock() + return provider + + +def test_reconcile_adopts_old_containers_into_warm_pool(): + """All containers are adopted into warm pool regardless of age — idle checker handles cleanup.""" + provider = _make_provider_for_reconciliation() + now = time.time() + + old_info = SandboxInfo( + sandbox_id="old12345", + sandbox_url="http://localhost:8081", + container_name="deer-flow-sandbox-old12345", + created_at=now - 1200, # 20 minutes old, > 600s idle_timeout + ) + provider._backend.list_running.return_value = [old_info] + + provider._reconcile_orphans() + + # Should NOT destroy directly — let idle checker handle it + provider._backend.destroy.assert_not_called() + assert "old12345" in provider._warm_pool + + +def test_reconcile_adopts_young_containers(): + """Young containers are adopted into warm pool for potential reuse.""" + provider = _make_provider_for_reconciliation() + now = time.time() + + young_info = SandboxInfo( + sandbox_id="young123", + sandbox_url="http://localhost:8082", + container_name="deer-flow-sandbox-young123", + created_at=now - 60, # 1 minute old, < 600s idle_timeout + ) + provider._backend.list_running.return_value = [young_info] + + provider._reconcile_orphans() + + provider._backend.destroy.assert_not_called() + assert "young123" in provider._warm_pool + adopted_info, release_ts = provider._warm_pool["young123"] + assert adopted_info.sandbox_id == "young123" + + +def test_reconcile_mixed_containers_all_adopted(): + """All containers (old and young) are adopted into warm pool.""" + provider = _make_provider_for_reconciliation() + now = time.time() + + old_info = SandboxInfo( + sandbox_id="old_one", + sandbox_url="http://localhost:8081", + container_name="deer-flow-sandbox-old_one", + created_at=now - 1200, + ) + young_info = SandboxInfo( + sandbox_id="young_one", + sandbox_url="http://localhost:8082", + container_name="deer-flow-sandbox-young_one", + created_at=now - 60, + ) + provider._backend.list_running.return_value = [old_info, young_info] + + provider._reconcile_orphans() + + provider._backend.destroy.assert_not_called() + assert "old_one" in provider._warm_pool + assert "young_one" in provider._warm_pool + + +def test_reconcile_skips_already_tracked_containers(): + """Containers already in _sandboxes or _warm_pool should be skipped.""" + provider = _make_provider_for_reconciliation() + now = time.time() + + existing_info = SandboxInfo( + sandbox_id="existing1", + sandbox_url="http://localhost:8081", + container_name="deer-flow-sandbox-existing1", + created_at=now - 1200, + ) + # Pre-populate _sandboxes to simulate already-tracked container + provider._sandboxes["existing1"] = MagicMock() + provider._backend.list_running.return_value = [existing_info] + + provider._reconcile_orphans() + + provider._backend.destroy.assert_not_called() + # The pre-populated sandbox should NOT be moved into warm pool + assert "existing1" not in provider._warm_pool + + +def test_reconcile_handles_backend_failure(): + """Reconciliation should not crash if backend.list_running() fails.""" + provider = _make_provider_for_reconciliation() + provider._backend.list_running.side_effect = RuntimeError("docker not available") + + # Should not raise + provider._reconcile_orphans() + + assert provider._warm_pool == {} + + +def test_reconcile_no_running_containers(): + """Reconciliation with no running containers is a no-op.""" + provider = _make_provider_for_reconciliation() + provider._backend.list_running.return_value = [] + + provider._reconcile_orphans() + + provider._backend.destroy.assert_not_called() + assert provider._warm_pool == {} + + +def test_reconcile_multiple_containers_all_adopted(): + """Multiple containers should all be adopted into warm pool.""" + provider = _make_provider_for_reconciliation() + now = time.time() + + info1 = SandboxInfo(sandbox_id="cont_one", sandbox_url="http://localhost:8081", created_at=now - 1200) + info2 = SandboxInfo(sandbox_id="cont_two", sandbox_url="http://localhost:8082", created_at=now - 1200) + + provider._backend.list_running.return_value = [info1, info2] + + provider._reconcile_orphans() + + provider._backend.destroy.assert_not_called() + assert "cont_one" in provider._warm_pool + assert "cont_two" in provider._warm_pool + + +def test_reconcile_zero_created_at_adopted(): + """Containers with created_at=0 (unknown age) should still be adopted into warm pool.""" + provider = _make_provider_for_reconciliation() + + info = SandboxInfo(sandbox_id="unknown1", sandbox_url="http://localhost:8081", created_at=0.0) + provider._backend.list_running.return_value = [info] + + provider._reconcile_orphans() + + provider._backend.destroy.assert_not_called() + assert "unknown1" in provider._warm_pool + + +def test_reconcile_idle_timeout_zero_adopts_all(): + """When idle_timeout=0 (disabled), all containers are still adopted into warm pool.""" + provider = _make_provider_for_reconciliation() + provider._config["idle_timeout"] = 0 + now = time.time() + + old_info = SandboxInfo(sandbox_id="old_one", sandbox_url="http://localhost:8081", created_at=now - 7200) + young_info = SandboxInfo(sandbox_id="young_one", sandbox_url="http://localhost:8082", created_at=now - 60) + provider._backend.list_running.return_value = [old_info, young_info] + + provider._reconcile_orphans() + + provider._backend.destroy.assert_not_called() + assert "old_one" in provider._warm_pool + assert "young_one" in provider._warm_pool + + +# ── SIGHUP signal handler ─────────────────────────────────────────────────── + + +def test_sighup_handler_registered(): + """SIGHUP handler should be registered on Unix systems.""" + if not hasattr(signal, "SIGHUP"): + pytest.skip("SIGHUP not available on this platform") + + provider = _make_provider_for_reconciliation() + + # Save original handlers for ALL signals we'll modify + original_sighup = signal.getsignal(signal.SIGHUP) + original_sigterm = signal.getsignal(signal.SIGTERM) + original_sigint = signal.getsignal(signal.SIGINT) + try: + aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider") + provider._original_sighup = original_sighup + provider._original_sigterm = original_sigterm + provider._original_sigint = original_sigint + provider.shutdown = MagicMock() + + aio_mod.AioSandboxProvider._register_signal_handlers(provider) + + # Verify SIGHUP handler is no longer the default + handler = signal.getsignal(signal.SIGHUP) + assert handler != signal.SIG_DFL, "SIGHUP handler should be registered" + finally: + # Restore ALL original handlers to avoid leaking state across tests + signal.signal(signal.SIGHUP, original_sighup) + signal.signal(signal.SIGTERM, original_sigterm) + signal.signal(signal.SIGINT, original_sigint) diff --git a/backend/tests/test_sandbox_orphan_reconciliation_e2e.py b/backend/tests/test_sandbox_orphan_reconciliation_e2e.py new file mode 100644 index 000000000..07f11eddd --- /dev/null +++ b/backend/tests/test_sandbox_orphan_reconciliation_e2e.py @@ -0,0 +1,215 @@ +"""Docker-backed sandbox container lifecycle and cleanup tests. + +This test module requires Docker to be running. It exercises the container +backend behavior behind sandbox lifecycle management and verifies that test +containers are created, observed, and explicitly cleaned up correctly. + +The coverage here is limited to direct backend/container operations used by +the reconciliation flow. It does not simulate a process restart by creating +a new ``AioSandboxProvider`` instance or assert provider startup orphan +reconciliation end-to-end — that logic is covered by unit tests in +``test_sandbox_orphan_reconciliation.py``. + +Run with: PYTHONPATH=. uv run pytest tests/test_sandbox_orphan_reconciliation_e2e.py -v -s +Requires: Docker running locally +""" + +import subprocess +import time + +import pytest + + +def _docker_available() -> bool: + try: + result = subprocess.run(["docker", "info"], capture_output=True, timeout=5) + return result.returncode == 0 + except (FileNotFoundError, subprocess.TimeoutExpired): + return False + + +def _container_running(container_name: str) -> bool: + result = subprocess.run( + ["docker", "inspect", "-f", "{{.State.Running}}", container_name], + capture_output=True, + text=True, + timeout=5, + ) + return result.returncode == 0 and result.stdout.strip().lower() == "true" + + +def _stop_container(container_name: str) -> None: + subprocess.run(["docker", "stop", container_name], capture_output=True, timeout=15) + + +# Use a lightweight image for testing to avoid pulling the heavy sandbox image +E2E_TEST_IMAGE = "busybox:latest" +E2E_PREFIX = "deer-flow-sandbox-e2e-test" + + +@pytest.fixture(autouse=True) +def cleanup_test_containers(): + """Ensure all test containers are cleaned up after the test.""" + yield + # Cleanup: stop any remaining test containers + result = subprocess.run( + ["docker", "ps", "-a", "--filter", f"name={E2E_PREFIX}-", "--format", "{{.Names}}"], + capture_output=True, + text=True, + timeout=10, + ) + for name in result.stdout.strip().splitlines(): + name = name.strip() + if name: + subprocess.run(["docker", "rm", "-f", name], capture_output=True, timeout=10) + + +@pytest.mark.skipif(not _docker_available(), reason="Docker not available") +class TestOrphanReconciliationE2E: + """E2E tests for orphan container reconciliation.""" + + def test_orphan_container_destroyed_on_startup(self): + """Core issue scenario: container from a previous process is destroyed on new process init. + + Steps: + 1. Start a container manually (simulating previous process) + 2. Create a LocalContainerBackend with matching prefix + 3. Call list_running() → should find the container + 4. Simulate _reconcile_orphans() logic → container should be destroyed + """ + container_name = f"{E2E_PREFIX}-orphan01" + + # Step 1: Start a container (simulating previous process lifecycle) + result = subprocess.run( + ["docker", "run", "--rm", "-d", "--name", container_name, E2E_TEST_IMAGE, "sleep", "3600"], + capture_output=True, + text=True, + timeout=30, + ) + assert result.returncode == 0, f"Failed to start test container: {result.stderr}" + + try: + assert _container_running(container_name), "Test container should be running" + + # Step 2: Create backend and list running containers + from deerflow.community.aio_sandbox.local_backend import LocalContainerBackend + + backend = LocalContainerBackend( + image=E2E_TEST_IMAGE, + base_port=9990, + container_prefix=E2E_PREFIX, + config_mounts=[], + environment={}, + ) + + # Step 3: list_running should find our container + running = backend.list_running() + found_ids = {info.sandbox_id for info in running} + assert "orphan01" in found_ids, f"Should find orphan01, got: {found_ids}" + + # Step 4: Simulate reconciliation — this container's created_at is recent, + # so with a very short idle_timeout it would be destroyed + orphan_info = next(info for info in running if info.sandbox_id == "orphan01") + assert orphan_info.created_at > 0, "created_at should be parsed from docker inspect" + + # Destroy it (simulating what _reconcile_orphans does for old containers) + backend.destroy(orphan_info) + + # Give Docker a moment to stop the container + time.sleep(1) + + # Verify container is gone + assert not _container_running(container_name), "Orphan container should be stopped after destroy" + + finally: + # Safety cleanup + _stop_container(container_name) + + def test_multiple_orphans_all_cleaned(self): + """Multiple orphaned containers are all found and can be cleaned up.""" + containers = [] + try: + # Start 3 containers + for i in range(3): + name = f"{E2E_PREFIX}-multi{i:02d}" + result = subprocess.run( + ["docker", "run", "--rm", "-d", "--name", name, E2E_TEST_IMAGE, "sleep", "3600"], + capture_output=True, + text=True, + timeout=30, + ) + assert result.returncode == 0, f"Failed to start {name}: {result.stderr}" + containers.append(name) + + from deerflow.community.aio_sandbox.local_backend import LocalContainerBackend + + backend = LocalContainerBackend( + image=E2E_TEST_IMAGE, + base_port=9990, + container_prefix=E2E_PREFIX, + config_mounts=[], + environment={}, + ) + + running = backend.list_running() + found_ids = {info.sandbox_id for info in running} + + assert "multi00" in found_ids + assert "multi01" in found_ids + assert "multi02" in found_ids + + # Destroy all + for info in running: + backend.destroy(info) + + time.sleep(1) + + # Verify all gone + for name in containers: + assert not _container_running(name), f"{name} should be stopped" + + finally: + for name in containers: + _stop_container(name) + + def test_list_running_ignores_unrelated_containers(self): + """Containers with different prefixes should not be listed.""" + unrelated_name = "unrelated-test-container" + our_name = f"{E2E_PREFIX}-ours001" + + try: + # Start an unrelated container + subprocess.run( + ["docker", "run", "--rm", "-d", "--name", unrelated_name, E2E_TEST_IMAGE, "sleep", "3600"], + capture_output=True, + timeout=30, + ) + # Start our container + subprocess.run( + ["docker", "run", "--rm", "-d", "--name", our_name, E2E_TEST_IMAGE, "sleep", "3600"], + capture_output=True, + timeout=30, + ) + + from deerflow.community.aio_sandbox.local_backend import LocalContainerBackend + + backend = LocalContainerBackend( + image=E2E_TEST_IMAGE, + base_port=9990, + container_prefix=E2E_PREFIX, + config_mounts=[], + environment={}, + ) + + running = backend.list_running() + found_ids = {info.sandbox_id for info in running} + + # Should find ours but not unrelated + assert "ours001" in found_ids + # "unrelated-test-container" doesn't match "deer-flow-sandbox-e2e-test-" prefix + for info in running: + assert not info.sandbox_id.startswith("unrelated") + + finally: + _stop_container(unrelated_name) + _stop_container(our_name)