mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-09 17:12:01 +00:00
* fix(sandbox): close AioSandbox HTTP client during provider teardown (#2872) AioSandbox allocates a host-side agent_sandbox client (wrapping an httpx.Client) in __init__, but AioSandboxProvider.release/destroy/shutdown only popped provider state and tore down the backend container — the client/transport owned by each cached AioSandbox was never explicitly closed, accumulating unreclaimed sockets in long-running services. - Add AioSandbox.close(): best-effort, idempotent close of the wrapped httpx_client (falls back to top-level client.close()); errors are logged but never raised so backend cleanup is never blocked. - AioSandboxProvider.release()/destroy() now close the cached AioSandbox before dropping it; shutdown() inherits this via destroy(). * fix(sandbox): close the real httpx.Client owned by AioSandbox (#2872) The previous close() only walked one level (wrapper.httpx_client), which resolves to the Fern-generated HttpClient wrapper that has no close(). The real socket-owning httpx.Client lives one level deeper at _client_wrapper.httpx_client.httpx_client, so the close path never fired and host-side sockets still leaked. Resolve the real httpx.Client with graceful degradation; clear self._client under the lock for use-after-close and concurrent double-close safety; mark provider release()/destroy() try/except as defense-in-depth; rewrite TestClose against the real nested structure to lock down the original no-op bug.
This commit is contained in:
parent
d9f4724950
commit
5dc2d6cbf5
@ -39,11 +39,63 @@ class AioSandbox(Sandbox):
|
||||
self._client = AioSandboxClient(base_url=base_url, timeout=600)
|
||||
self._home_dir = home_dir
|
||||
self._lock = threading.Lock()
|
||||
self._closed = False
|
||||
|
||||
@property
|
||||
def base_url(self) -> str:
|
||||
return self._base_url
|
||||
|
||||
def close(self) -> None:
|
||||
"""Best-effort close of the host-side HTTP client owned by this sandbox.
|
||||
|
||||
The agent_sandbox SDK is Fern-generated and exposes no ``close()`` /
|
||||
``__exit__``, so we reach the socket-owning ``httpx.Client`` explicitly
|
||||
through its attribute chain::
|
||||
|
||||
Sandbox._client_wrapper -> SyncClientWrapper
|
||||
.httpx_client -> Fern HttpClient (a wrapper, NOT httpx.Client)
|
||||
.httpx_client -> httpx.Client <- the real socket owner
|
||||
|
||||
Closing it releases pooled sockets so long-running provider lifecycles
|
||||
do not accumulate unreclaimed host-side resources (#2872).
|
||||
|
||||
Resolution is most-specific-first with graceful degradation: if a future
|
||||
SDK adds a top-level ``Sandbox.close()`` it is picked up automatically
|
||||
without changing this code. Idempotent, thread-safe, and non-fatal:
|
||||
failures during teardown are logged and swallowed so provider/backend
|
||||
cleanup is never blocked.
|
||||
"""
|
||||
with self._lock:
|
||||
if self._closed:
|
||||
return
|
||||
self._closed = True
|
||||
client = self._client
|
||||
# Drop the reference under the lock for use-after-close safety: any
|
||||
# later command on this instance fails loudly instead of reusing a
|
||||
# half-closed client.
|
||||
self._client = None
|
||||
|
||||
if client is None:
|
||||
return
|
||||
|
||||
# Walk from the real httpx.Client up to the top-level client, picking the
|
||||
# first object that actually exposes close().
|
||||
wrapper = getattr(client, "_client_wrapper", None)
|
||||
fern_http = getattr(wrapper, "httpx_client", None)
|
||||
real_httpx = getattr(fern_http, "httpx_client", None)
|
||||
target = next(
|
||||
(c for c in (real_httpx, fern_http, client) if c is not None and hasattr(c, "close")),
|
||||
None,
|
||||
)
|
||||
if target is None:
|
||||
logger.debug("AioSandbox %s: no closable client found, nothing to release", self.id)
|
||||
return
|
||||
|
||||
try:
|
||||
target.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing AioSandbox client for {self.id}: {e}")
|
||||
|
||||
@property
|
||||
def home_dir(self) -> str:
|
||||
"""Get the home directory inside the sandbox."""
|
||||
|
||||
@ -790,14 +790,20 @@ class AioSandboxProvider(SandboxProvider):
|
||||
thread on its next turn without a cold-start. The container will only be
|
||||
stopped when the replicas limit forces eviction or during shutdown.
|
||||
|
||||
The host-side HTTP client owned by the cached ``AioSandbox`` instance is
|
||||
closed before the instance is dropped (#2872). The warm-pool entry only
|
||||
stores ``SandboxInfo``, so a fresh ``AioSandbox`` (and a fresh client)
|
||||
is constructed if the container is later reclaimed.
|
||||
|
||||
Args:
|
||||
sandbox_id: The ID of the sandbox to release.
|
||||
"""
|
||||
info = None
|
||||
sandbox = None
|
||||
thread_ids_to_remove: list[str] = []
|
||||
|
||||
with self._lock:
|
||||
self._sandboxes.pop(sandbox_id, None)
|
||||
sandbox = self._sandboxes.pop(sandbox_id, None)
|
||||
info = self._sandbox_infos.pop(sandbox_id, None)
|
||||
thread_ids_to_remove = [tid for tid, sid in self._thread_sandboxes.items() if sid == sandbox_id]
|
||||
for tid in thread_ids_to_remove:
|
||||
@ -807,6 +813,15 @@ class AioSandboxProvider(SandboxProvider):
|
||||
if info and sandbox_id not in self._warm_pool:
|
||||
self._warm_pool[sandbox_id] = (info, time.time())
|
||||
|
||||
if sandbox is not None:
|
||||
# Defense-in-depth: close() already swallows its own errors; this
|
||||
# guard only protects against a future close() that misbehaves, so
|
||||
# host-side client cleanup can never block parking in the warm pool.
|
||||
try:
|
||||
sandbox.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing sandbox {sandbox_id} during release: {e}")
|
||||
|
||||
logger.info(f"Released sandbox {sandbox_id} to warm pool (container still running)")
|
||||
|
||||
def destroy(self, sandbox_id: str) -> None:
|
||||
@ -815,14 +830,19 @@ class AioSandboxProvider(SandboxProvider):
|
||||
Unlike release(), this actually stops the container. Use this for
|
||||
explicit cleanup, capacity-driven eviction, or shutdown.
|
||||
|
||||
The host-side HTTP client owned by the cached ``AioSandbox`` instance is
|
||||
closed alongside backend/container destruction so no client/socket
|
||||
resources leak (#2872).
|
||||
|
||||
Args:
|
||||
sandbox_id: The ID of the sandbox to destroy.
|
||||
"""
|
||||
info = None
|
||||
sandbox = None
|
||||
thread_ids_to_remove: list[str] = []
|
||||
|
||||
with self._lock:
|
||||
self._sandboxes.pop(sandbox_id, None)
|
||||
sandbox = self._sandboxes.pop(sandbox_id, None)
|
||||
info = self._sandbox_infos.pop(sandbox_id, None)
|
||||
thread_ids_to_remove = [tid for tid, sid in self._thread_sandboxes.items() if sid == sandbox_id]
|
||||
for tid in thread_ids_to_remove:
|
||||
@ -834,6 +854,15 @@ class AioSandboxProvider(SandboxProvider):
|
||||
else:
|
||||
self._warm_pool.pop(sandbox_id, None)
|
||||
|
||||
if sandbox is not None:
|
||||
# Defense-in-depth: close() already swallows its own errors; this
|
||||
# guard only protects against a future close() that misbehaves, so
|
||||
# host-side client cleanup can never block container destruction.
|
||||
try:
|
||||
sandbox.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing sandbox {sandbox_id} during destroy: {e}")
|
||||
|
||||
if info:
|
||||
self._backend.destroy(info)
|
||||
logger.info(f"Destroyed sandbox {sandbox_id}")
|
||||
|
||||
@ -318,3 +318,76 @@ class TestDownloadFile:
|
||||
result = sandbox.download_file("/mnt/user-data/outputs/single.bin")
|
||||
|
||||
assert result == b"single-chunk"
|
||||
|
||||
|
||||
class TestClose:
|
||||
"""Verify AioSandbox.close() tears down the host-side HTTP client (#2872)."""
|
||||
|
||||
def test_close_calls_real_nested_httpx_client(self, sandbox):
|
||||
"""close() must close the real httpx.Client at the bottom of the chain.
|
||||
|
||||
Mirrors the actual Fern structure:
|
||||
Sandbox._client_wrapper.httpx_client -> Fern HttpClient (no close())
|
||||
.httpx_client -> httpx.Client (the real owner)
|
||||
|
||||
The intermediate HttpClient deliberately exposes NO close(), so a naive
|
||||
one-level lookup (the original bug) would silently close nothing.
|
||||
"""
|
||||
real_httpx = MagicMock(spec=["close"])
|
||||
fern_http = SimpleNamespace(httpx_client=real_httpx) # no close on this layer
|
||||
sandbox._client._client_wrapper = SimpleNamespace(httpx_client=fern_http)
|
||||
|
||||
sandbox.close()
|
||||
|
||||
real_httpx.close.assert_called_once_with()
|
||||
|
||||
def test_close_clears_client_reference(self, sandbox):
|
||||
"""After close(), the client reference must be dropped (use-after-close safety)."""
|
||||
real_httpx = MagicMock(spec=["close"])
|
||||
fern_http = SimpleNamespace(httpx_client=real_httpx)
|
||||
sandbox._client._client_wrapper = SimpleNamespace(httpx_client=fern_http)
|
||||
|
||||
sandbox.close()
|
||||
|
||||
assert sandbox._client is None
|
||||
assert sandbox._closed is True
|
||||
|
||||
def test_close_is_idempotent(self, sandbox):
|
||||
"""Calling close() multiple times must close the underlying client at most once."""
|
||||
real_httpx = MagicMock(spec=["close"])
|
||||
fern_http = SimpleNamespace(httpx_client=real_httpx)
|
||||
sandbox._client._client_wrapper = SimpleNamespace(httpx_client=fern_http)
|
||||
|
||||
sandbox.close()
|
||||
sandbox.close()
|
||||
sandbox.close()
|
||||
|
||||
assert real_httpx.close.call_count == 1
|
||||
|
||||
def test_close_swallows_exceptions(self, sandbox, caplog):
|
||||
"""close() must be best-effort: client errors are logged but never raised."""
|
||||
real_httpx = MagicMock(spec=["close"])
|
||||
real_httpx.close.side_effect = RuntimeError("teardown boom")
|
||||
fern_http = SimpleNamespace(httpx_client=real_httpx)
|
||||
sandbox._client._client_wrapper = SimpleNamespace(httpx_client=fern_http)
|
||||
|
||||
with caplog.at_level("WARNING"):
|
||||
sandbox.close()
|
||||
|
||||
assert "Error closing AioSandbox client" in caplog.text
|
||||
|
||||
def test_close_falls_back_to_client_close(self, sandbox):
|
||||
"""If no nested httpx.Client is reachable, close() degrades to the client's own close()."""
|
||||
# Replace the mocked client with a stub that exposes only top-level close()
|
||||
client = MagicMock(spec=["close"])
|
||||
sandbox._client = client
|
||||
|
||||
sandbox.close()
|
||||
|
||||
client.close.assert_called_once_with()
|
||||
|
||||
def test_close_when_no_close_attr_does_not_raise(self, sandbox):
|
||||
"""A client without any close attribute must not crash close()."""
|
||||
sandbox._client = SimpleNamespace() # no close, no _client_wrapper
|
||||
sandbox.close() # must not raise
|
||||
assert sandbox._client is None
|
||||
|
||||
@ -348,3 +348,89 @@ def test_remote_backend_create_forwards_effective_user_id(monkeypatch):
|
||||
"thread_id": "thread-42",
|
||||
"user_id": "user-7",
|
||||
}
|
||||
|
||||
|
||||
# ── Sandbox client teardown (#2872) ──────────────────────────────────────────
|
||||
|
||||
|
||||
def _make_provider_with_active_sandbox(tmp_path, sandbox_id: str):
|
||||
"""Build a provider with one active sandbox suitable for release/destroy/shutdown tests."""
|
||||
aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider")
|
||||
provider = _make_provider(tmp_path)
|
||||
provider._lock = aio_mod.threading.Lock()
|
||||
provider._warm_pool = {}
|
||||
provider._sandbox_infos = {
|
||||
sandbox_id: aio_mod.SandboxInfo(sandbox_id=sandbox_id, sandbox_url="http://sandbox-host"),
|
||||
}
|
||||
provider._thread_sandboxes = {}
|
||||
provider._last_activity = {sandbox_id: 0.0}
|
||||
provider._shutdown_called = False
|
||||
provider._idle_checker_thread = None
|
||||
provider._backend = SimpleNamespace(destroy=MagicMock())
|
||||
|
||||
sandbox = MagicMock()
|
||||
sandbox.id = sandbox_id
|
||||
sandbox.close = MagicMock()
|
||||
provider._sandboxes = {sandbox_id: sandbox}
|
||||
return provider, sandbox, aio_mod
|
||||
|
||||
|
||||
def test_release_closes_cached_sandbox_client(tmp_path):
|
||||
"""release() must close the host-side client owned by the cached AioSandbox (#2872)."""
|
||||
provider, sandbox, _ = _make_provider_with_active_sandbox(tmp_path, "sandbox-rel")
|
||||
|
||||
provider.release("sandbox-rel")
|
||||
|
||||
sandbox.close.assert_called_once_with()
|
||||
# And the sandbox is parked in the warm pool (container still running).
|
||||
assert "sandbox-rel" in provider._warm_pool
|
||||
assert "sandbox-rel" not in provider._sandboxes
|
||||
|
||||
|
||||
def test_destroy_closes_cached_sandbox_client(tmp_path):
|
||||
"""destroy() must close the host-side client before backend container teardown (#2872)."""
|
||||
provider, sandbox, _ = _make_provider_with_active_sandbox(tmp_path, "sandbox-destroy")
|
||||
backend_destroy = provider._backend.destroy
|
||||
|
||||
provider.destroy("sandbox-destroy")
|
||||
|
||||
sandbox.close.assert_called_once_with()
|
||||
backend_destroy.assert_called_once()
|
||||
assert "sandbox-destroy" not in provider._sandboxes
|
||||
assert "sandbox-destroy" not in provider._sandbox_infos
|
||||
|
||||
|
||||
def test_shutdown_closes_all_active_sandbox_clients(tmp_path):
|
||||
"""shutdown() must close every cached AioSandbox client during teardown (#2872)."""
|
||||
provider, sandbox, _ = _make_provider_with_active_sandbox(tmp_path, "sandbox-shut")
|
||||
|
||||
provider.shutdown()
|
||||
|
||||
sandbox.close.assert_called_once_with()
|
||||
provider._backend.destroy.assert_called_once()
|
||||
assert provider._sandboxes == {}
|
||||
|
||||
|
||||
def test_release_swallows_close_errors(tmp_path, caplog):
|
||||
"""A failure inside sandbox.close() must not break provider release()."""
|
||||
provider, sandbox, _ = _make_provider_with_active_sandbox(tmp_path, "sandbox-rel-err")
|
||||
sandbox.close.side_effect = RuntimeError("boom")
|
||||
|
||||
with caplog.at_level("WARNING"):
|
||||
provider.release("sandbox-rel-err")
|
||||
|
||||
assert "Error closing sandbox sandbox-rel-err during release" in caplog.text
|
||||
# Still moved to warm pool: client teardown failure must not block lifecycle.
|
||||
assert "sandbox-rel-err" in provider._warm_pool
|
||||
|
||||
|
||||
def test_destroy_swallows_close_errors_and_still_destroys_backend(tmp_path, caplog):
|
||||
"""A failure in sandbox.close() must not skip backend container destruction."""
|
||||
provider, sandbox, _ = _make_provider_with_active_sandbox(tmp_path, "sandbox-dest-err")
|
||||
sandbox.close.side_effect = RuntimeError("boom")
|
||||
|
||||
with caplog.at_level("WARNING"):
|
||||
provider.destroy("sandbox-dest-err")
|
||||
|
||||
assert "Error closing sandbox sandbox-dest-err during destroy" in caplog.text
|
||||
provider._backend.destroy.assert_called_once()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user