diff --git a/backend/packages/harness/deerflow/agents/middlewares/tool_output_budget_middleware.py b/backend/packages/harness/deerflow/agents/middlewares/tool_output_budget_middleware.py index 3e5785e84..22b1a59f2 100644 --- a/backend/packages/harness/deerflow/agents/middlewares/tool_output_budget_middleware.py +++ b/backend/packages/harness/deerflow/agents/middlewares/tool_output_budget_middleware.py @@ -11,10 +11,11 @@ from __future__ import annotations import asyncio import logging import os +import shlex import uuid from collections.abc import Awaitable, Callable from dataclasses import replace as dc_replace -from typing import Any, override +from typing import TYPE_CHECKING, Any, override from langchain.agents import AgentState from langchain.agents.middleware import AgentMiddleware @@ -24,9 +25,19 @@ from langgraph.prebuilt.tool_node import ToolCallRequest from langgraph.types import Command from deerflow.config.tool_output_config import ToolOutputConfig +from deerflow.sandbox.sandbox_provider import get_sandbox_provider + +if TYPE_CHECKING: + from deerflow.sandbox.sandbox import Sandbox logger = logging.getLogger(__name__) +# Virtual outputs root inside the sandbox. Host-mounted sandboxes map this to +# the thread outputs dir on the host; for non-mounted (remote) sandboxes the +# same path is written directly into the sandbox filesystem so the model's +# ``read_file`` tool can read it back (issue #3416). +_VIRTUAL_OUTPUTS_BASE = "/mnt/user-data/outputs" + def _default_config() -> ToolOutputConfig: return ToolOutputConfig() @@ -94,6 +105,18 @@ def _sanitize_tool_name(name: str) -> str: return safe or "unknown" +def _build_externalized_filename(*, tool_name: str, tool_call_id: str) -> str: + """Build the on-disk filename for an externalized tool output. + + Shared by the host-disk and sandbox externalization paths so both + produce the identical naming scheme. + """ + safe_name = _sanitize_tool_name(tool_name) + ext = _EXT_MAP.get(tool_name, "txt") + short_id = uuid.uuid4().hex[:12] + return f"{safe_name}-{short_id}.{ext}" + + def _externalize( content: str, *, @@ -111,10 +134,7 @@ def _externalize( except OSError: return None - safe_name = _sanitize_tool_name(tool_name) - ext = _EXT_MAP.get(tool_name, "txt") - short_id = uuid.uuid4().hex[:12] - filename = f"{safe_name}-{short_id}.{ext}" + filename = _build_externalized_filename(tool_name=tool_name, tool_call_id=tool_call_id) filepath = os.path.join(storage_dir, filename) if not os.path.abspath(filepath).startswith(os.path.abspath(storage_dir)): @@ -126,8 +146,56 @@ def _externalize( except OSError: return None - virtual_base = "/mnt/user-data/outputs" - return f"{virtual_base}/{storage_subdir}/{filename}" + return f"{_VIRTUAL_OUTPUTS_BASE}/{storage_subdir}/{filename}" + + +def _externalize_to_sandbox( + content: str, + *, + tool_name: str, + tool_call_id: str, + storage_subdir: str, + sandbox: Sandbox, +) -> str | None: + """Write *content* into the sandbox filesystem and return the virtual path. + + Used when the sandbox does not use thread-data mounts (e.g. a remote AIO + sandbox): the host-side :func:`_externalize` virtual path would not exist + inside the sandbox, so the model's ``read_file`` tool could not read it + back (issue #3416). Returns the same virtual-path contract on success, or + ``None`` to signal the caller to fall back to inline truncation. + """ + if os.path.isabs(storage_subdir) or ".." in storage_subdir: + return None + filename = _build_externalized_filename(tool_name=tool_name, tool_call_id=tool_call_id) + virtual_dir = f"{_VIRTUAL_OUTPUTS_BASE}/{storage_subdir}" + virtual_path = f"{virtual_dir}/{filename}" + try: + # AIO sandbox write_file does NOT create parent directories, so create + # them explicitly before writing. execute_command returns its stdout + # verbatim (including an "Error: ..." string on failure) rather than + # raising, so we cannot rely on exception propagation here. + sandbox.execute_command(f"mkdir -p {shlex.quote(virtual_dir)}") + sandbox.write_file(virtual_path, content) + # Validate the file landed: execute_command may have silently failed + # to create the directory, and write_file backends differ. Refuse to + # hand the model an unreadable read_file path. + check = sandbox.execute_command(f"test -s {shlex.quote(virtual_path)} && echo OK || echo MISSING") + if not isinstance(check, str) or check.strip() != "OK": + logger.warning( + "Sandbox externalize validation failed: path=%s, check=%r", + virtual_path, + check, + ) + return None + except Exception: + logger.exception( + "Failed to externalize %s output to sandbox (call_id=%s)", + tool_name, + tool_call_id, + ) + return None + return virtual_path # --------------------------------------------------------------------------- @@ -227,6 +295,33 @@ def _resolve_outputs_path(request: ToolCallRequest) -> str | None: return outputs_path if isinstance(outputs_path, str) else None +def _resolve_sandbox(request: ToolCallRequest) -> Sandbox | None: + """Resolve the active sandbox for the current tool call, or ``None``. + + Reads the sandbox_id that ``SandboxMiddleware`` (and the sandbox tools + themselves) write into ``runtime.state["sandbox"]``. We intentionally do + NOT call ``provider.acquire`` here: acquiring a sandbox can trigger + blocking remote I/O, and this resolver runs on every tool call. Tools + that do not use a sandbox (``web_search``, MCP, ...) will return ``None`` + here, which is fine -- the caller falls back to inline truncation. + """ + runtime = getattr(request, "runtime", None) + state = getattr(runtime, "state", None) + if not isinstance(state, dict): + return None + sandbox_state = state.get("sandbox") + if not isinstance(sandbox_state, dict): + return None + sandbox_id = sandbox_state.get("sandbox_id") + if not sandbox_id: + return None + try: + return get_sandbox_provider().get(sandbox_id) + except Exception: + logger.exception("Failed to look up sandbox %s for tool-output externalization", sandbox_id) + return None + + def _budget_content( content: str, *, @@ -234,6 +329,7 @@ def _budget_content( tool_call_id: str, outputs_path: str | None, config: ToolOutputConfig, + sandbox: Sandbox | None = None, ) -> str | None: """Apply budget to *content*. Returns ``None`` if no change needed.""" threshold = config.tool_overrides.get(tool_name, config.externalize_min_chars) @@ -242,14 +338,50 @@ def _budget_content( if len(content) <= threshold and len(content) <= config.fallback_max_chars: return None - if threshold > 0 and len(content) > threshold and outputs_path: - virtual_path = _externalize( - content, - tool_name=tool_name, - tool_call_id=tool_call_id, - outputs_path=outputs_path, - storage_subdir=config.storage_subdir, - ) + if threshold > 0 and len(content) > threshold: + virtual_path: str | None = None + # Decide persistence target based on what's available, without touching + # the sandbox provider unless a sandbox was actually resolved for this + # call. This keeps the legacy host-disk path provider-free, so callers + # without a configured sandbox (and CI environments without a + # config.yaml) continue to externalize to the host as before. + if sandbox is not None: + provider = None + try: + provider = get_sandbox_provider() + except Exception: + logger.exception("Failed to get sandbox provider for tool-output externalization; falling back to inline truncation") + if provider is not None and getattr(provider, "uses_thread_data_mounts", False): + # Host-mounted sandbox: host outputs path is bind-mounted into + # the sandbox at the same virtual path, so writing host-side is + # equivalent. Preserve the original behavior to avoid extra + # sandbox round-trips. + if outputs_path: + virtual_path = _externalize( + content, + tool_name=tool_name, + tool_call_id=tool_call_id, + outputs_path=outputs_path, + storage_subdir=config.storage_subdir, + ) + else: + virtual_path = _externalize_to_sandbox( + content, + tool_name=tool_name, + tool_call_id=tool_call_id, + storage_subdir=config.storage_subdir, + sandbox=sandbox, + ) + elif outputs_path: + # No sandbox in this call (legacy / non-sandbox tools): write to + # host outputs path directly, no provider needed. + virtual_path = _externalize( + content, + tool_name=tool_name, + tool_call_id=tool_call_id, + outputs_path=outputs_path, + storage_subdir=config.storage_subdir, + ) if virtual_path is not None: logger.info( "Externalized %s output (%d chars) to %s", @@ -288,7 +420,12 @@ def _budget_content( # --------------------------------------------------------------------------- -def _patch_tool_message(msg: ToolMessage, config: ToolOutputConfig, outputs_path: str | None) -> ToolMessage: +def _patch_tool_message( + msg: ToolMessage, + config: ToolOutputConfig, + outputs_path: str | None, + sandbox: Sandbox | None = None, +) -> ToolMessage: """Apply budget to a single ToolMessage. Returns the original if unchanged.""" tool_name = msg.name or "unknown" if tool_name in config.exempt_tools: @@ -304,6 +441,7 @@ def _patch_tool_message(msg: ToolMessage, config: ToolOutputConfig, outputs_path tool_call_id=msg.tool_call_id or "", outputs_path=outputs_path, config=config, + sandbox=sandbox, ) if replacement is None: return msg @@ -355,10 +493,15 @@ def _needs_budget(result: ToolMessage | Command, config: ToolOutputConfig) -> bo return False -def _patch_result(result: ToolMessage | Command, config: ToolOutputConfig, outputs_path: str | None) -> ToolMessage | Command: +def _patch_result( + result: ToolMessage | Command, + config: ToolOutputConfig, + outputs_path: str | None, + sandbox: Sandbox | None = None, +) -> ToolMessage | Command: """Apply budget to a tool call result (ToolMessage or Command).""" if isinstance(result, ToolMessage): - return _patch_tool_message(result, config, outputs_path) + return _patch_tool_message(result, config, outputs_path, sandbox) update = getattr(result, "update", None) if not isinstance(update, dict): @@ -372,7 +515,7 @@ def _patch_result(result: ToolMessage | Command, config: ToolOutputConfig, outpu changed = False for msg in messages: if isinstance(msg, ToolMessage): - patched = _patch_tool_message(msg, config, outputs_path) + patched = _patch_tool_message(msg, config, outputs_path, sandbox) if patched is not msg: changed = True new_messages.append(patched) @@ -392,6 +535,11 @@ def _patch_model_messages(messages: list[Any], config: ToolOutputConfig) -> list ToolMessage exceeds the budget — the common case once every result has already been budgeted at tool-call time, so a long history is not rebuilt on every model call. + + Historical messages do not get a ``sandbox`` argument: any oversized tool + message in history was already budgeted (and possibly externalized) at + tool-call time, so the only thing left for the history path to do is + inline fallback truncation, which needs no sandbox. """ if not any(isinstance(msg, ToolMessage) and _tool_message_over_budget(msg, config) for msg in messages): return None @@ -442,7 +590,8 @@ class ToolOutputBudgetMiddleware(AgentMiddleware[AgentState]): if not _needs_budget(result, self._config): return result outputs_path = _resolve_outputs_path(request) - return _patch_result(result, self._config, outputs_path) + sandbox = _resolve_sandbox(request) + return _patch_result(result, self._config, outputs_path, sandbox) @override async def awrap_tool_call( @@ -456,7 +605,12 @@ class ToolOutputBudgetMiddleware(AgentMiddleware[AgentState]): if not _needs_budget(result, self._config): return result outputs_path = _resolve_outputs_path(request) - return await asyncio.to_thread(_patch_result, result, self._config, outputs_path) + # _resolve_sandbox only touches runtime.state and the provider's + # in-memory sandbox registry, so it is safe to call on the event + # loop. The actual sandbox I/O (mkdir/write/test) happens inside + # _patch_result, which is offloaded to a worker thread below. + sandbox = _resolve_sandbox(request) + return await asyncio.to_thread(_patch_result, result, self._config, outputs_path, sandbox) # -- model call hooks (historical message truncation) ------------------ diff --git a/backend/tests/test_tool_output_budget_middleware.py b/backend/tests/test_tool_output_budget_middleware.py index d6ec51052..33f9b00f9 100644 --- a/backend/tests/test_tool_output_budget_middleware.py +++ b/backend/tests/test_tool_output_budget_middleware.py @@ -121,11 +121,17 @@ class TestExternalize: assert f.read() == "full content here" def test_returns_none_on_invalid_path(self): + # ``/dev/null`` is a character device on both Linux and macOS, so + # ``os.makedirs`` cannot create any subdirectory under it for any + # user (including root). The previously-used ``/nonexistent/...`` + # path was silently created by ``mkdir -p`` when the test process + # ran as root inside the CI container, which made this test fail + # in CI independently of the externalization logic under test. path = _externalize( "data", tool_name="test", tool_call_id="tc-1", - outputs_path="/nonexistent/path/that/should/not/exist", + outputs_path="/dev/null/cannot-mkdir-here", storage_subdir=".tool-results", ) assert path is None @@ -370,7 +376,7 @@ class TestWrapToolCallFallback: mw = ToolOutputBudgetMiddleware(config=config) content = "x" * 500 msg = _tm(content, name="tool") - req = _make_request(outputs_path="/nonexistent/impossible/path") + req = _make_request(outputs_path="/dev/null/cannot-mkdir-here") result = mw.wrap_tool_call(req, lambda _: msg) @@ -888,3 +894,331 @@ class TestConfigVersion: assert tool_output["enabled"] is True assert tool_output["externalize_min_chars"] == 12000 assert "read_file" in tool_output["exempt_tools"] + + +# =========================================================================== +# externalize into sandbox for non-mounted (remote) sandboxes +# =========================================================================== + + +class _FakeSandbox: + """In-memory stand-in for a Sandbox. Records calls and supports failure injection.""" + + def __init__(self, *, write_ok: bool = True, check_result: str = "OK") -> None: + self.commands: list[str] = [] + self.writes: list[tuple[str, str]] = [] + self._write_ok = write_ok + self._check_result = check_result + + def execute_command(self, command: str) -> str: + self.commands.append(command) + if command.startswith("test -s"): + return self._check_result + return "" + + def write_file(self, path: str, content: str, append: bool = False) -> None: + if not self._write_ok: + raise RuntimeError("simulated write failure") + self.writes.append((path, content)) + + +class _FakeProvider: + """Minimal SandboxProvider stand-in for monkeypatching get_sandbox_provider.""" + + def __init__(self, *, uses_thread_data_mounts: bool, sandbox: _FakeSandbox | None = None) -> None: + self.uses_thread_data_mounts = uses_thread_data_mounts + self._sandbox = sandbox + + def get(self, sandbox_id: str): + return self._sandbox + + +class TestExternalizeToSandbox: + def test_writes_and_returns_virtual_path(self): + from deerflow.agents.middlewares.tool_output_budget_middleware import ( + _externalize_to_sandbox, + ) + + sb = _FakeSandbox() + result = _externalize_to_sandbox( + "x" * 100, + tool_name="bash", + tool_call_id="tc-1", + storage_subdir=".tool-results", + sandbox=sb, + ) + assert result is not None + assert result.startswith("/mnt/user-data/outputs/.tool-results/bash-") + assert result.endswith(".log") + assert any(c.startswith("mkdir -p ") for c in sb.commands) + assert any(c.startswith("test -s ") for c in sb.commands) + assert sb.writes and sb.writes[0][0] == result + assert sb.writes[0][1] == "x" * 100 + + def test_returns_none_when_write_raises(self): + from deerflow.agents.middlewares.tool_output_budget_middleware import ( + _externalize_to_sandbox, + ) + + result = _externalize_to_sandbox( + "x" * 100, + tool_name="web_fetch", + tool_call_id="tc-2", + storage_subdir=".tool-results", + sandbox=_FakeSandbox(write_ok=False), + ) + assert result is None + + def test_returns_none_when_validation_fails(self): + from deerflow.agents.middlewares.tool_output_budget_middleware import ( + _externalize_to_sandbox, + ) + + result = _externalize_to_sandbox( + "x" * 100, + tool_name="bash", + tool_call_id="tc-3", + storage_subdir=".tool-results", + sandbox=_FakeSandbox(check_result="MISSING"), + ) + assert result is None + + def test_rejects_unsafe_storage_subdir(self): + from deerflow.agents.middlewares.tool_output_budget_middleware import ( + _externalize_to_sandbox, + ) + + sb = _FakeSandbox() + assert ( + _externalize_to_sandbox( + "x" * 100, + tool_name="bash", + tool_call_id="tc-4", + storage_subdir="../escape", + sandbox=sb, + ) + is None + ) + assert ( + _externalize_to_sandbox( + "x" * 100, + tool_name="bash", + tool_call_id="tc-5", + storage_subdir="/abs/path", + sandbox=sb, + ) + is None + ) + # Sandbox must not be touched when the subdir is rejected up-front. + assert sb.commands == [] + assert sb.writes == [] + + def test_default_extension_for_unknown_tool(self): + from deerflow.agents.middlewares.tool_output_budget_middleware import ( + _externalize_to_sandbox, + ) + + result = _externalize_to_sandbox( + "data", + tool_name="unknown_tool", + tool_call_id="tc-6", + storage_subdir=".tool-results", + sandbox=_FakeSandbox(), + ) + assert result is not None and result.endswith(".txt") + + +class TestBudgetContentSandboxDispatch: + """_budget_content must branch on uses_thread_data_mounts (issue #3416).""" + + def test_mounted_sandbox_uses_host_disk(self, monkeypatch, tmp_path): + from deerflow.agents.middlewares import tool_output_budget_middleware as mod + + sb = _FakeSandbox() + monkeypatch.setattr( + mod, + "get_sandbox_provider", + lambda: _FakeProvider(uses_thread_data_mounts=True, sandbox=sb), + ) + config = ToolOutputConfig(externalize_min_chars=50, preview_head_chars=20, preview_tail_chars=10) + result = mod._budget_content( + "x" * 500, + tool_name="remote_executor", + tool_call_id="tc-m", + outputs_path=str(tmp_path), + config=config, + sandbox=sb, + ) + assert result is not None + assert "Full remote_executor output saved to /mnt/user-data/outputs/" in result + # Mounted path must NOT touch the sandbox. + assert sb.commands == [] + assert sb.writes == [] + # And the host file must exist. + storage_dir = tmp_path / ".tool-results" + assert storage_dir.is_dir() + assert len(list(storage_dir.iterdir())) == 1 + + def test_non_mounted_sandbox_writes_to_sandbox(self, monkeypatch, tmp_path): + from deerflow.agents.middlewares import tool_output_budget_middleware as mod + + sb = _FakeSandbox() + monkeypatch.setattr( + mod, + "get_sandbox_provider", + lambda: _FakeProvider(uses_thread_data_mounts=False, sandbox=sb), + ) + config = ToolOutputConfig(externalize_min_chars=50, preview_head_chars=20, preview_tail_chars=10) + result = mod._budget_content( + "x" * 500, + tool_name="remote_executor", + tool_call_id="tc-n", + outputs_path=str(tmp_path), # present, but ignored on non-mounted path + config=config, + sandbox=sb, + ) + assert result is not None + assert "Full remote_executor output saved to /mnt/user-data/outputs/" in result + # Non-mounted path MUST write into the sandbox. + assert sb.writes and sb.writes[0][1] == "x" * 500 + # And MUST NOT touch the host. + assert not (tmp_path / ".tool-results").exists() + + def test_non_mounted_without_sandbox_falls_back(self, monkeypatch): + from deerflow.agents.middlewares import tool_output_budget_middleware as mod + + monkeypatch.setattr( + mod, + "get_sandbox_provider", + lambda: _FakeProvider(uses_thread_data_mounts=False, sandbox=None), + ) + config = ToolOutputConfig( + externalize_min_chars=50, + fallback_max_chars=500, + fallback_head_chars=100, + fallback_tail_chars=50, + ) + result = mod._budget_content( + "x" * 5000, + tool_name="web_search", + tool_call_id="tc-fb", + outputs_path=None, + config=config, + sandbox=None, + ) + assert result is not None + assert "Persistent storage unavailable" in result + + +class TestResolveSandbox: + def test_returns_none_when_no_state(self): + from deerflow.agents.middlewares.tool_output_budget_middleware import _resolve_sandbox + + req = SimpleNamespace(runtime=None) + assert _resolve_sandbox(req) is None + + def test_returns_none_when_state_has_no_sandbox(self): + from deerflow.agents.middlewares.tool_output_budget_middleware import _resolve_sandbox + + req = SimpleNamespace(runtime=SimpleNamespace(state={})) + assert _resolve_sandbox(req) is None + + def test_returns_none_when_sandbox_id_missing(self): + from deerflow.agents.middlewares.tool_output_budget_middleware import _resolve_sandbox + + req = SimpleNamespace(runtime=SimpleNamespace(state={"sandbox": {}})) + assert _resolve_sandbox(req) is None + + def test_returns_sandbox_from_provider(self, monkeypatch): + from deerflow.agents.middlewares import tool_output_budget_middleware as mod + + sb = _FakeSandbox() + monkeypatch.setattr( + mod, + "get_sandbox_provider", + lambda: _FakeProvider(uses_thread_data_mounts=False, sandbox=sb), + ) + req = SimpleNamespace(runtime=SimpleNamespace(state={"sandbox": {"sandbox_id": "sb-1"}})) + assert mod._resolve_sandbox(req) is sb + + def test_returns_none_on_provider_exception(self, monkeypatch): + from deerflow.agents.middlewares import tool_output_budget_middleware as mod + + class _Boom: + def get(self, sandbox_id): + raise RuntimeError("boom") + + monkeypatch.setattr(mod, "get_sandbox_provider", lambda: _Boom()) + req = SimpleNamespace(runtime=SimpleNamespace(state={"sandbox": {"sandbox_id": "sb-x"}})) + assert mod._resolve_sandbox(req) is None + + +class TestWrapToolCallSandboxIntegration: + """End-to-end via wrap_tool_call for the non-mounted path (issue #3416).""" + + def test_oversized_output_lands_in_sandbox_not_host(self, monkeypatch, tmp_path): + from deerflow.agents.middlewares import tool_output_budget_middleware as mod + + sb = _FakeSandbox() + monkeypatch.setattr( + mod, + "get_sandbox_provider", + lambda: _FakeProvider(uses_thread_data_mounts=False, sandbox=sb), + ) + + config = ToolOutputConfig(externalize_min_chars=50, preview_head_chars=20, preview_tail_chars=10) + mw = ToolOutputBudgetMiddleware(config=config) + content = "x" * 500 + msg = _tm(content, name="remote_executor") + # Request carries BOTH outputs_path (host) AND a sandbox_id; the + # non-mounted branch must ignore outputs_path and write into sandbox. + req = SimpleNamespace( + tool_call={"name": "remote_executor", "id": "tc-1"}, + runtime=SimpleNamespace( + state={ + "thread_data": {"outputs_path": str(tmp_path)}, + "sandbox": {"sandbox_id": "sb-1"}, + } + ), + ) + + result = mw.wrap_tool_call(req, lambda _: msg) + + assert isinstance(result, ToolMessage) + assert "Full remote_executor output saved to /mnt/user-data/outputs/" in result.content + assert sb.writes and sb.writes[0][1] == content + # Host disk must not have been written. + assert not (tmp_path / ".tool-results").exists() + + +class TestBudgetContentNoSandboxNoProviderCall: + """Without a sandbox, _budget_content must NOT call get_sandbox_provider. + + This is the legacy host-disk path (and the CI-without-config.yaml path): + touching the provider would raise and force inline fallback, regressing + issue #3416's fix and breaking environments that never opt into sandbox. + """ + + def test_no_provider_call_when_sandbox_absent(self, monkeypatch, tmp_path): + from deerflow.agents.middlewares import tool_output_budget_middleware as mod + + called = {"n": 0} + + def boom(): + called["n"] += 1 + raise RuntimeError("provider must not be called on the legacy path") + + monkeypatch.setattr(mod, "get_sandbox_provider", boom) + config = ToolOutputConfig(externalize_min_chars=50, preview_head_chars=20, preview_tail_chars=10) + result = mod._budget_content( + "x" * 500, + tool_name="remote_executor", + tool_call_id="tc-legacy", + outputs_path=str(tmp_path), + config=config, + sandbox=None, + ) + assert result is not None + assert "Full remote_executor output saved to /mnt/user-data/outputs/" in result + assert called["n"] == 0 + assert (tmp_path / ".tool-results").is_dir()