fix(sandbox): add startup reconciliation to prevent orphaned container leaks (#1976)

* fix(sandbox): add startup reconciliation to prevent orphaned container leaks

Sandbox containers were never cleaned up when the managing process restarted,
because all lifecycle tracking lived in in-memory dictionaries. This adds
startup reconciliation that enumerates running containers via `docker ps` and
either destroys orphans (age > idle_timeout) or adopts them into the warm pool.

Closes #1972

* fix(sandbox): address Copilot review — adopt-all strategy, improved error handling

- Reconciliation now adopts all containers into warm pool unconditionally,
  letting the idle checker decide cleanup. Avoids destroying containers
  that another concurrent process may still be using.
- list_running() logs stderr on docker ps failure and catches
  FileNotFoundError/OSError.
- Signal handler test restores SIGTERM/SIGINT in addition to SIGHUP.
- E2E test docstring corrected to match actual coverage scope.

* fix(sandbox): address maintainer review — batch inspect, lock tightening, import hygiene

- _reconcile_orphans(): merge check-and-insert into a single lock acquisition
  per container to eliminate the TOCTOU window.
- list_running(): batch the per-container docker inspect into a single call.
  Total subprocess calls drop from 2N+1 to 2 (one ps + one batch inspect).
  Parse port and created_at from the inspect JSON payload.
- Extract _parse_docker_timestamp() and _extract_host_port() as module-level
  pure helpers and test them directly.
- Move datetime/json imports to module top level.
- _make_provider_for_reconciliation(): document the __new__ bypass and the
  lockstep coupling to AioSandboxProvider.__init__.
- Add assertion that list_running() makes exactly ONE inspect call.
This commit is contained in:
Xinmin Zeng 2026-04-09 17:21:23 +08:00 committed by GitHub
parent 140907ce1d
commit 0b6fa8b9e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 1020 additions and 4 deletions

View File

@ -112,6 +112,9 @@ class AioSandboxProvider(SandboxProvider):
atexit.register(self.shutdown)
self._register_signal_handlers()
# Reconcile orphaned containers from previous process lifecycles
self._reconcile_orphans()
# Start idle checker if enabled
if self._config.get("idle_timeout", DEFAULT_IDLE_TIMEOUT) > 0:
self._start_idle_checker()
@ -175,6 +178,51 @@ class AioSandboxProvider(SandboxProvider):
resolved[key] = str(value)
return resolved
# ── Startup reconciliation ────────────────────────────────────────────
def _reconcile_orphans(self) -> None:
"""Reconcile orphaned containers left by previous process lifecycles.
On startup, enumerate all running containers matching our prefix
and adopt them all into the warm pool. The idle checker will reclaim
containers that nobody re-acquires within ``idle_timeout``.
All containers are adopted unconditionally because we cannot
distinguish "orphaned" from "actively used by another process"
based on age alone ``idle_timeout`` represents inactivity, not
uptime. Adopting into the warm pool and letting the idle checker
decide avoids destroying containers that a concurrent process may
still be using.
This closes the fundamental gap where in-memory state loss (process
restart, crash, SIGKILL) leaves Docker containers running forever.
"""
try:
running = self._backend.list_running()
except Exception as e:
logger.warning(f"Failed to enumerate running containers during startup reconciliation: {e}")
return
if not running:
return
current_time = time.time()
adopted = 0
for info in running:
age = current_time - info.created_at if info.created_at > 0 else float("inf")
# Single lock acquisition per container: atomic check-and-insert.
# Avoids a TOCTOU window between the "already tracked?" check and
# the warm-pool insert.
with self._lock:
if info.sandbox_id in self._sandboxes or info.sandbox_id in self._warm_pool:
continue
self._warm_pool[info.sandbox_id] = (info, current_time)
adopted += 1
logger.info(f"Adopted container {info.sandbox_id} into warm pool (age: {age:.0f}s)")
logger.info(f"Startup reconciliation complete: {adopted} adopted into warm pool, {len(running)} total found")
# ── Deterministic ID ─────────────────────────────────────────────────
@staticmethod
@ -316,13 +364,23 @@ class AioSandboxProvider(SandboxProvider):
# ── Signal handling ──────────────────────────────────────────────────
def _register_signal_handlers(self) -> None:
"""Register signal handlers for graceful shutdown."""
"""Register signal handlers for graceful shutdown.
Handles SIGTERM, SIGINT, and SIGHUP (terminal close) to ensure
sandbox containers are cleaned up even when the user closes the terminal.
"""
self._original_sigterm = signal.getsignal(signal.SIGTERM)
self._original_sigint = signal.getsignal(signal.SIGINT)
self._original_sighup = signal.getsignal(signal.SIGHUP) if hasattr(signal, "SIGHUP") else None
def signal_handler(signum, frame):
self.shutdown()
original = self._original_sigterm if signum == signal.SIGTERM else self._original_sigint
if signum == signal.SIGTERM:
original = self._original_sigterm
elif hasattr(signal, "SIGHUP") and signum == signal.SIGHUP:
original = self._original_sighup
else:
original = self._original_sigint
if callable(original):
original(signum, frame)
elif original == signal.SIG_DFL:
@ -332,6 +390,8 @@ class AioSandboxProvider(SandboxProvider):
try:
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, signal_handler)
except ValueError:
logger.debug("Could not register signal handlers (not main thread)")

View File

@ -96,3 +96,19 @@ class SandboxBackend(ABC):
SandboxInfo if found and healthy, None otherwise.
"""
...
def list_running(self) -> list[SandboxInfo]:
"""Enumerate all running sandboxes managed by this backend.
Used for startup reconciliation: when the process restarts, it needs
to discover containers started by previous processes so they can be
adopted into the warm pool or destroyed if idle too long.
The default implementation returns an empty list, which is correct
for backends that don't manage local containers (e.g., RemoteSandboxBackend
delegates lifecycle to the provisioner which handles its own cleanup).
Returns:
A list of SandboxInfo for all currently running sandboxes.
"""
return []

View File

@ -6,9 +6,11 @@ Handles container lifecycle, port allocation, and cross-process container discov
from __future__ import annotations
import json
import logging
import os
import subprocess
from datetime import datetime
from deerflow.utils.network import get_free_port, release_port
@ -18,6 +20,52 @@ from .sandbox_info import SandboxInfo
logger = logging.getLogger(__name__)
def _parse_docker_timestamp(raw: str) -> float:
"""Parse Docker's ISO 8601 timestamp into a Unix epoch float.
Docker returns timestamps with nanosecond precision and a trailing ``Z``
(e.g. ``2026-04-08T01:22:50.123456789Z``). Python's ``fromisoformat``
accepts at most microseconds and (pre-3.11) does not accept ``Z``, so the
string is normalized before parsing. Returns ``0.0`` on empty input or
parse failure so callers can use ``0.0`` as a sentinel for "unknown age".
"""
if not raw:
return 0.0
try:
s = raw.strip()
if "." in s:
dot_pos = s.index(".")
tz_start = dot_pos + 1
while tz_start < len(s) and s[tz_start].isdigit():
tz_start += 1
frac = s[dot_pos + 1 : tz_start][:6] # truncate to microseconds
tz_suffix = s[tz_start:]
s = s[: dot_pos + 1] + frac + tz_suffix
if s.endswith("Z"):
s = s[:-1] + "+00:00"
return datetime.fromisoformat(s).timestamp()
except (ValueError, TypeError) as e:
logger.debug(f"Could not parse docker timestamp {raw!r}: {e}")
return 0.0
def _extract_host_port(inspect_entry: dict, container_port: int) -> int | None:
"""Extract the host port mapped to ``container_port/tcp`` from a docker inspect entry.
Returns None if the container has no port mapping for that port.
"""
try:
ports = (inspect_entry.get("NetworkSettings") or {}).get("Ports") or {}
bindings = ports.get(f"{container_port}/tcp") or []
if bindings:
host_port = bindings[0].get("HostPort")
if host_port:
return int(host_port)
except (ValueError, TypeError, AttributeError):
pass
return None
def _format_container_mount(runtime: str, host_path: str, container_path: str, read_only: bool) -> list[str]:
"""Format a bind-mount argument for the selected runtime.
@ -172,8 +220,12 @@ class LocalContainerBackend(SandboxBackend):
def destroy(self, info: SandboxInfo) -> None:
"""Stop the container and release its port."""
if info.container_id:
self._stop_container(info.container_id)
# Prefer container_id, fall back to container_name (both accepted by docker stop).
# This ensures containers discovered via list_running() (which only has the name)
# can also be stopped.
stop_target = info.container_id or info.container_name
if stop_target:
self._stop_container(stop_target)
# Extract port from sandbox_url for release
try:
from urllib.parse import urlparse
@ -222,6 +274,129 @@ class LocalContainerBackend(SandboxBackend):
container_name=container_name,
)
def list_running(self) -> list[SandboxInfo]:
"""Enumerate all running containers matching the configured prefix.
Uses a single ``docker ps`` call to list container names, then a
single batched ``docker inspect`` call to retrieve creation timestamp
and port mapping for all containers at once. Total subprocess calls:
2 (down from 2N+1 in the naive per-container approach).
Note: Docker's ``--filter name=`` performs *substring* matching,
so a secondary ``startswith`` check is applied to ensure only
containers with the exact prefix are included.
Containers without port mappings are still included (with empty
sandbox_url) so that startup reconciliation can adopt orphans
regardless of their port state.
"""
# Step 1: enumerate container names via docker ps
try:
result = subprocess.run(
[
self._runtime,
"ps",
"--filter",
f"name={self._container_prefix}-",
"--format",
"{{.Names}}",
],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode != 0:
stderr = (result.stderr or "").strip()
logger.warning(
"Failed to list running containers with %s ps (returncode=%s, stderr=%s)",
self._runtime,
result.returncode,
stderr or "<empty>",
)
return []
if not result.stdout.strip():
return []
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError, OSError) as e:
logger.warning(f"Failed to list running containers: {e}")
return []
# Filter to names matching our exact prefix (docker filter is substring-based)
container_names = [name.strip() for name in result.stdout.strip().splitlines() if name.strip().startswith(self._container_prefix + "-")]
if not container_names:
return []
# Step 2: batched docker inspect — single subprocess call for all containers
inspections = self._batch_inspect(container_names)
infos: list[SandboxInfo] = []
sandbox_host = os.environ.get("DEER_FLOW_SANDBOX_HOST", "localhost")
for container_name in container_names:
data = inspections.get(container_name)
if data is None:
# Container disappeared between ps and inspect, or inspect failed
continue
created_at, host_port = data
sandbox_id = container_name[len(self._container_prefix) + 1 :]
sandbox_url = f"http://{sandbox_host}:{host_port}" if host_port else ""
infos.append(
SandboxInfo(
sandbox_id=sandbox_id,
sandbox_url=sandbox_url,
container_name=container_name,
created_at=created_at,
)
)
logger.info(f"Found {len(infos)} running sandbox container(s)")
return infos
def _batch_inspect(self, container_names: list[str]) -> dict[str, tuple[float, int | None]]:
"""Batch-inspect containers in a single subprocess call.
Returns a mapping of ``container_name -> (created_at, host_port)``.
Missing containers or parse failures are silently dropped from the result.
"""
if not container_names:
return {}
try:
result = subprocess.run(
[self._runtime, "inspect", *container_names],
capture_output=True,
text=True,
timeout=15,
)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError, OSError) as e:
logger.warning(f"Failed to batch-inspect containers: {e}")
return {}
if result.returncode != 0:
stderr = (result.stderr or "").strip()
logger.warning(
"Failed to batch-inspect containers with %s inspect (returncode=%s, stderr=%s)",
self._runtime,
result.returncode,
stderr or "<empty>",
)
return {}
try:
payload = json.loads(result.stdout or "[]")
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse docker inspect output as JSON: {e}")
return {}
out: dict[str, tuple[float, int | None]] = {}
for entry in payload:
# ``Name`` is prefixed with ``/`` in the docker inspect response
name = (entry.get("Name") or "").lstrip("/")
if not name:
continue
created_at = _parse_docker_timestamp(entry.get("Created", ""))
host_port = _extract_host_port(entry, 8080)
out[name] = (created_at, host_port)
return out
# ── Container operations ─────────────────────────────────────────────
def _start_container(

View File

@ -0,0 +1,550 @@
"""Tests for sandbox container orphan reconciliation on startup.
Covers:
- SandboxBackend.list_running() default behavior
- LocalContainerBackend.list_running() with mocked docker commands
- _parse_docker_timestamp() / _extract_host_port() helpers
- AioSandboxProvider._reconcile_orphans() decision logic
- SIGHUP signal handler registration
"""
import importlib
import json
import signal
import threading
import time
from datetime import UTC, datetime
from unittest.mock import MagicMock
import pytest
from deerflow.community.aio_sandbox.sandbox_info import SandboxInfo
# ── SandboxBackend.list_running() default ────────────────────────────────────
def test_backend_list_running_default_returns_empty():
"""Base SandboxBackend.list_running() returns empty list (backward compat for RemoteSandboxBackend)."""
from deerflow.community.aio_sandbox.backend import SandboxBackend
class StubBackend(SandboxBackend):
def create(self, thread_id, sandbox_id, extra_mounts=None):
pass
def destroy(self, info):
pass
def is_alive(self, info):
return False
def discover(self, sandbox_id):
return None
backend = StubBackend()
assert backend.list_running() == []
# ── Helpers ──────────────────────────────────────────────────────────────────
def _make_local_backend():
"""Create a LocalContainerBackend with minimal config."""
from deerflow.community.aio_sandbox.local_backend import LocalContainerBackend
return LocalContainerBackend(
image="test-image:latest",
base_port=8080,
container_prefix="deer-flow-sandbox",
config_mounts=[],
environment={},
)
def _make_inspect_entry(name: str, created: str, host_port: str | None = None) -> dict:
"""Build a minimal docker inspect JSON entry matching the real schema."""
ports: dict = {}
if host_port is not None:
ports["8080/tcp"] = [{"HostIp": "0.0.0.0", "HostPort": host_port}]
return {
"Name": f"/{name}", # docker inspect prefixes names with "/"
"Created": created,
"NetworkSettings": {"Ports": ports},
}
def _mock_ps_and_inspect(monkeypatch, ps_output: str, inspect_payload: list | None):
"""Patch subprocess.run to serve fixed ps + inspect responses."""
import subprocess
def mock_run(cmd, **kwargs):
result = MagicMock()
if len(cmd) >= 2 and cmd[1] == "ps":
result.returncode = 0
result.stdout = ps_output
result.stderr = ""
return result
if len(cmd) >= 2 and cmd[1] == "inspect":
if inspect_payload is None:
result.returncode = 1
result.stdout = ""
result.stderr = "inspect failed"
return result
result.returncode = 0
result.stdout = json.dumps(inspect_payload)
result.stderr = ""
return result
result.returncode = 1
result.stdout = ""
result.stderr = "unexpected command"
return result
monkeypatch.setattr(subprocess, "run", mock_run)
# ── LocalContainerBackend.list_running() ─────────────────────────────────────
def test_list_running_returns_containers(monkeypatch):
"""list_running should enumerate containers via docker ps and batch-inspect them."""
backend = _make_local_backend()
monkeypatch.setattr(backend, "_runtime", "docker")
_mock_ps_and_inspect(
monkeypatch,
ps_output="deer-flow-sandbox-abc12345\ndeer-flow-sandbox-def67890\n",
inspect_payload=[
_make_inspect_entry("deer-flow-sandbox-abc12345", "2026-04-08T01:22:50.000000000Z", "8081"),
_make_inspect_entry("deer-flow-sandbox-def67890", "2026-04-08T02:22:50.000000000Z", "8082"),
],
)
infos = backend.list_running()
assert len(infos) == 2
ids = {info.sandbox_id for info in infos}
assert ids == {"abc12345", "def67890"}
urls = {info.sandbox_url for info in infos}
assert "http://localhost:8081" in urls
assert "http://localhost:8082" in urls
def test_list_running_empty_when_no_containers(monkeypatch):
"""list_running should return empty list when docker ps returns nothing."""
backend = _make_local_backend()
monkeypatch.setattr(backend, "_runtime", "docker")
_mock_ps_and_inspect(monkeypatch, ps_output="", inspect_payload=[])
assert backend.list_running() == []
def test_list_running_skips_non_matching_names(monkeypatch):
"""list_running should skip containers whose names don't match the prefix pattern."""
backend = _make_local_backend()
monkeypatch.setattr(backend, "_runtime", "docker")
_mock_ps_and_inspect(
monkeypatch,
ps_output="deer-flow-sandbox-abc12345\nsome-other-container\n",
inspect_payload=[
_make_inspect_entry("deer-flow-sandbox-abc12345", "2026-04-08T01:22:50Z", "8081"),
],
)
infos = backend.list_running()
assert len(infos) == 1
assert infos[0].sandbox_id == "abc12345"
def test_list_running_includes_containers_without_port(monkeypatch):
"""Containers without a port mapping should still be listed (with empty URL)."""
backend = _make_local_backend()
monkeypatch.setattr(backend, "_runtime", "docker")
_mock_ps_and_inspect(
monkeypatch,
ps_output="deer-flow-sandbox-abc12345\n",
inspect_payload=[
_make_inspect_entry("deer-flow-sandbox-abc12345", "2026-04-08T01:22:50Z", host_port=None),
],
)
infos = backend.list_running()
assert len(infos) == 1
assert infos[0].sandbox_id == "abc12345"
assert infos[0].sandbox_url == ""
def test_list_running_handles_docker_failure(monkeypatch):
"""list_running should return empty list when docker ps fails."""
backend = _make_local_backend()
monkeypatch.setattr(backend, "_runtime", "docker")
import subprocess
def mock_run(cmd, **kwargs):
result = MagicMock()
result.returncode = 1
result.stdout = ""
result.stderr = "daemon not running"
return result
monkeypatch.setattr(subprocess, "run", mock_run)
assert backend.list_running() == []
def test_list_running_handles_inspect_failure(monkeypatch):
"""list_running should return empty list when batch inspect fails."""
backend = _make_local_backend()
monkeypatch.setattr(backend, "_runtime", "docker")
_mock_ps_and_inspect(
monkeypatch,
ps_output="deer-flow-sandbox-abc12345\n",
inspect_payload=None, # Signals inspect failure
)
assert backend.list_running() == []
def test_list_running_handles_malformed_inspect_json(monkeypatch):
"""list_running should return empty list when docker inspect emits invalid JSON."""
backend = _make_local_backend()
monkeypatch.setattr(backend, "_runtime", "docker")
import subprocess
def mock_run(cmd, **kwargs):
result = MagicMock()
if len(cmd) >= 2 and cmd[1] == "ps":
result.returncode = 0
result.stdout = "deer-flow-sandbox-abc12345\n"
result.stderr = ""
else:
result.returncode = 0
result.stdout = "this is not json"
result.stderr = ""
return result
monkeypatch.setattr(subprocess, "run", mock_run)
assert backend.list_running() == []
def test_list_running_uses_single_batch_inspect_call(monkeypatch):
"""list_running should issue exactly ONE docker inspect call regardless of container count."""
backend = _make_local_backend()
monkeypatch.setattr(backend, "_runtime", "docker")
inspect_call_count = {"count": 0}
import subprocess
def mock_run(cmd, **kwargs):
result = MagicMock()
if len(cmd) >= 2 and cmd[1] == "ps":
result.returncode = 0
result.stdout = "deer-flow-sandbox-a\ndeer-flow-sandbox-b\ndeer-flow-sandbox-c\n"
result.stderr = ""
return result
if len(cmd) >= 2 and cmd[1] == "inspect":
inspect_call_count["count"] += 1
# Expect all three names passed in a single call
assert cmd[2:] == ["deer-flow-sandbox-a", "deer-flow-sandbox-b", "deer-flow-sandbox-c"]
result.returncode = 0
result.stdout = json.dumps(
[
_make_inspect_entry("deer-flow-sandbox-a", "2026-04-08T01:22:50Z", "8081"),
_make_inspect_entry("deer-flow-sandbox-b", "2026-04-08T01:22:50Z", "8082"),
_make_inspect_entry("deer-flow-sandbox-c", "2026-04-08T01:22:50Z", "8083"),
]
)
result.stderr = ""
return result
result.returncode = 1
result.stdout = ""
return result
monkeypatch.setattr(subprocess, "run", mock_run)
infos = backend.list_running()
assert len(infos) == 3
assert inspect_call_count["count"] == 1 # ← The core performance assertion
# ── _parse_docker_timestamp() ────────────────────────────────────────────────
def test_parse_docker_timestamp_with_nanoseconds():
"""Should correctly parse Docker's ISO 8601 timestamp with nanoseconds."""
from deerflow.community.aio_sandbox.local_backend import _parse_docker_timestamp
ts = _parse_docker_timestamp("2026-04-08T01:22:50.123456789Z")
assert ts > 0
expected = datetime(2026, 4, 8, 1, 22, 50, tzinfo=UTC).timestamp()
assert abs(ts - expected) < 1.0
def test_parse_docker_timestamp_without_fractional_seconds():
"""Should parse plain ISO 8601 timestamps without fractional seconds."""
from deerflow.community.aio_sandbox.local_backend import _parse_docker_timestamp
ts = _parse_docker_timestamp("2026-04-08T01:22:50Z")
expected = datetime(2026, 4, 8, 1, 22, 50, tzinfo=UTC).timestamp()
assert abs(ts - expected) < 1.0
def test_parse_docker_timestamp_empty_returns_zero():
from deerflow.community.aio_sandbox.local_backend import _parse_docker_timestamp
assert _parse_docker_timestamp("") == 0.0
assert _parse_docker_timestamp("not a timestamp") == 0.0
# ── _extract_host_port() ─────────────────────────────────────────────────────
def test_extract_host_port_returns_mapped_port():
from deerflow.community.aio_sandbox.local_backend import _extract_host_port
entry = {"NetworkSettings": {"Ports": {"8080/tcp": [{"HostIp": "0.0.0.0", "HostPort": "8081"}]}}}
assert _extract_host_port(entry, 8080) == 8081
def test_extract_host_port_returns_none_when_unmapped():
from deerflow.community.aio_sandbox.local_backend import _extract_host_port
entry = {"NetworkSettings": {"Ports": {}}}
assert _extract_host_port(entry, 8080) is None
def test_extract_host_port_handles_missing_fields():
from deerflow.community.aio_sandbox.local_backend import _extract_host_port
assert _extract_host_port({}, 8080) is None
assert _extract_host_port({"NetworkSettings": None}, 8080) is None
# ── AioSandboxProvider._reconcile_orphans() ──────────────────────────────────
def _make_provider_for_reconciliation():
"""Build a minimal AioSandboxProvider without triggering __init__ side effects.
WARNING: This helper intentionally bypasses ``__init__`` via ``__new__`` so
tests don't depend on Docker or touch the real idle-checker thread. The
downside is that this helper is tightly coupled to the set of attributes
set up in ``AioSandboxProvider.__init__``. If ``__init__`` gains a new
attribute that ``_reconcile_orphans`` (or other methods under test) reads,
this helper must be updated in lockstep otherwise tests will fail with a
confusing ``AttributeError`` instead of a meaningful assertion failure.
"""
aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider")
provider = aio_mod.AioSandboxProvider.__new__(aio_mod.AioSandboxProvider)
provider._lock = threading.Lock()
provider._sandboxes = {}
provider._sandbox_infos = {}
provider._thread_sandboxes = {}
provider._thread_locks = {}
provider._last_activity = {}
provider._warm_pool = {}
provider._shutdown_called = False
provider._idle_checker_stop = threading.Event()
provider._idle_checker_thread = None
provider._config = {
"idle_timeout": 600,
"replicas": 3,
}
provider._backend = MagicMock()
return provider
def test_reconcile_adopts_old_containers_into_warm_pool():
"""All containers are adopted into warm pool regardless of age — idle checker handles cleanup."""
provider = _make_provider_for_reconciliation()
now = time.time()
old_info = SandboxInfo(
sandbox_id="old12345",
sandbox_url="http://localhost:8081",
container_name="deer-flow-sandbox-old12345",
created_at=now - 1200, # 20 minutes old, > 600s idle_timeout
)
provider._backend.list_running.return_value = [old_info]
provider._reconcile_orphans()
# Should NOT destroy directly — let idle checker handle it
provider._backend.destroy.assert_not_called()
assert "old12345" in provider._warm_pool
def test_reconcile_adopts_young_containers():
"""Young containers are adopted into warm pool for potential reuse."""
provider = _make_provider_for_reconciliation()
now = time.time()
young_info = SandboxInfo(
sandbox_id="young123",
sandbox_url="http://localhost:8082",
container_name="deer-flow-sandbox-young123",
created_at=now - 60, # 1 minute old, < 600s idle_timeout
)
provider._backend.list_running.return_value = [young_info]
provider._reconcile_orphans()
provider._backend.destroy.assert_not_called()
assert "young123" in provider._warm_pool
adopted_info, release_ts = provider._warm_pool["young123"]
assert adopted_info.sandbox_id == "young123"
def test_reconcile_mixed_containers_all_adopted():
"""All containers (old and young) are adopted into warm pool."""
provider = _make_provider_for_reconciliation()
now = time.time()
old_info = SandboxInfo(
sandbox_id="old_one",
sandbox_url="http://localhost:8081",
container_name="deer-flow-sandbox-old_one",
created_at=now - 1200,
)
young_info = SandboxInfo(
sandbox_id="young_one",
sandbox_url="http://localhost:8082",
container_name="deer-flow-sandbox-young_one",
created_at=now - 60,
)
provider._backend.list_running.return_value = [old_info, young_info]
provider._reconcile_orphans()
provider._backend.destroy.assert_not_called()
assert "old_one" in provider._warm_pool
assert "young_one" in provider._warm_pool
def test_reconcile_skips_already_tracked_containers():
"""Containers already in _sandboxes or _warm_pool should be skipped."""
provider = _make_provider_for_reconciliation()
now = time.time()
existing_info = SandboxInfo(
sandbox_id="existing1",
sandbox_url="http://localhost:8081",
container_name="deer-flow-sandbox-existing1",
created_at=now - 1200,
)
# Pre-populate _sandboxes to simulate already-tracked container
provider._sandboxes["existing1"] = MagicMock()
provider._backend.list_running.return_value = [existing_info]
provider._reconcile_orphans()
provider._backend.destroy.assert_not_called()
# The pre-populated sandbox should NOT be moved into warm pool
assert "existing1" not in provider._warm_pool
def test_reconcile_handles_backend_failure():
"""Reconciliation should not crash if backend.list_running() fails."""
provider = _make_provider_for_reconciliation()
provider._backend.list_running.side_effect = RuntimeError("docker not available")
# Should not raise
provider._reconcile_orphans()
assert provider._warm_pool == {}
def test_reconcile_no_running_containers():
"""Reconciliation with no running containers is a no-op."""
provider = _make_provider_for_reconciliation()
provider._backend.list_running.return_value = []
provider._reconcile_orphans()
provider._backend.destroy.assert_not_called()
assert provider._warm_pool == {}
def test_reconcile_multiple_containers_all_adopted():
"""Multiple containers should all be adopted into warm pool."""
provider = _make_provider_for_reconciliation()
now = time.time()
info1 = SandboxInfo(sandbox_id="cont_one", sandbox_url="http://localhost:8081", created_at=now - 1200)
info2 = SandboxInfo(sandbox_id="cont_two", sandbox_url="http://localhost:8082", created_at=now - 1200)
provider._backend.list_running.return_value = [info1, info2]
provider._reconcile_orphans()
provider._backend.destroy.assert_not_called()
assert "cont_one" in provider._warm_pool
assert "cont_two" in provider._warm_pool
def test_reconcile_zero_created_at_adopted():
"""Containers with created_at=0 (unknown age) should still be adopted into warm pool."""
provider = _make_provider_for_reconciliation()
info = SandboxInfo(sandbox_id="unknown1", sandbox_url="http://localhost:8081", created_at=0.0)
provider._backend.list_running.return_value = [info]
provider._reconcile_orphans()
provider._backend.destroy.assert_not_called()
assert "unknown1" in provider._warm_pool
def test_reconcile_idle_timeout_zero_adopts_all():
"""When idle_timeout=0 (disabled), all containers are still adopted into warm pool."""
provider = _make_provider_for_reconciliation()
provider._config["idle_timeout"] = 0
now = time.time()
old_info = SandboxInfo(sandbox_id="old_one", sandbox_url="http://localhost:8081", created_at=now - 7200)
young_info = SandboxInfo(sandbox_id="young_one", sandbox_url="http://localhost:8082", created_at=now - 60)
provider._backend.list_running.return_value = [old_info, young_info]
provider._reconcile_orphans()
provider._backend.destroy.assert_not_called()
assert "old_one" in provider._warm_pool
assert "young_one" in provider._warm_pool
# ── SIGHUP signal handler ───────────────────────────────────────────────────
def test_sighup_handler_registered():
"""SIGHUP handler should be registered on Unix systems."""
if not hasattr(signal, "SIGHUP"):
pytest.skip("SIGHUP not available on this platform")
provider = _make_provider_for_reconciliation()
# Save original handlers for ALL signals we'll modify
original_sighup = signal.getsignal(signal.SIGHUP)
original_sigterm = signal.getsignal(signal.SIGTERM)
original_sigint = signal.getsignal(signal.SIGINT)
try:
aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider")
provider._original_sighup = original_sighup
provider._original_sigterm = original_sigterm
provider._original_sigint = original_sigint
provider.shutdown = MagicMock()
aio_mod.AioSandboxProvider._register_signal_handlers(provider)
# Verify SIGHUP handler is no longer the default
handler = signal.getsignal(signal.SIGHUP)
assert handler != signal.SIG_DFL, "SIGHUP handler should be registered"
finally:
# Restore ALL original handlers to avoid leaking state across tests
signal.signal(signal.SIGHUP, original_sighup)
signal.signal(signal.SIGTERM, original_sigterm)
signal.signal(signal.SIGINT, original_sigint)

View File

@ -0,0 +1,215 @@
"""Docker-backed sandbox container lifecycle and cleanup tests.
This test module requires Docker to be running. It exercises the container
backend behavior behind sandbox lifecycle management and verifies that test
containers are created, observed, and explicitly cleaned up correctly.
The coverage here is limited to direct backend/container operations used by
the reconciliation flow. It does not simulate a process restart by creating
a new ``AioSandboxProvider`` instance or assert provider startup orphan
reconciliation end-to-end that logic is covered by unit tests in
``test_sandbox_orphan_reconciliation.py``.
Run with: PYTHONPATH=. uv run pytest tests/test_sandbox_orphan_reconciliation_e2e.py -v -s
Requires: Docker running locally
"""
import subprocess
import time
import pytest
def _docker_available() -> bool:
try:
result = subprocess.run(["docker", "info"], capture_output=True, timeout=5)
return result.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired):
return False
def _container_running(container_name: str) -> bool:
result = subprocess.run(
["docker", "inspect", "-f", "{{.State.Running}}", container_name],
capture_output=True,
text=True,
timeout=5,
)
return result.returncode == 0 and result.stdout.strip().lower() == "true"
def _stop_container(container_name: str) -> None:
subprocess.run(["docker", "stop", container_name], capture_output=True, timeout=15)
# Use a lightweight image for testing to avoid pulling the heavy sandbox image
E2E_TEST_IMAGE = "busybox:latest"
E2E_PREFIX = "deer-flow-sandbox-e2e-test"
@pytest.fixture(autouse=True)
def cleanup_test_containers():
"""Ensure all test containers are cleaned up after the test."""
yield
# Cleanup: stop any remaining test containers
result = subprocess.run(
["docker", "ps", "-a", "--filter", f"name={E2E_PREFIX}-", "--format", "{{.Names}}"],
capture_output=True,
text=True,
timeout=10,
)
for name in result.stdout.strip().splitlines():
name = name.strip()
if name:
subprocess.run(["docker", "rm", "-f", name], capture_output=True, timeout=10)
@pytest.mark.skipif(not _docker_available(), reason="Docker not available")
class TestOrphanReconciliationE2E:
"""E2E tests for orphan container reconciliation."""
def test_orphan_container_destroyed_on_startup(self):
"""Core issue scenario: container from a previous process is destroyed on new process init.
Steps:
1. Start a container manually (simulating previous process)
2. Create a LocalContainerBackend with matching prefix
3. Call list_running() should find the container
4. Simulate _reconcile_orphans() logic container should be destroyed
"""
container_name = f"{E2E_PREFIX}-orphan01"
# Step 1: Start a container (simulating previous process lifecycle)
result = subprocess.run(
["docker", "run", "--rm", "-d", "--name", container_name, E2E_TEST_IMAGE, "sleep", "3600"],
capture_output=True,
text=True,
timeout=30,
)
assert result.returncode == 0, f"Failed to start test container: {result.stderr}"
try:
assert _container_running(container_name), "Test container should be running"
# Step 2: Create backend and list running containers
from deerflow.community.aio_sandbox.local_backend import LocalContainerBackend
backend = LocalContainerBackend(
image=E2E_TEST_IMAGE,
base_port=9990,
container_prefix=E2E_PREFIX,
config_mounts=[],
environment={},
)
# Step 3: list_running should find our container
running = backend.list_running()
found_ids = {info.sandbox_id for info in running}
assert "orphan01" in found_ids, f"Should find orphan01, got: {found_ids}"
# Step 4: Simulate reconciliation — this container's created_at is recent,
# so with a very short idle_timeout it would be destroyed
orphan_info = next(info for info in running if info.sandbox_id == "orphan01")
assert orphan_info.created_at > 0, "created_at should be parsed from docker inspect"
# Destroy it (simulating what _reconcile_orphans does for old containers)
backend.destroy(orphan_info)
# Give Docker a moment to stop the container
time.sleep(1)
# Verify container is gone
assert not _container_running(container_name), "Orphan container should be stopped after destroy"
finally:
# Safety cleanup
_stop_container(container_name)
def test_multiple_orphans_all_cleaned(self):
"""Multiple orphaned containers are all found and can be cleaned up."""
containers = []
try:
# Start 3 containers
for i in range(3):
name = f"{E2E_PREFIX}-multi{i:02d}"
result = subprocess.run(
["docker", "run", "--rm", "-d", "--name", name, E2E_TEST_IMAGE, "sleep", "3600"],
capture_output=True,
text=True,
timeout=30,
)
assert result.returncode == 0, f"Failed to start {name}: {result.stderr}"
containers.append(name)
from deerflow.community.aio_sandbox.local_backend import LocalContainerBackend
backend = LocalContainerBackend(
image=E2E_TEST_IMAGE,
base_port=9990,
container_prefix=E2E_PREFIX,
config_mounts=[],
environment={},
)
running = backend.list_running()
found_ids = {info.sandbox_id for info in running}
assert "multi00" in found_ids
assert "multi01" in found_ids
assert "multi02" in found_ids
# Destroy all
for info in running:
backend.destroy(info)
time.sleep(1)
# Verify all gone
for name in containers:
assert not _container_running(name), f"{name} should be stopped"
finally:
for name in containers:
_stop_container(name)
def test_list_running_ignores_unrelated_containers(self):
"""Containers with different prefixes should not be listed."""
unrelated_name = "unrelated-test-container"
our_name = f"{E2E_PREFIX}-ours001"
try:
# Start an unrelated container
subprocess.run(
["docker", "run", "--rm", "-d", "--name", unrelated_name, E2E_TEST_IMAGE, "sleep", "3600"],
capture_output=True,
timeout=30,
)
# Start our container
subprocess.run(
["docker", "run", "--rm", "-d", "--name", our_name, E2E_TEST_IMAGE, "sleep", "3600"],
capture_output=True,
timeout=30,
)
from deerflow.community.aio_sandbox.local_backend import LocalContainerBackend
backend = LocalContainerBackend(
image=E2E_TEST_IMAGE,
base_port=9990,
container_prefix=E2E_PREFIX,
config_mounts=[],
environment={},
)
running = backend.list_running()
found_ids = {info.sandbox_id for info in running}
# Should find ours but not unrelated
assert "ours001" in found_ids
# "unrelated-test-container" doesn't match "deer-flow-sandbox-e2e-test-" prefix
for info in running:
assert not info.sandbox_id.startswith("unrelated")
finally:
_stop_container(unrelated_name)
_stop_container(our_name)