Feature/feishu receive file (#1608)

* feat(feishu): add channel file materialization hook for inbound messages

- Introduce Channel.receive_file(msg, thread_id) as a base method for file materialization; default is no-op.
- Implement FeishuChannel.receive_file to download files/images from Feishu messages, save to sandbox, and inject virtual paths into msg.text.
- Update ChannelManager to call receive_file for any channel if msg.files is present, enabling downstream model access to user-uploaded files.
- No impact on Slack/Telegram or other channels (they inherit the default no-op).

* style(backend): format code with ruff for lint compliance

- Auto-formatted packages/harness/deerflow/agents/factory.py and tests/test_create_deerflow_agent.py using `ruff format`
- Ensured both files conform to project linting standards
- Fixes CI lint check failures caused by code style issues

* fix(feishu): handle file write operation asynchronously to prevent blocking

* fix(feishu): rename GetMessageResourceRequest to _GetMessageResourceRequest and remove redundant code

* test(feishu): add tests for receive_file method and placeholder replacement

* fix(manager): remove unnecessary type casting for channel retrieval

* fix(feishu): update logging messages to reflect resource handling instead of image

* fix(feishu): sanitize filename by replacing invalid characters in file uploads

* fix(feishu): improve filename sanitization and reorder image key handling in message processing

* fix(feishu): add thread lock to prevent filename conflicts during file downloads

* fix(test): correct bad merge in test_feishu_parser.py

* chore: run ruff and apply formatting cleanup
fix(feishu): preserve rich-text attachment order and improve fallback filename handling
This commit is contained in:
JilongSun 2026-04-06 22:14:12 +08:00 committed by GitHub
parent 888f7bfb9d
commit 88e535269e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 331 additions and 5 deletions

View File

@ -106,3 +106,21 @@ class Channel(ABC):
logger.warning("[%s] file upload skipped for %s", self.name, attachment.filename)
except Exception:
logger.exception("[%s] failed to upload file %s", self.name, attachment.filename)
async def receive_file(self, msg: InboundMessage, thread_id: str) -> InboundMessage:
"""
Optionally process and materialize inbound file attachments for this channel.
By default, this method does nothing and simply returns the original message.
Subclasses (e.g. FeishuChannel) may override this to download files (images, documents, etc)
referenced in msg.files, save them to the sandbox, and update msg.text to include
the sandbox file paths for downstream model consumption.
Args:
msg: The inbound message, possibly containing file metadata in msg.files.
thread_id: The resolved DeerFlow thread ID for sandbox path context.
Returns:
The (possibly modified) InboundMessage, with text and/or files updated as needed.
"""
return msg

View File

@ -5,12 +5,15 @@ from __future__ import annotations
import asyncio
import json
import logging
import re
import threading
from typing import Any
from typing import Any, Literal
from app.channels.base import Channel
from app.channels.commands import KNOWN_CHANNEL_COMMANDS
from app.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths
from deerflow.sandbox.sandbox_provider import get_sandbox_provider
logger = logging.getLogger(__name__)
@ -56,6 +59,8 @@ class FeishuChannel(Channel):
self._CreateFileRequestBody = None
self._CreateImageRequest = None
self._CreateImageRequestBody = None
self._GetMessageResourceRequest = None
self._thread_lock = threading.Lock()
async def start(self) -> None:
if self._running:
@ -73,6 +78,7 @@ class FeishuChannel(Channel):
CreateMessageRequest,
CreateMessageRequestBody,
Emoji,
GetMessageResourceRequest,
PatchMessageRequest,
PatchMessageRequestBody,
ReplyMessageRequest,
@ -96,6 +102,7 @@ class FeishuChannel(Channel):
self._CreateFileRequestBody = CreateFileRequestBody
self._CreateImageRequest = CreateImageRequest
self._CreateImageRequestBody = CreateImageRequestBody
self._GetMessageResourceRequest = GetMessageResourceRequest
app_id = self.config.get("app_id", "")
app_secret = self.config.get("app_secret", "")
@ -275,6 +282,112 @@ class FeishuChannel(Channel):
raise RuntimeError(f"Feishu file upload failed: code={response.code}, msg={response.msg}")
return response.data.file_key
async def receive_file(self, msg: InboundMessage, thread_id: str) -> InboundMessage:
"""Download a Feishu file into the thread uploads directory.
Returns the sandbox virtual path when the image is persisted successfully.
"""
if not msg.thread_ts:
logger.warning("[Feishu] received file message without thread_ts, cannot associate with conversation: %s", msg)
return msg
files = msg.files
if not files:
logger.warning("[Feishu] received message with no files: %s", msg)
return msg
text = msg.text
for file in files:
if file.get("image_key"):
virtual_path = await self._receive_single_file(msg.thread_ts, file["image_key"], "image", thread_id)
text = text.replace("[image]", virtual_path, 1)
elif file.get("file_key"):
virtual_path = await self._receive_single_file(msg.thread_ts, file["file_key"], "file", thread_id)
text = text.replace("[file]", virtual_path, 1)
msg.text = text
return msg
async def _receive_single_file(self, message_id: str, file_key: str, type: Literal["image", "file"], thread_id: str) -> str:
request = self._GetMessageResourceRequest.builder().message_id(message_id).file_key(file_key).type(type).build()
def inner():
return self._api_client.im.v1.message_resource.get(request)
try:
response = await asyncio.to_thread(inner)
except Exception:
logger.exception("[Feishu] resource get request failed for resource_key=%s type=%s", file_key, type)
return f"Failed to obtain the [{type}]"
if not response.success():
logger.warning(
"[Feishu] resource get failed: resource_key=%s, type=%s, code=%s, msg=%s, log_id=%s ",
file_key,
type,
response.code,
response.msg,
response.get_log_id(),
)
return f"Failed to obtain the [{type}]"
image_stream = getattr(response, "file", None)
if image_stream is None:
logger.warning("[Feishu] resource get returned no file stream: resource_key=%s, type=%s", file_key, type)
return f"Failed to obtain the [{type}]"
try:
content: bytes = await asyncio.to_thread(image_stream.read)
except Exception:
logger.exception("[Feishu] failed to read resource stream: resource_key=%s, type=%s", file_key, type)
return f"Failed to obtain the [{type}]"
if not content:
logger.warning("[Feishu] empty resource content: resource_key=%s, type=%s", file_key, type)
return f"Failed to obtain the [{type}]"
paths = get_paths()
paths.ensure_thread_dirs(thread_id)
uploads_dir = paths.sandbox_uploads_dir(thread_id).resolve()
ext = "png" if type == "image" else "bin"
raw_filename = getattr(response, "file_name", "") or f"feishu_{file_key[-12:]}.{ext}"
# Sanitize filename: preserve extension, replace path chars in name part
if "." in raw_filename:
name_part, ext = raw_filename.rsplit(".", 1)
name_part = re.sub(r"[./\\]", "_", name_part)
filename = f"{name_part}.{ext}"
else:
filename = re.sub(r"[./\\]", "_", raw_filename)
resolved_target = uploads_dir / filename
def down_load():
# use thread_lock to avoid filename conflicts when writing
with self._thread_lock:
resolved_target.write_bytes(content)
try:
await asyncio.to_thread(down_load)
except Exception:
logger.exception("[Feishu] failed to persist downloaded resource: %s, type=%s", resolved_target, type)
return f"Failed to obtain the [{type}]"
virtual_path = f"{VIRTUAL_PATH_PREFIX}/uploads/{resolved_target.name}"
try:
sandbox_provider = get_sandbox_provider()
sandbox_id = sandbox_provider.acquire(thread_id)
if sandbox_id != "local":
sandbox = sandbox_provider.get(sandbox_id)
if sandbox is None:
logger.warning("[Feishu] sandbox not found for thread_id=%s", thread_id)
return f"Failed to obtain the [{type}]"
sandbox.update_file(virtual_path, content)
except Exception:
logger.exception("[Feishu] failed to sync resource into non-local sandbox: %s", virtual_path)
return f"Failed to obtain the [{type}]"
logger.info("[Feishu] downloaded resource mapped: file_key=%s -> %s", file_key, virtual_path)
return virtual_path
# -- message formatting ------------------------------------------------
@staticmethod
@ -479,9 +592,28 @@ class FeishuChannel(Channel):
# Parse message content
content = json.loads(message.content)
# files_list store the any-file-key in feishu messages, which can be used to download the file content later
# In Feishu channel, image_keys are independent of file_keys.
# The file_key includes files, videos, and audio, but does not include stickers.
files_list = []
if "text" in content:
# Handle plain text messages
text = content["text"]
elif "file_key" in content:
file_key = content.get("file_key")
if isinstance(file_key, str) and file_key:
files_list.append({"file_key": file_key})
text = "[file]"
else:
text = ""
elif "image_key" in content:
image_key = content.get("image_key")
if isinstance(image_key, str) and image_key:
files_list.append({"image_key": image_key})
text = "[image]"
else:
text = ""
elif "content" in content and isinstance(content["content"], list):
# Handle rich-text messages with a top-level "content" list (e.g., topic groups/posts)
text_paragraphs: list[str] = []
@ -495,6 +627,16 @@ class FeishuChannel(Channel):
text_value = element.get("text", "")
if text_value:
paragraph_text_parts.append(text_value)
elif element.get("tag") == "img":
image_key = element.get("image_key")
if isinstance(image_key, str) and image_key:
files_list.append({"image_key": image_key})
paragraph_text_parts.append("[image]")
elif element.get("tag") in ("file", "media"):
file_key = element.get("file_key")
if isinstance(file_key, str) and file_key:
files_list.append({"file_key": file_key})
paragraph_text_parts.append("[file]")
if paragraph_text_parts:
# Join text segments within a paragraph with spaces to avoid "helloworld"
text_paragraphs.append(" ".join(paragraph_text_parts))
@ -514,7 +656,7 @@ class FeishuChannel(Channel):
text[:100] if text else "",
)
if not text:
if not (text or files_list):
logger.info("[Feishu] empty text, ignoring message")
return
@ -534,6 +676,7 @@ class FeishuChannel(Channel):
text=text,
msg_type=msg_type,
thread_ts=msg_id,
files=files_list,
metadata={"message_id": msg_id, "root_id": root_id},
)
inbound.topic_id = topic_id

View File

@ -675,6 +675,18 @@ class ChannelManager:
thread_id = await self._create_thread(client, msg)
assistant_id, run_config, run_context = self._resolve_run_params(msg, thread_id)
# If the inbound message contains file attachments, let the channel
# materialize (download) them and update msg.text to include sandbox file paths.
# This enables downstream models to access user-uploaded files by path.
# Channels that do not support file download will simply return the original message.
if msg.files:
from .service import get_channel_service
service = get_channel_service()
channel = service.get_channel(msg.channel_name) if service else None
logger.info("[Manager] preparing receive file context for %d attachments", len(msg.files))
msg = await channel.receive_file(msg, thread_id) if channel else msg
if extra_context:
run_context.update(extra_context)

View File

@ -6,6 +6,7 @@ import logging
import os
from typing import Any
from app.channels.base import Channel
from app.channels.manager import DEFAULT_GATEWAY_URL, DEFAULT_LANGGRAPH_URL, ChannelManager
from app.channels.message_bus import MessageBus
from app.channels.store import ChannelStore
@ -164,6 +165,10 @@ class ChannelService:
"channels": channels_status,
}
def get_channel(self, name: str) -> Channel | None:
"""Return a running channel instance by name when available."""
return self._channels.get(name)
# -- singleton access -------------------------------------------------------

View File

@ -276,6 +276,31 @@ class _DummyChannel(Channel):
class TestBaseChannelOnOutbound:
def test_default_receive_file_returns_original_message(self):
"""The base Channel.receive_file returns the original message unchanged."""
class MinimalChannel(Channel):
async def start(self):
pass
async def stop(self):
pass
async def send(self, msg):
pass
from app.channels.message_bus import InboundMessage
bus = MessageBus()
ch = MinimalChannel(name="minimal", bus=bus, config={})
msg = InboundMessage(channel_name="minimal", chat_id="c1", user_id="u1", text="hello", files=[{"file_key": "k1"}])
result = _run(ch.receive_file(msg, "thread-1"))
assert result is msg
assert result.text == "hello"
assert result.files == [{"file_key": "k1"}]
def test_send_file_called_for_each_attachment(self, tmp_path):
"""_on_outbound sends text first, then uploads each attachment."""
bus = MessageBus()

View File

@ -414,6 +414,62 @@ def _make_async_iterator(items):
class TestChannelManager:
def test_handle_chat_calls_channel_receive_file_for_inbound_files(self, monkeypatch):
from app.channels.manager import ChannelManager
async def go():
bus = MessageBus()
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
manager = ChannelManager(bus=bus, store=store)
outbound_received = []
async def capture_outbound(msg):
outbound_received.append(msg)
bus.subscribe_outbound(capture_outbound)
mock_client = _make_mock_langgraph_client()
manager._client = mock_client
modified_msg = InboundMessage(
channel_name="test",
chat_id="chat1",
user_id="user1",
text="with /mnt/user-data/uploads/demo.png",
files=[{"image_key": "img_1"}],
)
mock_channel = MagicMock()
mock_channel.receive_file = AsyncMock(return_value=modified_msg)
mock_service = MagicMock()
mock_service.get_channel.return_value = mock_channel
monkeypatch.setattr("app.channels.service.get_channel_service", lambda: mock_service)
await manager.start()
inbound = InboundMessage(
channel_name="test",
chat_id="chat1",
user_id="user1",
text="hi [image]",
files=[{"image_key": "img_1"}],
)
await bus.publish_inbound(inbound)
await _wait_for(lambda: len(outbound_received) >= 1)
await manager.stop()
mock_channel.receive_file.assert_awaited_once()
called_msg, called_thread_id = mock_channel.receive_file.await_args.args
assert called_msg.text == "hi [image]"
assert isinstance(called_thread_id, str)
assert called_thread_id
mock_client.runs.wait.assert_called_once()
run_call_args = mock_client.runs.wait.call_args
assert run_call_args[1]["input"]["messages"][0]["content"] == "with /mnt/user-data/uploads/demo.png"
_run(go())
def test_handle_chat_creates_thread(self):
from app.channels.manager import ChannelManager

View File

@ -1,11 +1,20 @@
import asyncio
import json
from unittest.mock import MagicMock
from unittest.mock import AsyncMock, MagicMock
import pytest
from app.channels.commands import KNOWN_CHANNEL_COMMANDS
from app.channels.feishu import FeishuChannel
from app.channels.message_bus import MessageBus
from app.channels.message_bus import InboundMessage, MessageBus
def _run(coro):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
def test_feishu_on_message_plain_text():
@ -71,6 +80,64 @@ def test_feishu_on_message_rich_text():
assert "\n\n" in parsed_text
def test_feishu_receive_file_replaces_placeholders_in_order():
async def go():
bus = MessageBus()
channel = FeishuChannel(bus, {"app_id": "test", "app_secret": "test"})
msg = InboundMessage(
channel_name="feishu",
chat_id="chat_1",
user_id="user_1",
text="before [image] middle [file] after",
thread_ts="msg_1",
files=[{"image_key": "img_key"}, {"file_key": "file_key"}],
)
channel._receive_single_file = AsyncMock(side_effect=["/mnt/user-data/uploads/a.png", "/mnt/user-data/uploads/b.pdf"])
result = await channel.receive_file(msg, "thread_1")
assert result.text == "before /mnt/user-data/uploads/a.png middle /mnt/user-data/uploads/b.pdf after"
_run(go())
def test_feishu_on_message_extracts_image_and_file_keys():
bus = MessageBus()
channel = FeishuChannel(bus, {"app_id": "test", "app_secret": "test"})
event = MagicMock()
event.event.message.chat_id = "chat_1"
event.event.message.message_id = "msg_1"
event.event.message.root_id = None
event.event.sender.sender_id.open_id = "user_1"
# Rich text with one image and one file element.
event.event.message.content = json.dumps(
{
"content": [
[
{"tag": "text", "text": "See"},
{"tag": "img", "image_key": "img_123"},
{"tag": "file", "file_key": "file_456"},
]
]
}
)
with pytest.MonkeyPatch.context() as m:
mock_make_inbound = MagicMock()
m.setattr(channel, "_make_inbound", mock_make_inbound)
channel._on_message(event)
mock_make_inbound.assert_called_once()
files = mock_make_inbound.call_args[1]["files"]
assert files == [{"image_key": "img_123"}, {"file_key": "file_456"}]
assert "[image]" in mock_make_inbound.call_args[1]["text"]
assert "[file]" in mock_make_inbound.call_args[1]["text"]
@pytest.mark.parametrize("command", sorted(KNOWN_CHANNEL_COMMANDS))
def test_feishu_recognizes_all_known_slash_commands(command):
"""Every entry in KNOWN_CHANNEL_COMMANDS must be classified as a command."""