diff --git a/backend/app/channels/manager.py b/backend/app/channels/manager.py index c09b13173..349fa1bfe 100644 --- a/backend/app/channels/manager.py +++ b/backend/app/channels/manager.py @@ -420,7 +420,13 @@ async def _ingest_inbound_files(thread_id: str, msg: InboundMessage) -> list[dic if not msg.files: return [] - from deerflow.uploads.manager import claim_unique_filename, ensure_uploads_dir, normalize_filename + from deerflow.uploads.manager import ( + UnsafeUploadPathError, + claim_unique_filename, + ensure_uploads_dir, + normalize_filename, + write_upload_file_no_symlink, + ) uploads_dir = ensure_uploads_dir(thread_id) seen_names = {entry.name for entry in uploads_dir.iterdir() if entry.is_file()} @@ -471,7 +477,10 @@ async def _ingest_inbound_files(thread_id: str, msg: InboundMessage) -> list[dic dest = uploads_dir / safe_name try: - dest.write_bytes(data) + dest = write_upload_file_no_symlink(uploads_dir, safe_name, data) + except UnsafeUploadPathError: + logger.warning("[Manager] skipping inbound file with unsafe destination: %s", safe_name) + continue except Exception: logger.exception("[Manager] failed to write inbound file: %s", dest) continue diff --git a/backend/app/gateway/routers/uploads.py b/backend/app/gateway/routers/uploads.py index 604a6e154..a4267f728 100644 --- a/backend/app/gateway/routers/uploads.py +++ b/backend/app/gateway/routers/uploads.py @@ -5,7 +5,7 @@ import os import stat from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile -from pydantic import BaseModel +from pydantic import BaseModel, Field from app.gateway.authz import require_permission from app.gateway.deps import get_config @@ -15,12 +15,14 @@ from deerflow.runtime.user_context import get_effective_user_id from deerflow.sandbox.sandbox_provider import SandboxProvider, get_sandbox_provider from deerflow.uploads.manager import ( PathTraversalError, + UnsafeUploadPathError, delete_file_safe, enrich_file_listing, ensure_uploads_dir, get_uploads_dir, list_files_in_dir, normalize_filename, + open_upload_file_no_symlink, upload_artifact_url, upload_virtual_path, ) @@ -42,6 +44,7 @@ class UploadResponse(BaseModel): success: bool files: list[dict[str, str]] message: str + skipped_files: list[str] = Field(default_factory=list) class UploadLimits(BaseModel): @@ -116,17 +119,18 @@ def _cleanup_uploaded_paths(paths: list[os.PathLike[str] | str]) -> None: logger.warning("Failed to clean up upload path after rejected request: %s", path, exc_info=True) -async def _write_upload_file_streaming( +async def _write_upload_file_with_limits( file: UploadFile, - file_path: os.PathLike[str] | str, *, + uploads_dir: os.PathLike[str] | str, display_filename: str, max_single_file_size: int, max_total_size: int, total_size: int, -) -> tuple[int, int]: +) -> tuple[os.PathLike[str] | str, int, int]: file_size = 0 - with open(file_path, "wb") as output: + file_path, fh = open_upload_file_no_symlink(uploads_dir, display_filename) + try: while chunk := await file.read(UPLOAD_CHUNK_SIZE): file_size += len(chunk) total_size += len(chunk) @@ -134,8 +138,17 @@ async def _write_upload_file_streaming( raise HTTPException(status_code=413, detail=f"File too large: {display_filename}") if total_size > max_total_size: raise HTTPException(status_code=413, detail="Total upload size too large") - output.write(chunk) - return file_size, total_size + fh.write(chunk) + except Exception: + fh.close() + try: + os.unlink(file_path) + except FileNotFoundError: + pass + raise + else: + fh.close() + return file_path, file_size, total_size def _auto_convert_documents_enabled(app_config: AppConfig) -> bool: @@ -177,6 +190,7 @@ async def upload_files( uploaded_files = [] written_paths = [] sandbox_sync_targets = [] + skipped_files = [] total_size = 0 sandbox_provider = get_sandbox_provider() @@ -200,16 +214,15 @@ async def upload_files( continue try: - file_path = uploads_dir / safe_filename - written_paths.append(file_path) - file_size, total_size = await _write_upload_file_streaming( + file_path, file_size, total_size = await _write_upload_file_with_limits( file, - file_path, + uploads_dir=uploads_dir, display_filename=safe_filename, max_single_file_size=limits.max_file_size, max_total_size=limits.max_total_size, total_size=total_size, ) + written_paths.append(file_path) virtual_path = upload_virtual_path(safe_filename) @@ -246,6 +259,10 @@ async def upload_files( except HTTPException as e: _cleanup_uploaded_paths(written_paths) raise e + except UnsafeUploadPathError as e: + logger.warning("Skipping upload with unsafe destination %s: %s", file.filename, e) + skipped_files.append(safe_filename) + continue except Exception as e: logger.error(f"Failed to upload {file.filename}: {e}") _cleanup_uploaded_paths(written_paths) @@ -256,10 +273,15 @@ async def upload_files( _make_file_sandbox_writable(file_path) sandbox.update_file(virtual_path, file_path.read_bytes()) + message = f"Successfully uploaded {len(uploaded_files)} file(s)" + if skipped_files: + message += f"; skipped {len(skipped_files)} unsafe file(s)" + return UploadResponse( - success=True, + success=not skipped_files, files=uploaded_files, - message=f"Successfully uploaded {len(uploaded_files)} file(s)", + message=message, + skipped_files=skipped_files, ) diff --git a/backend/packages/harness/deerflow/uploads/manager.py b/backend/packages/harness/deerflow/uploads/manager.py index c36151b38..1a1b63f09 100644 --- a/backend/packages/harness/deerflow/uploads/manager.py +++ b/backend/packages/harness/deerflow/uploads/manager.py @@ -4,8 +4,10 @@ Pure business logic — no FastAPI/HTTP dependencies. Both Gateway and Client delegate to these functions. """ +import errno import os import re +import stat from pathlib import Path from urllib.parse import quote @@ -17,6 +19,10 @@ class PathTraversalError(ValueError): """Raised when a path escapes its allowed base directory.""" +class UnsafeUploadPathError(ValueError): + """Raised when an upload destination is not a safe regular file path.""" + + # thread_id must be alphanumeric, hyphens, underscores, or dots only. _SAFE_THREAD_ID = re.compile(r"^[a-zA-Z0-9._-]+$") @@ -109,6 +115,64 @@ def validate_path_traversal(path: Path, base: Path) -> None: raise PathTraversalError("Path traversal detected") from None +def open_upload_file_no_symlink(base_dir: Path, filename: str) -> tuple[Path, object]: + """Open an upload destination for safe streaming writes. + + Upload directories may be mounted into local sandboxes. A sandbox process can + therefore leave a symlink at a future upload filename. Normal ``Path.write_bytes`` + follows that link and can overwrite files outside the uploads directory with + gateway privileges. This helper rejects symlink destinations and uses + ``O_NOFOLLOW`` where available so the final path component cannot be raced into + a symlink between validation and open. + """ + safe_name = normalize_filename(filename) + dest = base_dir / safe_name + + try: + st = os.lstat(dest) + except FileNotFoundError: + st = None + + if st is not None and not stat.S_ISREG(st.st_mode): + raise UnsafeUploadPathError(f"Upload destination is not a regular file: {safe_name}") + + validate_path_traversal(dest, base_dir) + + if not hasattr(os, "O_NOFOLLOW"): + raise UnsafeUploadPathError("Upload writes require O_NOFOLLOW support") + + flags = os.O_WRONLY | os.O_CREAT | os.O_NOFOLLOW + if hasattr(os, "O_NONBLOCK"): + flags |= os.O_NONBLOCK + + try: + fd = os.open(dest, flags, 0o600) + except OSError as exc: + if exc.errno in {errno.ELOOP, errno.EISDIR, errno.ENOTDIR, errno.ENXIO, errno.EAGAIN}: + raise UnsafeUploadPathError(f"Unsafe upload destination: {safe_name}") from exc + raise + + try: + opened_stat = os.fstat(fd) + if not stat.S_ISREG(opened_stat.st_mode) or opened_stat.st_nlink != 1: + raise UnsafeUploadPathError(f"Upload destination is not an exclusive regular file: {safe_name}") + os.ftruncate(fd, 0) + fh = os.fdopen(fd, "wb") + fd = -1 + finally: + if fd >= 0: + os.close(fd) + return dest, fh + + +def write_upload_file_no_symlink(base_dir: Path, filename: str, data: bytes) -> Path: + """Write upload bytes without following a pre-existing destination symlink.""" + dest, fh = open_upload_file_no_symlink(base_dir, filename) + with fh: + fh.write(data) + return dest + + def list_files_in_dir(directory: Path) -> dict: """List files (not directories) in *directory*. diff --git a/backend/tests/test_channel_file_attachments.py b/backend/tests/test_channel_file_attachments.py index 7273b1c82..aa2e9b004 100644 --- a/backend/tests/test_channel_file_attachments.py +++ b/backend/tests/test_channel_file_attachments.py @@ -3,11 +3,12 @@ from __future__ import annotations import asyncio +import os from pathlib import Path from unittest.mock import MagicMock, patch from app.channels.base import Channel -from app.channels.message_bus import MessageBus, OutboundMessage, ResolvedAttachment +from app.channels.message_bus import InboundMessage, MessageBus, OutboundMessage, ResolvedAttachment def _run(coro): @@ -248,6 +249,109 @@ class TestResolveAttachments: assert result[0].filename == "data.csv" +# --------------------------------------------------------------------------- +# Inbound file ingestion tests +# --------------------------------------------------------------------------- + + +class TestInboundFileIngestion: + def test_rejects_preexisting_symlink_destination(self, tmp_path): + from app.channels import manager + + uploads_dir = tmp_path / "uploads" + uploads_dir.mkdir() + outside_file = tmp_path / "outside-created.txt" + (uploads_dir / "victim.txt").symlink_to(outside_file) + + msg = InboundMessage( + channel_name="test-channel", + chat_id="chat-1", + user_id="user-1", + text="see attachment", + files=[{"filename": "victim.txt", "url": "https://example.invalid/victim.txt"}], + ) + + async def fake_reader(file_info, client): + return b"attacker data" + + with ( + patch("deerflow.uploads.manager.ensure_uploads_dir", return_value=uploads_dir), + patch.dict(manager.INBOUND_FILE_READERS, {"test-channel": fake_reader}, clear=False), + ): + result = _run(manager._ingest_inbound_files("thread-1", msg)) + + assert result == [] + assert not outside_file.exists() + assert (uploads_dir / "victim.txt").is_symlink() + + def test_rejects_dangling_symlink_destination(self, tmp_path): + from app.channels import manager + + uploads_dir = tmp_path / "uploads" + uploads_dir.mkdir() + missing_target = tmp_path / "missing-created.txt" + (uploads_dir / "victim.txt").symlink_to(missing_target) + + msg = InboundMessage( + channel_name="test-channel", + chat_id="chat-1", + user_id="user-1", + text="see attachment", + files=[{"filename": "victim.txt", "url": "https://example.invalid/victim.txt"}], + ) + + async def fake_reader(file_info, client): + return b"attacker data" + + with ( + patch("deerflow.uploads.manager.ensure_uploads_dir", return_value=uploads_dir), + patch.dict(manager.INBOUND_FILE_READERS, {"test-channel": fake_reader}, clear=False), + ): + result = _run(manager._ingest_inbound_files("thread-1", msg)) + + assert result == [] + assert not missing_target.exists() + assert (uploads_dir / "victim.txt").is_symlink() + + def test_hardlinked_existing_file_is_not_overwritten(self, tmp_path): + from app.channels import manager + + uploads_dir = tmp_path / "uploads" + uploads_dir.mkdir() + outside_file = tmp_path / "outside-created.txt" + outside_file.write_text("protected", encoding="utf-8") + os.link(outside_file, uploads_dir / "victim.txt") + + msg = InboundMessage( + channel_name="test-channel", + chat_id="chat-1", + user_id="user-1", + text="see attachment", + files=[{"filename": "victim.txt", "url": "https://example.invalid/victim.txt"}], + ) + + async def fake_reader(file_info, client): + return b"new attachment data" + + with ( + patch("deerflow.uploads.manager.ensure_uploads_dir", return_value=uploads_dir), + patch.dict(manager.INBOUND_FILE_READERS, {"test-channel": fake_reader}, clear=False), + ): + result = _run(manager._ingest_inbound_files("thread-1", msg)) + + assert result == [ + { + "filename": "victim_1.txt", + "size": len(b"new attachment data"), + "path": "/mnt/user-data/uploads/victim_1.txt", + "is_image": False, + } + ] + assert outside_file.read_text(encoding="utf-8") == "protected" + assert (uploads_dir / "victim.txt").read_text(encoding="utf-8") == "protected" + assert (uploads_dir / "victim_1.txt").read_bytes() == b"new attachment data" + + # --------------------------------------------------------------------------- # Channel base class _on_outbound with attachments # --------------------------------------------------------------------------- diff --git a/backend/tests/test_uploads_manager.py b/backend/tests/test_uploads_manager.py index 64964c0b0..2cf1ae7fb 100644 --- a/backend/tests/test_uploads_manager.py +++ b/backend/tests/test_uploads_manager.py @@ -1,14 +1,20 @@ """Tests for deerflow.uploads.manager — shared upload management logic.""" +import errno +import os +from unittest.mock import patch + import pytest from deerflow.uploads.manager import ( PathTraversalError, + UnsafeUploadPathError, claim_unique_filename, delete_file_safe, list_files_in_dir, normalize_filename, validate_path_traversal, + write_upload_file_no_symlink, ) # --------------------------------------------------------------------------- @@ -97,6 +103,54 @@ class TestValidatePathTraversal: validate_path_traversal(link, tmp_path) +# --------------------------------------------------------------------------- +# write_upload_file_no_symlink +# --------------------------------------------------------------------------- + + +class TestWriteUploadFileNoSymlink: + def test_writes_new_file(self, tmp_path): + dest = write_upload_file_no_symlink(tmp_path, "notes.txt", b"hello") + + assert dest == tmp_path / "notes.txt" + assert dest.read_bytes() == b"hello" + + def test_overwrites_existing_regular_file_with_single_link(self, tmp_path): + dest = tmp_path / "notes.txt" + dest.write_bytes(b"old contents") + assert os.stat(dest).st_nlink == 1 + + result = write_upload_file_no_symlink(tmp_path, "notes.txt", b"new contents") + + assert result == dest + assert dest.read_bytes() == b"new contents" + assert os.stat(dest).st_nlink == 1 + + def test_fails_closed_without_no_follow_support(self, tmp_path, monkeypatch): + monkeypatch.delattr(os, "O_NOFOLLOW", raising=False) + + with pytest.raises(UnsafeUploadPathError, match="O_NOFOLLOW"): + write_upload_file_no_symlink(tmp_path, "notes.txt", b"hello") + + assert not (tmp_path / "notes.txt").exists() + + def test_open_uses_nonblocking_flag_when_available(self, tmp_path): + with patch("deerflow.uploads.manager.os.open", side_effect=OSError(errno.ENXIO, "no reader")) as open_mock: + with pytest.raises(UnsafeUploadPathError, match="Unsafe upload destination"): + write_upload_file_no_symlink(tmp_path, "pipe.txt", b"hello") + + flags = open_mock.call_args.args[1] + assert flags & os.O_NONBLOCK + + @pytest.mark.parametrize("open_errno", [errno.ENXIO, errno.EAGAIN]) + def test_nonblocking_special_file_open_errors_are_unsafe(self, tmp_path, open_errno): + with patch("deerflow.uploads.manager.os.open", side_effect=OSError(open_errno, "would block")): + with pytest.raises(UnsafeUploadPathError, match="Unsafe upload destination"): + write_upload_file_no_symlink(tmp_path, "pipe.txt", b"hello") + + assert not (tmp_path / "pipe.txt").exists() + + # --------------------------------------------------------------------------- # list_files_in_dir # --------------------------------------------------------------------------- diff --git a/backend/tests/test_uploads_router.py b/backend/tests/test_uploads_router.py index a2538ec40..4a778345f 100644 --- a/backend/tests/test_uploads_router.py +++ b/backend/tests/test_uploads_router.py @@ -1,4 +1,5 @@ import asyncio +import os import stat from io import BytesIO from pathlib import Path @@ -428,6 +429,105 @@ def test_upload_files_rejects_dotdot_and_dot_filenames(tmp_path): assert [f.name for f in thread_uploads_dir.iterdir()] == ["passwd"] +def test_upload_files_rejects_preexisting_symlink_destination(tmp_path): + thread_uploads_dir = tmp_path / "uploads" + thread_uploads_dir.mkdir(parents=True) + outside_file = tmp_path / "outside.txt" + outside_file.write_text("protected", encoding="utf-8") + (thread_uploads_dir / "victim.txt").symlink_to(outside_file) + + provider = MagicMock() + provider.uses_thread_data_mounts = True + + with ( + patch.object(uploads, "get_uploads_dir", return_value=thread_uploads_dir), + patch.object(uploads, "ensure_uploads_dir", return_value=thread_uploads_dir), + patch.object(uploads, "get_sandbox_provider", return_value=provider), + ): + file = UploadFile(filename="victim.txt", file=BytesIO(b"attacker upload")) + result = asyncio.run(uploads.upload_files("thread-local", files=[file])) + + assert result.success is False + assert result.files == [] + assert result.skipped_files == ["victim.txt"] + assert "skipped 1 unsafe file" in result.message + assert outside_file.read_text(encoding="utf-8") == "protected" + assert (thread_uploads_dir / "victim.txt").is_symlink() + + +def test_upload_files_rejects_dangling_symlink_destination(tmp_path): + thread_uploads_dir = tmp_path / "uploads" + thread_uploads_dir.mkdir(parents=True) + missing_target = tmp_path / "missing-target.txt" + (thread_uploads_dir / "victim.txt").symlink_to(missing_target) + + provider = MagicMock() + provider.uses_thread_data_mounts = True + + with ( + patch.object(uploads, "get_uploads_dir", return_value=thread_uploads_dir), + patch.object(uploads, "ensure_uploads_dir", return_value=thread_uploads_dir), + patch.object(uploads, "get_sandbox_provider", return_value=provider), + ): + file = UploadFile(filename="victim.txt", file=BytesIO(b"attacker upload")) + result = asyncio.run(uploads.upload_files("thread-local", files=[file])) + + assert result.success is False + assert result.files == [] + assert result.skipped_files == ["victim.txt"] + assert not missing_target.exists() + assert (thread_uploads_dir / "victim.txt").is_symlink() + + +def test_upload_files_rejects_hardlinked_destination_without_truncating(tmp_path): + thread_uploads_dir = tmp_path / "uploads" + thread_uploads_dir.mkdir(parents=True) + outside_file = tmp_path / "outside.txt" + outside_file.write_text("protected", encoding="utf-8") + os.link(outside_file, thread_uploads_dir / "victim.txt") + + provider = MagicMock() + provider.uses_thread_data_mounts = True + + with ( + patch.object(uploads, "get_uploads_dir", return_value=thread_uploads_dir), + patch.object(uploads, "ensure_uploads_dir", return_value=thread_uploads_dir), + patch.object(uploads, "get_sandbox_provider", return_value=provider), + ): + file = UploadFile(filename="victim.txt", file=BytesIO(b"attacker upload")) + result = asyncio.run(uploads.upload_files("thread-local", files=[file])) + + assert result.success is False + assert result.files == [] + assert result.skipped_files == ["victim.txt"] + assert outside_file.read_text(encoding="utf-8") == "protected" + assert (thread_uploads_dir / "victim.txt").read_text(encoding="utf-8") == "protected" + + +def test_upload_files_overwrites_existing_regular_file(tmp_path): + thread_uploads_dir = tmp_path / "uploads" + thread_uploads_dir.mkdir(parents=True) + existing_file = thread_uploads_dir / "notes.txt" + existing_file.write_bytes(b"old upload") + assert existing_file.stat().st_nlink == 1 + + provider = MagicMock() + provider.uses_thread_data_mounts = True + + with ( + patch.object(uploads, "get_uploads_dir", return_value=thread_uploads_dir), + patch.object(uploads, "ensure_uploads_dir", return_value=thread_uploads_dir), + patch.object(uploads, "get_sandbox_provider", return_value=provider), + ): + file = UploadFile(filename="notes.txt", file=BytesIO(b"new upload")) + result = asyncio.run(uploads.upload_files("thread-local", files=[file])) + + assert result.success is True + assert [file_info["filename"] for file_info in result.files] == ["notes.txt"] + assert existing_file.read_bytes() == b"new upload" + assert existing_file.stat().st_nlink == 1 + + def test_delete_uploaded_file_removes_generated_markdown_companion(tmp_path): thread_uploads_dir = tmp_path / "uploads" thread_uploads_dir.mkdir(parents=True)