diff --git a/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py b/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py index cd2d382ae..75fb0b5ca 100644 --- a/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py +++ b/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py @@ -26,7 +26,7 @@ except ImportError: # pragma: no cover - Windows fallback import msvcrt from deerflow.config import get_app_config -from deerflow.config.paths import VIRTUAL_PATH_PREFIX, Paths, get_paths +from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths from deerflow.sandbox.sandbox import Sandbox from deerflow.sandbox.sandbox_provider import SandboxProvider @@ -214,17 +214,13 @@ class AioSandboxProvider(SandboxProvider): paths = get_paths() paths.ensure_thread_dirs(thread_id) - # host_paths resolves to the host-side base dir when DEER_FLOW_HOST_BASE_DIR - # is set, otherwise falls back to the container's own base dir (native mode). - host_paths = Paths(base_dir=paths.host_base_dir) - return [ - (str(host_paths.sandbox_work_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/workspace", False), - (str(host_paths.sandbox_uploads_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/uploads", False), - (str(host_paths.sandbox_outputs_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/outputs", False), + (paths.host_sandbox_work_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/workspace", False), + (paths.host_sandbox_uploads_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/uploads", False), + (paths.host_sandbox_outputs_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/outputs", False), # ACP workspace: read-only inside the sandbox (lead agent reads results; # the ACP subprocess writes from the host side, not from within the container). - (str(host_paths.acp_workspace_dir(thread_id)), "/mnt/acp-workspace", True), + (paths.host_acp_workspace_dir(thread_id), "/mnt/acp-workspace", True), ] @staticmethod diff --git a/backend/packages/harness/deerflow/config/paths.py b/backend/packages/harness/deerflow/config/paths.py index 9b45f5bea..8b9cbc24c 100644 --- a/backend/packages/harness/deerflow/config/paths.py +++ b/backend/packages/harness/deerflow/config/paths.py @@ -1,7 +1,7 @@ import os import re import shutil -from pathlib import Path +from pathlib import Path, PureWindowsPath # Virtual path prefix seen by agents inside the sandbox VIRTUAL_PATH_PREFIX = "/mnt/user-data" @@ -9,6 +9,41 @@ VIRTUAL_PATH_PREFIX = "/mnt/user-data" _SAFE_THREAD_ID_RE = re.compile(r"^[A-Za-z0-9_\-]+$") +def _validate_thread_id(thread_id: str) -> str: + """Validate a thread ID before using it in filesystem paths.""" + if not _SAFE_THREAD_ID_RE.match(thread_id): + raise ValueError(f"Invalid thread_id {thread_id!r}: only alphanumeric characters, hyphens, and underscores are allowed.") + return thread_id + + +def _join_host_path(base: str, *parts: str) -> str: + """Join host filesystem path segments while preserving native style. + + Docker Desktop on Windows expects bind mount sources to stay in Windows + path form (for example ``C:\\repo\\backend\\.deer-flow``). Using + ``Path(base) / ...`` on a POSIX host can accidentally rewrite those paths + with mixed separators, so this helper preserves the original style. + """ + if not parts: + return base + + if re.match(r"^[A-Za-z]:[\\/]", base) or base.startswith("\\\\") or "\\" in base: + result = PureWindowsPath(base) + for part in parts: + result /= part + return str(result) + + result = Path(base) + for part in parts: + result /= part + return str(result) + + +def join_host_path(base: str, *parts: str) -> str: + """Join host filesystem path segments while preserving native style.""" + return _join_host_path(base, *parts) + + class Paths: """ Centralized path configuration for DeerFlow application data. @@ -54,6 +89,12 @@ class Paths: return Path(env) return self.base_dir + def _host_base_dir_str(self) -> str: + """Return the host base dir as a raw string for bind mounts.""" + if env := os.getenv("DEER_FLOW_HOST_BASE_DIR"): + return env + return str(self.base_dir) + @property def base_dir(self) -> Path: """Root directory for all application data.""" @@ -103,9 +144,7 @@ class Paths: ValueError: If `thread_id` contains unsafe characters (path separators or `..`) that could cause directory traversal. """ - if not _SAFE_THREAD_ID_RE.match(thread_id): - raise ValueError(f"Invalid thread_id {thread_id!r}: only alphanumeric characters, hyphens, and underscores are allowed.") - return self.base_dir / "threads" / thread_id + return self.base_dir / "threads" / _validate_thread_id(thread_id) def sandbox_work_dir(self, thread_id: str) -> Path: """ @@ -150,6 +189,30 @@ class Paths: """ return self.thread_dir(thread_id) / "user-data" + def host_thread_dir(self, thread_id: str) -> str: + """Host path for a thread directory, preserving Windows path syntax.""" + return _join_host_path(self._host_base_dir_str(), "threads", _validate_thread_id(thread_id)) + + def host_sandbox_user_data_dir(self, thread_id: str) -> str: + """Host path for a thread's user-data root.""" + return _join_host_path(self.host_thread_dir(thread_id), "user-data") + + def host_sandbox_work_dir(self, thread_id: str) -> str: + """Host path for the workspace mount source.""" + return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "workspace") + + def host_sandbox_uploads_dir(self, thread_id: str) -> str: + """Host path for the uploads mount source.""" + return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "uploads") + + def host_sandbox_outputs_dir(self, thread_id: str) -> str: + """Host path for the outputs mount source.""" + return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "outputs") + + def host_acp_workspace_dir(self, thread_id: str) -> str: + """Host path for the ACP workspace mount source.""" + return _join_host_path(self.host_thread_dir(thread_id), "acp-workspace") + def ensure_thread_dirs(self, thread_id: str) -> None: """Create all standard sandbox directories for a thread. diff --git a/backend/packages/harness/deerflow/runtime/runs/manager.py b/backend/packages/harness/deerflow/runtime/runs/manager.py index 42b0372a2..abe090372 100644 --- a/backend/packages/harness/deerflow/runtime/runs/manager.py +++ b/backend/packages/harness/deerflow/runtime/runs/manager.py @@ -81,11 +81,7 @@ class RunManager: async def list_by_thread(self, thread_id: str) -> list[RunRecord]: """Return all runs for a given thread, newest first.""" async with self._lock: - return sorted( - (r for r in self._runs.values() if r.thread_id == thread_id), - key=lambda r: r.created_at, - reverse=True, - ) + return list(reversed([r for r in self._runs.values() if r.thread_id == thread_id])) async def set_status(self, run_id: str, status: RunStatus, *, error: str | None = None) -> None: """Transition a run to a new status.""" diff --git a/backend/tests/test_aio_sandbox_provider.py b/backend/tests/test_aio_sandbox_provider.py index 1569edbfc..e797cf7e3 100644 --- a/backend/tests/test_aio_sandbox_provider.py +++ b/backend/tests/test_aio_sandbox_provider.py @@ -5,7 +5,7 @@ from unittest.mock import MagicMock, patch import pytest -from deerflow.config.paths import Paths +from deerflow.config.paths import Paths, join_host_path # ── ensure_thread_dirs ─────────────────────────────────────────────────────── @@ -31,6 +31,13 @@ def test_ensure_thread_dirs_acp_workspace_is_world_writable(tmp_path): assert mode == oct(0o777) +def test_host_thread_dir_rejects_invalid_thread_id(tmp_path): + paths = Paths(base_dir=tmp_path) + + with pytest.raises(ValueError, match="Invalid thread_id"): + paths.host_thread_dir("../escape") + + # ── _get_thread_mounts ─────────────────────────────────────────────────────── @@ -75,6 +82,30 @@ def test_get_thread_mounts_includes_user_data_dirs(tmp_path, monkeypatch): assert "/mnt/user-data/outputs" in container_paths +def test_join_host_path_preserves_windows_drive_letter_style(): + base = r"C:\Users\demo\deer-flow\backend\.deer-flow" + + joined = join_host_path(base, "threads", "thread-9", "user-data", "outputs") + + assert joined == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-9\user-data\outputs" + + +def test_get_thread_mounts_preserves_windows_host_path_style(tmp_path, monkeypatch): + """Docker bind mount sources must keep Windows-style paths intact.""" + aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider") + monkeypatch.setenv("DEER_FLOW_HOST_BASE_DIR", r"C:\Users\demo\deer-flow\backend\.deer-flow") + monkeypatch.setattr(aio_mod, "get_paths", lambda: Paths(base_dir=tmp_path)) + + mounts = aio_mod.AioSandboxProvider._get_thread_mounts("thread-10") + + container_paths = {container_path: host_path for host_path, container_path, _ in mounts} + + assert container_paths["/mnt/user-data/workspace"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\user-data\workspace" + assert container_paths["/mnt/user-data/uploads"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\user-data\uploads" + assert container_paths["/mnt/user-data/outputs"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\user-data\outputs" + assert container_paths["/mnt/acp-workspace"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\acp-workspace" + + def test_discover_or_create_only_unlocks_when_lock_succeeds(tmp_path, monkeypatch): """Unlock should not run if exclusive locking itself fails.""" aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider") diff --git a/backend/tests/test_docker_sandbox_mode_detection.py b/backend/tests/test_docker_sandbox_mode_detection.py index 109057cdd..7191e4d7f 100644 --- a/backend/tests/test_docker_sandbox_mode_detection.py +++ b/backend/tests/test_docker_sandbox_mode_detection.py @@ -11,9 +11,16 @@ import pytest REPO_ROOT = Path(__file__).resolve().parents[2] SCRIPT_PATH = REPO_ROOT / "scripts" / "docker.sh" -BASH_EXECUTABLE = which("bash") or r"C:\Program Files\Git\bin\bash.exe" +BASH_CANDIDATES = [ + Path(r"C:\Program Files\Git\bin\bash.exe"), + Path(which("bash")) if which("bash") else None, +] +BASH_EXECUTABLE = next( + (str(path) for path in BASH_CANDIDATES if path is not None and path.exists() and "WindowsApps" not in str(path)), + None, +) -if not Path(BASH_EXECUTABLE).exists(): +if BASH_EXECUTABLE is None: pytestmark = pytest.mark.skip(reason="bash is required for docker.sh detection tests") @@ -21,13 +28,14 @@ def _detect_mode_with_config(config_content: str) -> str: """Write config content into a temp project root and execute detect_sandbox_mode.""" with tempfile.TemporaryDirectory() as tmpdir: tmp_root = Path(tmpdir) - (tmp_root / "config.yaml").write_text(config_content) + (tmp_root / "config.yaml").write_text(config_content, encoding="utf-8") command = f"source '{SCRIPT_PATH}' && PROJECT_ROOT='{tmp_root}' && detect_sandbox_mode" output = subprocess.check_output( [BASH_EXECUTABLE, "-lc", command], text=True, + encoding="utf-8", ).strip() return output @@ -37,7 +45,11 @@ def test_detect_mode_defaults_to_local_when_config_missing(): """No config file should default to local mode.""" with tempfile.TemporaryDirectory() as tmpdir: command = f"source '{SCRIPT_PATH}' && PROJECT_ROOT='{tmpdir}' && detect_sandbox_mode" - output = subprocess.check_output([BASH_EXECUTABLE, "-lc", command], text=True).strip() + output = subprocess.check_output( + [BASH_EXECUTABLE, "-lc", command], + text=True, + encoding="utf-8", + ).strip() assert output == "local" diff --git a/docker/provisioner/app.py b/docker/provisioner/app.py index f9cdfa36c..ba2dbb37b 100644 --- a/docker/provisioner/app.py +++ b/docker/provisioner/app.py @@ -31,6 +31,7 @@ from __future__ import annotations import logging import os +import re import time from contextlib import asynccontextmanager @@ -39,7 +40,7 @@ from fastapi import FastAPI, HTTPException from kubernetes import client as k8s_client from kubernetes import config as k8s_config from kubernetes.client.rest import ApiException -from pydantic import BaseModel +from pydantic import BaseModel, Field # Suppress only the InsecureRequestWarning from urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -59,6 +60,7 @@ SANDBOX_IMAGE = os.environ.get( ) SKILLS_HOST_PATH = os.environ.get("SKILLS_HOST_PATH", "/skills") THREADS_HOST_PATH = os.environ.get("THREADS_HOST_PATH", "/.deer-flow/threads") +SAFE_THREAD_ID_PATTERN = r"^[A-Za-z0-9_\-]+$" # Path to the kubeconfig *inside* the provisioner container. # Typically the host's ~/.kube/config is mounted here. @@ -69,6 +71,36 @@ KUBECONFIG_PATH = os.environ.get("KUBECONFIG_PATH", "/root/.kube/config") # is ``host.docker.internal``; on Linux it may be the host's LAN IP. NODE_HOST = os.environ.get("NODE_HOST", "host.docker.internal") + +def join_host_path(base: str, *parts: str) -> str: + """Join host filesystem path segments while preserving native style.""" + if not parts: + return base + + if re.match(r"^[A-Za-z]:[\\/]", base) or base.startswith("\\\\") or "\\" in base: + from pathlib import PureWindowsPath + + result = PureWindowsPath(base) + for part in parts: + result /= part + return str(result) + + from pathlib import Path + + result = Path(base) + for part in parts: + result /= part + return str(result) + + +def _validate_thread_id(thread_id: str) -> str: + if not re.match(SAFE_THREAD_ID_PATTERN, thread_id): + raise ValueError( + "Invalid thread_id: only alphanumeric characters, hyphens, and underscores are allowed." + ) + return thread_id + + # ── K8s client setup ──────────────────────────────────────────────────── core_v1: k8s_client.CoreV1Api | None = None @@ -186,7 +218,7 @@ app = FastAPI(title="DeerFlow Sandbox Provisioner", lifespan=lifespan) class CreateSandboxRequest(BaseModel): sandbox_id: str - thread_id: str + thread_id: str = Field(pattern=SAFE_THREAD_ID_PATTERN) class SandboxResponse(BaseModel): @@ -213,6 +245,7 @@ def _sandbox_url(node_port: int) -> str: def _build_pod(sandbox_id: str, thread_id: str) -> k8s_client.V1Pod: """Construct a Pod manifest for a single sandbox.""" + thread_id = _validate_thread_id(thread_id) return k8s_client.V1Pod( metadata=k8s_client.V1ObjectMeta( name=_pod_name(sandbox_id), @@ -298,7 +331,7 @@ def _build_pod(sandbox_id: str, thread_id: str) -> k8s_client.V1Pod: k8s_client.V1Volume( name="user-data", host_path=k8s_client.V1HostPathVolumeSource( - path=f"{THREADS_HOST_PATH}/{thread_id}/user-data", + path=join_host_path(THREADS_HOST_PATH, thread_id, "user-data"), type="DirectoryOrCreate", ), ),