mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-25 11:18:22 +00:00
fix Windows Docker sandbox path mounting (#1634)
* fix windows docker sandbox paths * fix windows sandbox mount validation * fix backend checks for windows sandbox path PR
This commit is contained in:
parent
c2f7be37b3
commit
3ff15423d6
@ -26,7 +26,7 @@ except ImportError: # pragma: no cover - Windows fallback
|
||||
import msvcrt
|
||||
|
||||
from deerflow.config import get_app_config
|
||||
from deerflow.config.paths import VIRTUAL_PATH_PREFIX, Paths, get_paths
|
||||
from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths
|
||||
from deerflow.sandbox.sandbox import Sandbox
|
||||
from deerflow.sandbox.sandbox_provider import SandboxProvider
|
||||
|
||||
@ -214,17 +214,13 @@ class AioSandboxProvider(SandboxProvider):
|
||||
paths = get_paths()
|
||||
paths.ensure_thread_dirs(thread_id)
|
||||
|
||||
# host_paths resolves to the host-side base dir when DEER_FLOW_HOST_BASE_DIR
|
||||
# is set, otherwise falls back to the container's own base dir (native mode).
|
||||
host_paths = Paths(base_dir=paths.host_base_dir)
|
||||
|
||||
return [
|
||||
(str(host_paths.sandbox_work_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/workspace", False),
|
||||
(str(host_paths.sandbox_uploads_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/uploads", False),
|
||||
(str(host_paths.sandbox_outputs_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/outputs", False),
|
||||
(paths.host_sandbox_work_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/workspace", False),
|
||||
(paths.host_sandbox_uploads_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/uploads", False),
|
||||
(paths.host_sandbox_outputs_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/outputs", False),
|
||||
# ACP workspace: read-only inside the sandbox (lead agent reads results;
|
||||
# the ACP subprocess writes from the host side, not from within the container).
|
||||
(str(host_paths.acp_workspace_dir(thread_id)), "/mnt/acp-workspace", True),
|
||||
(paths.host_acp_workspace_dir(thread_id), "/mnt/acp-workspace", True),
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from pathlib import Path, PureWindowsPath
|
||||
|
||||
# Virtual path prefix seen by agents inside the sandbox
|
||||
VIRTUAL_PATH_PREFIX = "/mnt/user-data"
|
||||
@ -9,6 +9,41 @@ VIRTUAL_PATH_PREFIX = "/mnt/user-data"
|
||||
_SAFE_THREAD_ID_RE = re.compile(r"^[A-Za-z0-9_\-]+$")
|
||||
|
||||
|
||||
def _validate_thread_id(thread_id: str) -> str:
|
||||
"""Validate a thread ID before using it in filesystem paths."""
|
||||
if not _SAFE_THREAD_ID_RE.match(thread_id):
|
||||
raise ValueError(f"Invalid thread_id {thread_id!r}: only alphanumeric characters, hyphens, and underscores are allowed.")
|
||||
return thread_id
|
||||
|
||||
|
||||
def _join_host_path(base: str, *parts: str) -> str:
|
||||
"""Join host filesystem path segments while preserving native style.
|
||||
|
||||
Docker Desktop on Windows expects bind mount sources to stay in Windows
|
||||
path form (for example ``C:\\repo\\backend\\.deer-flow``). Using
|
||||
``Path(base) / ...`` on a POSIX host can accidentally rewrite those paths
|
||||
with mixed separators, so this helper preserves the original style.
|
||||
"""
|
||||
if not parts:
|
||||
return base
|
||||
|
||||
if re.match(r"^[A-Za-z]:[\\/]", base) or base.startswith("\\\\") or "\\" in base:
|
||||
result = PureWindowsPath(base)
|
||||
for part in parts:
|
||||
result /= part
|
||||
return str(result)
|
||||
|
||||
result = Path(base)
|
||||
for part in parts:
|
||||
result /= part
|
||||
return str(result)
|
||||
|
||||
|
||||
def join_host_path(base: str, *parts: str) -> str:
|
||||
"""Join host filesystem path segments while preserving native style."""
|
||||
return _join_host_path(base, *parts)
|
||||
|
||||
|
||||
class Paths:
|
||||
"""
|
||||
Centralized path configuration for DeerFlow application data.
|
||||
@ -54,6 +89,12 @@ class Paths:
|
||||
return Path(env)
|
||||
return self.base_dir
|
||||
|
||||
def _host_base_dir_str(self) -> str:
|
||||
"""Return the host base dir as a raw string for bind mounts."""
|
||||
if env := os.getenv("DEER_FLOW_HOST_BASE_DIR"):
|
||||
return env
|
||||
return str(self.base_dir)
|
||||
|
||||
@property
|
||||
def base_dir(self) -> Path:
|
||||
"""Root directory for all application data."""
|
||||
@ -103,9 +144,7 @@ class Paths:
|
||||
ValueError: If `thread_id` contains unsafe characters (path separators
|
||||
or `..`) that could cause directory traversal.
|
||||
"""
|
||||
if not _SAFE_THREAD_ID_RE.match(thread_id):
|
||||
raise ValueError(f"Invalid thread_id {thread_id!r}: only alphanumeric characters, hyphens, and underscores are allowed.")
|
||||
return self.base_dir / "threads" / thread_id
|
||||
return self.base_dir / "threads" / _validate_thread_id(thread_id)
|
||||
|
||||
def sandbox_work_dir(self, thread_id: str) -> Path:
|
||||
"""
|
||||
@ -150,6 +189,30 @@ class Paths:
|
||||
"""
|
||||
return self.thread_dir(thread_id) / "user-data"
|
||||
|
||||
def host_thread_dir(self, thread_id: str) -> str:
|
||||
"""Host path for a thread directory, preserving Windows path syntax."""
|
||||
return _join_host_path(self._host_base_dir_str(), "threads", _validate_thread_id(thread_id))
|
||||
|
||||
def host_sandbox_user_data_dir(self, thread_id: str) -> str:
|
||||
"""Host path for a thread's user-data root."""
|
||||
return _join_host_path(self.host_thread_dir(thread_id), "user-data")
|
||||
|
||||
def host_sandbox_work_dir(self, thread_id: str) -> str:
|
||||
"""Host path for the workspace mount source."""
|
||||
return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "workspace")
|
||||
|
||||
def host_sandbox_uploads_dir(self, thread_id: str) -> str:
|
||||
"""Host path for the uploads mount source."""
|
||||
return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "uploads")
|
||||
|
||||
def host_sandbox_outputs_dir(self, thread_id: str) -> str:
|
||||
"""Host path for the outputs mount source."""
|
||||
return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "outputs")
|
||||
|
||||
def host_acp_workspace_dir(self, thread_id: str) -> str:
|
||||
"""Host path for the ACP workspace mount source."""
|
||||
return _join_host_path(self.host_thread_dir(thread_id), "acp-workspace")
|
||||
|
||||
def ensure_thread_dirs(self, thread_id: str) -> None:
|
||||
"""Create all standard sandbox directories for a thread.
|
||||
|
||||
|
||||
@ -81,11 +81,7 @@ class RunManager:
|
||||
async def list_by_thread(self, thread_id: str) -> list[RunRecord]:
|
||||
"""Return all runs for a given thread, newest first."""
|
||||
async with self._lock:
|
||||
return sorted(
|
||||
(r for r in self._runs.values() if r.thread_id == thread_id),
|
||||
key=lambda r: r.created_at,
|
||||
reverse=True,
|
||||
)
|
||||
return list(reversed([r for r in self._runs.values() if r.thread_id == thread_id]))
|
||||
|
||||
async def set_status(self, run_id: str, status: RunStatus, *, error: str | None = None) -> None:
|
||||
"""Transition a run to a new status."""
|
||||
|
||||
@ -5,7 +5,7 @@ from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from deerflow.config.paths import Paths
|
||||
from deerflow.config.paths import Paths, join_host_path
|
||||
|
||||
# ── ensure_thread_dirs ───────────────────────────────────────────────────────
|
||||
|
||||
@ -31,6 +31,13 @@ def test_ensure_thread_dirs_acp_workspace_is_world_writable(tmp_path):
|
||||
assert mode == oct(0o777)
|
||||
|
||||
|
||||
def test_host_thread_dir_rejects_invalid_thread_id(tmp_path):
|
||||
paths = Paths(base_dir=tmp_path)
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid thread_id"):
|
||||
paths.host_thread_dir("../escape")
|
||||
|
||||
|
||||
# ── _get_thread_mounts ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@ -75,6 +82,30 @@ def test_get_thread_mounts_includes_user_data_dirs(tmp_path, monkeypatch):
|
||||
assert "/mnt/user-data/outputs" in container_paths
|
||||
|
||||
|
||||
def test_join_host_path_preserves_windows_drive_letter_style():
|
||||
base = r"C:\Users\demo\deer-flow\backend\.deer-flow"
|
||||
|
||||
joined = join_host_path(base, "threads", "thread-9", "user-data", "outputs")
|
||||
|
||||
assert joined == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-9\user-data\outputs"
|
||||
|
||||
|
||||
def test_get_thread_mounts_preserves_windows_host_path_style(tmp_path, monkeypatch):
|
||||
"""Docker bind mount sources must keep Windows-style paths intact."""
|
||||
aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider")
|
||||
monkeypatch.setenv("DEER_FLOW_HOST_BASE_DIR", r"C:\Users\demo\deer-flow\backend\.deer-flow")
|
||||
monkeypatch.setattr(aio_mod, "get_paths", lambda: Paths(base_dir=tmp_path))
|
||||
|
||||
mounts = aio_mod.AioSandboxProvider._get_thread_mounts("thread-10")
|
||||
|
||||
container_paths = {container_path: host_path for host_path, container_path, _ in mounts}
|
||||
|
||||
assert container_paths["/mnt/user-data/workspace"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\user-data\workspace"
|
||||
assert container_paths["/mnt/user-data/uploads"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\user-data\uploads"
|
||||
assert container_paths["/mnt/user-data/outputs"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\user-data\outputs"
|
||||
assert container_paths["/mnt/acp-workspace"] == r"C:\Users\demo\deer-flow\backend\.deer-flow\threads\thread-10\acp-workspace"
|
||||
|
||||
|
||||
def test_discover_or_create_only_unlocks_when_lock_succeeds(tmp_path, monkeypatch):
|
||||
"""Unlock should not run if exclusive locking itself fails."""
|
||||
aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider")
|
||||
|
||||
@ -11,9 +11,16 @@ import pytest
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[2]
|
||||
SCRIPT_PATH = REPO_ROOT / "scripts" / "docker.sh"
|
||||
BASH_EXECUTABLE = which("bash") or r"C:\Program Files\Git\bin\bash.exe"
|
||||
BASH_CANDIDATES = [
|
||||
Path(r"C:\Program Files\Git\bin\bash.exe"),
|
||||
Path(which("bash")) if which("bash") else None,
|
||||
]
|
||||
BASH_EXECUTABLE = next(
|
||||
(str(path) for path in BASH_CANDIDATES if path is not None and path.exists() and "WindowsApps" not in str(path)),
|
||||
None,
|
||||
)
|
||||
|
||||
if not Path(BASH_EXECUTABLE).exists():
|
||||
if BASH_EXECUTABLE is None:
|
||||
pytestmark = pytest.mark.skip(reason="bash is required for docker.sh detection tests")
|
||||
|
||||
|
||||
@ -21,13 +28,14 @@ def _detect_mode_with_config(config_content: str) -> str:
|
||||
"""Write config content into a temp project root and execute detect_sandbox_mode."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
tmp_root = Path(tmpdir)
|
||||
(tmp_root / "config.yaml").write_text(config_content)
|
||||
(tmp_root / "config.yaml").write_text(config_content, encoding="utf-8")
|
||||
|
||||
command = f"source '{SCRIPT_PATH}' && PROJECT_ROOT='{tmp_root}' && detect_sandbox_mode"
|
||||
|
||||
output = subprocess.check_output(
|
||||
[BASH_EXECUTABLE, "-lc", command],
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
).strip()
|
||||
|
||||
return output
|
||||
@ -37,7 +45,11 @@ def test_detect_mode_defaults_to_local_when_config_missing():
|
||||
"""No config file should default to local mode."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
command = f"source '{SCRIPT_PATH}' && PROJECT_ROOT='{tmpdir}' && detect_sandbox_mode"
|
||||
output = subprocess.check_output([BASH_EXECUTABLE, "-lc", command], text=True).strip()
|
||||
output = subprocess.check_output(
|
||||
[BASH_EXECUTABLE, "-lc", command],
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
).strip()
|
||||
|
||||
assert output == "local"
|
||||
|
||||
|
||||
@ -31,6 +31,7 @@ from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
@ -39,7 +40,7 @@ from fastapi import FastAPI, HTTPException
|
||||
from kubernetes import client as k8s_client
|
||||
from kubernetes import config as k8s_config
|
||||
from kubernetes.client.rest import ApiException
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
# Suppress only the InsecureRequestWarning from urllib3
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
@ -59,6 +60,7 @@ SANDBOX_IMAGE = os.environ.get(
|
||||
)
|
||||
SKILLS_HOST_PATH = os.environ.get("SKILLS_HOST_PATH", "/skills")
|
||||
THREADS_HOST_PATH = os.environ.get("THREADS_HOST_PATH", "/.deer-flow/threads")
|
||||
SAFE_THREAD_ID_PATTERN = r"^[A-Za-z0-9_\-]+$"
|
||||
|
||||
# Path to the kubeconfig *inside* the provisioner container.
|
||||
# Typically the host's ~/.kube/config is mounted here.
|
||||
@ -69,6 +71,36 @@ KUBECONFIG_PATH = os.environ.get("KUBECONFIG_PATH", "/root/.kube/config")
|
||||
# is ``host.docker.internal``; on Linux it may be the host's LAN IP.
|
||||
NODE_HOST = os.environ.get("NODE_HOST", "host.docker.internal")
|
||||
|
||||
|
||||
def join_host_path(base: str, *parts: str) -> str:
|
||||
"""Join host filesystem path segments while preserving native style."""
|
||||
if not parts:
|
||||
return base
|
||||
|
||||
if re.match(r"^[A-Za-z]:[\\/]", base) or base.startswith("\\\\") or "\\" in base:
|
||||
from pathlib import PureWindowsPath
|
||||
|
||||
result = PureWindowsPath(base)
|
||||
for part in parts:
|
||||
result /= part
|
||||
return str(result)
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
result = Path(base)
|
||||
for part in parts:
|
||||
result /= part
|
||||
return str(result)
|
||||
|
||||
|
||||
def _validate_thread_id(thread_id: str) -> str:
|
||||
if not re.match(SAFE_THREAD_ID_PATTERN, thread_id):
|
||||
raise ValueError(
|
||||
"Invalid thread_id: only alphanumeric characters, hyphens, and underscores are allowed."
|
||||
)
|
||||
return thread_id
|
||||
|
||||
|
||||
# ── K8s client setup ────────────────────────────────────────────────────
|
||||
|
||||
core_v1: k8s_client.CoreV1Api | None = None
|
||||
@ -186,7 +218,7 @@ app = FastAPI(title="DeerFlow Sandbox Provisioner", lifespan=lifespan)
|
||||
|
||||
class CreateSandboxRequest(BaseModel):
|
||||
sandbox_id: str
|
||||
thread_id: str
|
||||
thread_id: str = Field(pattern=SAFE_THREAD_ID_PATTERN)
|
||||
|
||||
|
||||
class SandboxResponse(BaseModel):
|
||||
@ -213,6 +245,7 @@ def _sandbox_url(node_port: int) -> str:
|
||||
|
||||
def _build_pod(sandbox_id: str, thread_id: str) -> k8s_client.V1Pod:
|
||||
"""Construct a Pod manifest for a single sandbox."""
|
||||
thread_id = _validate_thread_id(thread_id)
|
||||
return k8s_client.V1Pod(
|
||||
metadata=k8s_client.V1ObjectMeta(
|
||||
name=_pod_name(sandbox_id),
|
||||
@ -298,7 +331,7 @@ def _build_pod(sandbox_id: str, thread_id: str) -> k8s_client.V1Pod:
|
||||
k8s_client.V1Volume(
|
||||
name="user-data",
|
||||
host_path=k8s_client.V1HostPathVolumeSource(
|
||||
path=f"{THREADS_HOST_PATH}/{thread_id}/user-data",
|
||||
path=join_host_path(THREADS_HOST_PATH, thread_id, "user-data"),
|
||||
type="DirectoryOrCreate",
|
||||
),
|
||||
),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user