diff --git a/.env.example b/.env.example index 59b93dd1b..c4eb63c91 100644 --- a/.env.example +++ b/.env.example @@ -32,3 +32,5 @@ INFOQUEST_API_KEY=your-infoquest-api-key # GitHub API Token # GITHUB_TOKEN=your-github-token +# WECOM_BOT_ID=your-wecom-bot-id +# WECOM_BOT_SECRET=your-wecom-bot-secret diff --git a/README.md b/README.md index 48d56070b..1c1f6dcdf 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,8 @@ DeerFlow has newly integrated the intelligent search and crawling toolset indepe - [Embedded Python Client](#embedded-python-client) - [Documentation](#documentation) - [⚠️ Security Notice](#️-security-notice) + - [Improper Deployment May Introduce Security Risks](#improper-deployment-may-introduce-security-risks) + - [Security Recommendations](#security-recommendations) - [Contributing](#contributing) - [License](#license) - [Acknowledgments](#acknowledgments) @@ -305,6 +307,7 @@ DeerFlow supports receiving tasks from messaging apps. Channels auto-start when | Telegram | Bot API (long-polling) | Easy | | Slack | Socket Mode | Moderate | | Feishu / Lark | WebSocket | Moderate | +| WeCom | WebSocket | Moderate | **Configuration in `config.yaml`:** @@ -332,6 +335,11 @@ channels: # domain: https://open.feishu.cn # China (default) # domain: https://open.larksuite.com # International + wecom: + enabled: true + bot_id: $WECOM_BOT_ID + bot_secret: $WECOM_BOT_SECRET + slack: enabled: true bot_token: $SLACK_BOT_TOKEN # xoxb-... @@ -375,6 +383,10 @@ SLACK_APP_TOKEN=xapp-... # Feishu / Lark FEISHU_APP_ID=cli_xxxx FEISHU_APP_SECRET=your_app_secret + +# WeCom +WECOM_BOT_ID=your_bot_id +WECOM_BOT_SECRET=your_bot_secret ``` **Telegram Setup** @@ -397,6 +409,14 @@ FEISHU_APP_SECRET=your_app_secret 3. Under **Events**, subscribe to `im.message.receive_v1` and select **Long Connection** mode. 4. Copy the App ID and App Secret. Set `FEISHU_APP_ID` and `FEISHU_APP_SECRET` in `.env` and enable the channel in `config.yaml`. +**WeCom Setup** + +1. Create a bot on the WeCom AI Bot platform and obtain the `bot_id` and `bot_secret`. +2. Enable `channels.wecom` in `config.yaml` and fill in `bot_id` / `bot_secret`. +3. Set `WECOM_BOT_ID` and `WECOM_BOT_SECRET` in `.env`. +4. Make sure backend dependencies include `wecom-aibot-python-sdk`. The channel uses a WebSocket long connection and does not require a public callback URL. +5. The current integration supports inbound text, image, and file messages. Final images/files generated by the agent are also sent back to the WeCom conversation. + When DeerFlow runs in Docker Compose, IM channels execute inside the `gateway` container. In that case, do not point `channels.langgraph_url` or `channels.gateway_url` at `localhost`; use container service names such as `http://langgraph:2024` and `http://gateway:8001`, or set `DEER_FLOW_CHANNELS_LANGGRAPH_URL` and `DEER_FLOW_CHANNELS_GATEWAY_URL`. **Commands** diff --git a/README_zh.md b/README_zh.md index cbb3b5601..9b832eb0d 100644 --- a/README_zh.md +++ b/README_zh.md @@ -232,6 +232,7 @@ DeerFlow 支持从即时通讯应用接收任务。只要配置完成,对应 | Telegram | Bot API(long-polling) | 简单 | | Slack | Socket Mode | 中等 | | Feishu / Lark | WebSocket | 中等 | +| 企业微信智能机器人 | WebSocket | 中等 | **`config.yaml` 中的配置示例:** @@ -259,6 +260,11 @@ channels: # domain: https://open.feishu.cn # 国内版(默认) # domain: https://open.larksuite.com # 国际版 + wecom: + enabled: true + bot_id: $WECOM_BOT_ID + bot_secret: $WECOM_BOT_SECRET + slack: enabled: true bot_token: $SLACK_BOT_TOKEN # xoxb-... @@ -302,6 +308,10 @@ SLACK_APP_TOKEN=xapp-... # Feishu / Lark FEISHU_APP_ID=cli_xxxx FEISHU_APP_SECRET=your_app_secret + +# 企业微信智能机器人 +WECOM_BOT_ID=your_bot_id +WECOM_BOT_SECRET=your_bot_secret ``` **Telegram 配置** @@ -324,6 +334,14 @@ FEISHU_APP_SECRET=your_app_secret 3. 在 **事件订阅** 中订阅 `im.message.receive_v1`,连接方式选择 **长连接**。 4. 复制 App ID 和 App Secret,在 `.env` 中设置 `FEISHU_APP_ID` 和 `FEISHU_APP_SECRET`,并在 `config.yaml` 中启用该渠道。 +**企业微信智能机器人配置** + +1. 在企业微信智能机器人平台创建机器人,获取 `bot_id` 和 `bot_secret`。 +2. 在 `config.yaml` 中启用 `channels.wecom`,并填入 `bot_id` / `bot_secret`。 +3. 在 `.env` 中设置 `WECOM_BOT_ID` 和 `WECOM_BOT_SECRET`。 +4. 安装后端依赖时确保包含 `wecom-aibot-python-sdk`,渠道会通过 WebSocket 长连接接收消息,无需公网回调地址。 +5. 当前支持文本、图片和文件入站消息;agent 生成的最终图片/文件也会回传到企业微信会话中。 + **命令** 渠道连接完成后,你可以直接在聊天窗口里和 DeerFlow 交互: diff --git a/backend/app/channels/manager.py b/backend/app/channels/manager.py index ab63100fa..96e9c755e 100644 --- a/backend/app/channels/manager.py +++ b/backend/app/channels/manager.py @@ -7,9 +7,10 @@ import logging import mimetypes import re import time -from collections.abc import Mapping +from collections.abc import Awaitable, Callable, Mapping from typing import Any +import httpx from langgraph_sdk.errors import ConflictError from app.channels.commands import KNOWN_CHANNEL_COMMANDS @@ -36,8 +37,49 @@ CHANNEL_CAPABILITIES = { "feishu": {"supports_streaming": True}, "slack": {"supports_streaming": False}, "telegram": {"supports_streaming": False}, + "wecom": {"supports_streaming": True}, } +InboundFileReader = Callable[[dict[str, Any], httpx.AsyncClient], Awaitable[bytes | None]] + + +INBOUND_FILE_READERS: dict[str, InboundFileReader] = {} + + +def register_inbound_file_reader(channel_name: str, reader: InboundFileReader) -> None: + INBOUND_FILE_READERS[channel_name] = reader + + +async def _read_http_inbound_file(file_info: dict[str, Any], client: httpx.AsyncClient) -> bytes | None: + url = file_info.get("url") + if not isinstance(url, str) or not url: + return None + + resp = await client.get(url) + resp.raise_for_status() + return resp.content + + +async def _read_wecom_inbound_file(file_info: dict[str, Any], client: httpx.AsyncClient) -> bytes | None: + data = await _read_http_inbound_file(file_info, client) + if data is None: + return None + + aeskey = file_info.get("aeskey") if isinstance(file_info.get("aeskey"), str) else None + if not aeskey: + return data + + try: + from aibot.crypto_utils import decrypt_file + except Exception: + logger.exception("[Manager] failed to import WeCom decrypt_file") + return None + + return decrypt_file(data, aeskey) + + +register_inbound_file_reader("wecom", _read_wecom_inbound_file) + class InvalidChannelSessionConfigError(ValueError): """Raised when IM channel session overrides contain invalid agent config.""" @@ -342,6 +384,105 @@ def _prepare_artifact_delivery( return response_text, attachments +async def _ingest_inbound_files(thread_id: str, msg: InboundMessage) -> list[dict[str, Any]]: + if not msg.files: + return [] + + from deerflow.uploads.manager import claim_unique_filename, ensure_uploads_dir, normalize_filename + + uploads_dir = ensure_uploads_dir(thread_id) + seen_names = {entry.name for entry in uploads_dir.iterdir() if entry.is_file()} + + created: list[dict[str, Any]] = [] + file_reader = INBOUND_FILE_READERS.get(msg.channel_name, _read_http_inbound_file) + async with httpx.AsyncClient(timeout=httpx.Timeout(20.0)) as client: + for idx, f in enumerate(msg.files): + if not isinstance(f, dict): + continue + + ftype = f.get("type") if isinstance(f.get("type"), str) else "file" + filename = f.get("filename") if isinstance(f.get("filename"), str) else "" + + try: + data = await file_reader(f, client) + except Exception: + logger.exception( + "[Manager] failed to read inbound file: channel=%s, file=%s", + msg.channel_name, + f.get("url") or filename or idx, + ) + continue + + if data is None: + logger.warning( + "[Manager] inbound file reader returned no data: channel=%s, file=%s", + msg.channel_name, + f.get("url") or filename or idx, + ) + continue + + if not filename: + ext = ".bin" + if ftype == "image": + ext = ".png" + filename = f"{msg.thread_ts or 'msg'}_{idx}{ext}" + + try: + safe_name = claim_unique_filename(normalize_filename(filename), seen_names) + except ValueError: + logger.warning( + "[Manager] skipping inbound file with unsafe filename: channel=%s, file=%r", + msg.channel_name, + filename, + ) + continue + + dest = uploads_dir / safe_name + try: + dest.write_bytes(data) + except Exception: + logger.exception("[Manager] failed to write inbound file: %s", dest) + continue + + created.append( + { + "filename": safe_name, + "size": len(data), + "path": f"/mnt/user-data/uploads/{safe_name}", + "is_image": ftype == "image", + } + ) + + return created + + +def _format_uploaded_files_block(files: list[dict[str, Any]]) -> str: + lines = [ + "", + "The following files were uploaded in this message:", + "", + ] + if not files: + lines.append("(empty)") + else: + for f in files: + filename = f.get("filename", "") + size = int(f.get("size") or 0) + size_kb = size / 1024 if size else 0 + size_str = f"{size_kb:.1f} KB" if size_kb < 1024 else f"{size_kb / 1024:.1f} MB" + path = f.get("path", "") + is_image = bool(f.get("is_image")) + file_kind = "image" if is_image else "file" + lines.append(f"- {filename} ({size_str})") + lines.append(f" Type: {file_kind}") + lines.append(f" Path: {path}") + lines.append("") + lines.append("Use `read_file` for text-based files and documents.") + lines.append("Use `view_image` for image files (jpg, jpeg, png, webp) so the model can inspect the image content.") + lines.append("") + return "\n".join(lines) + + class ChannelManager: """Core dispatcher that bridges IM channels to the DeerFlow agent. @@ -536,6 +677,11 @@ class ChannelManager: assistant_id, run_config, run_context = self._resolve_run_params(msg, thread_id) if extra_context: run_context.update(extra_context) + + uploaded = await _ingest_inbound_files(thread_id, msg) + if uploaded: + msg.text = f"{_format_uploaded_files_block(uploaded)}\n\n{msg.text}".strip() + if self._channel_supports_streaming(msg.channel_name): await self._handle_streaming_chat( client, diff --git a/backend/app/channels/service.py b/backend/app/channels/service.py index 5c4b0d252..672b67295 100644 --- a/backend/app/channels/service.py +++ b/backend/app/channels/service.py @@ -17,6 +17,7 @@ _CHANNEL_REGISTRY: dict[str, str] = { "feishu": "app.channels.feishu:FeishuChannel", "slack": "app.channels.slack:SlackChannel", "telegram": "app.channels.telegram:TelegramChannel", + "wecom": "app.channels.wecom:WeComChannel", } _CHANNELS_LANGGRAPH_URL_ENV = "DEER_FLOW_CHANNELS_LANGGRAPH_URL" diff --git a/backend/app/channels/wecom.py b/backend/app/channels/wecom.py new file mode 100644 index 000000000..5a8948bd4 --- /dev/null +++ b/backend/app/channels/wecom.py @@ -0,0 +1,394 @@ +from __future__ import annotations + +import asyncio +import base64 +import hashlib +import logging +from collections.abc import Awaitable, Callable +from typing import Any, cast + +from app.channels.base import Channel +from app.channels.message_bus import ( + InboundMessageType, + MessageBus, + OutboundMessage, + ResolvedAttachment, +) + +logger = logging.getLogger(__name__) + + +class WeComChannel(Channel): + def __init__(self, bus: MessageBus, config: dict[str, Any]) -> None: + super().__init__(name="wecom", bus=bus, config=config) + self._bot_id: str | None = None + self._bot_secret: str | None = None + self._ws_client = None + self._ws_task: asyncio.Task | None = None + self._ws_frames: dict[str, dict[str, Any]] = {} + self._ws_stream_ids: dict[str, str] = {} + self._working_message = "Working on it..." + + def _clear_ws_context(self, thread_ts: str | None) -> None: + if not thread_ts: + return + self._ws_frames.pop(thread_ts, None) + self._ws_stream_ids.pop(thread_ts, None) + + async def _send_ws_upload_command(self, req_id: str, body: dict[str, Any], cmd: str) -> dict[str, Any]: + if not self._ws_client: + raise RuntimeError("WeCom WebSocket client is not available") + + ws_manager = getattr(self._ws_client, "_ws_manager", None) + send_reply = getattr(ws_manager, "send_reply", None) + if not callable(send_reply): + raise RuntimeError("Installed wecom-aibot-python-sdk does not expose the WebSocket media upload API expected by DeerFlow. Use wecom-aibot-python-sdk==0.1.6 or update the adapter.") + + send_reply_async = cast(Callable[[str, dict[str, Any], str], Awaitable[dict[str, Any]]], send_reply) + return await send_reply_async(req_id, body, cmd) + + async def start(self) -> None: + if self._running: + return + + bot_id = self.config.get("bot_id") + bot_secret = self.config.get("bot_secret") + working_message = self.config.get("working_message") + + self._bot_id = bot_id if isinstance(bot_id, str) and bot_id else None + self._bot_secret = bot_secret if isinstance(bot_secret, str) and bot_secret else None + self._working_message = working_message if isinstance(working_message, str) and working_message else "Working on it..." + + if not self._bot_id or not self._bot_secret: + logger.error("WeCom channel requires bot_id and bot_secret") + return + + try: + from aibot import WSClient, WSClientOptions + except ImportError: + logger.error("wecom-aibot-python-sdk is not installed. Install it with: uv add wecom-aibot-python-sdk") + return + else: + self._ws_client = WSClient(WSClientOptions(bot_id=self._bot_id, secret=self._bot_secret, logger=logger)) + self._ws_client.on("message.text", self._on_ws_text) + self._ws_client.on("message.mixed", self._on_ws_mixed) + self._ws_client.on("message.image", self._on_ws_image) + self._ws_client.on("message.file", self._on_ws_file) + self._ws_task = asyncio.create_task(self._ws_client.connect()) + + self._running = True + self.bus.subscribe_outbound(self._on_outbound) + logger.info("WeCom channel started") + + async def stop(self) -> None: + self._running = False + self.bus.unsubscribe_outbound(self._on_outbound) + if self._ws_task: + try: + self._ws_task.cancel() + except Exception: + pass + self._ws_task = None + if self._ws_client: + try: + self._ws_client.disconnect() + except Exception: + pass + self._ws_client = None + self._ws_frames.clear() + self._ws_stream_ids.clear() + logger.info("WeCom channel stopped") + + async def send(self, msg: OutboundMessage, *, _max_retries: int = 3) -> None: + if self._ws_client: + await self._send_ws(msg, _max_retries=_max_retries) + return + logger.warning("[WeCom] send called but WebSocket client is not available") + + async def _on_outbound(self, msg: OutboundMessage) -> None: + if msg.channel_name != self.name: + return + + try: + await self.send(msg) + except Exception: + logger.exception("Failed to send outbound message on channel %s", self.name) + if msg.is_final: + self._clear_ws_context(msg.thread_ts) + return + + for attachment in msg.attachments: + try: + success = await self.send_file(msg, attachment) + if not success: + logger.warning("[%s] file upload skipped for %s", self.name, attachment.filename) + except Exception: + logger.exception("[%s] failed to upload file %s", self.name, attachment.filename) + + if msg.is_final: + self._clear_ws_context(msg.thread_ts) + + async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool: + if not msg.is_final: + return True + if not self._ws_client: + return False + if not msg.thread_ts: + return False + frame = self._ws_frames.get(msg.thread_ts) + if not frame: + return False + + media_type = "image" if attachment.is_image else "file" + size_limit = 2 * 1024 * 1024 if attachment.is_image else 20 * 1024 * 1024 + if attachment.size > size_limit: + logger.warning( + "[WeCom] %s too large (%d bytes), skipping: %s", + media_type, + attachment.size, + attachment.filename, + ) + return False + + try: + media_id = await self._upload_media_ws( + media_type=media_type, + filename=attachment.filename, + path=str(attachment.actual_path), + size=attachment.size, + ) + if not media_id: + return False + + body = {media_type: {"media_id": media_id}, "msgtype": media_type} + await self._ws_client.reply(frame, body) + logger.debug("[WeCom] %s sent via ws: %s", media_type, attachment.filename) + return True + except Exception: + logger.exception("[WeCom] failed to upload/send file via ws: %s", attachment.filename) + return False + + async def _on_ws_text(self, frame: dict[str, Any]) -> None: + body = frame.get("body", {}) or {} + text = ((body.get("text") or {}).get("content") or "").strip() + quote = body.get("quote", {}).get("text", {}).get("content", "").strip() + if not text and not quote: + return + await self._publish_ws_inbound(frame, text + (f"\nQuote message: {quote}" if quote else "")) + + async def _on_ws_mixed(self, frame: dict[str, Any]) -> None: + body = frame.get("body", {}) or {} + mixed = body.get("mixed") or {} + items = mixed.get("msg_item") or [] + parts: list[str] = [] + files: list[dict[str, Any]] = [] + for item in items: + item_type = (item or {}).get("msgtype") + if item_type == "text": + content = (((item or {}).get("text") or {}).get("content") or "").strip() + if content: + parts.append(content) + elif item_type in ("image", "file"): + payload = (item or {}).get(item_type) or {} + url = payload.get("url") + aeskey = payload.get("aeskey") + if isinstance(url, str) and url: + files.append( + { + "type": item_type, + "url": url, + "aeskey": (aeskey if isinstance(aeskey, str) and aeskey else None), + } + ) + text = "\n\n".join(parts).strip() + if not text and not files: + return + if not text: + text = "(receive image/file)" + await self._publish_ws_inbound(frame, text, files=files) + + async def _on_ws_image(self, frame: dict[str, Any]) -> None: + body = frame.get("body", {}) or {} + image = body.get("image") or {} + url = image.get("url") + aeskey = image.get("aeskey") + if not isinstance(url, str) or not url: + return + await self._publish_ws_inbound( + frame, + "(receive image )", + files=[ + { + "type": "image", + "url": url, + "aeskey": aeskey if isinstance(aeskey, str) and aeskey else None, + } + ], + ) + + async def _on_ws_file(self, frame: dict[str, Any]) -> None: + body = frame.get("body", {}) or {} + file_obj = body.get("file") or {} + url = file_obj.get("url") + aeskey = file_obj.get("aeskey") + if not isinstance(url, str) or not url: + return + await self._publish_ws_inbound( + frame, + "(receive file)", + files=[ + { + "type": "file", + "url": url, + "aeskey": aeskey if isinstance(aeskey, str) and aeskey else None, + } + ], + ) + + async def _publish_ws_inbound( + self, + frame: dict[str, Any], + text: str, + *, + files: list[dict[str, Any]] | None = None, + ) -> None: + if not self._ws_client: + return + try: + from aibot import generate_req_id + except Exception: + return + + body = frame.get("body", {}) or {} + msg_id = body.get("msgid") + if not msg_id: + return + + user_id = (body.get("from") or {}).get("userid") + + inbound_type = InboundMessageType.COMMAND if text.startswith("/") else InboundMessageType.CHAT + inbound = self._make_inbound( + chat_id=user_id, # keep user's conversation in memory + user_id=user_id, + text=text, + msg_type=inbound_type, + thread_ts=msg_id, + files=files or [], + metadata={"aibotid": body.get("aibotid"), "chattype": body.get("chattype")}, + ) + inbound.topic_id = user_id # keep the same thread + + stream_id = generate_req_id("stream") + self._ws_frames[msg_id] = frame + self._ws_stream_ids[msg_id] = stream_id + + try: + await self._ws_client.reply_stream(frame, stream_id, self._working_message, False) + except Exception: + pass + + await self.bus.publish_inbound(inbound) + + async def _send_ws(self, msg: OutboundMessage, *, _max_retries: int = 3) -> None: + if not self._ws_client: + return + try: + from aibot import generate_req_id + except Exception: + generate_req_id = None + + if msg.thread_ts and msg.thread_ts in self._ws_frames: + frame = self._ws_frames[msg.thread_ts] + stream_id = self._ws_stream_ids.get(msg.thread_ts) + if not stream_id and generate_req_id: + stream_id = generate_req_id("stream") + self._ws_stream_ids[msg.thread_ts] = stream_id + if not stream_id: + return + + last_exc: Exception | None = None + for attempt in range(_max_retries): + try: + await self._ws_client.reply_stream(frame, stream_id, msg.text, bool(msg.is_final)) + return + except Exception as exc: + last_exc = exc + if attempt < _max_retries - 1: + await asyncio.sleep(2**attempt) + if last_exc: + raise last_exc + + body = {"msgtype": "markdown", "markdown": {"content": msg.text}} + last_exc = None + for attempt in range(_max_retries): + try: + await self._ws_client.send_message(msg.chat_id, body) + return + except Exception as exc: + last_exc = exc + if attempt < _max_retries - 1: + await asyncio.sleep(2**attempt) + if last_exc: + raise last_exc + + async def _upload_media_ws( + self, + *, + media_type: str, + filename: str, + path: str, + size: int, + ) -> str | None: + if not self._ws_client: + return None + try: + from aibot import generate_req_id + except Exception: + return None + + chunk_size = 512 * 1024 + total_chunks = (size + chunk_size - 1) // chunk_size + if total_chunks < 1 or total_chunks > 100: + logger.warning("[WeCom] invalid total_chunks=%d for %s", total_chunks, filename) + return None + + md5_hasher = hashlib.md5() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(1024 * 1024), b""): + md5_hasher.update(chunk) + md5 = md5_hasher.hexdigest() + + init_req_id = generate_req_id("aibot_upload_media_init") + init_body = { + "type": media_type, + "filename": filename, + "total_size": int(size), + "total_chunks": int(total_chunks), + "md5": md5, + } + init_ack = await self._send_ws_upload_command(init_req_id, init_body, "aibot_upload_media_init") + upload_id = (init_ack.get("body") or {}).get("upload_id") + if not upload_id: + logger.warning("[WeCom] upload init returned no upload_id: %s", init_ack) + return None + + with open(path, "rb") as f: + for idx in range(total_chunks): + data = f.read(chunk_size) + if not data: + break + chunk_req_id = generate_req_id("aibot_upload_media_chunk") + chunk_body = { + "upload_id": upload_id, + "chunk_index": int(idx), + "base64_data": base64.b64encode(data).decode("utf-8"), + } + await self._send_ws_upload_command(chunk_req_id, chunk_body, "aibot_upload_media_chunk") + + finish_req_id = generate_req_id("aibot_upload_media_finish") + finish_ack = await self._send_ws_upload_command(finish_req_id, {"upload_id": upload_id}, "aibot_upload_media_finish") + media_id = (finish_ack.get("body") or {}).get("media_id") + if not media_id: + logger.warning("[WeCom] upload finish returned no media_id: %s", finish_ack) + return None + return media_id diff --git a/backend/pyproject.toml b/backend/pyproject.toml index c94b46825..11d1065b3 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "python-telegram-bot>=21.0", "langgraph-sdk>=0.1.51", "markdown-to-mrkdwn>=0.3.1", + "wecom-aibot-python-sdk>=0.1.6", ] [dependency-groups] diff --git a/backend/tests/test_channels.py b/backend/tests/test_channels.py index fb58bcfb4..8e7546ded 100644 --- a/backend/tests/test_channels.py +++ b/backend/tests/test_channels.py @@ -12,7 +12,7 @@ from unittest.mock import AsyncMock, MagicMock import pytest from app.channels.base import Channel -from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage +from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment from app.channels.store import ChannelStore @@ -1718,6 +1718,159 @@ class TestFeishuChannel: _run(go()) +class TestWeComChannel: + def test_publish_ws_inbound_starts_stream_and_publishes_message(self, monkeypatch): + from app.channels.wecom import WeComChannel + + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = WeComChannel(bus, config={}) + channel._ws_client = SimpleNamespace(reply_stream=AsyncMock()) + + monkeypatch.setitem( + __import__("sys").modules, + "aibot", + SimpleNamespace(generate_req_id=lambda prefix: "stream-1"), + ) + + frame = { + "body": { + "msgid": "msg-1", + "from": {"userid": "user-1"}, + "aibotid": "bot-1", + "chattype": "single", + } + } + files = [{"type": "image", "url": "https://example.com/image.png"}] + + await channel._publish_ws_inbound(frame, "hello", files=files) + + channel._ws_client.reply_stream.assert_awaited_once_with(frame, "stream-1", "Working on it...", False) + bus.publish_inbound.assert_awaited_once() + + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.channel_name == "wecom" + assert inbound.chat_id == "user-1" + assert inbound.user_id == "user-1" + assert inbound.text == "hello" + assert inbound.thread_ts == "msg-1" + assert inbound.topic_id == "user-1" + assert inbound.files == files + assert inbound.metadata == {"aibotid": "bot-1", "chattype": "single"} + assert channel._ws_frames["msg-1"] is frame + assert channel._ws_stream_ids["msg-1"] == "stream-1" + + _run(go()) + + def test_publish_ws_inbound_uses_configured_working_message(self, monkeypatch): + from app.channels.wecom import WeComChannel + + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = WeComChannel(bus, config={"working_message": "Please wait..."}) + channel._ws_client = SimpleNamespace(reply_stream=AsyncMock()) + channel._working_message = "Please wait..." + + monkeypatch.setitem( + __import__("sys").modules, + "aibot", + SimpleNamespace(generate_req_id=lambda prefix: "stream-1"), + ) + + frame = { + "body": { + "msgid": "msg-1", + "from": {"userid": "user-1"}, + } + } + + await channel._publish_ws_inbound(frame, "hello") + + channel._ws_client.reply_stream.assert_awaited_once_with(frame, "stream-1", "Please wait...", False) + + _run(go()) + + def test_on_outbound_sends_attachment_before_clearing_context(self, tmp_path): + from app.channels.wecom import WeComChannel + + async def go(): + bus = MessageBus() + channel = WeComChannel(bus, config={}) + + frame = {"body": {"msgid": "msg-1"}} + ws_client = SimpleNamespace( + reply_stream=AsyncMock(), + reply=AsyncMock(), + ) + channel._ws_client = ws_client + channel._ws_frames["msg-1"] = frame + channel._ws_stream_ids["msg-1"] = "stream-1" + channel._upload_media_ws = AsyncMock(return_value="media-1") + + attachment_path = tmp_path / "image.png" + attachment_path.write_bytes(b"png") + attachment = ResolvedAttachment( + virtual_path="/mnt/user-data/outputs/image.png", + actual_path=attachment_path, + filename="image.png", + mime_type="image/png", + size=attachment_path.stat().st_size, + is_image=True, + ) + + msg = OutboundMessage( + channel_name="wecom", + chat_id="user-1", + thread_id="thread-1", + text="done", + attachments=[attachment], + is_final=True, + thread_ts="msg-1", + ) + + await channel._on_outbound(msg) + + ws_client.reply_stream.assert_awaited_once_with(frame, "stream-1", "done", True) + channel._upload_media_ws.assert_awaited_once_with( + media_type="image", + filename="image.png", + path=str(attachment_path), + size=attachment.size, + ) + ws_client.reply.assert_awaited_once_with(frame, {"image": {"media_id": "media-1"}, "msgtype": "image"}) + assert "msg-1" not in channel._ws_frames + assert "msg-1" not in channel._ws_stream_ids + + _run(go()) + + def test_send_falls_back_to_send_message_without_thread_context(self): + from app.channels.wecom import WeComChannel + + async def go(): + bus = MessageBus() + channel = WeComChannel(bus, config={}) + channel._ws_client = SimpleNamespace(send_message=AsyncMock()) + + msg = OutboundMessage( + channel_name="wecom", + chat_id="user-1", + thread_id="thread-1", + text="hello", + thread_ts=None, + ) + + await channel.send(msg) + + channel._ws_client.send_message.assert_awaited_once_with( + "user-1", + {"msgtype": "markdown", "markdown": {"content": "hello"}}, + ) + + _run(go()) + + class TestChannelService: def test_get_status_no_channels(self): from app.channels.service import ChannelService diff --git a/backend/uv.lock b/backend/uv.lock index 73e4d9694..7a40d5656 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -681,6 +681,7 @@ dependencies = [ { name = "slack-sdk" }, { name = "sse-starlette" }, { name = "uvicorn", extra = ["standard"] }, + { name = "wecom-aibot-python-sdk" }, ] [package.dev-dependencies] @@ -702,6 +703,7 @@ requires-dist = [ { name = "slack-sdk", specifier = ">=3.33.0" }, { name = "sse-starlette", specifier = ">=2.1.0" }, { name = "uvicorn", extras = ["standard"], specifier = ">=0.34.0" }, + { name = "wecom-aibot-python-sdk", specifier = ">=0.1.6" }, ] [package.metadata.requires-dev] @@ -2906,6 +2908,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a6/53/d78dc063216e62fc55f6b2eebb447f6a4b0a59f55c8406376f76bf959b08/pydub-0.25.1-py2.py3-none-any.whl", hash = "sha256:65617e33033874b59d87db603aa1ed450633288aefead953b30bded59cb599a6", size = 32327, upload-time = "2021-03-10T02:09:53.503Z" }, ] +[[package]] +name = "pyee" +version = "13.0.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/8b/04/e7c1fe4dc78a6fdbfd6c337b1c3732ff543b8a397683ab38378447baa331/pyee-13.0.1.tar.gz", hash = "sha256:0b931f7c14535667ed4c7e0d531716368715e860b988770fc7eb8578d1f67fc8", size = 31655, upload-time = "2026-02-14T21:12:28.044Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a0/c4/b4d4827c93ef43c01f599ef31453ccc1c132b353284fc6c87d535c233129/pyee-13.0.1-py3-none-any.whl", hash = "sha256:af2f8fede4171ef667dfded53f96e2ed0d6e6bd7ee3bb46437f77e3b57689228", size = 15659, upload-time = "2026-02-14T21:12:26.263Z" }, +] + [[package]] name = "pygments" version = "2.19.2" @@ -3884,6 +3898,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/6f/28/258ebab549c2bf3e64d2b0217b973467394a9cea8c42f70418ca2c5d0d2e/websockets-16.0-py3-none-any.whl", hash = "sha256:1637db62fad1dc833276dded54215f2c7fa46912301a24bd94d45d46a011ceec", size = 171598, upload-time = "2026-01-10T09:23:45.395Z" }, ] +[[package]] +name = "wecom-aibot-python-sdk" +version = "1.0.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiohttp" }, + { name = "certifi" }, + { name = "cryptography" }, + { name = "pyee" }, + { name = "websockets" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/23/b4/df93b46006e5c1900703aefa59004e6d524a4e73ba56ae73bcce24ff4184/wecom_aibot_python_sdk-1.0.2.tar.gz", hash = "sha256:f8cd9920c0b6cb88bf8a50742fca1e834e5c49e06c3ae861d0f128672c17697b", size = 31706, upload-time = "2026-03-23T07:44:53.949Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ee/39/f2fab475f15d5bf596c4fa998ddd321b1400bcc6ae2e73d3e935db939379/wecom_aibot_python_sdk-1.0.2-py3-none-any.whl", hash = "sha256:03df207c72021157506647cd9f4ee51b865a7f37d3b5df7f7af1b1c7e677db84", size = 23228, upload-time = "2026-03-23T07:44:52.555Z" }, +] + [[package]] name = "wrapt" version = "1.17.3" diff --git a/config.example.yaml b/config.example.yaml index 0ca11f418..f68a574e5 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -232,7 +232,6 @@ models: # supports_vision: true # supports_thinking: true - # Example: OpenRouter (OpenAI-compatible) # OpenRouter models use the same ChatOpenAI + base_url pattern as other OpenAI-compatible gateways. # - name: openrouter-gemini-2.5-flash @@ -687,6 +686,10 @@ checkpointer: # context: # thinking_enabled: true # subagent_enabled: true +# wecom: +# enabled: false +# bot_id: $WECOM_BOT_ID +# bot_secret: $WECOM_BOT_SECRET # ============================================================================ # Guardrails Configuration