diff --git a/backend/app/channels/feishu.py b/backend/app/channels/feishu.py index 5a80016f0..3bb83163d 100644 --- a/backend/app/channels/feishu.py +++ b/backend/app/channels/feishu.py @@ -9,11 +9,12 @@ import re import threading from typing import Any, Literal +from app.plugins.auth.security.actor_context import bind_user_actor_context from app.channels.base import Channel from app.channels.commands import KNOWN_CHANNEL_COMMANDS from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths -from deerflow.runtime.user_context import get_effective_user_id +from deerflow.runtime.actor_context import get_effective_user_id from deerflow.sandbox.sandbox_provider import get_sandbox_provider logger = logging.getLogger(__name__) @@ -298,15 +299,35 @@ class FeishuChannel(Channel): text = msg.text for file in files: if file.get("image_key"): - virtual_path = await self._receive_single_file(msg.thread_ts, file["image_key"], "image", thread_id) + virtual_path = await self._receive_single_file( + msg.thread_ts, + file["image_key"], + "image", + thread_id, + user_id=msg.user_id, + ) text = text.replace("[image]", virtual_path, 1) elif file.get("file_key"): - virtual_path = await self._receive_single_file(msg.thread_ts, file["file_key"], "file", thread_id) + virtual_path = await self._receive_single_file( + msg.thread_ts, + file["file_key"], + "file", + thread_id, + user_id=msg.user_id, + ) text = text.replace("[file]", virtual_path, 1) msg.text = text return msg - async def _receive_single_file(self, message_id: str, file_key: str, type: Literal["image", "file"], thread_id: str) -> str: + async def _receive_single_file( + self, + message_id: str, + file_key: str, + type: Literal["image", "file"], + thread_id: str, + *, + user_id: str | None = None, + ) -> str: request = self._GetMessageResourceRequest.builder().message_id(message_id).file_key(file_key).type(type).build() def inner(): @@ -345,50 +366,51 @@ class FeishuChannel(Channel): return f"Failed to obtain the [{type}]" paths = get_paths() - user_id = get_effective_user_id() - paths.ensure_thread_dirs(thread_id, user_id=user_id) - uploads_dir = paths.sandbox_uploads_dir(thread_id, user_id=user_id).resolve() + with bind_user_actor_context(user_id): + effective_user_id = get_effective_user_id() + paths.ensure_thread_dirs(thread_id, user_id=effective_user_id) + uploads_dir = paths.sandbox_uploads_dir(thread_id, user_id=effective_user_id).resolve() - ext = "png" if type == "image" else "bin" - raw_filename = getattr(response, "file_name", "") or f"feishu_{file_key[-12:]}.{ext}" + ext = "png" if type == "image" else "bin" + raw_filename = getattr(response, "file_name", "") or f"feishu_{file_key[-12:]}.{ext}" - # Sanitize filename: preserve extension, replace path chars in name part - if "." in raw_filename: - name_part, ext = raw_filename.rsplit(".", 1) - name_part = re.sub(r"[./\\]", "_", name_part) - filename = f"{name_part}.{ext}" - else: - filename = re.sub(r"[./\\]", "_", raw_filename) - resolved_target = uploads_dir / filename + # Sanitize filename: preserve extension, replace path chars in name part + if "." in raw_filename: + name_part, ext = raw_filename.rsplit(".", 1) + name_part = re.sub(r"[./\\]", "_", name_part) + filename = f"{name_part}.{ext}" + else: + filename = re.sub(r"[./\\]", "_", raw_filename) + resolved_target = uploads_dir / filename - def down_load(): - # use thread_lock to avoid filename conflicts when writing - with self._thread_lock: - resolved_target.write_bytes(content) + def down_load(): + # use thread_lock to avoid filename conflicts when writing + with self._thread_lock: + resolved_target.write_bytes(content) - try: - await asyncio.to_thread(down_load) - except Exception: - logger.exception("[Feishu] failed to persist downloaded resource: %s, type=%s", resolved_target, type) - return f"Failed to obtain the [{type}]" + try: + await asyncio.to_thread(down_load) + except Exception: + logger.exception("[Feishu] failed to persist downloaded resource: %s, type=%s", resolved_target, type) + return f"Failed to obtain the [{type}]" - virtual_path = f"{VIRTUAL_PATH_PREFIX}/uploads/{resolved_target.name}" + virtual_path = f"{VIRTUAL_PATH_PREFIX}/uploads/{resolved_target.name}" - try: - sandbox_provider = get_sandbox_provider() - sandbox_id = sandbox_provider.acquire(thread_id) - if sandbox_id != "local": - sandbox = sandbox_provider.get(sandbox_id) - if sandbox is None: - logger.warning("[Feishu] sandbox not found for thread_id=%s", thread_id) - return f"Failed to obtain the [{type}]" - sandbox.update_file(virtual_path, content) - except Exception: - logger.exception("[Feishu] failed to sync resource into non-local sandbox: %s", virtual_path) - return f"Failed to obtain the [{type}]" + try: + sandbox_provider = get_sandbox_provider() + sandbox_id = sandbox_provider.acquire(thread_id) + if sandbox_id != "local": + sandbox = sandbox_provider.get(sandbox_id) + if sandbox is None: + logger.warning("[Feishu] sandbox not found for thread_id=%s", thread_id) + return f"Failed to obtain the [{type}]" + sandbox.update_file(virtual_path, content) + except Exception: + logger.exception("[Feishu] failed to sync resource into non-local sandbox: %s", virtual_path) + return f"Failed to obtain the [{type}]" - logger.info("[Feishu] downloaded resource mapped: file_key=%s -> %s", file_key, virtual_path) - return virtual_path + logger.info("[Feishu] downloaded resource mapped: file_key=%s -> %s", file_key, virtual_path) + return virtual_path # -- message formatting ------------------------------------------------ diff --git a/backend/app/channels/manager.py b/backend/app/channels/manager.py index 400d29d60..66690cbc3 100644 --- a/backend/app/channels/manager.py +++ b/backend/app/channels/manager.py @@ -14,10 +14,11 @@ from typing import Any import httpx from langgraph_sdk.errors import ConflictError +from app.plugins.auth.security.actor_context import bind_user_actor_context from app.channels.commands import KNOWN_CHANNEL_COMMANDS from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment from app.channels.store import ChannelStore -from deerflow.runtime.user_context import get_effective_user_id +from deerflow.runtime.actor_context import get_effective_user_id logger = logging.getLogger(__name__) @@ -328,7 +329,7 @@ def _format_artifact_text(artifacts: list[str]) -> str: _OUTPUTS_VIRTUAL_PREFIX = "/mnt/user-data/outputs/" -def _resolve_attachments(thread_id: str, artifacts: list[str]) -> list[ResolvedAttachment]: +def _resolve_attachments(thread_id: str, artifacts: list[str], *, user_id: str | None = None) -> list[ResolvedAttachment]: """Resolve virtual artifact paths to host filesystem paths with metadata. Only paths under ``/mnt/user-data/outputs/`` are accepted; any other @@ -342,39 +343,40 @@ def _resolve_attachments(thread_id: str, artifacts: list[str]) -> list[ResolvedA attachments: list[ResolvedAttachment] = [] paths = get_paths() - user_id = get_effective_user_id() - outputs_dir = paths.sandbox_outputs_dir(thread_id, user_id=user_id).resolve() - for virtual_path in artifacts: - # Security: only allow files from the agent outputs directory - if not virtual_path.startswith(_OUTPUTS_VIRTUAL_PREFIX): - logger.warning("[Manager] rejected non-outputs artifact path: %s", virtual_path) - continue - try: - actual = paths.resolve_virtual_path(thread_id, virtual_path, user_id=user_id) - # Verify the resolved path is actually under the outputs directory - # (guards against path-traversal even after prefix check) + with bind_user_actor_context(user_id): + effective_user_id = get_effective_user_id() + outputs_dir = paths.sandbox_outputs_dir(thread_id, user_id=effective_user_id).resolve() + for virtual_path in artifacts: + # Security: only allow files from the agent outputs directory + if not virtual_path.startswith(_OUTPUTS_VIRTUAL_PREFIX): + logger.warning("[Manager] rejected non-outputs artifact path: %s", virtual_path) + continue try: - actual.resolve().relative_to(outputs_dir) - except ValueError: - logger.warning("[Manager] artifact path escapes outputs dir: %s -> %s", virtual_path, actual) - continue - if not actual.is_file(): - logger.warning("[Manager] artifact not found on disk: %s -> %s", virtual_path, actual) - continue - mime, _ = mimetypes.guess_type(str(actual)) - mime = mime or "application/octet-stream" - attachments.append( - ResolvedAttachment( - virtual_path=virtual_path, - actual_path=actual, - filename=actual.name, - mime_type=mime, - size=actual.stat().st_size, - is_image=mime.startswith("image/"), + actual = paths.resolve_virtual_path(thread_id, virtual_path, user_id=effective_user_id) + # Verify the resolved path is actually under the outputs directory + # (guards against path-traversal even after prefix check) + try: + actual.resolve().relative_to(outputs_dir) + except ValueError: + logger.warning("[Manager] artifact path escapes outputs dir: %s -> %s", virtual_path, actual) + continue + if not actual.is_file(): + logger.warning("[Manager] artifact not found on disk: %s -> %s", virtual_path, actual) + continue + mime, _ = mimetypes.guess_type(str(actual)) + mime = mime or "application/octet-stream" + attachments.append( + ResolvedAttachment( + virtual_path=virtual_path, + actual_path=actual, + filename=actual.name, + mime_type=mime, + size=actual.stat().st_size, + is_image=mime.startswith("image/"), + ) ) - ) - except (ValueError, OSError) as exc: - logger.warning("[Manager] failed to resolve artifact %s: %s", virtual_path, exc) + except (ValueError, OSError) as exc: + logger.warning("[Manager] failed to resolve artifact %s: %s", virtual_path, exc) return attachments @@ -382,13 +384,15 @@ def _prepare_artifact_delivery( thread_id: str, response_text: str, artifacts: list[str], + *, + user_id: str | None = None, ) -> tuple[str, list[ResolvedAttachment]]: """Resolve attachments and append filename fallbacks to the text response.""" attachments: list[ResolvedAttachment] = [] if not artifacts: return response_text, attachments - attachments = _resolve_attachments(thread_id, artifacts) + attachments = _resolve_attachments(thread_id, artifacts, user_id=user_id) resolved_virtuals = {attachment.virtual_path for attachment in attachments} unresolved = [path for path in artifacts if path not in resolved_virtuals] @@ -411,7 +415,8 @@ async def _ingest_inbound_files(thread_id: str, msg: InboundMessage) -> list[dic from deerflow.uploads.manager import claim_unique_filename, ensure_uploads_dir, normalize_filename - uploads_dir = ensure_uploads_dir(thread_id) + with bind_user_actor_context(msg.user_id): + uploads_dir = ensure_uploads_dir(thread_id) seen_names = {entry.name for entry in uploads_dir.iterdir() if entry.is_file()} created: list[dict[str, Any]] = [] @@ -745,7 +750,12 @@ class ChannelManager: len(artifacts), ) - response_text, attachments = _prepare_artifact_delivery(thread_id, response_text, artifacts) + response_text, attachments = _prepare_artifact_delivery( + thread_id, + response_text, + artifacts, + user_id=msg.user_id, + ) if not response_text: if attachments: @@ -836,7 +846,12 @@ class ChannelManager: result = last_values if last_values is not None else {"messages": [{"type": "ai", "content": latest_text}]} response_text = _extract_response_text(result) artifacts = _extract_artifacts(result) - response_text, attachments = _prepare_artifact_delivery(thread_id, response_text, artifacts) + response_text, attachments = _prepare_artifact_delivery( + thread_id, + response_text, + artifacts, + user_id=msg.user_id, + ) if not response_text: if attachments: