fix(sandbox): prevent local custom mount symlink escapes (#2558)

* fix(sandbox): prevent local custom mount symlink escapes

Fixes #2506

* fix(sandbox): harden custom mount symlink handling

* fix(sandbox): format internal symlink directory listings
This commit is contained in:
DanielWalnut 2026-04-28 11:59:46 +08:00 committed by GitHub
parent 707ed328dd
commit 39c5da94f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 242 additions and 23 deletions

View File

@ -22,6 +22,13 @@ def list_dir(path: str, max_depth: int = 2) -> list[str]:
if not root_path.is_dir():
return result
def _is_within_root(candidate: Path) -> bool:
try:
candidate.relative_to(root_path)
return True
except ValueError:
return False
def _traverse(current_path: Path, current_depth: int) -> None:
"""Recursively traverse directories up to max_depth."""
if current_depth > max_depth:
@ -32,8 +39,23 @@ def list_dir(path: str, max_depth: int = 2) -> list[str]:
if should_ignore_name(item.name):
continue
if item.is_symlink():
try:
item_resolved = item.resolve()
if not _is_within_root(item_resolved):
continue
except OSError:
continue
post_fix = "/" if item_resolved.is_dir() else ""
result.append(str(item_resolved) + post_fix)
continue
item_resolved = item.resolve()
if not _is_within_root(item_resolved):
continue
post_fix = "/" if item.is_dir() else ""
result.append(str(item.resolve()) + post_fix)
result.append(str(item_resolved) + post_fix)
# Recurse into subdirectories if not at max depth
if item.is_dir() and current_depth < max_depth:

View File

@ -5,6 +5,7 @@ import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import NamedTuple
from deerflow.sandbox.local.list_dir import list_dir
from deerflow.sandbox.sandbox import Sandbox
@ -20,6 +21,11 @@ class PathMapping:
read_only: bool = False
class ResolvedPath(NamedTuple):
path: str
mapping: PathMapping | None
class LocalSandbox(Sandbox):
@staticmethod
def _shell_name(shell: str) -> str:
@ -91,7 +97,23 @@ class LocalSandbox(Sandbox):
return best_mapping.read_only
def _resolve_path(self, path: str) -> str:
def _find_path_mapping(self, path: str) -> tuple[PathMapping, str] | None:
path_str = str(path)
for mapping in sorted(self.path_mappings, key=lambda m: len(m.container_path.rstrip("/") or "/"), reverse=True):
container_path = mapping.container_path.rstrip("/") or "/"
if container_path == "/":
if path_str.startswith("/"):
return mapping, path_str.lstrip("/")
continue
if path_str == container_path or path_str.startswith(container_path + "/"):
relative = path_str[len(container_path) :].lstrip("/")
return mapping, relative
return None
def _resolve_path_with_mapping(self, path: str) -> ResolvedPath:
"""
Resolve container path to actual local path using mappings.
@ -99,22 +121,30 @@ class LocalSandbox(Sandbox):
path: Path that might be a container path
Returns:
Resolved local path
Resolved local path and the matched mapping, if any
"""
path_str = str(path)
# Try each mapping (longest prefix first for more specific matches)
for mapping in sorted(self.path_mappings, key=lambda m: len(m.container_path), reverse=True):
container_path = mapping.container_path
local_path = mapping.local_path
if path_str == container_path or path_str.startswith(container_path + "/"):
# Replace the container path prefix with local path
relative = path_str[len(container_path) :].lstrip("/")
resolved = str(Path(local_path) / relative) if relative else local_path
return resolved
mapping_match = self._find_path_mapping(path_str)
if mapping_match is None:
return ResolvedPath(path_str, None)
# No mapping found, return original path
return path_str
mapping, relative = mapping_match
local_root = Path(mapping.local_path).resolve()
resolved_path = (local_root / relative).resolve() if relative else local_root
try:
resolved_path.relative_to(local_root)
except ValueError as exc:
raise PermissionError(errno.EACCES, "Access denied: path escapes mounted directory", path_str) from exc
return ResolvedPath(str(resolved_path), mapping)
def _resolve_path(self, path: str) -> str:
return self._resolve_path_with_mapping(path).path
def _is_resolved_path_read_only(self, resolved: ResolvedPath) -> bool:
return bool(resolved.mapping and resolved.mapping.read_only) or self._is_read_only_path(resolved.path)
def _reverse_resolve_path(self, path: str) -> str:
"""
@ -309,8 +339,14 @@ class LocalSandbox(Sandbox):
def list_dir(self, path: str, max_depth=2) -> list[str]:
resolved_path = self._resolve_path(path)
entries = list_dir(resolved_path, max_depth)
# Reverse resolve local paths back to container paths in output
return [self._reverse_resolve_paths_in_output(entry) for entry in entries]
# Reverse resolve local paths back to container paths and preserve
# list_dir's trailing "/" marker for directories.
result: list[str] = []
for entry in entries:
is_dir = entry.endswith(("/", "\\"))
reversed_entry = self._reverse_resolve_path(entry.rstrip("/\\")) if is_dir else self._reverse_resolve_path(entry)
result.append(f"{reversed_entry}/" if is_dir and not reversed_entry.endswith("/") else reversed_entry)
return result
def read_file(self, path: str) -> str:
resolved_path = self._resolve_path(path)
@ -329,8 +365,9 @@ class LocalSandbox(Sandbox):
raise type(e)(e.errno, e.strerror, path) from None
def write_file(self, path: str, content: str, append: bool = False) -> None:
resolved_path = self._resolve_path(path)
if self._is_read_only_path(resolved_path):
resolved = self._resolve_path_with_mapping(path)
resolved_path = resolved.path
if self._is_resolved_path_read_only(resolved):
raise OSError(errno.EROFS, "Read-only file system", path)
try:
dir_path = os.path.dirname(resolved_path)
@ -384,8 +421,9 @@ class LocalSandbox(Sandbox):
], truncated
def update_file(self, path: str, content: bytes) -> None:
resolved_path = self._resolve_path(path)
if self._is_read_only_path(resolved_path):
resolved = self._resolve_path_with_mapping(path)
resolved_path = resolved.path
if self._is_resolved_path_read_only(resolved):
raise OSError(errno.EROFS, "Read-only file system", path)
try:
dir_path = os.path.dirname(resolved_path)

View File

@ -1,4 +1,5 @@
import errno
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
@ -8,6 +9,13 @@ from deerflow.sandbox.local.local_sandbox import LocalSandbox, PathMapping
from deerflow.sandbox.local.local_sandbox_provider import LocalSandboxProvider
def _symlink_to(target, link, *, target_is_directory=False):
try:
link.symlink_to(target, target_is_directory=target_is_directory)
except (NotImplementedError, OSError) as exc:
pytest.skip(f"symlinks are not available: {exc}")
class TestPathMapping:
def test_path_mapping_dataclass(self):
mapping = PathMapping(container_path="/mnt/skills", local_path="/home/user/skills", read_only=True)
@ -29,7 +37,7 @@ class TestLocalSandboxPathResolution:
],
)
resolved = sandbox._resolve_path("/mnt/skills")
assert resolved == "/home/user/skills"
assert resolved == str(Path("/home/user/skills").resolve())
def test_resolve_path_nested_path(self):
sandbox = LocalSandbox(
@ -39,7 +47,7 @@ class TestLocalSandboxPathResolution:
],
)
resolved = sandbox._resolve_path("/mnt/skills/agent/prompt.py")
assert resolved == "/home/user/skills/agent/prompt.py"
assert resolved == str(Path("/home/user/skills/agent/prompt.py").resolve())
def test_resolve_path_no_mapping(self):
sandbox = LocalSandbox(
@ -61,7 +69,7 @@ class TestLocalSandboxPathResolution:
)
resolved = sandbox._resolve_path("/mnt/skills/file.py")
# Should match /mnt/skills first (longer prefix)
assert resolved == "/home/user/skills/file.py"
assert resolved == str(Path("/home/user/skills/file.py").resolve())
def test_reverse_resolve_path_exact_match(self, tmp_path):
skills_dir = tmp_path / "skills"
@ -175,6 +183,157 @@ class TestReadOnlyPath:
assert exc_info.value.errno == errno.EROFS
class TestSymlinkEscapes:
def test_read_file_blocks_symlink_escape_from_mount(self, tmp_path):
mount_dir = tmp_path / "mount"
mount_dir.mkdir()
outside_dir = tmp_path / "outside"
outside_dir.mkdir()
(outside_dir / "secret.txt").write_text("secret")
_symlink_to(outside_dir, mount_dir / "escape", target_is_directory=True)
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/data", local_path=str(mount_dir), read_only=False),
],
)
with pytest.raises(PermissionError) as exc_info:
sandbox.read_file("/mnt/data/escape/secret.txt")
assert exc_info.value.errno == errno.EACCES
def test_write_file_blocks_symlink_escape_from_mount(self, tmp_path):
mount_dir = tmp_path / "mount"
mount_dir.mkdir()
outside_dir = tmp_path / "outside"
outside_dir.mkdir()
victim = outside_dir / "victim.txt"
victim.write_text("original")
_symlink_to(outside_dir, mount_dir / "escape", target_is_directory=True)
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/data", local_path=str(mount_dir), read_only=False),
],
)
with pytest.raises(PermissionError) as exc_info:
sandbox.write_file("/mnt/data/escape/victim.txt", "changed")
assert exc_info.value.errno == errno.EACCES
assert victim.read_text() == "original"
def test_write_file_uses_matched_read_only_mount_for_symlink_target(self, tmp_path):
repo_dir = tmp_path / "repo"
repo_dir.mkdir()
writable_dir = repo_dir / "writable"
writable_dir.mkdir()
_symlink_to(writable_dir, repo_dir / "link-to-writable", target_is_directory=True)
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/repo", local_path=str(repo_dir), read_only=True),
PathMapping(container_path="/mnt/repo/writable", local_path=str(writable_dir), read_only=False),
],
)
with pytest.raises(OSError) as exc_info:
sandbox.write_file("/mnt/repo/link-to-writable/file.txt", "bypass")
assert exc_info.value.errno == errno.EROFS
assert not (writable_dir / "file.txt").exists()
def test_list_dir_does_not_follow_symlink_escape_from_mount(self, tmp_path):
mount_dir = tmp_path / "mount"
mount_dir.mkdir()
outside_dir = tmp_path / "outside"
outside_dir.mkdir()
(outside_dir / "secret.txt").write_text("secret")
_symlink_to(outside_dir, mount_dir / "escape", target_is_directory=True)
(mount_dir / "visible.txt").write_text("visible")
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/data", local_path=str(mount_dir), read_only=False),
],
)
entries = sandbox.list_dir("/mnt/data", max_depth=2)
assert "/mnt/data/visible.txt" in entries
assert all("secret.txt" not in entry for entry in entries)
assert all("outside" not in entry for entry in entries)
def test_list_dir_formats_internal_directory_symlink_like_directory(self, tmp_path):
mount_dir = tmp_path / "mount"
nested_dir = mount_dir / "nested"
linked_dir = nested_dir / "linked-dir"
linked_dir.mkdir(parents=True)
_symlink_to(linked_dir, mount_dir / "dir-link", target_is_directory=True)
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/data", local_path=str(mount_dir), read_only=False),
],
)
entries = sandbox.list_dir("/mnt/data", max_depth=1)
assert "/mnt/data/nested/" in entries
assert "/mnt/data/nested/linked-dir/" in entries
assert "/mnt/data/dir-link" not in entries
def test_write_file_blocks_symlink_into_nested_read_only_mount(self, tmp_path):
repo_dir = tmp_path / "repo"
repo_dir.mkdir()
protected_dir = repo_dir / "protected"
protected_dir.mkdir()
_symlink_to(protected_dir, repo_dir / "link-to-protected", target_is_directory=True)
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/repo", local_path=str(repo_dir), read_only=False),
PathMapping(container_path="/mnt/repo/protected", local_path=str(protected_dir), read_only=True),
],
)
with pytest.raises(OSError) as exc_info:
sandbox.write_file("/mnt/repo/link-to-protected/file.txt", "bypass")
assert exc_info.value.errno == errno.EROFS
assert not (protected_dir / "file.txt").exists()
def test_update_file_blocks_symlink_into_nested_read_only_mount(self, tmp_path):
repo_dir = tmp_path / "repo"
repo_dir.mkdir()
protected_dir = repo_dir / "protected"
protected_dir.mkdir()
existing = protected_dir / "file.txt"
existing.write_bytes(b"original")
_symlink_to(protected_dir, repo_dir / "link-to-protected", target_is_directory=True)
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/repo", local_path=str(repo_dir), read_only=False),
PathMapping(container_path="/mnt/repo/protected", local_path=str(protected_dir), read_only=True),
],
)
with pytest.raises(OSError) as exc_info:
sandbox.update_file("/mnt/repo/link-to-protected/file.txt", b"changed")
assert exc_info.value.errno == errno.EROFS
assert existing.read_bytes() == b"original"
class TestMultipleMounts:
def test_multiple_read_write_mounts(self, tmp_path):
skills_dir = tmp_path / "skills"