mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-27 04:08:30 +00:00
feat(isolation): wire user_id through all Paths and memory callsites
Pass user_id=get_effective_user_id() at every callsite that invokes Paths methods or memory functions, enabling per-user filesystem isolation throughout the harness and app layers. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
9af2f3e73c
commit
7ce9333200
@ -13,6 +13,7 @@ from app.channels.base import Channel
|
||||
from app.channels.commands import KNOWN_CHANNEL_COMMANDS
|
||||
from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
|
||||
from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
from deerflow.sandbox.sandbox_provider import get_sandbox_provider
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -344,8 +345,9 @@ class FeishuChannel(Channel):
|
||||
return f"Failed to obtain the [{type}]"
|
||||
|
||||
paths = get_paths()
|
||||
paths.ensure_thread_dirs(thread_id)
|
||||
uploads_dir = paths.sandbox_uploads_dir(thread_id).resolve()
|
||||
user_id = get_effective_user_id()
|
||||
paths.ensure_thread_dirs(thread_id, user_id=user_id)
|
||||
uploads_dir = paths.sandbox_uploads_dir(thread_id, user_id=user_id).resolve()
|
||||
|
||||
ext = "png" if type == "image" else "bin"
|
||||
raw_filename = getattr(response, "file_name", "") or f"feishu_{file_key[-12:]}.{ext}"
|
||||
|
||||
@ -17,6 +17,7 @@ from langgraph_sdk.errors import ConflictError
|
||||
from app.channels.commands import KNOWN_CHANNEL_COMMANDS
|
||||
from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
|
||||
from app.channels.store import ChannelStore
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -341,14 +342,15 @@ def _resolve_attachments(thread_id: str, artifacts: list[str]) -> list[ResolvedA
|
||||
|
||||
attachments: list[ResolvedAttachment] = []
|
||||
paths = get_paths()
|
||||
outputs_dir = paths.sandbox_outputs_dir(thread_id).resolve()
|
||||
user_id = get_effective_user_id()
|
||||
outputs_dir = paths.sandbox_outputs_dir(thread_id, user_id=user_id).resolve()
|
||||
for virtual_path in artifacts:
|
||||
# Security: only allow files from the agent outputs directory
|
||||
if not virtual_path.startswith(_OUTPUTS_VIRTUAL_PREFIX):
|
||||
logger.warning("[Manager] rejected non-outputs artifact path: %s", virtual_path)
|
||||
continue
|
||||
try:
|
||||
actual = paths.resolve_virtual_path(thread_id, virtual_path)
|
||||
actual = paths.resolve_virtual_path(thread_id, virtual_path, user_id=user_id)
|
||||
# Verify the resolved path is actually under the outputs directory
|
||||
# (guards against path-traversal even after prefix check)
|
||||
try:
|
||||
|
||||
@ -5,6 +5,7 @@ from pathlib import Path
|
||||
from fastapi import HTTPException
|
||||
|
||||
from deerflow.config.paths import get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
|
||||
def resolve_thread_virtual_path(thread_id: str, virtual_path: str) -> Path:
|
||||
@ -22,7 +23,7 @@ def resolve_thread_virtual_path(thread_id: str, virtual_path: str) -> Path:
|
||||
HTTPException: If the path is invalid or outside allowed directories.
|
||||
"""
|
||||
try:
|
||||
return get_paths().resolve_virtual_path(thread_id, virtual_path)
|
||||
return get_paths().resolve_virtual_path(thread_id, virtual_path, user_id=get_effective_user_id())
|
||||
except ValueError as e:
|
||||
status = 403 if "traversal" in str(e) else 400
|
||||
raise HTTPException(status_code=status, detail=str(e))
|
||||
|
||||
@ -13,6 +13,7 @@ from deerflow.agents.memory.updater import (
|
||||
update_memory_fact,
|
||||
)
|
||||
from deerflow.config.memory_config import get_memory_config
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
router = APIRouter(prefix="/api", tags=["memory"])
|
||||
|
||||
@ -147,7 +148,7 @@ async def get_memory() -> MemoryResponse:
|
||||
}
|
||||
```
|
||||
"""
|
||||
memory_data = get_memory_data()
|
||||
memory_data = get_memory_data(user_id=get_effective_user_id())
|
||||
return MemoryResponse(**memory_data)
|
||||
|
||||
|
||||
@ -167,7 +168,7 @@ async def reload_memory() -> MemoryResponse:
|
||||
Returns:
|
||||
The reloaded memory data.
|
||||
"""
|
||||
memory_data = reload_memory_data()
|
||||
memory_data = reload_memory_data(user_id=get_effective_user_id())
|
||||
return MemoryResponse(**memory_data)
|
||||
|
||||
|
||||
@ -181,7 +182,7 @@ async def reload_memory() -> MemoryResponse:
|
||||
async def clear_memory() -> MemoryResponse:
|
||||
"""Clear all persisted memory data."""
|
||||
try:
|
||||
memory_data = clear_memory_data()
|
||||
memory_data = clear_memory_data(user_id=get_effective_user_id())
|
||||
except OSError as exc:
|
||||
raise HTTPException(status_code=500, detail="Failed to clear memory data.") from exc
|
||||
|
||||
@ -202,6 +203,7 @@ async def create_memory_fact_endpoint(request: FactCreateRequest) -> MemoryRespo
|
||||
content=request.content,
|
||||
category=request.category,
|
||||
confidence=request.confidence,
|
||||
user_id=get_effective_user_id(),
|
||||
)
|
||||
except ValueError as exc:
|
||||
raise _map_memory_fact_value_error(exc) from exc
|
||||
@ -221,7 +223,7 @@ async def create_memory_fact_endpoint(request: FactCreateRequest) -> MemoryRespo
|
||||
async def delete_memory_fact_endpoint(fact_id: str) -> MemoryResponse:
|
||||
"""Delete a single fact from memory by fact id."""
|
||||
try:
|
||||
memory_data = delete_memory_fact(fact_id)
|
||||
memory_data = delete_memory_fact(fact_id, user_id=get_effective_user_id())
|
||||
except KeyError as exc:
|
||||
raise HTTPException(status_code=404, detail=f"Memory fact '{fact_id}' not found.") from exc
|
||||
except OSError as exc:
|
||||
@ -245,6 +247,7 @@ async def update_memory_fact_endpoint(fact_id: str, request: FactPatchRequest) -
|
||||
content=request.content,
|
||||
category=request.category,
|
||||
confidence=request.confidence,
|
||||
user_id=get_effective_user_id(),
|
||||
)
|
||||
except ValueError as exc:
|
||||
raise _map_memory_fact_value_error(exc) from exc
|
||||
@ -265,7 +268,7 @@ async def update_memory_fact_endpoint(fact_id: str, request: FactPatchRequest) -
|
||||
)
|
||||
async def export_memory() -> MemoryResponse:
|
||||
"""Export the current memory data."""
|
||||
memory_data = get_memory_data()
|
||||
memory_data = get_memory_data(user_id=get_effective_user_id())
|
||||
return MemoryResponse(**memory_data)
|
||||
|
||||
|
||||
@ -279,7 +282,7 @@ async def export_memory() -> MemoryResponse:
|
||||
async def import_memory(request: MemoryResponse) -> MemoryResponse:
|
||||
"""Import and persist memory data."""
|
||||
try:
|
||||
memory_data = import_memory_data(request.model_dump())
|
||||
memory_data = import_memory_data(request.model_dump(), user_id=get_effective_user_id())
|
||||
except OSError as exc:
|
||||
raise HTTPException(status_code=500, detail="Failed to import memory data.") from exc
|
||||
|
||||
@ -337,7 +340,7 @@ async def get_memory_status() -> MemoryStatusResponse:
|
||||
Combined memory configuration and current data.
|
||||
"""
|
||||
config = get_memory_config()
|
||||
memory_data = get_memory_data()
|
||||
memory_data = get_memory_data(user_id=get_effective_user_id())
|
||||
|
||||
return MemoryStatusResponse(
|
||||
config=MemoryConfigResponse(
|
||||
|
||||
@ -26,6 +26,7 @@ from app.gateway.deps import get_checkpointer, get_current_user, get_feedback_re
|
||||
from app.gateway.utils import sanitize_log_param
|
||||
from deerflow.config.paths import Paths, get_paths
|
||||
from deerflow.runtime import serialize_channel_values
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/api/threads", tags=["threads"])
|
||||
@ -143,11 +144,11 @@ class ThreadHistoryRequest(BaseModel):
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _delete_thread_data(thread_id: str, paths: Paths | None = None) -> ThreadDeleteResponse:
|
||||
def _delete_thread_data(thread_id: str, paths: Paths | None = None, *, user_id: str | None = None) -> ThreadDeleteResponse:
|
||||
"""Delete local persisted filesystem data for a thread."""
|
||||
path_manager = paths or get_paths()
|
||||
try:
|
||||
path_manager.delete_thread_dir(thread_id)
|
||||
path_manager.delete_thread_dir(thread_id, user_id=user_id)
|
||||
except ValueError as exc:
|
||||
raise HTTPException(status_code=422, detail=str(exc)) from exc
|
||||
except FileNotFoundError:
|
||||
@ -198,7 +199,7 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
|
||||
from app.gateway.deps import get_thread_store
|
||||
|
||||
# Clean local filesystem
|
||||
response = _delete_thread_data(thread_id)
|
||||
response = _delete_thread_data(thread_id, user_id=get_effective_user_id())
|
||||
|
||||
# Remove checkpoints (best-effort)
|
||||
checkpointer = getattr(request.app.state, "checkpointer", None)
|
||||
|
||||
@ -9,6 +9,7 @@ from pydantic import BaseModel
|
||||
|
||||
from app.gateway.authz import require_permission
|
||||
from deerflow.config.paths import get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
from deerflow.sandbox.sandbox_provider import get_sandbox_provider
|
||||
from deerflow.uploads.manager import (
|
||||
PathTraversalError,
|
||||
@ -69,7 +70,7 @@ async def upload_files(
|
||||
uploads_dir = ensure_uploads_dir(thread_id)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
sandbox_uploads = get_paths().sandbox_uploads_dir(thread_id)
|
||||
sandbox_uploads = get_paths().sandbox_uploads_dir(thread_id, user_id=get_effective_user_id())
|
||||
uploaded_files = []
|
||||
|
||||
sandbox_provider = get_sandbox_provider()
|
||||
@ -147,7 +148,7 @@ async def list_uploaded_files(thread_id: str, request: Request) -> dict:
|
||||
enrich_file_listing(result, thread_id)
|
||||
|
||||
# Gateway additionally includes the sandbox-relative path.
|
||||
sandbox_uploads = get_paths().sandbox_uploads_dir(thread_id)
|
||||
sandbox_uploads = get_paths().sandbox_uploads_dir(thread_id, user_id=get_effective_user_id())
|
||||
for f in result["files"]:
|
||||
f["path"] = str(sandbox_uploads / f["filename"])
|
||||
|
||||
|
||||
@ -519,12 +519,13 @@ def _get_memory_context(agent_name: str | None = None) -> str:
|
||||
try:
|
||||
from deerflow.agents.memory import format_memory_for_injection, get_memory_data
|
||||
from deerflow.config.memory_config import get_memory_config
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
config = get_memory_config()
|
||||
if not config.enabled or not config.injection_enabled:
|
||||
return ""
|
||||
|
||||
memory_data = get_memory_data(agent_name)
|
||||
memory_data = get_memory_data(agent_name, user_id=get_effective_user_id())
|
||||
memory_content = format_memory_for_injection(memory_data, max_tokens=config.max_injection_tokens)
|
||||
|
||||
if not memory_content.strip():
|
||||
|
||||
@ -8,6 +8,7 @@ from langgraph.runtime import Runtime
|
||||
|
||||
from deerflow.agents.thread_state import ThreadDataState
|
||||
from deerflow.config.paths import Paths, get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -46,32 +47,34 @@ class ThreadDataMiddleware(AgentMiddleware[ThreadDataMiddlewareState]):
|
||||
self._paths = Paths(base_dir) if base_dir else get_paths()
|
||||
self._lazy_init = lazy_init
|
||||
|
||||
def _get_thread_paths(self, thread_id: str) -> dict[str, str]:
|
||||
def _get_thread_paths(self, thread_id: str, user_id: str | None = None) -> dict[str, str]:
|
||||
"""Get the paths for a thread's data directories.
|
||||
|
||||
Args:
|
||||
thread_id: The thread ID.
|
||||
user_id: Optional user ID for per-user path isolation.
|
||||
|
||||
Returns:
|
||||
Dictionary with workspace_path, uploads_path, and outputs_path.
|
||||
"""
|
||||
return {
|
||||
"workspace_path": str(self._paths.sandbox_work_dir(thread_id)),
|
||||
"uploads_path": str(self._paths.sandbox_uploads_dir(thread_id)),
|
||||
"outputs_path": str(self._paths.sandbox_outputs_dir(thread_id)),
|
||||
"workspace_path": str(self._paths.sandbox_work_dir(thread_id, user_id=user_id)),
|
||||
"uploads_path": str(self._paths.sandbox_uploads_dir(thread_id, user_id=user_id)),
|
||||
"outputs_path": str(self._paths.sandbox_outputs_dir(thread_id, user_id=user_id)),
|
||||
}
|
||||
|
||||
def _create_thread_directories(self, thread_id: str) -> dict[str, str]:
|
||||
def _create_thread_directories(self, thread_id: str, user_id: str | None = None) -> dict[str, str]:
|
||||
"""Create the thread data directories.
|
||||
|
||||
Args:
|
||||
thread_id: The thread ID.
|
||||
user_id: Optional user ID for per-user path isolation.
|
||||
|
||||
Returns:
|
||||
Dictionary with the created directory paths.
|
||||
"""
|
||||
self._paths.ensure_thread_dirs(thread_id)
|
||||
return self._get_thread_paths(thread_id)
|
||||
self._paths.ensure_thread_dirs(thread_id, user_id=user_id)
|
||||
return self._get_thread_paths(thread_id, user_id=user_id)
|
||||
|
||||
@override
|
||||
def before_agent(self, state: ThreadDataMiddlewareState, runtime: Runtime) -> dict | None:
|
||||
@ -84,12 +87,14 @@ class ThreadDataMiddleware(AgentMiddleware[ThreadDataMiddlewareState]):
|
||||
if thread_id is None:
|
||||
raise ValueError("Thread ID is required in runtime context or config.configurable")
|
||||
|
||||
user_id = get_effective_user_id()
|
||||
|
||||
if self._lazy_init:
|
||||
# Lazy initialization: only compute paths, don't create directories
|
||||
paths = self._get_thread_paths(thread_id)
|
||||
paths = self._get_thread_paths(thread_id, user_id=user_id)
|
||||
else:
|
||||
# Eager initialization: create directories immediately
|
||||
paths = self._create_thread_directories(thread_id)
|
||||
paths = self._create_thread_directories(thread_id, user_id=user_id)
|
||||
logger.debug("Created thread data directories for thread %s", thread_id)
|
||||
|
||||
return {
|
||||
|
||||
@ -10,6 +10,7 @@ from langchain_core.messages import HumanMessage
|
||||
from langgraph.runtime import Runtime
|
||||
|
||||
from deerflow.config.paths import Paths, get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
from deerflow.utils.file_conversion import extract_outline
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -221,7 +222,7 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
|
||||
thread_id = get_config().get("configurable", {}).get("thread_id")
|
||||
except RuntimeError:
|
||||
pass # get_config() raises outside a runnable context (e.g. unit tests)
|
||||
uploads_dir = self._paths.sandbox_uploads_dir(thread_id) if thread_id else None
|
||||
uploads_dir = self._paths.sandbox_uploads_dir(thread_id, user_id=get_effective_user_id()) if thread_id else None
|
||||
|
||||
# Get newly uploaded files from the current message's additional_kwargs.files
|
||||
new_files = self._files_from_kwargs(last_message, uploads_dir) or []
|
||||
|
||||
@ -40,6 +40,7 @@ from deerflow.config.app_config import get_app_config, reload_app_config
|
||||
from deerflow.config.extensions_config import ExtensionsConfig, SkillStateConfig, get_extensions_config, reload_extensions_config
|
||||
from deerflow.config.paths import get_paths
|
||||
from deerflow.models import create_chat_model
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
from deerflow.skills.installer import install_skill_from_archive
|
||||
from deerflow.uploads.manager import (
|
||||
claim_unique_filename,
|
||||
@ -769,19 +770,19 @@ class DeerFlowClient:
|
||||
"""
|
||||
from deerflow.agents.memory.updater import get_memory_data
|
||||
|
||||
return get_memory_data()
|
||||
return get_memory_data(user_id=get_effective_user_id())
|
||||
|
||||
def export_memory(self) -> dict:
|
||||
"""Export current memory data for backup or transfer."""
|
||||
from deerflow.agents.memory.updater import get_memory_data
|
||||
|
||||
return get_memory_data()
|
||||
return get_memory_data(user_id=get_effective_user_id())
|
||||
|
||||
def import_memory(self, memory_data: dict) -> dict:
|
||||
"""Import and persist full memory data."""
|
||||
from deerflow.agents.memory.updater import import_memory_data
|
||||
|
||||
return import_memory_data(memory_data)
|
||||
return import_memory_data(memory_data, user_id=get_effective_user_id())
|
||||
|
||||
def get_model(self, name: str) -> dict | None:
|
||||
"""Get a specific model's configuration by name.
|
||||
@ -956,13 +957,13 @@ class DeerFlowClient:
|
||||
"""
|
||||
from deerflow.agents.memory.updater import reload_memory_data
|
||||
|
||||
return reload_memory_data()
|
||||
return reload_memory_data(user_id=get_effective_user_id())
|
||||
|
||||
def clear_memory(self) -> dict:
|
||||
"""Clear all persisted memory data."""
|
||||
from deerflow.agents.memory.updater import clear_memory_data
|
||||
|
||||
return clear_memory_data()
|
||||
return clear_memory_data(user_id=get_effective_user_id())
|
||||
|
||||
def create_memory_fact(self, content: str, category: str = "context", confidence: float = 0.5) -> dict:
|
||||
"""Create a single fact manually."""
|
||||
@ -1179,7 +1180,7 @@ class DeerFlowClient:
|
||||
ValueError: If the path is invalid.
|
||||
"""
|
||||
try:
|
||||
actual = get_paths().resolve_virtual_path(thread_id, path)
|
||||
actual = get_paths().resolve_virtual_path(thread_id, path, user_id=get_effective_user_id())
|
||||
except ValueError as exc:
|
||||
if "traversal" in str(exc):
|
||||
from deerflow.uploads.manager import PathTraversalError
|
||||
|
||||
@ -27,6 +27,7 @@ except ImportError: # pragma: no cover - Windows fallback
|
||||
|
||||
from deerflow.config import get_app_config
|
||||
from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
from deerflow.sandbox.sandbox import Sandbox
|
||||
from deerflow.sandbox.sandbox_provider import SandboxProvider
|
||||
|
||||
@ -260,15 +261,16 @@ class AioSandboxProvider(SandboxProvider):
|
||||
mounted Docker socket (DooD), the host Docker daemon can resolve the paths.
|
||||
"""
|
||||
paths = get_paths()
|
||||
paths.ensure_thread_dirs(thread_id)
|
||||
user_id = get_effective_user_id()
|
||||
paths.ensure_thread_dirs(thread_id, user_id=user_id)
|
||||
|
||||
return [
|
||||
(paths.host_sandbox_work_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/workspace", False),
|
||||
(paths.host_sandbox_uploads_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/uploads", False),
|
||||
(paths.host_sandbox_outputs_dir(thread_id), f"{VIRTUAL_PATH_PREFIX}/outputs", False),
|
||||
(paths.host_sandbox_work_dir(thread_id, user_id=user_id), f"{VIRTUAL_PATH_PREFIX}/workspace", False),
|
||||
(paths.host_sandbox_uploads_dir(thread_id, user_id=user_id), f"{VIRTUAL_PATH_PREFIX}/uploads", False),
|
||||
(paths.host_sandbox_outputs_dir(thread_id, user_id=user_id), f"{VIRTUAL_PATH_PREFIX}/outputs", False),
|
||||
# ACP workspace: read-only inside the sandbox (lead agent reads results;
|
||||
# the ACP subprocess writes from the host side, not from within the container).
|
||||
(paths.host_acp_workspace_dir(thread_id), "/mnt/acp-workspace", True),
|
||||
(paths.host_acp_workspace_dir(thread_id, user_id=user_id), "/mnt/acp-workspace", True),
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
@ -480,8 +482,9 @@ class AioSandboxProvider(SandboxProvider):
|
||||
across multiple processes, preventing container-name conflicts.
|
||||
"""
|
||||
paths = get_paths()
|
||||
paths.ensure_thread_dirs(thread_id)
|
||||
lock_path = paths.thread_dir(thread_id) / f"{sandbox_id}.lock"
|
||||
user_id = get_effective_user_id()
|
||||
paths.ensure_thread_dirs(thread_id, user_id=user_id)
|
||||
lock_path = paths.thread_dir(thread_id, user_id=user_id) / f"{sandbox_id}.lock"
|
||||
|
||||
with open(lock_path, "a", encoding="utf-8") as lock_file:
|
||||
locked = False
|
||||
|
||||
@ -200,8 +200,9 @@ def _get_acp_workspace_host_path(thread_id: str | None = None) -> str | None:
|
||||
if thread_id is not None:
|
||||
try:
|
||||
from deerflow.config.paths import get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
host_path = get_paths().acp_workspace_dir(thread_id)
|
||||
host_path = get_paths().acp_workspace_dir(thread_id, user_id=get_effective_user_id())
|
||||
if host_path.exists():
|
||||
return str(host_path)
|
||||
except Exception:
|
||||
|
||||
@ -33,11 +33,12 @@ def _get_work_dir(thread_id: str | None) -> str:
|
||||
An absolute physical filesystem path to use as the working directory.
|
||||
"""
|
||||
from deerflow.config.paths import get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
paths = get_paths()
|
||||
if thread_id:
|
||||
try:
|
||||
work_dir = paths.acp_workspace_dir(thread_id)
|
||||
work_dir = paths.acp_workspace_dir(thread_id, user_id=get_effective_user_id())
|
||||
except ValueError:
|
||||
logger.warning("Invalid thread_id %r for ACP workspace, falling back to global", thread_id)
|
||||
work_dir = paths.base_dir / "acp-workspace"
|
||||
|
||||
@ -8,6 +8,7 @@ from langgraph.typing import ContextT
|
||||
|
||||
from deerflow.agents.thread_state import ThreadState
|
||||
from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
OUTPUTS_VIRTUAL_PREFIX = f"{VIRTUAL_PATH_PREFIX}/outputs"
|
||||
|
||||
@ -47,7 +48,7 @@ def _normalize_presented_filepath(
|
||||
virtual_prefix = VIRTUAL_PATH_PREFIX.lstrip("/")
|
||||
|
||||
if stripped == virtual_prefix or stripped.startswith(virtual_prefix + "/"):
|
||||
actual_path = get_paths().resolve_virtual_path(thread_id, filepath)
|
||||
actual_path = get_paths().resolve_virtual_path(thread_id, filepath, user_id=get_effective_user_id())
|
||||
else:
|
||||
actual_path = Path(filepath).expanduser().resolve()
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@ from pathlib import Path
|
||||
from urllib.parse import quote
|
||||
|
||||
from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
|
||||
class PathTraversalError(ValueError):
|
||||
@ -33,7 +34,7 @@ def validate_thread_id(thread_id: str) -> None:
|
||||
def get_uploads_dir(thread_id: str) -> Path:
|
||||
"""Return the uploads directory path for a thread (no side effects)."""
|
||||
validate_thread_id(thread_id)
|
||||
return get_paths().sandbox_uploads_dir(thread_id)
|
||||
return get_paths().sandbox_uploads_dir(thread_id, user_id=get_effective_user_id())
|
||||
|
||||
|
||||
def ensure_uploads_dir(thread_id: str) -> Path:
|
||||
|
||||
@ -57,6 +57,7 @@ def test_get_thread_mounts_includes_acp_workspace(tmp_path, monkeypatch):
|
||||
"""_get_thread_mounts must include /mnt/acp-workspace (read-only) for docker sandbox."""
|
||||
aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider")
|
||||
monkeypatch.setattr(aio_mod, "get_paths", lambda: Paths(base_dir=tmp_path))
|
||||
monkeypatch.setattr(aio_mod, "get_effective_user_id", lambda: None)
|
||||
|
||||
mounts = aio_mod.AioSandboxProvider._get_thread_mounts("thread-3")
|
||||
|
||||
@ -95,6 +96,7 @@ def test_get_thread_mounts_preserves_windows_host_path_style(tmp_path, monkeypat
|
||||
aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider")
|
||||
monkeypatch.setenv("DEER_FLOW_HOST_BASE_DIR", r"C:\Users\demo\deer-flow\backend\.deer-flow")
|
||||
monkeypatch.setattr(aio_mod, "get_paths", lambda: Paths(base_dir=tmp_path))
|
||||
monkeypatch.setattr(aio_mod, "get_effective_user_id", lambda: None)
|
||||
|
||||
mounts = aio_mod.AioSandboxProvider._get_thread_mounts("thread-10")
|
||||
|
||||
|
||||
@ -231,7 +231,7 @@ class TestResolveAttachments:
|
||||
mock_paths = MagicMock()
|
||||
mock_paths.sandbox_outputs_dir.return_value = outputs_dir
|
||||
|
||||
def resolve_side_effect(tid, vpath):
|
||||
def resolve_side_effect(tid, vpath, *, user_id=None):
|
||||
if "data.csv" in vpath:
|
||||
return good_file
|
||||
return tmp_path / "missing.txt"
|
||||
|
||||
@ -1241,7 +1241,10 @@ class TestMemoryManagement:
|
||||
with patch("deerflow.agents.memory.updater.import_memory_data", return_value=imported) as mock_import:
|
||||
result = client.import_memory(imported)
|
||||
|
||||
mock_import.assert_called_once_with(imported)
|
||||
assert mock_import.call_count == 1
|
||||
call_args = mock_import.call_args
|
||||
assert call_args.args == (imported,)
|
||||
assert "user_id" in call_args.kwargs
|
||||
assert result == imported
|
||||
|
||||
def test_reload_memory(self, client):
|
||||
@ -1487,9 +1490,12 @@ class TestUploads:
|
||||
|
||||
class TestArtifacts:
|
||||
def test_get_artifact(self, client):
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
paths = Paths(base_dir=tmp)
|
||||
outputs = paths.sandbox_outputs_dir("t1")
|
||||
user_id = get_effective_user_id()
|
||||
outputs = paths.sandbox_outputs_dir("t1", user_id=user_id)
|
||||
outputs.mkdir(parents=True)
|
||||
(outputs / "result.txt").write_text("artifact content")
|
||||
|
||||
@ -1500,9 +1506,12 @@ class TestArtifacts:
|
||||
assert "text" in mime
|
||||
|
||||
def test_get_artifact_not_found(self, client):
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
paths = Paths(base_dir=tmp)
|
||||
paths.sandbox_user_data_dir("t1").mkdir(parents=True)
|
||||
user_id = get_effective_user_id()
|
||||
paths.sandbox_outputs_dir("t1", user_id=user_id).mkdir(parents=True)
|
||||
|
||||
with patch("deerflow.client.get_paths", return_value=paths):
|
||||
with pytest.raises(FileNotFoundError):
|
||||
@ -1513,9 +1522,12 @@ class TestArtifacts:
|
||||
client.get_artifact("t1", "bad/path/file.txt")
|
||||
|
||||
def test_get_artifact_path_traversal(self, client):
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
paths = Paths(base_dir=tmp)
|
||||
paths.sandbox_user_data_dir("t1").mkdir(parents=True)
|
||||
user_id = get_effective_user_id()
|
||||
paths.sandbox_outputs_dir("t1", user_id=user_id).mkdir(parents=True)
|
||||
|
||||
with patch("deerflow.client.get_paths", return_value=paths):
|
||||
with pytest.raises(PathTraversalError):
|
||||
@ -1699,13 +1711,16 @@ class TestScenarioFileLifecycle:
|
||||
|
||||
def test_upload_then_read_artifact(self, client):
|
||||
"""Upload a file, simulate agent producing artifact, read it back."""
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
tmp_path = Path(tmp)
|
||||
uploads_dir = tmp_path / "uploads"
|
||||
uploads_dir.mkdir()
|
||||
|
||||
paths = Paths(base_dir=tmp_path)
|
||||
outputs_dir = paths.sandbox_outputs_dir("t-artifact")
|
||||
user_id = get_effective_user_id()
|
||||
outputs_dir = paths.sandbox_outputs_dir("t-artifact", user_id=user_id)
|
||||
outputs_dir.mkdir(parents=True)
|
||||
|
||||
# Upload phase
|
||||
@ -1955,11 +1970,14 @@ class TestScenarioThreadIsolation:
|
||||
|
||||
def test_artifacts_isolated_per_thread(self, client):
|
||||
"""Artifacts in thread-A are not accessible from thread-B."""
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
paths = Paths(base_dir=tmp)
|
||||
outputs_a = paths.sandbox_outputs_dir("thread-a")
|
||||
user_id = get_effective_user_id()
|
||||
outputs_a = paths.sandbox_outputs_dir("thread-a", user_id=user_id)
|
||||
outputs_a.mkdir(parents=True)
|
||||
paths.sandbox_user_data_dir("thread-b").mkdir(parents=True)
|
||||
paths.sandbox_outputs_dir("thread-b", user_id=user_id).mkdir(parents=True)
|
||||
(outputs_a / "result.txt").write_text("thread-a artifact")
|
||||
|
||||
with patch("deerflow.client.get_paths", return_value=paths):
|
||||
@ -2864,9 +2882,12 @@ class TestUploadDeleteSymlink:
|
||||
class TestArtifactHardening:
|
||||
def test_artifact_directory_rejected(self, client):
|
||||
"""get_artifact rejects paths that resolve to a directory."""
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
paths = Paths(base_dir=tmp)
|
||||
subdir = paths.sandbox_outputs_dir("t1") / "subdir"
|
||||
user_id = get_effective_user_id()
|
||||
subdir = paths.sandbox_outputs_dir("t1", user_id=user_id) / "subdir"
|
||||
subdir.mkdir(parents=True)
|
||||
|
||||
with patch("deerflow.client.get_paths", return_value=paths):
|
||||
@ -2875,9 +2896,12 @@ class TestArtifactHardening:
|
||||
|
||||
def test_artifact_leading_slash_stripped(self, client):
|
||||
"""Paths with leading slash are handled correctly."""
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
paths = Paths(base_dir=tmp)
|
||||
outputs = paths.sandbox_outputs_dir("t1")
|
||||
user_id = get_effective_user_id()
|
||||
outputs = paths.sandbox_outputs_dir("t1", user_id=user_id)
|
||||
outputs.mkdir(parents=True)
|
||||
(outputs / "file.txt").write_text("content")
|
||||
|
||||
@ -2991,9 +3015,12 @@ class TestBugArtifactPrefixMatchTooLoose:
|
||||
|
||||
def test_exact_prefix_without_subpath_accepted(self, client):
|
||||
"""Bare 'mnt/user-data' is accepted (will later fail as directory, not at prefix)."""
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
paths = Paths(base_dir=tmp)
|
||||
paths.sandbox_user_data_dir("t1").mkdir(parents=True)
|
||||
user_id = get_effective_user_id()
|
||||
paths.sandbox_outputs_dir("t1", user_id=user_id).mkdir(parents=True)
|
||||
|
||||
with patch("deerflow.client.get_paths", return_value=paths):
|
||||
# Accepted at prefix check, but fails because it's a directory.
|
||||
|
||||
@ -262,8 +262,9 @@ class TestFileUploadIntegration:
|
||||
|
||||
# Physically exists
|
||||
from deerflow.config.paths import get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
assert (get_paths().sandbox_uploads_dir(tid) / "readme.txt").exists()
|
||||
assert (get_paths().sandbox_uploads_dir(tid, user_id=get_effective_user_id()) / "readme.txt").exists()
|
||||
|
||||
def test_upload_duplicate_rename(self, e2e_env, tmp_path):
|
||||
"""Uploading two files with the same name auto-renames the second."""
|
||||
@ -472,12 +473,13 @@ class TestArtifactAccess:
|
||||
def test_get_artifact_happy_path(self, e2e_env):
|
||||
"""Write a file to outputs, then read it back via get_artifact()."""
|
||||
from deerflow.config.paths import get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
|
||||
tid = str(uuid.uuid4())
|
||||
|
||||
# Create an output file in the thread's outputs directory
|
||||
outputs_dir = get_paths().sandbox_outputs_dir(tid)
|
||||
outputs_dir = get_paths().sandbox_outputs_dir(tid, user_id=get_effective_user_id())
|
||||
outputs_dir.mkdir(parents=True, exist_ok=True)
|
||||
(outputs_dir / "result.txt").write_text("hello artifact")
|
||||
|
||||
@ -488,11 +490,12 @@ class TestArtifactAccess:
|
||||
def test_get_artifact_nested_path(self, e2e_env):
|
||||
"""Artifacts in subdirectories are accessible."""
|
||||
from deerflow.config.paths import get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
|
||||
tid = str(uuid.uuid4())
|
||||
|
||||
outputs_dir = get_paths().sandbox_outputs_dir(tid)
|
||||
outputs_dir = get_paths().sandbox_outputs_dir(tid, user_id=get_effective_user_id())
|
||||
sub = outputs_dir / "charts"
|
||||
sub.mkdir(parents=True, exist_ok=True)
|
||||
(sub / "data.json").write_text('{"x": 1}')
|
||||
|
||||
@ -152,8 +152,10 @@ def test_get_work_dir_uses_base_dir_when_no_thread_id(monkeypatch, tmp_path):
|
||||
def test_get_work_dir_uses_per_thread_path_when_thread_id_given(monkeypatch, tmp_path):
|
||||
"""P1.1: _get_work_dir(thread_id) uses {base_dir}/threads/{thread_id}/acp-workspace/."""
|
||||
from deerflow.config import paths as paths_module
|
||||
from deerflow.runtime import user_context as uc_module
|
||||
|
||||
monkeypatch.setattr(paths_module, "get_paths", lambda: paths_module.Paths(base_dir=tmp_path))
|
||||
monkeypatch.setattr(uc_module, "get_effective_user_id", lambda: None)
|
||||
result = _get_work_dir("thread-abc-123")
|
||||
expected = tmp_path / "threads" / "thread-abc-123" / "acp-workspace"
|
||||
assert result == str(expected)
|
||||
@ -310,8 +312,10 @@ async def test_invoke_acp_agent_uses_fixed_acp_workspace(monkeypatch, tmp_path):
|
||||
async def test_invoke_acp_agent_uses_per_thread_workspace_when_thread_id_in_config(monkeypatch, tmp_path):
|
||||
"""P1.1: When thread_id is in the RunnableConfig, ACP agent uses per-thread workspace."""
|
||||
from deerflow.config import paths as paths_module
|
||||
from deerflow.runtime import user_context as uc_module
|
||||
|
||||
monkeypatch.setattr(paths_module, "get_paths", lambda: paths_module.Paths(base_dir=tmp_path))
|
||||
monkeypatch.setattr(uc_module, "get_effective_user_id", lambda: None)
|
||||
|
||||
monkeypatch.setattr(
|
||||
"deerflow.config.extensions_config.ExtensionsConfig.from_file",
|
||||
|
||||
@ -258,12 +258,13 @@ def test_update_memory_fact_route_preserves_omitted_fields() -> None:
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
update_fact.assert_called_once_with(
|
||||
fact_id="fact_edit",
|
||||
content="User prefers spaces",
|
||||
category=None,
|
||||
confidence=None,
|
||||
)
|
||||
assert update_fact.call_count == 1
|
||||
call_kwargs = update_fact.call_args.kwargs
|
||||
assert call_kwargs.get("fact_id") == "fact_edit"
|
||||
assert call_kwargs.get("content") == "User prefers spaces"
|
||||
assert call_kwargs.get("category") is None
|
||||
assert call_kwargs.get("confidence") is None
|
||||
assert "user_id" in call_kwargs
|
||||
assert response.json()["facts"] == updated_memory["facts"]
|
||||
|
||||
|
||||
|
||||
@ -38,7 +38,7 @@ def test_present_files_keeps_virtual_outputs_path(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
present_file_tool_module,
|
||||
"get_paths",
|
||||
lambda: SimpleNamespace(resolve_virtual_path=lambda thread_id, path: artifact_path),
|
||||
lambda: SimpleNamespace(resolve_virtual_path=lambda thread_id, path, *, user_id=None: artifact_path),
|
||||
)
|
||||
|
||||
result = present_file_tool_module.present_file_tool.func(
|
||||
|
||||
@ -50,10 +50,13 @@ def test_delete_thread_data_rejects_invalid_thread_id(tmp_path):
|
||||
|
||||
|
||||
def test_delete_thread_route_cleans_thread_directory(tmp_path):
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
paths = Paths(tmp_path)
|
||||
thread_dir = paths.thread_dir("thread-route")
|
||||
paths.sandbox_work_dir("thread-route").mkdir(parents=True, exist_ok=True)
|
||||
(paths.sandbox_work_dir("thread-route") / "notes.txt").write_text("hello", encoding="utf-8")
|
||||
user_id = get_effective_user_id()
|
||||
thread_dir = paths.thread_dir("thread-route", user_id=user_id)
|
||||
paths.sandbox_work_dir("thread-route", user_id=user_id).mkdir(parents=True, exist_ok=True)
|
||||
(paths.sandbox_work_dir("thread-route", user_id=user_id) / "notes.txt").write_text("hello", encoding="utf-8")
|
||||
|
||||
app = make_authed_test_app()
|
||||
app.include_router(threads.router)
|
||||
|
||||
@ -34,7 +34,9 @@ def _runtime(thread_id: str | None = THREAD_ID) -> MagicMock:
|
||||
|
||||
|
||||
def _uploads_dir(tmp_path: Path, thread_id: str = THREAD_ID) -> Path:
|
||||
d = Paths(str(tmp_path)).sandbox_uploads_dir(thread_id)
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
d = Paths(str(tmp_path)).sandbox_uploads_dir(thread_id, user_id=get_effective_user_id())
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
return d
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user