feat(paths): add user-aware path methods with optional user_id parameter

Add _validate_user_id(), user_dir(), user_memory_file(), user_agent_memory_file()
and optional keyword-only user_id parameter to all thread-related path methods.
When user_id is provided, paths resolve under users/{user_id}/threads/{thread_id}/;
when omitted, legacy layout is preserved for backward compatibility.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
rayhpeng 2026-04-12 10:50:11 +08:00
parent 3540e157f1
commit e8f087cb37
2 changed files with 231 additions and 34 deletions

View File

@ -7,6 +7,7 @@ from pathlib import Path, PureWindowsPath
VIRTUAL_PATH_PREFIX = "/mnt/user-data"
_SAFE_THREAD_ID_RE = re.compile(r"^[A-Za-z0-9_\-]+$")
_SAFE_USER_ID_RE = re.compile(r"^[A-Za-z0-9_\-]+$")
def _default_local_base_dir() -> Path:
@ -22,6 +23,13 @@ def _validate_thread_id(thread_id: str) -> str:
return thread_id
def _validate_user_id(user_id: str) -> str:
"""Validate a user ID before using it in filesystem paths."""
if not _SAFE_USER_ID_RE.match(user_id):
raise ValueError(f"Invalid user_id {user_id!r}: only alphanumeric characters, hyphens, and underscores are allowed.")
return user_id
def _join_host_path(base: str, *parts: str) -> str:
"""Join host filesystem path segments while preserving native style.
@ -134,44 +142,63 @@ class Paths:
"""Per-agent memory file: `{base_dir}/agents/{name}/memory.json`."""
return self.agent_dir(name) / "memory.json"
def thread_dir(self, thread_id: str) -> Path:
def user_dir(self, user_id: str) -> Path:
"""Directory for a specific user: `{base_dir}/users/{user_id}/`."""
return self.base_dir / "users" / _validate_user_id(user_id)
def user_memory_file(self, user_id: str) -> Path:
"""Per-user memory file: `{base_dir}/users/{user_id}/memory.json`."""
return self.user_dir(user_id) / "memory.json"
def user_agent_memory_file(self, user_id: str, agent_name: str) -> Path:
"""Per-user per-agent memory: `{base_dir}/users/{user_id}/agents/{name}/memory.json`."""
return self.user_dir(user_id) / "agents" / agent_name.lower() / "memory.json"
def thread_dir(self, thread_id: str, *, user_id: str | None = None) -> Path:
"""
Host path for a thread's data: `{base_dir}/threads/{thread_id}/`
Host path for a thread's data.
When *user_id* is provided:
`{base_dir}/users/{user_id}/threads/{thread_id}/`
Otherwise (legacy layout):
`{base_dir}/threads/{thread_id}/`
This directory contains a `user-data/` subdirectory that is mounted
as `/mnt/user-data/` inside the sandbox.
Raises:
ValueError: If `thread_id` contains unsafe characters (path separators
or `..`) that could cause directory traversal.
ValueError: If `thread_id` or `user_id` contains unsafe characters (path
separators or `..`) that could cause directory traversal.
"""
if user_id is not None:
return self.user_dir(user_id) / "threads" / _validate_thread_id(thread_id)
return self.base_dir / "threads" / _validate_thread_id(thread_id)
def sandbox_work_dir(self, thread_id: str) -> Path:
def sandbox_work_dir(self, thread_id: str, *, user_id: str | None = None) -> Path:
"""
Host path for the agent's workspace directory.
Host: `{base_dir}/threads/{thread_id}/user-data/workspace/`
Sandbox: `/mnt/user-data/workspace/`
"""
return self.thread_dir(thread_id) / "user-data" / "workspace"
return self.thread_dir(thread_id, user_id=user_id) / "user-data" / "workspace"
def sandbox_uploads_dir(self, thread_id: str) -> Path:
def sandbox_uploads_dir(self, thread_id: str, *, user_id: str | None = None) -> Path:
"""
Host path for user-uploaded files.
Host: `{base_dir}/threads/{thread_id}/user-data/uploads/`
Sandbox: `/mnt/user-data/uploads/`
"""
return self.thread_dir(thread_id) / "user-data" / "uploads"
return self.thread_dir(thread_id, user_id=user_id) / "user-data" / "uploads"
def sandbox_outputs_dir(self, thread_id: str) -> Path:
def sandbox_outputs_dir(self, thread_id: str, *, user_id: str | None = None) -> Path:
"""
Host path for agent-generated artifacts.
Host: `{base_dir}/threads/{thread_id}/user-data/outputs/`
Sandbox: `/mnt/user-data/outputs/`
"""
return self.thread_dir(thread_id) / "user-data" / "outputs"
return self.thread_dir(thread_id, user_id=user_id) / "user-data" / "outputs"
def acp_workspace_dir(self, thread_id: str) -> Path:
def acp_workspace_dir(self, thread_id: str, *, user_id: str | None = None) -> Path:
"""
Host path for the ACP workspace of a specific thread.
Host: `{base_dir}/threads/{thread_id}/acp-workspace/`
@ -180,41 +207,43 @@ class Paths:
Each thread gets its own isolated ACP workspace so that concurrent
sessions cannot read each other's ACP agent outputs.
"""
return self.thread_dir(thread_id) / "acp-workspace"
return self.thread_dir(thread_id, user_id=user_id) / "acp-workspace"
def sandbox_user_data_dir(self, thread_id: str) -> Path:
def sandbox_user_data_dir(self, thread_id: str, *, user_id: str | None = None) -> Path:
"""
Host path for the user-data root.
Host: `{base_dir}/threads/{thread_id}/user-data/`
Sandbox: `/mnt/user-data/`
"""
return self.thread_dir(thread_id) / "user-data"
return self.thread_dir(thread_id, user_id=user_id) / "user-data"
def host_thread_dir(self, thread_id: str) -> str:
def host_thread_dir(self, thread_id: str, *, user_id: str | None = None) -> str:
"""Host path for a thread directory, preserving Windows path syntax."""
if user_id is not None:
return _join_host_path(self._host_base_dir_str(), "users", _validate_user_id(user_id), "threads", _validate_thread_id(thread_id))
return _join_host_path(self._host_base_dir_str(), "threads", _validate_thread_id(thread_id))
def host_sandbox_user_data_dir(self, thread_id: str) -> str:
def host_sandbox_user_data_dir(self, thread_id: str, *, user_id: str | None = None) -> str:
"""Host path for a thread's user-data root."""
return _join_host_path(self.host_thread_dir(thread_id), "user-data")
return _join_host_path(self.host_thread_dir(thread_id, user_id=user_id), "user-data")
def host_sandbox_work_dir(self, thread_id: str) -> str:
def host_sandbox_work_dir(self, thread_id: str, *, user_id: str | None = None) -> str:
"""Host path for the workspace mount source."""
return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "workspace")
return _join_host_path(self.host_sandbox_user_data_dir(thread_id, user_id=user_id), "workspace")
def host_sandbox_uploads_dir(self, thread_id: str) -> str:
def host_sandbox_uploads_dir(self, thread_id: str, *, user_id: str | None = None) -> str:
"""Host path for the uploads mount source."""
return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "uploads")
return _join_host_path(self.host_sandbox_user_data_dir(thread_id, user_id=user_id), "uploads")
def host_sandbox_outputs_dir(self, thread_id: str) -> str:
def host_sandbox_outputs_dir(self, thread_id: str, *, user_id: str | None = None) -> str:
"""Host path for the outputs mount source."""
return _join_host_path(self.host_sandbox_user_data_dir(thread_id), "outputs")
return _join_host_path(self.host_sandbox_user_data_dir(thread_id, user_id=user_id), "outputs")
def host_acp_workspace_dir(self, thread_id: str) -> str:
def host_acp_workspace_dir(self, thread_id: str, *, user_id: str | None = None) -> str:
"""Host path for the ACP workspace mount source."""
return _join_host_path(self.host_thread_dir(thread_id), "acp-workspace")
return _join_host_path(self.host_thread_dir(thread_id, user_id=user_id), "acp-workspace")
def ensure_thread_dirs(self, thread_id: str) -> None:
def ensure_thread_dirs(self, thread_id: str, *, user_id: str | None = None) -> None:
"""Create all standard sandbox directories for a thread.
Directories are created with mode 0o777 so that sandbox containers
@ -228,24 +257,24 @@ class Paths:
ACP agent invocation.
"""
for d in [
self.sandbox_work_dir(thread_id),
self.sandbox_uploads_dir(thread_id),
self.sandbox_outputs_dir(thread_id),
self.acp_workspace_dir(thread_id),
self.sandbox_work_dir(thread_id, user_id=user_id),
self.sandbox_uploads_dir(thread_id, user_id=user_id),
self.sandbox_outputs_dir(thread_id, user_id=user_id),
self.acp_workspace_dir(thread_id, user_id=user_id),
]:
d.mkdir(parents=True, exist_ok=True)
d.chmod(0o777)
def delete_thread_dir(self, thread_id: str) -> None:
def delete_thread_dir(self, thread_id: str, *, user_id: str | None = None) -> None:
"""Delete all persisted data for a thread.
The operation is idempotent: missing thread directories are ignored.
"""
thread_dir = self.thread_dir(thread_id)
thread_dir = self.thread_dir(thread_id, user_id=user_id)
if thread_dir.exists():
shutil.rmtree(thread_dir)
def resolve_virtual_path(self, thread_id: str, virtual_path: str) -> Path:
def resolve_virtual_path(self, thread_id: str, virtual_path: str, *, user_id: str | None = None) -> Path:
"""Resolve a sandbox virtual path to the actual host filesystem path.
Args:
@ -253,6 +282,7 @@ class Paths:
virtual_path: Virtual path as seen inside the sandbox, e.g.
``/mnt/user-data/outputs/report.pdf``.
Leading slashes are stripped before matching.
user_id: Optional user ID for user-scoped path resolution.
Returns:
The resolved absolute host filesystem path.
@ -270,7 +300,7 @@ class Paths:
raise ValueError(f"Path must start with /{prefix}")
relative = stripped[len(prefix) :].lstrip("/")
base = self.sandbox_user_data_dir(thread_id).resolve()
base = self.sandbox_user_data_dir(thread_id, user_id=user_id).resolve()
actual = (base / relative).resolve()
try:

View File

@ -0,0 +1,167 @@
"""Tests for user-scoped path resolution in Paths."""
import pytest
from pathlib import Path
from deerflow.config.paths import Paths
@pytest.fixture
def paths(tmp_path: Path) -> Paths:
return Paths(tmp_path)
class TestValidateUserId:
def test_valid_user_id(self, paths: Paths):
d = paths.user_dir("u-abc-123")
assert d == paths.base_dir / "users" / "u-abc-123"
def test_rejects_path_traversal(self, paths: Paths):
with pytest.raises(ValueError, match="Invalid user_id"):
paths.user_dir("../escape")
def test_rejects_slash(self, paths: Paths):
with pytest.raises(ValueError, match="Invalid user_id"):
paths.user_dir("foo/bar")
def test_rejects_empty(self, paths: Paths):
with pytest.raises(ValueError, match="Invalid user_id"):
paths.user_dir("")
class TestUserDir:
def test_user_dir(self, paths: Paths):
assert paths.user_dir("alice") == paths.base_dir / "users" / "alice"
class TestUserMemoryFile:
def test_user_memory_file(self, paths: Paths):
assert paths.user_memory_file("bob") == paths.base_dir / "users" / "bob" / "memory.json"
class TestUserAgentMemoryFile:
def test_user_agent_memory_file(self, paths: Paths):
expected = paths.base_dir / "users" / "bob" / "agents" / "myagent" / "memory.json"
assert paths.user_agent_memory_file("bob", "myagent") == expected
def test_user_agent_memory_file_lowercases_name(self, paths: Paths):
expected = paths.base_dir / "users" / "bob" / "agents" / "myagent" / "memory.json"
assert paths.user_agent_memory_file("bob", "MyAgent") == expected
class TestUserThreadDir:
def test_user_thread_dir(self, paths: Paths):
expected = paths.base_dir / "users" / "u1" / "threads" / "t1"
assert paths.thread_dir("t1", user_id="u1") == expected
def test_thread_dir_no_user_id_falls_back_to_legacy(self, paths: Paths):
expected = paths.base_dir / "threads" / "t1"
assert paths.thread_dir("t1") == expected
class TestUserSandboxDirs:
def test_sandbox_work_dir(self, paths: Paths):
expected = paths.base_dir / "users" / "u1" / "threads" / "t1" / "user-data" / "workspace"
assert paths.sandbox_work_dir("t1", user_id="u1") == expected
def test_sandbox_uploads_dir(self, paths: Paths):
expected = paths.base_dir / "users" / "u1" / "threads" / "t1" / "user-data" / "uploads"
assert paths.sandbox_uploads_dir("t1", user_id="u1") == expected
def test_sandbox_outputs_dir(self, paths: Paths):
expected = paths.base_dir / "users" / "u1" / "threads" / "t1" / "user-data" / "outputs"
assert paths.sandbox_outputs_dir("t1", user_id="u1") == expected
def test_sandbox_user_data_dir(self, paths: Paths):
expected = paths.base_dir / "users" / "u1" / "threads" / "t1" / "user-data"
assert paths.sandbox_user_data_dir("t1", user_id="u1") == expected
def test_acp_workspace_dir(self, paths: Paths):
expected = paths.base_dir / "users" / "u1" / "threads" / "t1" / "acp-workspace"
assert paths.acp_workspace_dir("t1", user_id="u1") == expected
def test_legacy_sandbox_work_dir(self, paths: Paths):
expected = paths.base_dir / "threads" / "t1" / "user-data" / "workspace"
assert paths.sandbox_work_dir("t1") == expected
class TestHostPathsWithUserId:
def test_host_thread_dir_with_user_id(self, paths: Paths):
result = paths.host_thread_dir("t1", user_id="u1")
assert "users" in result
assert "u1" in result
assert "threads" in result
assert "t1" in result
def test_host_thread_dir_legacy(self, paths: Paths):
result = paths.host_thread_dir("t1")
assert "threads" in result
assert "t1" in result
assert "users" not in result
def test_host_sandbox_user_data_dir_with_user_id(self, paths: Paths):
result = paths.host_sandbox_user_data_dir("t1", user_id="u1")
assert "users" in result
assert "user-data" in result
def test_host_sandbox_work_dir_with_user_id(self, paths: Paths):
result = paths.host_sandbox_work_dir("t1", user_id="u1")
assert "workspace" in result
def test_host_sandbox_uploads_dir_with_user_id(self, paths: Paths):
result = paths.host_sandbox_uploads_dir("t1", user_id="u1")
assert "uploads" in result
def test_host_sandbox_outputs_dir_with_user_id(self, paths: Paths):
result = paths.host_sandbox_outputs_dir("t1", user_id="u1")
assert "outputs" in result
def test_host_acp_workspace_dir_with_user_id(self, paths: Paths):
result = paths.host_acp_workspace_dir("t1", user_id="u1")
assert "acp-workspace" in result
class TestEnsureAndDeleteWithUserId:
def test_ensure_thread_dirs_creates_user_scoped(self, paths: Paths):
paths.ensure_thread_dirs("t1", user_id="u1")
assert paths.sandbox_work_dir("t1", user_id="u1").is_dir()
assert paths.sandbox_uploads_dir("t1", user_id="u1").is_dir()
assert paths.sandbox_outputs_dir("t1", user_id="u1").is_dir()
assert paths.acp_workspace_dir("t1", user_id="u1").is_dir()
def test_delete_thread_dir_removes_user_scoped(self, paths: Paths):
paths.ensure_thread_dirs("t1", user_id="u1")
assert paths.thread_dir("t1", user_id="u1").exists()
paths.delete_thread_dir("t1", user_id="u1")
assert not paths.thread_dir("t1", user_id="u1").exists()
def test_delete_thread_dir_idempotent(self, paths: Paths):
paths.delete_thread_dir("nonexistent", user_id="u1") # should not raise
def test_ensure_thread_dirs_legacy_still_works(self, paths: Paths):
paths.ensure_thread_dirs("t1")
assert paths.sandbox_work_dir("t1").is_dir()
def test_user_scoped_and_legacy_are_independent(self, paths: Paths):
paths.ensure_thread_dirs("t1", user_id="u1")
paths.ensure_thread_dirs("t1")
# Both exist independently
assert paths.thread_dir("t1", user_id="u1").exists()
assert paths.thread_dir("t1").exists()
# Delete one doesn't affect the other
paths.delete_thread_dir("t1", user_id="u1")
assert not paths.thread_dir("t1", user_id="u1").exists()
assert paths.thread_dir("t1").exists()
class TestResolveVirtualPathWithUserId:
def test_resolve_virtual_path_with_user_id(self, paths: Paths):
paths.ensure_thread_dirs("t1", user_id="u1")
result = paths.resolve_virtual_path("t1", "/mnt/user-data/workspace/file.txt", user_id="u1")
expected_base = paths.sandbox_user_data_dir("t1", user_id="u1").resolve()
assert str(result).startswith(str(expected_base))
def test_resolve_virtual_path_legacy(self, paths: Paths):
paths.ensure_thread_dirs("t1")
result = paths.resolve_virtual_path("t1", "/mnt/user-data/workspace/file.txt")
expected_base = paths.sandbox_user_data_dir("t1").resolve()
assert str(result).startswith(str(expected_base))