From 88e535269ec1b4ec06ee3ad7f6143ddea27305ab Mon Sep 17 00:00:00 2001 From: JilongSun <965640067@qq.com> Date: Mon, 6 Apr 2026 22:14:12 +0800 Subject: [PATCH] Feature/feishu receive file (#1608) * feat(feishu): add channel file materialization hook for inbound messages - Introduce Channel.receive_file(msg, thread_id) as a base method for file materialization; default is no-op. - Implement FeishuChannel.receive_file to download files/images from Feishu messages, save to sandbox, and inject virtual paths into msg.text. - Update ChannelManager to call receive_file for any channel if msg.files is present, enabling downstream model access to user-uploaded files. - No impact on Slack/Telegram or other channels (they inherit the default no-op). * style(backend): format code with ruff for lint compliance - Auto-formatted packages/harness/deerflow/agents/factory.py and tests/test_create_deerflow_agent.py using `ruff format` - Ensured both files conform to project linting standards - Fixes CI lint check failures caused by code style issues * fix(feishu): handle file write operation asynchronously to prevent blocking * fix(feishu): rename GetMessageResourceRequest to _GetMessageResourceRequest and remove redundant code * test(feishu): add tests for receive_file method and placeholder replacement * fix(manager): remove unnecessary type casting for channel retrieval * fix(feishu): update logging messages to reflect resource handling instead of image * fix(feishu): sanitize filename by replacing invalid characters in file uploads * fix(feishu): improve filename sanitization and reorder image key handling in message processing * fix(feishu): add thread lock to prevent filename conflicts during file downloads * fix(test): correct bad merge in test_feishu_parser.py * chore: run ruff and apply formatting cleanup fix(feishu): preserve rich-text attachment order and improve fallback filename handling --- backend/app/channels/base.py | 18 +++ backend/app/channels/feishu.py | 149 +++++++++++++++++- backend/app/channels/manager.py | 12 ++ backend/app/channels/service.py | 5 + .../tests/test_channel_file_attachments.py | 25 +++ backend/tests/test_channels.py | 56 +++++++ backend/tests/test_feishu_parser.py | 71 ++++++++- 7 files changed, 331 insertions(+), 5 deletions(-) diff --git a/backend/app/channels/base.py b/backend/app/channels/base.py index d92365379..95aecf267 100644 --- a/backend/app/channels/base.py +++ b/backend/app/channels/base.py @@ -106,3 +106,21 @@ class Channel(ABC): logger.warning("[%s] file upload skipped for %s", self.name, attachment.filename) except Exception: logger.exception("[%s] failed to upload file %s", self.name, attachment.filename) + + async def receive_file(self, msg: InboundMessage, thread_id: str) -> InboundMessage: + """ + Optionally process and materialize inbound file attachments for this channel. + + By default, this method does nothing and simply returns the original message. + Subclasses (e.g. FeishuChannel) may override this to download files (images, documents, etc) + referenced in msg.files, save them to the sandbox, and update msg.text to include + the sandbox file paths for downstream model consumption. + + Args: + msg: The inbound message, possibly containing file metadata in msg.files. + thread_id: The resolved DeerFlow thread ID for sandbox path context. + + Returns: + The (possibly modified) InboundMessage, with text and/or files updated as needed. + """ + return msg diff --git a/backend/app/channels/feishu.py b/backend/app/channels/feishu.py index 6492d28e5..c2a637ff9 100644 --- a/backend/app/channels/feishu.py +++ b/backend/app/channels/feishu.py @@ -5,12 +5,15 @@ from __future__ import annotations import asyncio import json import logging +import re import threading -from typing import Any +from typing import Any, Literal from app.channels.base import Channel from app.channels.commands import KNOWN_CHANNEL_COMMANDS -from app.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment +from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment +from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths +from deerflow.sandbox.sandbox_provider import get_sandbox_provider logger = logging.getLogger(__name__) @@ -56,6 +59,8 @@ class FeishuChannel(Channel): self._CreateFileRequestBody = None self._CreateImageRequest = None self._CreateImageRequestBody = None + self._GetMessageResourceRequest = None + self._thread_lock = threading.Lock() async def start(self) -> None: if self._running: @@ -73,6 +78,7 @@ class FeishuChannel(Channel): CreateMessageRequest, CreateMessageRequestBody, Emoji, + GetMessageResourceRequest, PatchMessageRequest, PatchMessageRequestBody, ReplyMessageRequest, @@ -96,6 +102,7 @@ class FeishuChannel(Channel): self._CreateFileRequestBody = CreateFileRequestBody self._CreateImageRequest = CreateImageRequest self._CreateImageRequestBody = CreateImageRequestBody + self._GetMessageResourceRequest = GetMessageResourceRequest app_id = self.config.get("app_id", "") app_secret = self.config.get("app_secret", "") @@ -275,6 +282,112 @@ class FeishuChannel(Channel): raise RuntimeError(f"Feishu file upload failed: code={response.code}, msg={response.msg}") return response.data.file_key + async def receive_file(self, msg: InboundMessage, thread_id: str) -> InboundMessage: + """Download a Feishu file into the thread uploads directory. + + Returns the sandbox virtual path when the image is persisted successfully. + """ + if not msg.thread_ts: + logger.warning("[Feishu] received file message without thread_ts, cannot associate with conversation: %s", msg) + return msg + files = msg.files + if not files: + logger.warning("[Feishu] received message with no files: %s", msg) + return msg + text = msg.text + for file in files: + if file.get("image_key"): + virtual_path = await self._receive_single_file(msg.thread_ts, file["image_key"], "image", thread_id) + text = text.replace("[image]", virtual_path, 1) + elif file.get("file_key"): + virtual_path = await self._receive_single_file(msg.thread_ts, file["file_key"], "file", thread_id) + text = text.replace("[file]", virtual_path, 1) + msg.text = text + return msg + + async def _receive_single_file(self, message_id: str, file_key: str, type: Literal["image", "file"], thread_id: str) -> str: + request = self._GetMessageResourceRequest.builder().message_id(message_id).file_key(file_key).type(type).build() + + def inner(): + return self._api_client.im.v1.message_resource.get(request) + + try: + response = await asyncio.to_thread(inner) + except Exception: + logger.exception("[Feishu] resource get request failed for resource_key=%s type=%s", file_key, type) + return f"Failed to obtain the [{type}]" + + if not response.success(): + logger.warning( + "[Feishu] resource get failed: resource_key=%s, type=%s, code=%s, msg=%s, log_id=%s ", + file_key, + type, + response.code, + response.msg, + response.get_log_id(), + ) + return f"Failed to obtain the [{type}]" + + image_stream = getattr(response, "file", None) + if image_stream is None: + logger.warning("[Feishu] resource get returned no file stream: resource_key=%s, type=%s", file_key, type) + return f"Failed to obtain the [{type}]" + + try: + content: bytes = await asyncio.to_thread(image_stream.read) + except Exception: + logger.exception("[Feishu] failed to read resource stream: resource_key=%s, type=%s", file_key, type) + return f"Failed to obtain the [{type}]" + + if not content: + logger.warning("[Feishu] empty resource content: resource_key=%s, type=%s", file_key, type) + return f"Failed to obtain the [{type}]" + + paths = get_paths() + paths.ensure_thread_dirs(thread_id) + uploads_dir = paths.sandbox_uploads_dir(thread_id).resolve() + + ext = "png" if type == "image" else "bin" + raw_filename = getattr(response, "file_name", "") or f"feishu_{file_key[-12:]}.{ext}" + + # Sanitize filename: preserve extension, replace path chars in name part + if "." in raw_filename: + name_part, ext = raw_filename.rsplit(".", 1) + name_part = re.sub(r"[./\\]", "_", name_part) + filename = f"{name_part}.{ext}" + else: + filename = re.sub(r"[./\\]", "_", raw_filename) + resolved_target = uploads_dir / filename + + def down_load(): + # use thread_lock to avoid filename conflicts when writing + with self._thread_lock: + resolved_target.write_bytes(content) + + try: + await asyncio.to_thread(down_load) + except Exception: + logger.exception("[Feishu] failed to persist downloaded resource: %s, type=%s", resolved_target, type) + return f"Failed to obtain the [{type}]" + + virtual_path = f"{VIRTUAL_PATH_PREFIX}/uploads/{resolved_target.name}" + + try: + sandbox_provider = get_sandbox_provider() + sandbox_id = sandbox_provider.acquire(thread_id) + if sandbox_id != "local": + sandbox = sandbox_provider.get(sandbox_id) + if sandbox is None: + logger.warning("[Feishu] sandbox not found for thread_id=%s", thread_id) + return f"Failed to obtain the [{type}]" + sandbox.update_file(virtual_path, content) + except Exception: + logger.exception("[Feishu] failed to sync resource into non-local sandbox: %s", virtual_path) + return f"Failed to obtain the [{type}]" + + logger.info("[Feishu] downloaded resource mapped: file_key=%s -> %s", file_key, virtual_path) + return virtual_path + # -- message formatting ------------------------------------------------ @staticmethod @@ -479,9 +592,28 @@ class FeishuChannel(Channel): # Parse message content content = json.loads(message.content) + # files_list store the any-file-key in feishu messages, which can be used to download the file content later + # In Feishu channel, image_keys are independent of file_keys. + # The file_key includes files, videos, and audio, but does not include stickers. + files_list = [] + if "text" in content: # Handle plain text messages text = content["text"] + elif "file_key" in content: + file_key = content.get("file_key") + if isinstance(file_key, str) and file_key: + files_list.append({"file_key": file_key}) + text = "[file]" + else: + text = "" + elif "image_key" in content: + image_key = content.get("image_key") + if isinstance(image_key, str) and image_key: + files_list.append({"image_key": image_key}) + text = "[image]" + else: + text = "" elif "content" in content and isinstance(content["content"], list): # Handle rich-text messages with a top-level "content" list (e.g., topic groups/posts) text_paragraphs: list[str] = [] @@ -495,6 +627,16 @@ class FeishuChannel(Channel): text_value = element.get("text", "") if text_value: paragraph_text_parts.append(text_value) + elif element.get("tag") == "img": + image_key = element.get("image_key") + if isinstance(image_key, str) and image_key: + files_list.append({"image_key": image_key}) + paragraph_text_parts.append("[image]") + elif element.get("tag") in ("file", "media"): + file_key = element.get("file_key") + if isinstance(file_key, str) and file_key: + files_list.append({"file_key": file_key}) + paragraph_text_parts.append("[file]") if paragraph_text_parts: # Join text segments within a paragraph with spaces to avoid "helloworld" text_paragraphs.append(" ".join(paragraph_text_parts)) @@ -514,7 +656,7 @@ class FeishuChannel(Channel): text[:100] if text else "", ) - if not text: + if not (text or files_list): logger.info("[Feishu] empty text, ignoring message") return @@ -534,6 +676,7 @@ class FeishuChannel(Channel): text=text, msg_type=msg_type, thread_ts=msg_id, + files=files_list, metadata={"message_id": msg_id, "root_id": root_id}, ) inbound.topic_id = topic_id diff --git a/backend/app/channels/manager.py b/backend/app/channels/manager.py index 96e9c755e..0d2a3a4ba 100644 --- a/backend/app/channels/manager.py +++ b/backend/app/channels/manager.py @@ -675,6 +675,18 @@ class ChannelManager: thread_id = await self._create_thread(client, msg) assistant_id, run_config, run_context = self._resolve_run_params(msg, thread_id) + + # If the inbound message contains file attachments, let the channel + # materialize (download) them and update msg.text to include sandbox file paths. + # This enables downstream models to access user-uploaded files by path. + # Channels that do not support file download will simply return the original message. + if msg.files: + from .service import get_channel_service + + service = get_channel_service() + channel = service.get_channel(msg.channel_name) if service else None + logger.info("[Manager] preparing receive file context for %d attachments", len(msg.files)) + msg = await channel.receive_file(msg, thread_id) if channel else msg if extra_context: run_context.update(extra_context) diff --git a/backend/app/channels/service.py b/backend/app/channels/service.py index 672b67295..1906aef0b 100644 --- a/backend/app/channels/service.py +++ b/backend/app/channels/service.py @@ -6,6 +6,7 @@ import logging import os from typing import Any +from app.channels.base import Channel from app.channels.manager import DEFAULT_GATEWAY_URL, DEFAULT_LANGGRAPH_URL, ChannelManager from app.channels.message_bus import MessageBus from app.channels.store import ChannelStore @@ -164,6 +165,10 @@ class ChannelService: "channels": channels_status, } + def get_channel(self, name: str) -> Channel | None: + """Return a running channel instance by name when available.""" + return self._channels.get(name) + # -- singleton access ------------------------------------------------------- diff --git a/backend/tests/test_channel_file_attachments.py b/backend/tests/test_channel_file_attachments.py index 60bc58e35..2843a9cd0 100644 --- a/backend/tests/test_channel_file_attachments.py +++ b/backend/tests/test_channel_file_attachments.py @@ -276,6 +276,31 @@ class _DummyChannel(Channel): class TestBaseChannelOnOutbound: + def test_default_receive_file_returns_original_message(self): + """The base Channel.receive_file returns the original message unchanged.""" + + class MinimalChannel(Channel): + async def start(self): + pass + + async def stop(self): + pass + + async def send(self, msg): + pass + + from app.channels.message_bus import InboundMessage + + bus = MessageBus() + ch = MinimalChannel(name="minimal", bus=bus, config={}) + msg = InboundMessage(channel_name="minimal", chat_id="c1", user_id="u1", text="hello", files=[{"file_key": "k1"}]) + + result = _run(ch.receive_file(msg, "thread-1")) + + assert result is msg + assert result.text == "hello" + assert result.files == [{"file_key": "k1"}] + def test_send_file_called_for_each_attachment(self, tmp_path): """_on_outbound sends text first, then uploads each attachment.""" bus = MessageBus() diff --git a/backend/tests/test_channels.py b/backend/tests/test_channels.py index aaa5997b9..7fc412653 100644 --- a/backend/tests/test_channels.py +++ b/backend/tests/test_channels.py @@ -414,6 +414,62 @@ def _make_async_iterator(items): class TestChannelManager: + def test_handle_chat_calls_channel_receive_file_for_inbound_files(self, monkeypatch): + from app.channels.manager import ChannelManager + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + manager = ChannelManager(bus=bus, store=store) + + outbound_received = [] + + async def capture_outbound(msg): + outbound_received.append(msg) + + bus.subscribe_outbound(capture_outbound) + + mock_client = _make_mock_langgraph_client() + manager._client = mock_client + + modified_msg = InboundMessage( + channel_name="test", + chat_id="chat1", + user_id="user1", + text="with /mnt/user-data/uploads/demo.png", + files=[{"image_key": "img_1"}], + ) + mock_channel = MagicMock() + mock_channel.receive_file = AsyncMock(return_value=modified_msg) + mock_service = MagicMock() + mock_service.get_channel.return_value = mock_channel + monkeypatch.setattr("app.channels.service.get_channel_service", lambda: mock_service) + + await manager.start() + + inbound = InboundMessage( + channel_name="test", + chat_id="chat1", + user_id="user1", + text="hi [image]", + files=[{"image_key": "img_1"}], + ) + await bus.publish_inbound(inbound) + await _wait_for(lambda: len(outbound_received) >= 1) + await manager.stop() + + mock_channel.receive_file.assert_awaited_once() + called_msg, called_thread_id = mock_channel.receive_file.await_args.args + assert called_msg.text == "hi [image]" + assert isinstance(called_thread_id, str) + assert called_thread_id + + mock_client.runs.wait.assert_called_once() + run_call_args = mock_client.runs.wait.call_args + assert run_call_args[1]["input"]["messages"][0]["content"] == "with /mnt/user-data/uploads/demo.png" + + _run(go()) + def test_handle_chat_creates_thread(self): from app.channels.manager import ChannelManager diff --git a/backend/tests/test_feishu_parser.py b/backend/tests/test_feishu_parser.py index 7a1fd9fc7..202862fb1 100644 --- a/backend/tests/test_feishu_parser.py +++ b/backend/tests/test_feishu_parser.py @@ -1,11 +1,20 @@ +import asyncio import json -from unittest.mock import MagicMock +from unittest.mock import AsyncMock, MagicMock import pytest from app.channels.commands import KNOWN_CHANNEL_COMMANDS from app.channels.feishu import FeishuChannel -from app.channels.message_bus import MessageBus +from app.channels.message_bus import InboundMessage, MessageBus + + +def _run(coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() def test_feishu_on_message_plain_text(): @@ -71,6 +80,64 @@ def test_feishu_on_message_rich_text(): assert "\n\n" in parsed_text +def test_feishu_receive_file_replaces_placeholders_in_order(): + async def go(): + bus = MessageBus() + channel = FeishuChannel(bus, {"app_id": "test", "app_secret": "test"}) + + msg = InboundMessage( + channel_name="feishu", + chat_id="chat_1", + user_id="user_1", + text="before [image] middle [file] after", + thread_ts="msg_1", + files=[{"image_key": "img_key"}, {"file_key": "file_key"}], + ) + + channel._receive_single_file = AsyncMock(side_effect=["/mnt/user-data/uploads/a.png", "/mnt/user-data/uploads/b.pdf"]) + + result = await channel.receive_file(msg, "thread_1") + + assert result.text == "before /mnt/user-data/uploads/a.png middle /mnt/user-data/uploads/b.pdf after" + + _run(go()) + + +def test_feishu_on_message_extracts_image_and_file_keys(): + bus = MessageBus() + channel = FeishuChannel(bus, {"app_id": "test", "app_secret": "test"}) + + event = MagicMock() + event.event.message.chat_id = "chat_1" + event.event.message.message_id = "msg_1" + event.event.message.root_id = None + event.event.sender.sender_id.open_id = "user_1" + + # Rich text with one image and one file element. + event.event.message.content = json.dumps( + { + "content": [ + [ + {"tag": "text", "text": "See"}, + {"tag": "img", "image_key": "img_123"}, + {"tag": "file", "file_key": "file_456"}, + ] + ] + } + ) + + with pytest.MonkeyPatch.context() as m: + mock_make_inbound = MagicMock() + m.setattr(channel, "_make_inbound", mock_make_inbound) + channel._on_message(event) + + mock_make_inbound.assert_called_once() + files = mock_make_inbound.call_args[1]["files"] + assert files == [{"image_key": "img_123"}, {"file_key": "file_456"}] + assert "[image]" in mock_make_inbound.call_args[1]["text"] + assert "[file]" in mock_make_inbound.call_args[1]["text"] + + @pytest.mark.parametrize("command", sorted(KNOWN_CHANNEL_COMMANDS)) def test_feishu_recognizes_all_known_slash_commands(command): """Every entry in KNOWN_CHANNEL_COMMANDS must be classified as a command."""