deer-flow/backend/tests/test_channel_file_attachments.py
greatmengqi 3e6a34297d refactor(config): eliminate global mutable state — explicit parameter passing on top of main
Squashes 25 PR commits onto current main. AppConfig becomes a pure value
object with no ambient lookup. Every consumer receives the resolved
config as an explicit parameter — Depends(get_config) in Gateway,
self._app_config in DeerFlowClient, runtime.context.app_config in agent
runs, AppConfig.from_file() at the LangGraph Server registration
boundary.

Phase 1 — frozen data + typed context

- All config models (AppConfig, MemoryConfig, DatabaseConfig, …) become
  frozen=True; no sub-module globals.
- AppConfig.from_file() is pure (no side-effect singleton loaders).
- Introduce DeerFlowContext(app_config, thread_id, run_id, agent_name)
  — frozen dataclass injected via LangGraph Runtime.
- Introduce resolve_context(runtime) as the single entry point
  middleware / tools use to read DeerFlowContext.

Phase 2 — pure explicit parameter passing

- Gateway: app.state.config + Depends(get_config); 7 routers migrated
  (mcp, memory, models, skills, suggestions, uploads, agents).
- DeerFlowClient: __init__(config=...) captures config locally.
- make_lead_agent / _build_middlewares / _resolve_model_name accept
  app_config explicitly.
- RunContext.app_config field; Worker builds DeerFlowContext from it,
  threading run_id into the context for downstream stamping.
- Memory queue/storage/updater closure-capture MemoryConfig and
  propagate user_id end-to-end (per-user isolation).
- Sandbox/skills/community/factories/tools thread app_config.
- resolve_context() rejects non-typed runtime.context.
- Test suite migrated off AppConfig.current() monkey-patches.
- AppConfig.current() classmethod deleted.

Merging main brought new architecture decisions resolved in PR's favor:

- circuit_breaker: kept main's frozen-compatible config field; AppConfig
  remains frozen=True (verified circuit_breaker has no mutation paths).
- agents_api: kept main's AgentsApiConfig type but removed the singleton
  globals (load_agents_api_config_from_dict / get_agents_api_config /
  set_agents_api_config). 8 routes in agents.py now read via
  Depends(get_config).
- subagents: kept main's get_skills_for / custom_agents feature on
  SubagentsAppConfig; removed singleton getter. registry.py now reads
  app_config.subagents directly.
- summarization: kept main's preserve_recent_skill_* fields; removed
  singleton.
- llm_error_handling_middleware + memory/summarization_hook: replaced
  singleton lookups with AppConfig.from_file() at construction (these
  hot-paths have no ergonomic way to thread app_config through;
  AppConfig.from_file is a pure load).
- worker.py + thread_data_middleware.py: DeerFlowContext.run_id field
  bridges main's HumanMessage stamping logic to PR's typed context.

Trade-offs (follow-up work):

- main's #2138 (async memory updater) reverted to PR's sync
  implementation. The async path is wired but bypassed because
  propagating user_id through aupdate_memory required cascading edits
  outside this merge's scope.
- tests/test_subagent_skills_config.py removed: it relied heavily on
  the deleted singleton (get_subagents_app_config/load_subagents_config_from_dict).
  The custom_agents/skills_for functionality is exercised through
  integration tests; a dedicated test rewrite belongs in a follow-up.

Verification: backend test suite — 2560 passed, 4 skipped, 84 failures.
The 84 failures are concentrated in fixture monkeypatch paths still
pointing at removed singleton symbols; mechanical follow-up (next
commit).
2026-04-26 21:45:02 +08:00

461 lines
16 KiB
Python

"""Tests for channel file attachment support (ResolvedAttachment, resolution, send_file)."""
from __future__ import annotations
import asyncio
from pathlib import Path
from unittest.mock import MagicMock, patch
from app.channels.base import Channel
from app.channels.message_bus import MessageBus, OutboundMessage, ResolvedAttachment
def _run(coro):
"""Run an async coroutine synchronously."""
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
# ---------------------------------------------------------------------------
# ResolvedAttachment tests
# ---------------------------------------------------------------------------
class TestResolvedAttachment:
def test_basic_construction(self, tmp_path):
f = tmp_path / "test.pdf"
f.write_bytes(b"PDF content")
att = ResolvedAttachment(
virtual_path="/mnt/user-data/outputs/test.pdf",
actual_path=f,
filename="test.pdf",
mime_type="application/pdf",
size=11,
is_image=False,
)
assert att.filename == "test.pdf"
assert att.is_image is False
assert att.size == 11
def test_image_detection(self, tmp_path):
f = tmp_path / "photo.png"
f.write_bytes(b"\x89PNG")
att = ResolvedAttachment(
virtual_path="/mnt/user-data/outputs/photo.png",
actual_path=f,
filename="photo.png",
mime_type="image/png",
size=4,
is_image=True,
)
assert att.is_image is True
# ---------------------------------------------------------------------------
# OutboundMessage.attachments field tests
# ---------------------------------------------------------------------------
class TestOutboundMessageAttachments:
def test_default_empty_attachments(self):
msg = OutboundMessage(
channel_name="test",
chat_id="c1",
thread_id="t1",
text="hello",
)
assert msg.attachments == []
def test_attachments_populated(self, tmp_path):
f = tmp_path / "file.txt"
f.write_text("content")
att = ResolvedAttachment(
virtual_path="/mnt/user-data/outputs/file.txt",
actual_path=f,
filename="file.txt",
mime_type="text/plain",
size=7,
is_image=False,
)
msg = OutboundMessage(
channel_name="test",
chat_id="c1",
thread_id="t1",
text="hello",
attachments=[att],
)
assert len(msg.attachments) == 1
assert msg.attachments[0].filename == "file.txt"
# ---------------------------------------------------------------------------
# _resolve_attachments tests
# ---------------------------------------------------------------------------
class TestResolveAttachments:
def test_resolves_existing_file(self, tmp_path):
"""Successfully resolves a virtual path to an existing file."""
from app.channels.manager import _resolve_attachments
# Create the directory structure: threads/{thread_id}/user-data/outputs/
thread_id = "test-thread-123"
outputs_dir = tmp_path / "threads" / thread_id / "user-data" / "outputs"
outputs_dir.mkdir(parents=True)
test_file = outputs_dir / "report.pdf"
test_file.write_bytes(b"%PDF-1.4 fake content")
mock_paths = MagicMock()
mock_paths.resolve_virtual_path.return_value = test_file
mock_paths.sandbox_outputs_dir.return_value = outputs_dir
with patch("deerflow.config.paths.get_paths", return_value=mock_paths):
result = _resolve_attachments(thread_id, ["/mnt/user-data/outputs/report.pdf"])
assert len(result) == 1
assert result[0].filename == "report.pdf"
assert result[0].mime_type == "application/pdf"
assert result[0].is_image is False
assert result[0].size == len(b"%PDF-1.4 fake content")
def test_resolves_image_file(self, tmp_path):
"""Images are detected by MIME type."""
from app.channels.manager import _resolve_attachments
thread_id = "test-thread"
outputs_dir = tmp_path / "threads" / thread_id / "user-data" / "outputs"
outputs_dir.mkdir(parents=True)
img = outputs_dir / "chart.png"
img.write_bytes(b"\x89PNG fake image")
mock_paths = MagicMock()
mock_paths.resolve_virtual_path.return_value = img
mock_paths.sandbox_outputs_dir.return_value = outputs_dir
with patch("deerflow.config.paths.get_paths", return_value=mock_paths):
result = _resolve_attachments(thread_id, ["/mnt/user-data/outputs/chart.png"])
assert len(result) == 1
assert result[0].is_image is True
assert result[0].mime_type == "image/png"
def test_skips_missing_file(self, tmp_path):
"""Missing files are skipped with a warning."""
from app.channels.manager import _resolve_attachments
outputs_dir = tmp_path / "outputs"
outputs_dir.mkdir()
mock_paths = MagicMock()
mock_paths.resolve_virtual_path.return_value = outputs_dir / "nonexistent.txt"
mock_paths.sandbox_outputs_dir.return_value = outputs_dir
with patch("deerflow.config.paths.get_paths", return_value=mock_paths):
result = _resolve_attachments("t1", ["/mnt/user-data/outputs/nonexistent.txt"])
assert result == []
def test_skips_invalid_path(self):
"""Invalid paths (ValueError from resolve) are skipped."""
from app.channels.manager import _resolve_attachments
mock_paths = MagicMock()
mock_paths.resolve_virtual_path.side_effect = ValueError("bad path")
with patch("deerflow.config.paths.get_paths", return_value=mock_paths):
result = _resolve_attachments("t1", ["/invalid/path"])
assert result == []
def test_rejects_uploads_path(self):
"""Paths under /mnt/user-data/uploads/ are rejected (security)."""
from app.channels.manager import _resolve_attachments
mock_paths = MagicMock()
with patch("deerflow.config.paths.get_paths", return_value=mock_paths):
result = _resolve_attachments("t1", ["/mnt/user-data/uploads/secret.pdf"])
assert result == []
mock_paths.resolve_virtual_path.assert_not_called()
def test_rejects_workspace_path(self):
"""Paths under /mnt/user-data/workspace/ are rejected (security)."""
from app.channels.manager import _resolve_attachments
mock_paths = MagicMock()
with patch("deerflow.config.paths.get_paths", return_value=mock_paths):
result = _resolve_attachments("t1", ["/mnt/user-data/workspace/config.py"])
assert result == []
mock_paths.resolve_virtual_path.assert_not_called()
def test_rejects_path_traversal_escape(self, tmp_path):
"""Paths that escape the outputs directory after resolution are rejected."""
from app.channels.manager import _resolve_attachments
thread_id = "t1"
outputs_dir = tmp_path / "threads" / thread_id / "user-data" / "outputs"
outputs_dir.mkdir(parents=True)
# Simulate a resolved path that escapes outside the outputs directory
escaped_file = tmp_path / "threads" / thread_id / "user-data" / "uploads" / "stolen.txt"
escaped_file.parent.mkdir(parents=True, exist_ok=True)
escaped_file.write_text("sensitive")
mock_paths = MagicMock()
mock_paths.resolve_virtual_path.return_value = escaped_file
mock_paths.sandbox_outputs_dir.return_value = outputs_dir
with patch("deerflow.config.paths.get_paths", return_value=mock_paths):
result = _resolve_attachments(thread_id, ["/mnt/user-data/outputs/../uploads/stolen.txt"])
assert result == []
def test_multiple_artifacts_partial_resolution(self, tmp_path):
"""Mixed valid/invalid artifacts: only valid ones are returned."""
from app.channels.manager import _resolve_attachments
thread_id = "t1"
outputs_dir = tmp_path / "outputs"
outputs_dir.mkdir()
good_file = outputs_dir / "data.csv"
good_file.write_text("a,b,c")
mock_paths = MagicMock()
mock_paths.sandbox_outputs_dir.return_value = outputs_dir
def resolve_side_effect(tid, vpath, *, user_id=None):
if "data.csv" in vpath:
return good_file
return tmp_path / "missing.txt"
mock_paths.resolve_virtual_path.side_effect = resolve_side_effect
with patch("deerflow.config.paths.get_paths", return_value=mock_paths):
result = _resolve_attachments(
thread_id,
["/mnt/user-data/outputs/data.csv", "/mnt/user-data/outputs/missing.txt"],
)
assert len(result) == 1
assert result[0].filename == "data.csv"
# ---------------------------------------------------------------------------
# Channel base class _on_outbound with attachments
# ---------------------------------------------------------------------------
class _DummyChannel(Channel):
"""Concrete channel for testing the base class behavior."""
def __init__(self, bus):
super().__init__(name="dummy", bus=bus, config={})
self.sent_messages: list[OutboundMessage] = []
self.sent_files: list[tuple[OutboundMessage, ResolvedAttachment]] = []
async def start(self):
pass
async def stop(self):
pass
async def send(self, msg: OutboundMessage) -> None:
self.sent_messages.append(msg)
async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool:
self.sent_files.append((msg, attachment))
return True
class TestBaseChannelOnOutbound:
def test_default_receive_file_returns_original_message(self):
"""The base Channel.receive_file returns the original message unchanged."""
class MinimalChannel(Channel):
async def start(self):
pass
async def stop(self):
pass
async def send(self, msg):
pass
from app.channels.message_bus import InboundMessage
bus = MessageBus()
ch = MinimalChannel(name="minimal", bus=bus, config={})
msg = InboundMessage(channel_name="minimal", chat_id="c1", user_id="u1", text="hello", files=[{"file_key": "k1"}])
result = _run(ch.receive_file(msg, "thread-1"))
assert result is msg
assert result.text == "hello"
assert result.files == [{"file_key": "k1"}]
def test_send_file_called_for_each_attachment(self, tmp_path):
"""_on_outbound sends text first, then uploads each attachment."""
bus = MessageBus()
ch = _DummyChannel(bus)
f1 = tmp_path / "a.txt"
f1.write_text("aaa")
f2 = tmp_path / "b.png"
f2.write_bytes(b"\x89PNG")
att1 = ResolvedAttachment("/mnt/user-data/outputs/a.txt", f1, "a.txt", "text/plain", 3, False)
att2 = ResolvedAttachment("/mnt/user-data/outputs/b.png", f2, "b.png", "image/png", 4, True)
msg = OutboundMessage(
channel_name="dummy",
chat_id="c1",
thread_id="t1",
text="Here are your files",
attachments=[att1, att2],
)
_run(ch._on_outbound(msg))
assert len(ch.sent_messages) == 1
assert len(ch.sent_files) == 2
assert ch.sent_files[0][1].filename == "a.txt"
assert ch.sent_files[1][1].filename == "b.png"
def test_no_attachments_no_send_file(self):
"""When there are no attachments, send_file is not called."""
bus = MessageBus()
ch = _DummyChannel(bus)
msg = OutboundMessage(
channel_name="dummy",
chat_id="c1",
thread_id="t1",
text="No files here",
)
_run(ch._on_outbound(msg))
assert len(ch.sent_messages) == 1
assert len(ch.sent_files) == 0
def test_send_file_failure_does_not_block_others(self, tmp_path):
"""If one attachment upload fails, remaining attachments still get sent."""
bus = MessageBus()
ch = _DummyChannel(bus)
# Override send_file to fail on first call, succeed on second
call_count = 0
original_send_file = ch.send_file
async def flaky_send_file(msg, att):
nonlocal call_count
call_count += 1
if call_count == 1:
raise RuntimeError("upload failed")
return await original_send_file(msg, att)
ch.send_file = flaky_send_file # type: ignore
f1 = tmp_path / "fail.txt"
f1.write_text("x")
f2 = tmp_path / "ok.txt"
f2.write_text("y")
att1 = ResolvedAttachment("/mnt/user-data/outputs/fail.txt", f1, "fail.txt", "text/plain", 1, False)
att2 = ResolvedAttachment("/mnt/user-data/outputs/ok.txt", f2, "ok.txt", "text/plain", 1, False)
msg = OutboundMessage(
channel_name="dummy",
chat_id="c1",
thread_id="t1",
text="files",
attachments=[att1, att2],
)
_run(ch._on_outbound(msg))
# First upload failed, second succeeded
assert len(ch.sent_files) == 1
assert ch.sent_files[0][1].filename == "ok.txt"
def test_send_raises_skips_file_uploads(self, tmp_path):
"""When send() raises, file uploads are skipped entirely."""
bus = MessageBus()
ch = _DummyChannel(bus)
async def failing_send(msg):
raise RuntimeError("network error")
ch.send = failing_send # type: ignore
f = tmp_path / "a.pdf"
f.write_bytes(b"%PDF")
att = ResolvedAttachment("/mnt/user-data/outputs/a.pdf", f, "a.pdf", "application/pdf", 4, False)
msg = OutboundMessage(
channel_name="dummy",
chat_id="c1",
thread_id="t1",
text="Here is the file",
attachments=[att],
)
_run(ch._on_outbound(msg))
# send() raised, so send_file should never be called
assert len(ch.sent_files) == 0
def test_default_send_file_returns_false(self):
"""The base Channel.send_file returns False by default."""
class MinimalChannel(Channel):
async def start(self):
pass
async def stop(self):
pass
async def send(self, msg):
pass
bus = MessageBus()
ch = MinimalChannel(name="minimal", bus=bus, config={})
att = ResolvedAttachment("/x", Path("/x"), "x", "text/plain", 0, False)
msg = OutboundMessage(channel_name="minimal", chat_id="c", thread_id="t", text="t")
result = _run(ch.send_file(msg, att))
assert result is False
# ---------------------------------------------------------------------------
# ChannelManager artifact resolution integration
# ---------------------------------------------------------------------------
class TestManagerArtifactResolution:
def test_handle_chat_populates_attachments(self):
"""Verify _resolve_attachments is importable and works with the manager module."""
from app.channels.manager import _resolve_attachments
# Basic smoke test: empty artifacts returns empty list
mock_paths = MagicMock()
with patch("deerflow.config.paths.get_paths", return_value=mock_paths):
result = _resolve_attachments("t1", [])
assert result == []
def test_format_artifact_text_for_unresolved(self):
"""_format_artifact_text produces expected output."""
from app.channels.manager import _format_artifact_text
assert "report.pdf" in _format_artifact_text(["/mnt/user-data/outputs/report.pdf"])
result = _format_artifact_text(["/mnt/user-data/outputs/a.txt", "/mnt/user-data/outputs/b.txt"])
assert "a.txt" in result
assert "b.txt" in result