feat: support wecom channel (#1390)

* feat: support wecom channel

* fix: sending file to client

Signed-off-by: fengxusong <7008971+fengxsong@users.noreply.github.com>

* test: add unit tests for wecom channel

Signed-off-by: fengxusong <7008971+fengxsong@users.noreply.github.com>

* docs: add example configs and setup docs

Signed-off-by: fengxusong <7008971+fengxsong@users.noreply.github.com>

* revert pypi default index setting

Signed-off-by: fengxusong <7008971+fengxsong@users.noreply.github.com>

* revert: keeping codes in harness untouched

Signed-off-by: fengxusong <7008971+fengxsong@users.noreply.github.com>

* fix: format issue

Signed-off-by: fengxusong <7008971+fengxsong@users.noreply.github.com>

* fix: resolve Copilot comments

Signed-off-by: fengxusong <7008971+fengxsong@users.noreply.github.com>

---------

Signed-off-by: fengxusong <7008971+fengxsong@users.noreply.github.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
fengxsong 2026-04-04 11:28:35 +08:00 committed by GitHub
parent 6473d38917
commit 19809800f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 771 additions and 3 deletions

View File

@ -32,3 +32,5 @@ INFOQUEST_API_KEY=your-infoquest-api-key
# GitHub API Token # GitHub API Token
# GITHUB_TOKEN=your-github-token # GITHUB_TOKEN=your-github-token
# WECOM_BOT_ID=your-wecom-bot-id
# WECOM_BOT_SECRET=your-wecom-bot-secret

View File

@ -74,6 +74,8 @@ DeerFlow has newly integrated the intelligent search and crawling toolset indepe
- [Embedded Python Client](#embedded-python-client) - [Embedded Python Client](#embedded-python-client)
- [Documentation](#documentation) - [Documentation](#documentation)
- [⚠️ Security Notice](#-security-notice) - [⚠️ Security Notice](#-security-notice)
- [Improper Deployment May Introduce Security Risks](#improper-deployment-may-introduce-security-risks)
- [Security Recommendations](#security-recommendations)
- [Contributing](#contributing) - [Contributing](#contributing)
- [License](#license) - [License](#license)
- [Acknowledgments](#acknowledgments) - [Acknowledgments](#acknowledgments)
@ -305,6 +307,7 @@ DeerFlow supports receiving tasks from messaging apps. Channels auto-start when
| Telegram | Bot API (long-polling) | Easy | | Telegram | Bot API (long-polling) | Easy |
| Slack | Socket Mode | Moderate | | Slack | Socket Mode | Moderate |
| Feishu / Lark | WebSocket | Moderate | | Feishu / Lark | WebSocket | Moderate |
| WeCom | WebSocket | Moderate |
**Configuration in `config.yaml`:** **Configuration in `config.yaml`:**
@ -332,6 +335,11 @@ channels:
# domain: https://open.feishu.cn # China (default) # domain: https://open.feishu.cn # China (default)
# domain: https://open.larksuite.com # International # domain: https://open.larksuite.com # International
wecom:
enabled: true
bot_id: $WECOM_BOT_ID
bot_secret: $WECOM_BOT_SECRET
slack: slack:
enabled: true enabled: true
bot_token: $SLACK_BOT_TOKEN # xoxb-... bot_token: $SLACK_BOT_TOKEN # xoxb-...
@ -375,6 +383,10 @@ SLACK_APP_TOKEN=xapp-...
# Feishu / Lark # Feishu / Lark
FEISHU_APP_ID=cli_xxxx FEISHU_APP_ID=cli_xxxx
FEISHU_APP_SECRET=your_app_secret FEISHU_APP_SECRET=your_app_secret
# WeCom
WECOM_BOT_ID=your_bot_id
WECOM_BOT_SECRET=your_bot_secret
``` ```
**Telegram Setup** **Telegram Setup**
@ -397,6 +409,14 @@ FEISHU_APP_SECRET=your_app_secret
3. Under **Events**, subscribe to `im.message.receive_v1` and select **Long Connection** mode. 3. Under **Events**, subscribe to `im.message.receive_v1` and select **Long Connection** mode.
4. Copy the App ID and App Secret. Set `FEISHU_APP_ID` and `FEISHU_APP_SECRET` in `.env` and enable the channel in `config.yaml`. 4. Copy the App ID and App Secret. Set `FEISHU_APP_ID` and `FEISHU_APP_SECRET` in `.env` and enable the channel in `config.yaml`.
**WeCom Setup**
1. Create a bot on the WeCom AI Bot platform and obtain the `bot_id` and `bot_secret`.
2. Enable `channels.wecom` in `config.yaml` and fill in `bot_id` / `bot_secret`.
3. Set `WECOM_BOT_ID` and `WECOM_BOT_SECRET` in `.env`.
4. Make sure backend dependencies include `wecom-aibot-python-sdk`. The channel uses a WebSocket long connection and does not require a public callback URL.
5. The current integration supports inbound text, image, and file messages. Final images/files generated by the agent are also sent back to the WeCom conversation.
When DeerFlow runs in Docker Compose, IM channels execute inside the `gateway` container. In that case, do not point `channels.langgraph_url` or `channels.gateway_url` at `localhost`; use container service names such as `http://langgraph:2024` and `http://gateway:8001`, or set `DEER_FLOW_CHANNELS_LANGGRAPH_URL` and `DEER_FLOW_CHANNELS_GATEWAY_URL`. When DeerFlow runs in Docker Compose, IM channels execute inside the `gateway` container. In that case, do not point `channels.langgraph_url` or `channels.gateway_url` at `localhost`; use container service names such as `http://langgraph:2024` and `http://gateway:8001`, or set `DEER_FLOW_CHANNELS_LANGGRAPH_URL` and `DEER_FLOW_CHANNELS_GATEWAY_URL`.
**Commands** **Commands**

View File

@ -232,6 +232,7 @@ DeerFlow 支持从即时通讯应用接收任务。只要配置完成,对应
| Telegram | Bot APIlong-polling | 简单 | | Telegram | Bot APIlong-polling | 简单 |
| Slack | Socket Mode | 中等 | | Slack | Socket Mode | 中等 |
| Feishu / Lark | WebSocket | 中等 | | Feishu / Lark | WebSocket | 中等 |
| 企业微信智能机器人 | WebSocket | 中等 |
**`config.yaml` 中的配置示例:** **`config.yaml` 中的配置示例:**
@ -259,6 +260,11 @@ channels:
# domain: https://open.feishu.cn # 国内版(默认) # domain: https://open.feishu.cn # 国内版(默认)
# domain: https://open.larksuite.com # 国际版 # domain: https://open.larksuite.com # 国际版
wecom:
enabled: true
bot_id: $WECOM_BOT_ID
bot_secret: $WECOM_BOT_SECRET
slack: slack:
enabled: true enabled: true
bot_token: $SLACK_BOT_TOKEN # xoxb-... bot_token: $SLACK_BOT_TOKEN # xoxb-...
@ -302,6 +308,10 @@ SLACK_APP_TOKEN=xapp-...
# Feishu / Lark # Feishu / Lark
FEISHU_APP_ID=cli_xxxx FEISHU_APP_ID=cli_xxxx
FEISHU_APP_SECRET=your_app_secret FEISHU_APP_SECRET=your_app_secret
# 企业微信智能机器人
WECOM_BOT_ID=your_bot_id
WECOM_BOT_SECRET=your_bot_secret
``` ```
**Telegram 配置** **Telegram 配置**
@ -324,6 +334,14 @@ FEISHU_APP_SECRET=your_app_secret
3. 在 **事件订阅** 中订阅 `im.message.receive_v1`,连接方式选择 **长连接** 3. 在 **事件订阅** 中订阅 `im.message.receive_v1`,连接方式选择 **长连接**
4. 复制 App ID 和 App Secret`.env` 中设置 `FEISHU_APP_ID``FEISHU_APP_SECRET`,并在 `config.yaml` 中启用该渠道。 4. 复制 App ID 和 App Secret`.env` 中设置 `FEISHU_APP_ID``FEISHU_APP_SECRET`,并在 `config.yaml` 中启用该渠道。
**企业微信智能机器人配置**
1. 在企业微信智能机器人平台创建机器人,获取 `bot_id``bot_secret`
2. 在 `config.yaml` 中启用 `channels.wecom`,并填入 `bot_id` / `bot_secret`
3. 在 `.env` 中设置 `WECOM_BOT_ID``WECOM_BOT_SECRET`
4. 安装后端依赖时确保包含 `wecom-aibot-python-sdk`,渠道会通过 WebSocket 长连接接收消息,无需公网回调地址。
5. 当前支持文本、图片和文件入站消息agent 生成的最终图片/文件也会回传到企业微信会话中。
**命令** **命令**
渠道连接完成后,你可以直接在聊天窗口里和 DeerFlow 交互: 渠道连接完成后,你可以直接在聊天窗口里和 DeerFlow 交互:

View File

@ -7,9 +7,10 @@ import logging
import mimetypes import mimetypes
import re import re
import time import time
from collections.abc import Mapping from collections.abc import Awaitable, Callable, Mapping
from typing import Any from typing import Any
import httpx
from langgraph_sdk.errors import ConflictError from langgraph_sdk.errors import ConflictError
from app.channels.commands import KNOWN_CHANNEL_COMMANDS from app.channels.commands import KNOWN_CHANNEL_COMMANDS
@ -36,8 +37,49 @@ CHANNEL_CAPABILITIES = {
"feishu": {"supports_streaming": True}, "feishu": {"supports_streaming": True},
"slack": {"supports_streaming": False}, "slack": {"supports_streaming": False},
"telegram": {"supports_streaming": False}, "telegram": {"supports_streaming": False},
"wecom": {"supports_streaming": True},
} }
InboundFileReader = Callable[[dict[str, Any], httpx.AsyncClient], Awaitable[bytes | None]]
INBOUND_FILE_READERS: dict[str, InboundFileReader] = {}
def register_inbound_file_reader(channel_name: str, reader: InboundFileReader) -> None:
INBOUND_FILE_READERS[channel_name] = reader
async def _read_http_inbound_file(file_info: dict[str, Any], client: httpx.AsyncClient) -> bytes | None:
url = file_info.get("url")
if not isinstance(url, str) or not url:
return None
resp = await client.get(url)
resp.raise_for_status()
return resp.content
async def _read_wecom_inbound_file(file_info: dict[str, Any], client: httpx.AsyncClient) -> bytes | None:
data = await _read_http_inbound_file(file_info, client)
if data is None:
return None
aeskey = file_info.get("aeskey") if isinstance(file_info.get("aeskey"), str) else None
if not aeskey:
return data
try:
from aibot.crypto_utils import decrypt_file
except Exception:
logger.exception("[Manager] failed to import WeCom decrypt_file")
return None
return decrypt_file(data, aeskey)
register_inbound_file_reader("wecom", _read_wecom_inbound_file)
class InvalidChannelSessionConfigError(ValueError): class InvalidChannelSessionConfigError(ValueError):
"""Raised when IM channel session overrides contain invalid agent config.""" """Raised when IM channel session overrides contain invalid agent config."""
@ -342,6 +384,105 @@ def _prepare_artifact_delivery(
return response_text, attachments return response_text, attachments
async def _ingest_inbound_files(thread_id: str, msg: InboundMessage) -> list[dict[str, Any]]:
if not msg.files:
return []
from deerflow.uploads.manager import claim_unique_filename, ensure_uploads_dir, normalize_filename
uploads_dir = ensure_uploads_dir(thread_id)
seen_names = {entry.name for entry in uploads_dir.iterdir() if entry.is_file()}
created: list[dict[str, Any]] = []
file_reader = INBOUND_FILE_READERS.get(msg.channel_name, _read_http_inbound_file)
async with httpx.AsyncClient(timeout=httpx.Timeout(20.0)) as client:
for idx, f in enumerate(msg.files):
if not isinstance(f, dict):
continue
ftype = f.get("type") if isinstance(f.get("type"), str) else "file"
filename = f.get("filename") if isinstance(f.get("filename"), str) else ""
try:
data = await file_reader(f, client)
except Exception:
logger.exception(
"[Manager] failed to read inbound file: channel=%s, file=%s",
msg.channel_name,
f.get("url") or filename or idx,
)
continue
if data is None:
logger.warning(
"[Manager] inbound file reader returned no data: channel=%s, file=%s",
msg.channel_name,
f.get("url") or filename or idx,
)
continue
if not filename:
ext = ".bin"
if ftype == "image":
ext = ".png"
filename = f"{msg.thread_ts or 'msg'}_{idx}{ext}"
try:
safe_name = claim_unique_filename(normalize_filename(filename), seen_names)
except ValueError:
logger.warning(
"[Manager] skipping inbound file with unsafe filename: channel=%s, file=%r",
msg.channel_name,
filename,
)
continue
dest = uploads_dir / safe_name
try:
dest.write_bytes(data)
except Exception:
logger.exception("[Manager] failed to write inbound file: %s", dest)
continue
created.append(
{
"filename": safe_name,
"size": len(data),
"path": f"/mnt/user-data/uploads/{safe_name}",
"is_image": ftype == "image",
}
)
return created
def _format_uploaded_files_block(files: list[dict[str, Any]]) -> str:
lines = [
"<uploaded_files>",
"The following files were uploaded in this message:",
"",
]
if not files:
lines.append("(empty)")
else:
for f in files:
filename = f.get("filename", "")
size = int(f.get("size") or 0)
size_kb = size / 1024 if size else 0
size_str = f"{size_kb:.1f} KB" if size_kb < 1024 else f"{size_kb / 1024:.1f} MB"
path = f.get("path", "")
is_image = bool(f.get("is_image"))
file_kind = "image" if is_image else "file"
lines.append(f"- {filename} ({size_str})")
lines.append(f" Type: {file_kind}")
lines.append(f" Path: {path}")
lines.append("")
lines.append("Use `read_file` for text-based files and documents.")
lines.append("Use `view_image` for image files (jpg, jpeg, png, webp) so the model can inspect the image content.")
lines.append("</uploaded_files>")
return "\n".join(lines)
class ChannelManager: class ChannelManager:
"""Core dispatcher that bridges IM channels to the DeerFlow agent. """Core dispatcher that bridges IM channels to the DeerFlow agent.
@ -536,6 +677,11 @@ class ChannelManager:
assistant_id, run_config, run_context = self._resolve_run_params(msg, thread_id) assistant_id, run_config, run_context = self._resolve_run_params(msg, thread_id)
if extra_context: if extra_context:
run_context.update(extra_context) run_context.update(extra_context)
uploaded = await _ingest_inbound_files(thread_id, msg)
if uploaded:
msg.text = f"{_format_uploaded_files_block(uploaded)}\n\n{msg.text}".strip()
if self._channel_supports_streaming(msg.channel_name): if self._channel_supports_streaming(msg.channel_name):
await self._handle_streaming_chat( await self._handle_streaming_chat(
client, client,

View File

@ -17,6 +17,7 @@ _CHANNEL_REGISTRY: dict[str, str] = {
"feishu": "app.channels.feishu:FeishuChannel", "feishu": "app.channels.feishu:FeishuChannel",
"slack": "app.channels.slack:SlackChannel", "slack": "app.channels.slack:SlackChannel",
"telegram": "app.channels.telegram:TelegramChannel", "telegram": "app.channels.telegram:TelegramChannel",
"wecom": "app.channels.wecom:WeComChannel",
} }
_CHANNELS_LANGGRAPH_URL_ENV = "DEER_FLOW_CHANNELS_LANGGRAPH_URL" _CHANNELS_LANGGRAPH_URL_ENV = "DEER_FLOW_CHANNELS_LANGGRAPH_URL"

View File

@ -0,0 +1,394 @@
from __future__ import annotations
import asyncio
import base64
import hashlib
import logging
from collections.abc import Awaitable, Callable
from typing import Any, cast
from app.channels.base import Channel
from app.channels.message_bus import (
InboundMessageType,
MessageBus,
OutboundMessage,
ResolvedAttachment,
)
logger = logging.getLogger(__name__)
class WeComChannel(Channel):
def __init__(self, bus: MessageBus, config: dict[str, Any]) -> None:
super().__init__(name="wecom", bus=bus, config=config)
self._bot_id: str | None = None
self._bot_secret: str | None = None
self._ws_client = None
self._ws_task: asyncio.Task | None = None
self._ws_frames: dict[str, dict[str, Any]] = {}
self._ws_stream_ids: dict[str, str] = {}
self._working_message = "Working on it..."
def _clear_ws_context(self, thread_ts: str | None) -> None:
if not thread_ts:
return
self._ws_frames.pop(thread_ts, None)
self._ws_stream_ids.pop(thread_ts, None)
async def _send_ws_upload_command(self, req_id: str, body: dict[str, Any], cmd: str) -> dict[str, Any]:
if not self._ws_client:
raise RuntimeError("WeCom WebSocket client is not available")
ws_manager = getattr(self._ws_client, "_ws_manager", None)
send_reply = getattr(ws_manager, "send_reply", None)
if not callable(send_reply):
raise RuntimeError("Installed wecom-aibot-python-sdk does not expose the WebSocket media upload API expected by DeerFlow. Use wecom-aibot-python-sdk==0.1.6 or update the adapter.")
send_reply_async = cast(Callable[[str, dict[str, Any], str], Awaitable[dict[str, Any]]], send_reply)
return await send_reply_async(req_id, body, cmd)
async def start(self) -> None:
if self._running:
return
bot_id = self.config.get("bot_id")
bot_secret = self.config.get("bot_secret")
working_message = self.config.get("working_message")
self._bot_id = bot_id if isinstance(bot_id, str) and bot_id else None
self._bot_secret = bot_secret if isinstance(bot_secret, str) and bot_secret else None
self._working_message = working_message if isinstance(working_message, str) and working_message else "Working on it..."
if not self._bot_id or not self._bot_secret:
logger.error("WeCom channel requires bot_id and bot_secret")
return
try:
from aibot import WSClient, WSClientOptions
except ImportError:
logger.error("wecom-aibot-python-sdk is not installed. Install it with: uv add wecom-aibot-python-sdk")
return
else:
self._ws_client = WSClient(WSClientOptions(bot_id=self._bot_id, secret=self._bot_secret, logger=logger))
self._ws_client.on("message.text", self._on_ws_text)
self._ws_client.on("message.mixed", self._on_ws_mixed)
self._ws_client.on("message.image", self._on_ws_image)
self._ws_client.on("message.file", self._on_ws_file)
self._ws_task = asyncio.create_task(self._ws_client.connect())
self._running = True
self.bus.subscribe_outbound(self._on_outbound)
logger.info("WeCom channel started")
async def stop(self) -> None:
self._running = False
self.bus.unsubscribe_outbound(self._on_outbound)
if self._ws_task:
try:
self._ws_task.cancel()
except Exception:
pass
self._ws_task = None
if self._ws_client:
try:
self._ws_client.disconnect()
except Exception:
pass
self._ws_client = None
self._ws_frames.clear()
self._ws_stream_ids.clear()
logger.info("WeCom channel stopped")
async def send(self, msg: OutboundMessage, *, _max_retries: int = 3) -> None:
if self._ws_client:
await self._send_ws(msg, _max_retries=_max_retries)
return
logger.warning("[WeCom] send called but WebSocket client is not available")
async def _on_outbound(self, msg: OutboundMessage) -> None:
if msg.channel_name != self.name:
return
try:
await self.send(msg)
except Exception:
logger.exception("Failed to send outbound message on channel %s", self.name)
if msg.is_final:
self._clear_ws_context(msg.thread_ts)
return
for attachment in msg.attachments:
try:
success = await self.send_file(msg, attachment)
if not success:
logger.warning("[%s] file upload skipped for %s", self.name, attachment.filename)
except Exception:
logger.exception("[%s] failed to upload file %s", self.name, attachment.filename)
if msg.is_final:
self._clear_ws_context(msg.thread_ts)
async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool:
if not msg.is_final:
return True
if not self._ws_client:
return False
if not msg.thread_ts:
return False
frame = self._ws_frames.get(msg.thread_ts)
if not frame:
return False
media_type = "image" if attachment.is_image else "file"
size_limit = 2 * 1024 * 1024 if attachment.is_image else 20 * 1024 * 1024
if attachment.size > size_limit:
logger.warning(
"[WeCom] %s too large (%d bytes), skipping: %s",
media_type,
attachment.size,
attachment.filename,
)
return False
try:
media_id = await self._upload_media_ws(
media_type=media_type,
filename=attachment.filename,
path=str(attachment.actual_path),
size=attachment.size,
)
if not media_id:
return False
body = {media_type: {"media_id": media_id}, "msgtype": media_type}
await self._ws_client.reply(frame, body)
logger.debug("[WeCom] %s sent via ws: %s", media_type, attachment.filename)
return True
except Exception:
logger.exception("[WeCom] failed to upload/send file via ws: %s", attachment.filename)
return False
async def _on_ws_text(self, frame: dict[str, Any]) -> None:
body = frame.get("body", {}) or {}
text = ((body.get("text") or {}).get("content") or "").strip()
quote = body.get("quote", {}).get("text", {}).get("content", "").strip()
if not text and not quote:
return
await self._publish_ws_inbound(frame, text + (f"\nQuote message: {quote}" if quote else ""))
async def _on_ws_mixed(self, frame: dict[str, Any]) -> None:
body = frame.get("body", {}) or {}
mixed = body.get("mixed") or {}
items = mixed.get("msg_item") or []
parts: list[str] = []
files: list[dict[str, Any]] = []
for item in items:
item_type = (item or {}).get("msgtype")
if item_type == "text":
content = (((item or {}).get("text") or {}).get("content") or "").strip()
if content:
parts.append(content)
elif item_type in ("image", "file"):
payload = (item or {}).get(item_type) or {}
url = payload.get("url")
aeskey = payload.get("aeskey")
if isinstance(url, str) and url:
files.append(
{
"type": item_type,
"url": url,
"aeskey": (aeskey if isinstance(aeskey, str) and aeskey else None),
}
)
text = "\n\n".join(parts).strip()
if not text and not files:
return
if not text:
text = "receive image/file"
await self._publish_ws_inbound(frame, text, files=files)
async def _on_ws_image(self, frame: dict[str, Any]) -> None:
body = frame.get("body", {}) or {}
image = body.get("image") or {}
url = image.get("url")
aeskey = image.get("aeskey")
if not isinstance(url, str) or not url:
return
await self._publish_ws_inbound(
frame,
"receive image ",
files=[
{
"type": "image",
"url": url,
"aeskey": aeskey if isinstance(aeskey, str) and aeskey else None,
}
],
)
async def _on_ws_file(self, frame: dict[str, Any]) -> None:
body = frame.get("body", {}) or {}
file_obj = body.get("file") or {}
url = file_obj.get("url")
aeskey = file_obj.get("aeskey")
if not isinstance(url, str) or not url:
return
await self._publish_ws_inbound(
frame,
"receive file",
files=[
{
"type": "file",
"url": url,
"aeskey": aeskey if isinstance(aeskey, str) and aeskey else None,
}
],
)
async def _publish_ws_inbound(
self,
frame: dict[str, Any],
text: str,
*,
files: list[dict[str, Any]] | None = None,
) -> None:
if not self._ws_client:
return
try:
from aibot import generate_req_id
except Exception:
return
body = frame.get("body", {}) or {}
msg_id = body.get("msgid")
if not msg_id:
return
user_id = (body.get("from") or {}).get("userid")
inbound_type = InboundMessageType.COMMAND if text.startswith("/") else InboundMessageType.CHAT
inbound = self._make_inbound(
chat_id=user_id, # keep user's conversation in memory
user_id=user_id,
text=text,
msg_type=inbound_type,
thread_ts=msg_id,
files=files or [],
metadata={"aibotid": body.get("aibotid"), "chattype": body.get("chattype")},
)
inbound.topic_id = user_id # keep the same thread
stream_id = generate_req_id("stream")
self._ws_frames[msg_id] = frame
self._ws_stream_ids[msg_id] = stream_id
try:
await self._ws_client.reply_stream(frame, stream_id, self._working_message, False)
except Exception:
pass
await self.bus.publish_inbound(inbound)
async def _send_ws(self, msg: OutboundMessage, *, _max_retries: int = 3) -> None:
if not self._ws_client:
return
try:
from aibot import generate_req_id
except Exception:
generate_req_id = None
if msg.thread_ts and msg.thread_ts in self._ws_frames:
frame = self._ws_frames[msg.thread_ts]
stream_id = self._ws_stream_ids.get(msg.thread_ts)
if not stream_id and generate_req_id:
stream_id = generate_req_id("stream")
self._ws_stream_ids[msg.thread_ts] = stream_id
if not stream_id:
return
last_exc: Exception | None = None
for attempt in range(_max_retries):
try:
await self._ws_client.reply_stream(frame, stream_id, msg.text, bool(msg.is_final))
return
except Exception as exc:
last_exc = exc
if attempt < _max_retries - 1:
await asyncio.sleep(2**attempt)
if last_exc:
raise last_exc
body = {"msgtype": "markdown", "markdown": {"content": msg.text}}
last_exc = None
for attempt in range(_max_retries):
try:
await self._ws_client.send_message(msg.chat_id, body)
return
except Exception as exc:
last_exc = exc
if attempt < _max_retries - 1:
await asyncio.sleep(2**attempt)
if last_exc:
raise last_exc
async def _upload_media_ws(
self,
*,
media_type: str,
filename: str,
path: str,
size: int,
) -> str | None:
if not self._ws_client:
return None
try:
from aibot import generate_req_id
except Exception:
return None
chunk_size = 512 * 1024
total_chunks = (size + chunk_size - 1) // chunk_size
if total_chunks < 1 or total_chunks > 100:
logger.warning("[WeCom] invalid total_chunks=%d for %s", total_chunks, filename)
return None
md5_hasher = hashlib.md5()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
md5_hasher.update(chunk)
md5 = md5_hasher.hexdigest()
init_req_id = generate_req_id("aibot_upload_media_init")
init_body = {
"type": media_type,
"filename": filename,
"total_size": int(size),
"total_chunks": int(total_chunks),
"md5": md5,
}
init_ack = await self._send_ws_upload_command(init_req_id, init_body, "aibot_upload_media_init")
upload_id = (init_ack.get("body") or {}).get("upload_id")
if not upload_id:
logger.warning("[WeCom] upload init returned no upload_id: %s", init_ack)
return None
with open(path, "rb") as f:
for idx in range(total_chunks):
data = f.read(chunk_size)
if not data:
break
chunk_req_id = generate_req_id("aibot_upload_media_chunk")
chunk_body = {
"upload_id": upload_id,
"chunk_index": int(idx),
"base64_data": base64.b64encode(data).decode("utf-8"),
}
await self._send_ws_upload_command(chunk_req_id, chunk_body, "aibot_upload_media_chunk")
finish_req_id = generate_req_id("aibot_upload_media_finish")
finish_ack = await self._send_ws_upload_command(finish_req_id, {"upload_id": upload_id}, "aibot_upload_media_finish")
media_id = (finish_ack.get("body") or {}).get("media_id")
if not media_id:
logger.warning("[WeCom] upload finish returned no media_id: %s", finish_ack)
return None
return media_id

View File

@ -16,6 +16,7 @@ dependencies = [
"python-telegram-bot>=21.0", "python-telegram-bot>=21.0",
"langgraph-sdk>=0.1.51", "langgraph-sdk>=0.1.51",
"markdown-to-mrkdwn>=0.3.1", "markdown-to-mrkdwn>=0.3.1",
"wecom-aibot-python-sdk>=0.1.6",
] ]
[dependency-groups] [dependency-groups]

View File

@ -12,7 +12,7 @@ from unittest.mock import AsyncMock, MagicMock
import pytest import pytest
from app.channels.base import Channel from app.channels.base import Channel
from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
from app.channels.store import ChannelStore from app.channels.store import ChannelStore
@ -1718,6 +1718,159 @@ class TestFeishuChannel:
_run(go()) _run(go())
class TestWeComChannel:
def test_publish_ws_inbound_starts_stream_and_publishes_message(self, monkeypatch):
from app.channels.wecom import WeComChannel
async def go():
bus = MessageBus()
bus.publish_inbound = AsyncMock()
channel = WeComChannel(bus, config={})
channel._ws_client = SimpleNamespace(reply_stream=AsyncMock())
monkeypatch.setitem(
__import__("sys").modules,
"aibot",
SimpleNamespace(generate_req_id=lambda prefix: "stream-1"),
)
frame = {
"body": {
"msgid": "msg-1",
"from": {"userid": "user-1"},
"aibotid": "bot-1",
"chattype": "single",
}
}
files = [{"type": "image", "url": "https://example.com/image.png"}]
await channel._publish_ws_inbound(frame, "hello", files=files)
channel._ws_client.reply_stream.assert_awaited_once_with(frame, "stream-1", "Working on it...", False)
bus.publish_inbound.assert_awaited_once()
inbound = bus.publish_inbound.await_args.args[0]
assert inbound.channel_name == "wecom"
assert inbound.chat_id == "user-1"
assert inbound.user_id == "user-1"
assert inbound.text == "hello"
assert inbound.thread_ts == "msg-1"
assert inbound.topic_id == "user-1"
assert inbound.files == files
assert inbound.metadata == {"aibotid": "bot-1", "chattype": "single"}
assert channel._ws_frames["msg-1"] is frame
assert channel._ws_stream_ids["msg-1"] == "stream-1"
_run(go())
def test_publish_ws_inbound_uses_configured_working_message(self, monkeypatch):
from app.channels.wecom import WeComChannel
async def go():
bus = MessageBus()
bus.publish_inbound = AsyncMock()
channel = WeComChannel(bus, config={"working_message": "Please wait..."})
channel._ws_client = SimpleNamespace(reply_stream=AsyncMock())
channel._working_message = "Please wait..."
monkeypatch.setitem(
__import__("sys").modules,
"aibot",
SimpleNamespace(generate_req_id=lambda prefix: "stream-1"),
)
frame = {
"body": {
"msgid": "msg-1",
"from": {"userid": "user-1"},
}
}
await channel._publish_ws_inbound(frame, "hello")
channel._ws_client.reply_stream.assert_awaited_once_with(frame, "stream-1", "Please wait...", False)
_run(go())
def test_on_outbound_sends_attachment_before_clearing_context(self, tmp_path):
from app.channels.wecom import WeComChannel
async def go():
bus = MessageBus()
channel = WeComChannel(bus, config={})
frame = {"body": {"msgid": "msg-1"}}
ws_client = SimpleNamespace(
reply_stream=AsyncMock(),
reply=AsyncMock(),
)
channel._ws_client = ws_client
channel._ws_frames["msg-1"] = frame
channel._ws_stream_ids["msg-1"] = "stream-1"
channel._upload_media_ws = AsyncMock(return_value="media-1")
attachment_path = tmp_path / "image.png"
attachment_path.write_bytes(b"png")
attachment = ResolvedAttachment(
virtual_path="/mnt/user-data/outputs/image.png",
actual_path=attachment_path,
filename="image.png",
mime_type="image/png",
size=attachment_path.stat().st_size,
is_image=True,
)
msg = OutboundMessage(
channel_name="wecom",
chat_id="user-1",
thread_id="thread-1",
text="done",
attachments=[attachment],
is_final=True,
thread_ts="msg-1",
)
await channel._on_outbound(msg)
ws_client.reply_stream.assert_awaited_once_with(frame, "stream-1", "done", True)
channel._upload_media_ws.assert_awaited_once_with(
media_type="image",
filename="image.png",
path=str(attachment_path),
size=attachment.size,
)
ws_client.reply.assert_awaited_once_with(frame, {"image": {"media_id": "media-1"}, "msgtype": "image"})
assert "msg-1" not in channel._ws_frames
assert "msg-1" not in channel._ws_stream_ids
_run(go())
def test_send_falls_back_to_send_message_without_thread_context(self):
from app.channels.wecom import WeComChannel
async def go():
bus = MessageBus()
channel = WeComChannel(bus, config={})
channel._ws_client = SimpleNamespace(send_message=AsyncMock())
msg = OutboundMessage(
channel_name="wecom",
chat_id="user-1",
thread_id="thread-1",
text="hello",
thread_ts=None,
)
await channel.send(msg)
channel._ws_client.send_message.assert_awaited_once_with(
"user-1",
{"msgtype": "markdown", "markdown": {"content": "hello"}},
)
_run(go())
class TestChannelService: class TestChannelService:
def test_get_status_no_channels(self): def test_get_status_no_channels(self):
from app.channels.service import ChannelService from app.channels.service import ChannelService

30
backend/uv.lock generated
View File

@ -681,6 +681,7 @@ dependencies = [
{ name = "slack-sdk" }, { name = "slack-sdk" },
{ name = "sse-starlette" }, { name = "sse-starlette" },
{ name = "uvicorn", extra = ["standard"] }, { name = "uvicorn", extra = ["standard"] },
{ name = "wecom-aibot-python-sdk" },
] ]
[package.dev-dependencies] [package.dev-dependencies]
@ -702,6 +703,7 @@ requires-dist = [
{ name = "slack-sdk", specifier = ">=3.33.0" }, { name = "slack-sdk", specifier = ">=3.33.0" },
{ name = "sse-starlette", specifier = ">=2.1.0" }, { name = "sse-starlette", specifier = ">=2.1.0" },
{ name = "uvicorn", extras = ["standard"], specifier = ">=0.34.0" }, { name = "uvicorn", extras = ["standard"], specifier = ">=0.34.0" },
{ name = "wecom-aibot-python-sdk", specifier = ">=0.1.6" },
] ]
[package.metadata.requires-dev] [package.metadata.requires-dev]
@ -2906,6 +2908,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a6/53/d78dc063216e62fc55f6b2eebb447f6a4b0a59f55c8406376f76bf959b08/pydub-0.25.1-py2.py3-none-any.whl", hash = "sha256:65617e33033874b59d87db603aa1ed450633288aefead953b30bded59cb599a6", size = 32327, upload-time = "2021-03-10T02:09:53.503Z" }, { url = "https://files.pythonhosted.org/packages/a6/53/d78dc063216e62fc55f6b2eebb447f6a4b0a59f55c8406376f76bf959b08/pydub-0.25.1-py2.py3-none-any.whl", hash = "sha256:65617e33033874b59d87db603aa1ed450633288aefead953b30bded59cb599a6", size = 32327, upload-time = "2021-03-10T02:09:53.503Z" },
] ]
[[package]]
name = "pyee"
version = "13.0.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/8b/04/e7c1fe4dc78a6fdbfd6c337b1c3732ff543b8a397683ab38378447baa331/pyee-13.0.1.tar.gz", hash = "sha256:0b931f7c14535667ed4c7e0d531716368715e860b988770fc7eb8578d1f67fc8", size = 31655, upload-time = "2026-02-14T21:12:28.044Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a0/c4/b4d4827c93ef43c01f599ef31453ccc1c132b353284fc6c87d535c233129/pyee-13.0.1-py3-none-any.whl", hash = "sha256:af2f8fede4171ef667dfded53f96e2ed0d6e6bd7ee3bb46437f77e3b57689228", size = 15659, upload-time = "2026-02-14T21:12:26.263Z" },
]
[[package]] [[package]]
name = "pygments" name = "pygments"
version = "2.19.2" version = "2.19.2"
@ -3884,6 +3898,22 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/6f/28/258ebab549c2bf3e64d2b0217b973467394a9cea8c42f70418ca2c5d0d2e/websockets-16.0-py3-none-any.whl", hash = "sha256:1637db62fad1dc833276dded54215f2c7fa46912301a24bd94d45d46a011ceec", size = 171598, upload-time = "2026-01-10T09:23:45.395Z" }, { url = "https://files.pythonhosted.org/packages/6f/28/258ebab549c2bf3e64d2b0217b973467394a9cea8c42f70418ca2c5d0d2e/websockets-16.0-py3-none-any.whl", hash = "sha256:1637db62fad1dc833276dded54215f2c7fa46912301a24bd94d45d46a011ceec", size = 171598, upload-time = "2026-01-10T09:23:45.395Z" },
] ]
[[package]]
name = "wecom-aibot-python-sdk"
version = "1.0.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiohttp" },
{ name = "certifi" },
{ name = "cryptography" },
{ name = "pyee" },
{ name = "websockets" },
]
sdist = { url = "https://files.pythonhosted.org/packages/23/b4/df93b46006e5c1900703aefa59004e6d524a4e73ba56ae73bcce24ff4184/wecom_aibot_python_sdk-1.0.2.tar.gz", hash = "sha256:f8cd9920c0b6cb88bf8a50742fca1e834e5c49e06c3ae861d0f128672c17697b", size = 31706, upload-time = "2026-03-23T07:44:53.949Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ee/39/f2fab475f15d5bf596c4fa998ddd321b1400bcc6ae2e73d3e935db939379/wecom_aibot_python_sdk-1.0.2-py3-none-any.whl", hash = "sha256:03df207c72021157506647cd9f4ee51b865a7f37d3b5df7f7af1b1c7e677db84", size = 23228, upload-time = "2026-03-23T07:44:52.555Z" },
]
[[package]] [[package]]
name = "wrapt" name = "wrapt"
version = "1.17.3" version = "1.17.3"

View File

@ -232,7 +232,6 @@ models:
# supports_vision: true # supports_vision: true
# supports_thinking: true # supports_thinking: true
# Example: OpenRouter (OpenAI-compatible) # Example: OpenRouter (OpenAI-compatible)
# OpenRouter models use the same ChatOpenAI + base_url pattern as other OpenAI-compatible gateways. # OpenRouter models use the same ChatOpenAI + base_url pattern as other OpenAI-compatible gateways.
# - name: openrouter-gemini-2.5-flash # - name: openrouter-gemini-2.5-flash
@ -687,6 +686,10 @@ checkpointer:
# context: # context:
# thinking_enabled: true # thinking_enabled: true
# subagent_enabled: true # subagent_enabled: true
# wecom:
# enabled: false
# bot_id: $WECOM_BOT_ID
# bot_secret: $WECOM_BOT_SECRET
# ============================================================================ # ============================================================================
# Guardrails Configuration # Guardrails Configuration