mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-28 20:58:16 +00:00
* fix(harness): constrain view_image to thread data paths Fixes #2530 * fix(harness): address view_image review findings * style(harness): format view_image changes * fix(harness): address view_image review comments
165 lines
5.6 KiB
Python
165 lines
5.6 KiB
Python
import base64
|
|
import importlib
|
|
import os
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
|
|
from deerflow.tools.builtins.view_image_tool import view_image_tool
|
|
|
|
view_image_module = importlib.import_module("deerflow.tools.builtins.view_image_tool")
|
|
|
|
PNG_BYTES = base64.b64decode("iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg==")
|
|
|
|
|
|
def _make_thread_data(tmp_path: Path) -> dict[str, str]:
|
|
user_data = tmp_path / "threads" / "thread-1" / "user-data"
|
|
workspace = user_data / "workspace"
|
|
uploads = user_data / "uploads"
|
|
outputs = user_data / "outputs"
|
|
for directory in (workspace, uploads, outputs):
|
|
directory.mkdir(parents=True)
|
|
|
|
return {
|
|
"workspace_path": str(workspace),
|
|
"uploads_path": str(uploads),
|
|
"outputs_path": str(outputs),
|
|
}
|
|
|
|
|
|
def _make_runtime(thread_data: dict[str, str]) -> SimpleNamespace:
|
|
return SimpleNamespace(
|
|
state={"thread_data": thread_data},
|
|
context={"thread_id": "thread-1"},
|
|
config={},
|
|
)
|
|
|
|
|
|
def _message_content(result) -> str:
|
|
return result.update["messages"][0].content
|
|
|
|
|
|
def test_view_image_rejects_external_absolute_path(tmp_path: Path) -> None:
|
|
thread_data = _make_thread_data(tmp_path)
|
|
outside_image = tmp_path / "outside.png"
|
|
outside_image.write_bytes(PNG_BYTES)
|
|
|
|
result = view_image_tool.func(
|
|
runtime=_make_runtime(thread_data),
|
|
image_path=str(outside_image),
|
|
tool_call_id="tc-external",
|
|
)
|
|
|
|
assert "Only image paths under /mnt/user-data" in _message_content(result)
|
|
assert "viewed_images" not in result.update
|
|
|
|
|
|
def test_view_image_reads_virtual_uploads_path(tmp_path: Path) -> None:
|
|
thread_data = _make_thread_data(tmp_path)
|
|
image_path = Path(thread_data["uploads_path"]) / "sample.png"
|
|
image_path.write_bytes(PNG_BYTES)
|
|
|
|
result = view_image_tool.func(
|
|
runtime=_make_runtime(thread_data),
|
|
image_path="/mnt/user-data/uploads/sample.png",
|
|
tool_call_id="tc-uploads",
|
|
)
|
|
|
|
assert _message_content(result) == "Successfully read image"
|
|
viewed_image = result.update["viewed_images"]["/mnt/user-data/uploads/sample.png"]
|
|
assert viewed_image["base64"] == base64.b64encode(PNG_BYTES).decode("utf-8")
|
|
assert viewed_image["mime_type"] == "image/png"
|
|
|
|
|
|
def test_view_image_rejects_spoofed_extension(tmp_path: Path) -> None:
|
|
thread_data = _make_thread_data(tmp_path)
|
|
image_path = Path(thread_data["uploads_path"]) / "not-really.png"
|
|
image_path.write_bytes(b"not an image")
|
|
|
|
result = view_image_tool.func(
|
|
runtime=_make_runtime(thread_data),
|
|
image_path="/mnt/user-data/uploads/not-really.png",
|
|
tool_call_id="tc-spoofed",
|
|
)
|
|
|
|
assert "contents do not match" in _message_content(result)
|
|
assert "viewed_images" not in result.update
|
|
|
|
|
|
def test_view_image_rejects_mismatched_magic_bytes(tmp_path: Path) -> None:
|
|
thread_data = _make_thread_data(tmp_path)
|
|
image_path = Path(thread_data["uploads_path"]) / "jpeg-named-png.png"
|
|
image_path.write_bytes(b"\xff\xd8\xff\xe0fake-jpeg")
|
|
|
|
result = view_image_tool.func(
|
|
runtime=_make_runtime(thread_data),
|
|
image_path="/mnt/user-data/uploads/jpeg-named-png.png",
|
|
tool_call_id="tc-mismatch",
|
|
)
|
|
|
|
assert "file extension indicates image/png" in _message_content(result)
|
|
assert "viewed_images" not in result.update
|
|
|
|
|
|
def test_view_image_rejects_oversized_image(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
|
thread_data = _make_thread_data(tmp_path)
|
|
image_path = Path(thread_data["uploads_path"]) / "sample.png"
|
|
image_path.write_bytes(PNG_BYTES)
|
|
monkeypatch.setattr(view_image_module, "_MAX_IMAGE_BYTES", len(PNG_BYTES) - 1)
|
|
|
|
result = view_image_tool.func(
|
|
runtime=_make_runtime(thread_data),
|
|
image_path="/mnt/user-data/uploads/sample.png",
|
|
tool_call_id="tc-oversized",
|
|
)
|
|
|
|
assert "Image file is too large" in _message_content(result)
|
|
assert "viewed_images" not in result.update
|
|
|
|
|
|
def test_view_image_sanitizes_read_errors(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
|
thread_data = _make_thread_data(tmp_path)
|
|
image_path = Path(thread_data["uploads_path"]) / "sample.png"
|
|
image_path.write_bytes(PNG_BYTES)
|
|
|
|
def _open(*args, **kwargs):
|
|
raise PermissionError(f"permission denied: {image_path}")
|
|
|
|
monkeypatch.setattr("builtins.open", _open)
|
|
|
|
result = view_image_tool.func(
|
|
runtime=_make_runtime(thread_data),
|
|
image_path="/mnt/user-data/uploads/sample.png",
|
|
tool_call_id="tc-read-error",
|
|
)
|
|
|
|
message = _message_content(result)
|
|
assert "Error reading image file" in message
|
|
assert str(image_path) not in message
|
|
assert str(Path(thread_data["uploads_path"])) not in message
|
|
assert "/mnt/user-data/uploads/sample.png" in message
|
|
assert "viewed_images" not in result.update
|
|
|
|
|
|
@pytest.mark.skipif(os.name == "nt", reason="symlink semantics differ on Windows")
|
|
def test_view_image_rejects_uploads_symlink_escape(tmp_path: Path) -> None:
|
|
thread_data = _make_thread_data(tmp_path)
|
|
outside_image = tmp_path / "outside-target.png"
|
|
outside_image.write_bytes(PNG_BYTES)
|
|
|
|
link_path = Path(thread_data["uploads_path"]) / "escape.png"
|
|
try:
|
|
link_path.symlink_to(outside_image)
|
|
except OSError as exc:
|
|
pytest.skip(f"symlink creation failed: {exc}")
|
|
|
|
result = view_image_tool.func(
|
|
runtime=_make_runtime(thread_data),
|
|
image_path="/mnt/user-data/uploads/escape.png",
|
|
tool_call_id="tc-symlink",
|
|
)
|
|
|
|
assert "path traversal" in _message_content(result)
|
|
assert "viewed_images" not in result.update
|