diff --git a/README.md b/README.md index 6928ac412..f2ad090ed 100644 --- a/README.md +++ b/README.md @@ -368,6 +368,7 @@ DeerFlow supports receiving tasks from messaging apps. Channels auto-start when | Telegram | Bot API (long-polling) | Easy | | Slack | Socket Mode | Moderate | | Feishu / Lark | WebSocket | Moderate | +| WeChat | Tencent iLink (long-polling) | Moderate | | WeCom | WebSocket | Moderate | **Configuration in `config.yaml`:** @@ -412,6 +413,19 @@ channels: bot_token: $TELEGRAM_BOT_TOKEN allowed_users: [] # empty = allow all + wechat: + enabled: false + bot_token: $WECHAT_BOT_TOKEN + ilink_bot_id: $WECHAT_ILINK_BOT_ID + qrcode_login_enabled: true # optional: allow first-time QR bootstrap when bot_token is absent + allowed_users: [] # empty = allow all + polling_timeout: 35 + state_dir: ./.deer-flow/wechat/state + max_inbound_image_bytes: 20971520 + max_outbound_image_bytes: 20971520 + max_inbound_file_bytes: 52428800 + max_outbound_file_bytes: 52428800 + # Optional: per-channel / per-user session settings session: assistant_id: mobile-agent # custom agent names are also supported here @@ -445,6 +459,10 @@ SLACK_APP_TOKEN=xapp-... FEISHU_APP_ID=cli_xxxx FEISHU_APP_SECRET=your_app_secret +# WeChat iLink +WECHAT_BOT_TOKEN=your_ilink_bot_token +WECHAT_ILINK_BOT_ID=your_ilink_bot_id + # WeCom WECOM_BOT_ID=your_bot_id WECOM_BOT_SECRET=your_bot_secret @@ -470,6 +488,14 @@ WECOM_BOT_SECRET=your_bot_secret 3. Under **Events**, subscribe to `im.message.receive_v1` and select **Long Connection** mode. 4. Copy the App ID and App Secret. Set `FEISHU_APP_ID` and `FEISHU_APP_SECRET` in `.env` and enable the channel in `config.yaml`. +**WeChat Setup** + +1. Enable the `wechat` channel in `config.yaml`. +2. Either set `WECHAT_BOT_TOKEN` in `.env`, or set `qrcode_login_enabled: true` for first-time QR bootstrap. +3. When `bot_token` is absent and QR bootstrap is enabled, watch backend logs for the QR content returned by iLink and complete the binding flow. +4. After the QR flow succeeds, DeerFlow persists the acquired token under `state_dir` for later restarts. +5. For Docker Compose deployments, keep `state_dir` on a persistent volume so the `get_updates_buf` cursor and saved auth state survive restarts. + **WeCom Setup** 1. Create a bot on the WeCom AI Bot platform and obtain the `bot_id` and `bot_secret`. diff --git a/backend/app/channels/manager.py b/backend/app/channels/manager.py index 0d2a3a4ba..2410dcb64 100644 --- a/backend/app/channels/manager.py +++ b/backend/app/channels/manager.py @@ -8,6 +8,7 @@ import mimetypes import re import time from collections.abc import Awaitable, Callable, Mapping +from pathlib import Path from typing import Any import httpx @@ -37,6 +38,7 @@ CHANNEL_CAPABILITIES = { "feishu": {"supports_streaming": True}, "slack": {"supports_streaming": False}, "telegram": {"supports_streaming": False}, + "wechat": {"supports_streaming": False}, "wecom": {"supports_streaming": True}, } @@ -78,7 +80,24 @@ async def _read_wecom_inbound_file(file_info: dict[str, Any], client: httpx.Asyn return decrypt_file(data, aeskey) +async def _read_wechat_inbound_file(file_info: dict[str, Any], client: httpx.AsyncClient) -> bytes | None: + raw_path = file_info.get("path") + if isinstance(raw_path, str) and raw_path.strip(): + try: + return await asyncio.to_thread(Path(raw_path).read_bytes) + except OSError: + logger.exception("[Manager] failed to read WeChat inbound file from local path: %s", raw_path) + return None + + full_url = file_info.get("full_url") + if isinstance(full_url, str) and full_url.strip(): + return await _read_http_inbound_file({"url": full_url}, client) + + return None + + register_inbound_file_reader("wecom", _read_wecom_inbound_file) +register_inbound_file_reader("wechat", _read_wechat_inbound_file) class InvalidChannelSessionConfigError(ValueError): diff --git a/backend/app/channels/service.py b/backend/app/channels/service.py index 1906aef0b..046eab7ca 100644 --- a/backend/app/channels/service.py +++ b/backend/app/channels/service.py @@ -18,6 +18,7 @@ _CHANNEL_REGISTRY: dict[str, str] = { "feishu": "app.channels.feishu:FeishuChannel", "slack": "app.channels.slack:SlackChannel", "telegram": "app.channels.telegram:TelegramChannel", + "wechat": "app.channels.wechat:WechatChannel", "wecom": "app.channels.wecom:WeComChannel", } diff --git a/backend/app/channels/wechat.py b/backend/app/channels/wechat.py new file mode 100644 index 000000000..a8339c2e2 --- /dev/null +++ b/backend/app/channels/wechat.py @@ -0,0 +1,1370 @@ +"""WeChat channel — connects to iLink via long-polling.""" + +from __future__ import annotations + +import asyncio +import base64 +import binascii +import hashlib +import json +import logging +import mimetypes +import secrets +import time +from collections.abc import Mapping +from enum import IntEnum +from pathlib import Path +from typing import Any +from urllib.parse import quote + +import httpx +from cryptography.hazmat.primitives import padding +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +from app.channels.base import Channel +from app.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment + +logger = logging.getLogger(__name__) + + +class MessageItemType(IntEnum): + NONE = 0 + TEXT = 1 + IMAGE = 2 + VOICE = 3 + FILE = 4 + VIDEO = 5 + + +class UploadMediaType(IntEnum): + IMAGE = 1 + VIDEO = 2 + FILE = 3 + VOICE = 4 + + +def _build_ilink_client_version(version: str) -> str: + parts = [part.strip() for part in version.split(".")] + + def _part(index: int) -> int: + if index >= len(parts): + return 0 + try: + return max(0, min(int(parts[index] or 0), 0xFF)) + except ValueError: + return 0 + + major = _part(0) + minor = _part(1) + patch = _part(2) + return str((major << 16) | (minor << 8) | patch) + + +def _build_wechat_uin() -> str: + return base64.b64encode(str(secrets.randbits(32)).encode("utf-8")).decode("utf-8") + + +def _md5_hex(content: bytes) -> str: + return hashlib.md5(content).hexdigest() + + +def _encrypted_size_for_aes_128_ecb(plaintext_size: int) -> int: + if plaintext_size < 0: + raise ValueError("plaintext_size must be non-negative") + return ((plaintext_size // 16) + 1) * 16 + + +def _validate_aes_128_key(key: bytes) -> None: + if len(key) != 16: + raise ValueError("AES-128-ECB requires a 16-byte key") + + +def _encrypt_aes_128_ecb(content: bytes, key: bytes) -> bytes: + _validate_aes_128_key(key) + padder = padding.PKCS7(128).padder() + padded = padder.update(content) + padder.finalize() + cipher = Cipher(algorithms.AES(key), modes.ECB()) + encryptor = cipher.encryptor() + return encryptor.update(padded) + encryptor.finalize() + + +def _decrypt_aes_128_ecb(content: bytes, key: bytes) -> bytes: + _validate_aes_128_key(key) + cipher = Cipher(algorithms.AES(key), modes.ECB()) + decryptor = cipher.decryptor() + padded = decryptor.update(content) + decryptor.finalize() + unpadder = padding.PKCS7(128).unpadder() + return unpadder.update(padded) + unpadder.finalize() + + +def _safe_media_filename(prefix: str, extension: str, message_id: str | None = None, index: int | None = None) -> str: + safe_ext = extension if extension.startswith(".") else f".{extension}" if extension else "" + safe_msg = (message_id or "msg").replace("/", "_").replace("\\", "_") + suffix = f"-{index}" if index is not None else "" + return f"{prefix}-{safe_msg}{suffix}{safe_ext}" + + +def _build_cdn_upload_url(cdn_base_url: str, upload_param: str, filekey: str) -> str: + return f"{cdn_base_url.rstrip('/')}/upload?encrypted_query_param={quote(upload_param, safe='')}&filekey={quote(filekey, safe='')}" + + +def _encode_outbound_media_aes_key(aes_key: bytes) -> str: + return base64.b64encode(aes_key.hex().encode("utf-8")).decode("utf-8") + + +def _detect_image_extension_and_mime(content: bytes) -> tuple[str, str] | None: + if content.startswith(b"\x89PNG\r\n\x1a\n"): + return ".png", "image/png" + if content.startswith(b"\xff\xd8\xff"): + return ".jpg", "image/jpeg" + if content.startswith((b"GIF87a", b"GIF89a")): + return ".gif", "image/gif" + if len(content) >= 12 and content.startswith(b"RIFF") and content[8:12] == b"WEBP": + return ".webp", "image/webp" + if content.startswith(b"BM"): + return ".bmp", "image/bmp" + return None + + +class WechatChannel(Channel): + """WeChat iLink bot channel using long-polling. + + Configuration keys (in ``config.yaml`` under ``channels.wechat``): + - ``bot_token``: iLink bot token used for authenticated API calls. + - ``qrcode_login_enabled``: (optional) Allow first-time QR bootstrap when ``bot_token`` is missing. + - ``base_url``: (optional) iLink API base URL. + - ``allowed_users``: (optional) List of allowed iLink user IDs. Empty = allow all. + - ``polling_timeout``: (optional) Long-poll timeout in seconds. Default: 35. + - ``state_dir``: (optional) Directory used to persist the long-poll cursor. + """ + + DEFAULT_BASE_URL = "https://ilinkai.weixin.qq.com" + DEFAULT_CDN_BASE_URL = "https://novac2c.cdn.weixin.qq.com/c2c" + DEFAULT_CHANNEL_VERSION = "1.0" + DEFAULT_POLLING_TIMEOUT = 35.0 + DEFAULT_RETRY_DELAY = 5.0 + DEFAULT_QRCODE_POLL_INTERVAL = 2.0 + DEFAULT_QRCODE_POLL_TIMEOUT = 180.0 + DEFAULT_QRCODE_BOT_TYPE = 3 + DEFAULT_API_TIMEOUT = 15.0 + DEFAULT_CONFIG_TIMEOUT = 10.0 + DEFAULT_CDN_TIMEOUT = 30.0 + DEFAULT_IMAGE_DOWNLOAD_DIRNAME = "downloads" + DEFAULT_MAX_IMAGE_BYTES = 20 * 1024 * 1024 + DEFAULT_MAX_OUTBOUND_IMAGE_BYTES = 20 * 1024 * 1024 + DEFAULT_MAX_INBOUND_FILE_BYTES = 50 * 1024 * 1024 + DEFAULT_MAX_OUTBOUND_FILE_BYTES = 50 * 1024 * 1024 + DEFAULT_ALLOWED_FILE_EXTENSIONS = frozenset( + { + ".txt", + ".md", + ".pdf", + ".csv", + ".json", + ".yaml", + ".yml", + ".xml", + ".html", + ".log", + ".zip", + ".doc", + ".docx", + ".xls", + ".xlsx", + ".ppt", + ".pptx", + ".rtf", + ".py", + ".js", + ".ts", + ".tsx", + ".jsx", + ".java", + ".go", + ".rs", + ".c", + ".cpp", + ".h", + ".hpp", + ".sql", + ".sh", + ".bat", + ".ps1", + ".toml", + ".ini", + ".conf", + } + ) + DEFAULT_ALLOWED_FILE_MIME_TYPES = frozenset( + { + "application/pdf", + "application/json", + "application/xml", + "application/zip", + "application/x-zip-compressed", + "application/x-yaml", + "application/yaml", + "text/csv", + "application/msword", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/vnd.ms-excel", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "application/vnd.ms-powerpoint", + "application/vnd.openxmlformats-officedocument.presentationml.presentation", + "application/rtf", + } + ) + + def __init__(self, bus: MessageBus, config: dict[str, Any]) -> None: + super().__init__(name="wechat", bus=bus, config=config) + self._main_loop: asyncio.AbstractEventLoop | None = None + self._poll_task: asyncio.Task | None = None + self._client: httpx.AsyncClient | None = None + self._auth_lock = asyncio.Lock() + + self._base_url = str(config.get("base_url") or self.DEFAULT_BASE_URL).rstrip("/") + self._cdn_base_url = str(config.get("cdn_base_url") or self.DEFAULT_CDN_BASE_URL).rstrip("/") + self._channel_version = str(config.get("channel_version") or self.DEFAULT_CHANNEL_VERSION) + self._polling_timeout = self._coerce_float(config.get("polling_timeout"), self.DEFAULT_POLLING_TIMEOUT) + self._retry_delay = self._coerce_float(config.get("polling_retry_delay"), self.DEFAULT_RETRY_DELAY) + self._qrcode_poll_interval = self._coerce_float(config.get("qrcode_poll_interval"), self.DEFAULT_QRCODE_POLL_INTERVAL) + self._qrcode_poll_timeout = self._coerce_float(config.get("qrcode_poll_timeout"), self.DEFAULT_QRCODE_POLL_TIMEOUT) + self._qrcode_login_enabled = bool(config.get("qrcode_login_enabled", False)) + self._qrcode_bot_type = self._coerce_int(config.get("qrcode_bot_type"), self.DEFAULT_QRCODE_BOT_TYPE) + self._ilink_app_id = str(config.get("ilink_app_id") or "").strip() + self._route_tag = str(config.get("route_tag") or "").strip() + self._respect_server_longpoll_timeout = bool(config.get("respect_server_longpoll_timeout", True)) + self._max_inbound_image_bytes = self._coerce_int(config.get("max_inbound_image_bytes"), self.DEFAULT_MAX_IMAGE_BYTES) + self._max_outbound_image_bytes = self._coerce_int(config.get("max_outbound_image_bytes"), self.DEFAULT_MAX_OUTBOUND_IMAGE_BYTES) + self._max_inbound_file_bytes = self._coerce_int(config.get("max_inbound_file_bytes"), self.DEFAULT_MAX_INBOUND_FILE_BYTES) + self._max_outbound_file_bytes = self._coerce_int(config.get("max_outbound_file_bytes"), self.DEFAULT_MAX_OUTBOUND_FILE_BYTES) + self._allowed_file_extensions = self._coerce_str_set(config.get("allowed_file_extensions"), self.DEFAULT_ALLOWED_FILE_EXTENSIONS) + self._allowed_users: set[str] = {str(uid).strip() for uid in config.get("allowed_users", []) if str(uid).strip()} + self._bot_token = str(config.get("bot_token") or "").strip() + self._ilink_bot_id = str(config.get("ilink_bot_id") or "").strip() or None + self._auth_state: dict[str, Any] = {} + self._server_longpoll_timeout_seconds: float | None = None + + self._get_updates_buf = "" + self._context_tokens_by_chat: dict[str, str] = {} + self._context_tokens_by_thread: dict[str, str] = {} + + self._state_dir = self._resolve_state_dir(config.get("state_dir")) + self._cursor_path = self._state_dir / "wechat-getupdates.json" if self._state_dir else None + self._auth_path = self._state_dir / "wechat-auth.json" if self._state_dir else None + self._load_state() + + async def start(self) -> None: + if self._running: + return + + if not self._bot_token and not self._qrcode_login_enabled: + logger.error("WeChat channel requires bot_token or qrcode_login_enabled") + return + + self._main_loop = asyncio.get_running_loop() + if self._state_dir: + self._state_dir.mkdir(parents=True, exist_ok=True) + + await self._ensure_client() + self._running = True + self.bus.subscribe_outbound(self._on_outbound) + self._poll_task = self._main_loop.create_task(self._poll_loop()) + logger.info("WeChat channel started") + + async def stop(self) -> None: + self._running = False + self.bus.unsubscribe_outbound(self._on_outbound) + + if self._poll_task: + self._poll_task.cancel() + try: + await self._poll_task + except asyncio.CancelledError: + pass + self._poll_task = None + + if self._client is not None: + await self._client.aclose() + self._client = None + + logger.info("WeChat channel stopped") + + async def send(self, msg: OutboundMessage, *, _max_retries: int = 3) -> None: + text = msg.text.strip() + if not text: + return + + if not self._bot_token and not await self._ensure_authenticated(): + logger.warning("[WeChat] unable to authenticate before sending chat=%s", msg.chat_id) + return + + context_token = self._resolve_context_token(msg) + if not context_token: + logger.warning("[WeChat] missing context_token for chat=%s, dropping outbound message", msg.chat_id) + return + + await self._send_text_message( + chat_id=msg.chat_id, + context_token=context_token, + text=text, + client_id_prefix="deerflow", + max_retries=_max_retries, + ) + + async def _send_text_message( + self, + *, + chat_id: str, + context_token: str, + text: str, + client_id_prefix: str, + max_retries: int, + ) -> None: + payload = { + "msg": { + "from_user_id": "", + "to_user_id": chat_id, + "client_id": f"{client_id_prefix}_{int(time.time() * 1000)}_{secrets.token_hex(2)}", + "message_type": 2, + "message_state": 2, + "context_token": context_token, + "item_list": [ + { + "type": int(MessageItemType.TEXT), + "text_item": {"text": text}, + } + ], + }, + "base_info": self._base_info(), + } + + last_exc: Exception | None = None + for attempt in range(max_retries): + try: + data = await self._request_json("/ilink/bot/sendmessage", payload) + self._ensure_success(data, "sendmessage") + return + except Exception as exc: + last_exc = exc + if attempt < max_retries - 1: + delay = 2**attempt + logger.warning( + "[WeChat] send failed (attempt %d/%d), retrying in %ds: %s", + attempt + 1, + max_retries, + delay, + exc, + ) + await asyncio.sleep(delay) + + logger.error("[WeChat] send failed after %d attempts: %s", max_retries, last_exc) + raise last_exc # type: ignore[misc] + + async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool: + if attachment.is_image: + return await self._send_image_attachment(msg, attachment) + return await self._send_file_attachment(msg, attachment) + + async def _send_image_attachment(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool: + if self._max_outbound_image_bytes > 0 and attachment.size > self._max_outbound_image_bytes: + logger.warning("[WeChat] outbound image too large (%d bytes), skipping: %s", attachment.size, attachment.filename) + return False + + if not self._bot_token and not await self._ensure_authenticated(): + logger.warning("[WeChat] unable to authenticate before sending image chat=%s", msg.chat_id) + return False + + context_token = self._resolve_context_token(msg) + if not context_token: + logger.warning("[WeChat] missing context_token for image chat=%s", msg.chat_id) + return False + + try: + plaintext = attachment.actual_path.read_bytes() + except OSError: + logger.exception("[WeChat] failed to read outbound image %s", attachment.actual_path) + return False + + aes_key = secrets.token_bytes(16) + filekey = _safe_media_filename("wechat-upload", attachment.actual_path.suffix or ".bin", message_id=msg.thread_id) + upload_request = self._build_upload_request( + filekey=filekey, + media_type=UploadMediaType.IMAGE, + to_user_id=msg.chat_id, + plaintext=plaintext, + aes_key=aes_key, + no_need_thumb=True, + ) + + try: + upload_data = await self._request_json( + "/ilink/bot/getuploadurl", + { + **upload_request, + "base_info": self._base_info(), + }, + ) + self._ensure_success(upload_data, "getuploadurl") + + upload_full_url = self._extract_upload_full_url(upload_data) + upload_param = self._extract_upload_param(upload_data) + upload_method = "POST" + if not upload_full_url: + if not upload_param: + logger.warning("[WeChat] getuploadurl returned no upload URL for image %s", attachment.filename) + return False + upload_full_url = _build_cdn_upload_url(self._cdn_base_url, upload_param, filekey) + + encrypted = _encrypt_aes_128_ecb(plaintext, aes_key) + download_param = await self._upload_cdn_bytes( + upload_full_url, + encrypted, + content_type=attachment.mime_type, + method=upload_method, + ) + if download_param: + upload_data = dict(upload_data) + upload_data["upload_param"] = download_param + + image_item = self._build_outbound_image_item(upload_data, aes_key, ciphertext_size=len(encrypted)) + send_payload = { + "msg": { + "from_user_id": "", + "to_user_id": msg.chat_id, + "client_id": f"deerflow_img_{int(time.time() * 1000)}", + "message_type": 2, + "message_state": 2, + "context_token": context_token, + "item_list": [ + { + "type": int(MessageItemType.IMAGE), + "image_item": image_item, + } + ], + }, + "base_info": self._base_info(), + } + response = await self._request_json("/ilink/bot/sendmessage", send_payload) + self._ensure_success(response, "sendmessage") + return True + except Exception: + logger.exception("[WeChat] failed to send image attachment %s", attachment.filename) + return False + + async def _send_file_attachment(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool: + if not self._is_allowed_file_type(attachment.filename, attachment.mime_type): + logger.warning("[WeChat] outbound file type blocked, skipping: %s (%s)", attachment.filename, attachment.mime_type) + return False + + if self._max_outbound_file_bytes > 0 and attachment.size > self._max_outbound_file_bytes: + logger.warning("[WeChat] outbound file too large (%d bytes), skipping: %s", attachment.size, attachment.filename) + return False + + if not self._bot_token and not await self._ensure_authenticated(): + logger.warning("[WeChat] unable to authenticate before sending file chat=%s", msg.chat_id) + return False + + context_token = self._resolve_context_token(msg) + if not context_token: + logger.warning("[WeChat] missing context_token for file chat=%s", msg.chat_id) + return False + + try: + plaintext = attachment.actual_path.read_bytes() + except OSError: + logger.exception("[WeChat] failed to read outbound file %s", attachment.actual_path) + return False + + aes_key = secrets.token_bytes(16) + filekey = _safe_media_filename("wechat-file-upload", attachment.actual_path.suffix or ".bin", message_id=msg.thread_id) + upload_request = self._build_upload_request( + filekey=filekey, + media_type=UploadMediaType.FILE, + to_user_id=msg.chat_id, + plaintext=plaintext, + aes_key=aes_key, + no_need_thumb=True, + ) + + try: + upload_data = await self._request_json( + "/ilink/bot/getuploadurl", + { + **upload_request, + "base_info": self._base_info(), + }, + ) + self._ensure_success(upload_data, "getuploadurl") + + upload_full_url = self._extract_upload_full_url(upload_data) + upload_param = self._extract_upload_param(upload_data) + upload_method = "POST" + if not upload_full_url: + if not upload_param: + logger.warning("[WeChat] getuploadurl returned no upload URL for file %s", attachment.filename) + return False + upload_full_url = _build_cdn_upload_url(self._cdn_base_url, upload_param, filekey) + + encrypted = _encrypt_aes_128_ecb(plaintext, aes_key) + download_param = await self._upload_cdn_bytes( + upload_full_url, + encrypted, + content_type=attachment.mime_type, + method=upload_method, + ) + if download_param: + upload_data = dict(upload_data) + upload_data["upload_param"] = download_param + + file_item = self._build_outbound_file_item(upload_data, aes_key, attachment.filename, plaintext) + send_payload = { + "msg": { + "from_user_id": "", + "to_user_id": msg.chat_id, + "client_id": f"deerflow_file_{int(time.time() * 1000)}", + "message_type": 2, + "message_state": 2, + "context_token": context_token, + "item_list": [ + { + "type": int(MessageItemType.FILE), + "file_item": file_item, + } + ], + }, + "base_info": self._base_info(), + } + response = await self._request_json("/ilink/bot/sendmessage", send_payload) + self._ensure_success(response, "sendmessage") + return True + except Exception: + logger.exception("[WeChat] failed to send file attachment %s", attachment.filename) + return False + + async def _poll_loop(self) -> None: + while self._running: + try: + if not await self._ensure_authenticated(): + await asyncio.sleep(self._retry_delay) + continue + + data = await self._request_json( + "/ilink/bot/getupdates", + { + "get_updates_buf": self._get_updates_buf, + "base_info": self._base_info(), + }, + timeout=max(self._current_longpoll_timeout_seconds() + 5.0, 10.0), + ) + + ret = data.get("ret", 0) + if ret not in (0, None): + errcode = data.get("errcode") + if errcode == -14: + self._bot_token = "" + self._get_updates_buf = "" + self._save_state() + self._save_auth_state(status="expired", bot_token="") + logger.error("[WeChat] bot token expired; scan again or update bot_token and restart the channel") + self._running = False + break + logger.warning( + "[WeChat] getupdates returned ret=%s errcode=%s errmsg=%s", + ret, + errcode, + data.get("errmsg"), + ) + await asyncio.sleep(self._retry_delay) + continue + + self._update_longpoll_timeout(data) + + next_buf = data.get("get_updates_buf") + if isinstance(next_buf, str) and next_buf != self._get_updates_buf: + self._get_updates_buf = next_buf + self._save_state() + + for raw_message in data.get("msgs", []): + await self._handle_update(raw_message) + except asyncio.CancelledError: + raise + except Exception: + logger.exception("[WeChat] polling loop failed") + await asyncio.sleep(self._retry_delay) + + async def _handle_update(self, raw_message: Any) -> None: + if not isinstance(raw_message, dict): + return + if raw_message.get("message_type") != 1: + return + + chat_id = str(raw_message.get("from_user_id") or raw_message.get("ilink_user_id") or "").strip() + if not chat_id or not self._check_user(chat_id): + return + + text = self._extract_text(raw_message) + files = await self._extract_inbound_files(raw_message) + if not text and not files: + return + + context_token = str(raw_message.get("context_token") or "").strip() + thread_ts = context_token or str(raw_message.get("client_id") or raw_message.get("msg_id") or "").strip() or None + + if context_token: + self._context_tokens_by_chat[chat_id] = context_token + if thread_ts: + self._context_tokens_by_thread[thread_ts] = context_token + + inbound = self._make_inbound( + chat_id=chat_id, + user_id=chat_id, + text=text, + msg_type=InboundMessageType.COMMAND if text.startswith("/") else InboundMessageType.CHAT, + thread_ts=thread_ts, + files=files, + metadata={ + "context_token": context_token, + "ilink_user_id": chat_id, + "ref_msg": self._extract_ref_message(raw_message), + "raw_message": raw_message, + }, + ) + inbound.topic_id = None + await self.bus.publish_inbound(inbound) + + async def _ensure_authenticated(self) -> bool: + async with self._auth_lock: + if self._bot_token: + return True + + self._load_auth_state() + if self._bot_token: + return True + + if not self._qrcode_login_enabled: + return False + + try: + auth_state = await self._bind_via_qrcode() + except Exception: + logger.exception("[WeChat] QR code binding failed") + return False + return bool(auth_state.get("bot_token")) + + async def _bind_via_qrcode(self) -> dict[str, Any]: + qrcode_data = await self._request_public_get_json( + "/ilink/bot/get_bot_qrcode", + params={"bot_type": self._qrcode_bot_type}, + ) + qrcode = str(qrcode_data.get("qrcode") or "").strip() + if not qrcode: + raise RuntimeError("iLink get_bot_qrcode did not return qrcode") + + qrcode_img_content = str(qrcode_data.get("qrcode_img_content") or "").strip() + logger.warning("[WeChat] QR login required. qrcode=%s", qrcode) + if qrcode_img_content: + logger.warning("[WeChat] qrcode_img_content=%s", qrcode_img_content) + + self._save_auth_state( + status="pending", + qrcode=qrcode, + qrcode_img_content=qrcode_img_content or None, + ) + + deadline = time.monotonic() + max(self._qrcode_poll_timeout, 1.0) + while time.monotonic() < deadline: + status_data = await self._request_public_get_json( + "/ilink/bot/get_qrcode_status", + params={"qrcode": qrcode}, + ) + status = str(status_data.get("status") or "").strip().lower() + if status == "confirmed": + token = str(status_data.get("bot_token") or "").strip() + if not token: + raise RuntimeError("iLink QR confirmation succeeded without bot_token") + self._bot_token = token + ilink_bot_id = str(status_data.get("ilink_bot_id") or "").strip() or None + if ilink_bot_id: + self._ilink_bot_id = ilink_bot_id + + return self._save_auth_state( + status="confirmed", + bot_token=token, + ilink_bot_id=self._ilink_bot_id, + qrcode=qrcode, + qrcode_img_content=qrcode_img_content or None, + ) + + if status in {"expired", "canceled", "cancelled", "invalid", "failed"}: + self._save_auth_state( + status=status, + qrcode=qrcode, + qrcode_img_content=qrcode_img_content or None, + ) + raise RuntimeError(f"iLink QR code flow ended with status={status}") + + await asyncio.sleep(max(self._qrcode_poll_interval, 0.1)) + + self._save_auth_state( + status="timeout", + qrcode=qrcode, + qrcode_img_content=qrcode_img_content or None, + ) + raise TimeoutError("Timed out waiting for WeChat QR confirmation") + + async def _request_json(self, path: str, payload: dict[str, Any], *, timeout: float | None = None) -> dict[str, Any]: + client = await self._ensure_client() + response = await client.post( + f"{self._base_url}{path}", + json=payload, + headers=self._auth_headers(), + timeout=timeout or self.DEFAULT_API_TIMEOUT, + ) + response.raise_for_status() + data = response.json() + return data if isinstance(data, dict) else {} + + async def _request_public_get_json( + self, + path: str, + params: dict[str, Any] | None = None, + *, + timeout: float | None = None, + ) -> dict[str, Any]: + client = await self._ensure_client() + response = await client.get( + f"{self._base_url}{path}", + params=params, + headers=self._public_headers(), + timeout=timeout or self.DEFAULT_CONFIG_TIMEOUT, + ) + response.raise_for_status() + data = response.json() + return data if isinstance(data, dict) else {} + + async def _ensure_client(self) -> httpx.AsyncClient: + if self._client is None: + timeout = max(self._polling_timeout + 5.0, 10.0) + self._client = httpx.AsyncClient(timeout=timeout) + return self._client + + def _resolve_context_token(self, msg: OutboundMessage) -> str | None: + metadata_token = msg.metadata.get("context_token") + if isinstance(metadata_token, str) and metadata_token.strip(): + return metadata_token.strip() + if msg.thread_ts and msg.thread_ts in self._context_tokens_by_thread: + return self._context_tokens_by_thread[msg.thread_ts] + return self._context_tokens_by_chat.get(msg.chat_id) + + def _check_user(self, user_id: str) -> bool: + if not self._allowed_users: + return True + return user_id in self._allowed_users + + def _current_longpoll_timeout_seconds(self) -> float: + if self._respect_server_longpoll_timeout and self._server_longpoll_timeout_seconds is not None: + return self._server_longpoll_timeout_seconds + return self._polling_timeout + + def _update_longpoll_timeout(self, data: Mapping[str, Any]) -> None: + if not self._respect_server_longpoll_timeout: + return + raw_timeout = data.get("longpolling_timeout_ms") + if raw_timeout is None: + return + try: + timeout_ms = float(raw_timeout) + except (TypeError, ValueError): + return + if timeout_ms <= 0: + return + self._server_longpoll_timeout_seconds = timeout_ms / 1000.0 + + def _base_info(self) -> dict[str, str]: + return {"channel_version": self._channel_version} + + def _common_headers(self) -> dict[str, str]: + headers = { + "iLink-App-ClientVersion": _build_ilink_client_version(self._channel_version), + "X-WECHAT-UIN": _build_wechat_uin(), + } + if self._ilink_app_id: + headers["iLink-App-Id"] = self._ilink_app_id + if self._route_tag: + headers["SKRouteTag"] = self._route_tag + return headers + + def _public_headers(self) -> dict[str, str]: + return { + "Content-Type": "application/json", + **self._common_headers(), + } + + def _auth_headers(self) -> dict[str, str]: + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self._bot_token}", + "AuthorizationType": "ilink_bot_token", + **self._common_headers(), + } + return headers + + @staticmethod + def _extract_cdn_full_url(media: Mapping[str, Any] | None) -> str | None: + if not isinstance(media, Mapping): + return None + full_url = media.get("full_url") + return full_url.strip() if isinstance(full_url, str) and full_url.strip() else None + + @staticmethod + def _extract_upload_full_url(upload_data: Mapping[str, Any] | None) -> str | None: + if not isinstance(upload_data, Mapping): + return None + upload_full_url = upload_data.get("upload_full_url") + return upload_full_url.strip() if isinstance(upload_full_url, str) and upload_full_url.strip() else None + + @staticmethod + def _extract_upload_param(upload_data: Mapping[str, Any] | None) -> str | None: + if not isinstance(upload_data, Mapping): + return None + upload_param = upload_data.get("upload_param") + return upload_param.strip() if isinstance(upload_param, str) and upload_param.strip() else None + + def _build_upload_request( + self, + *, + filekey: str, + media_type: UploadMediaType, + to_user_id: str, + plaintext: bytes, + aes_key: bytes, + thumb_plaintext: bytes | None = None, + no_need_thumb: bool = False, + ) -> dict[str, Any]: + _validate_aes_128_key(aes_key) + payload: dict[str, Any] = { + "filekey": filekey, + "media_type": int(media_type), + "to_user_id": to_user_id, + "rawsize": len(plaintext), + "rawfilemd5": _md5_hex(plaintext), + "filesize": _encrypted_size_for_aes_128_ecb(len(plaintext)), + "aeskey": aes_key.hex(), + } + if thumb_plaintext is not None: + payload.update( + { + "thumb_rawsize": len(thumb_plaintext), + "thumb_rawfilemd5": _md5_hex(thumb_plaintext), + "thumb_filesize": _encrypted_size_for_aes_128_ecb(len(thumb_plaintext)), + } + ) + elif no_need_thumb: + payload["no_need_thumb"] = True + return payload + + async def _download_cdn_bytes(self, url: str, *, timeout: float | None = None) -> bytes: + client = await self._ensure_client() + response = await client.get(url, timeout=timeout or self.DEFAULT_CDN_TIMEOUT) + response.raise_for_status() + return response.content + + async def _upload_cdn_bytes( + self, + url: str, + content: bytes, + *, + content_type: str = "application/octet-stream", + timeout: float | None = None, + method: str = "PUT", + ) -> str | None: + client = await self._ensure_client() + request_kwargs = { + "content": content, + "headers": {"Content-Type": content_type}, + "timeout": timeout or self.DEFAULT_CDN_TIMEOUT, + } + if method.upper() == "POST": + response = await client.post(url, **request_kwargs) + else: + response = await client.put(url, **request_kwargs) + response.raise_for_status() + return response.headers.get("x-encrypted-param") + + def _build_outbound_image_item( + self, + upload_data: Mapping[str, Any], + aes_key: bytes, + *, + ciphertext_size: int, + ) -> dict[str, Any]: + encoded_aes_key = _encode_outbound_media_aes_key(aes_key) + media: dict[str, Any] = { + "aes_key": encoded_aes_key, + "encrypt_type": 1, + } + upload_param = upload_data.get("upload_param") + if isinstance(upload_param, str) and upload_param.strip(): + media["encrypt_query_param"] = upload_param.strip() + + return { + "media": media, + "mid_size": ciphertext_size, + } + + def _build_outbound_file_item( + self, + upload_data: Mapping[str, Any], + aes_key: bytes, + filename: str, + plaintext: bytes, + ) -> dict[str, Any]: + media: dict[str, Any] = { + "aes_key": _encode_outbound_media_aes_key(aes_key), + "encrypt_type": 1, + } + upload_param = upload_data.get("upload_param") + if isinstance(upload_param, str) and upload_param.strip(): + media["encrypt_query_param"] = upload_param.strip() + return { + "media": media, + "file_name": filename, + "md5": _md5_hex(plaintext), + "len": str(len(plaintext)), + } + + def _download_dir(self) -> Path | None: + if not self._state_dir: + return None + return self._state_dir / self.DEFAULT_IMAGE_DOWNLOAD_DIRNAME + + async def _extract_inbound_files(self, raw_message: Mapping[str, Any]) -> list[dict[str, Any]]: + files: list[dict[str, Any]] = [] + item_list = raw_message.get("item_list") + if not isinstance(item_list, list): + return files + + message_id = str(raw_message.get("message_id") or raw_message.get("msg_id") or raw_message.get("client_id") or "msg") + + for index, item in enumerate(item_list): + if not isinstance(item, Mapping): + continue + if item.get("type") == int(MessageItemType.IMAGE): + image_file = await self._extract_image_file(item, message_id=message_id, index=index) + if image_file: + files.append(image_file) + elif item.get("type") == int(MessageItemType.FILE): + file_info = await self._extract_file_item(item, message_id=message_id, index=index) + if file_info: + files.append(file_info) + return files + + async def _extract_image_file(self, item: Mapping[str, Any], *, message_id: str, index: int) -> dict[str, Any] | None: + image_item = item.get("image_item") + if not isinstance(image_item, Mapping): + return None + + media = image_item.get("media") + if not isinstance(media, Mapping): + return None + + full_url = self._extract_cdn_full_url(media) + if not full_url: + logger.warning("[WeChat] inbound image missing full_url, skipping message_id=%s", message_id) + return None + + aes_key = self._resolve_media_aes_key(item, image_item, media) + if not aes_key: + logger.warning( + "[WeChat] inbound image missing aes key, skipping message_id=%s diagnostics=%s", + message_id, + self._describe_media_key_state(item=item, item_payload=image_item, media=media), + ) + return None + + encrypted = await self._download_cdn_bytes(full_url) + decrypted = _decrypt_aes_128_ecb(encrypted, aes_key) + if self._max_inbound_image_bytes > 0 and len(decrypted) > self._max_inbound_image_bytes: + logger.warning("[WeChat] inbound image exceeds size limit (%d bytes), skipping message_id=%s", len(decrypted), message_id) + return None + + detected_image = _detect_image_extension_and_mime(decrypted) + image_extension = detected_image[0] if detected_image else ".jpg" + filename = _safe_media_filename("wechat-image", image_extension, message_id=message_id, index=index) + stored_path = self._stage_downloaded_file(filename, decrypted) + if stored_path is None: + return None + + mime_type = detected_image[1] if detected_image else mimetypes.guess_type(filename)[0] or "image/jpeg" + return { + "type": "image", + "filename": stored_path.name, + "size": len(decrypted), + "path": str(stored_path), + "mime_type": mime_type, + "source": "wechat", + "message_item_type": int(MessageItemType.IMAGE), + "full_url": full_url, + } + + async def _extract_file_item(self, item: Mapping[str, Any], *, message_id: str, index: int) -> dict[str, Any] | None: + file_item = item.get("file_item") + if not isinstance(file_item, Mapping): + return None + + media = file_item.get("media") + if not isinstance(media, Mapping): + return None + + full_url = self._extract_cdn_full_url(media) + if not full_url: + logger.warning("[WeChat] inbound file missing full_url, skipping message_id=%s", message_id) + return None + + aes_key = self._resolve_media_aes_key(item, file_item, media) + if not aes_key: + logger.warning( + "[WeChat] inbound file missing aes key, skipping message_id=%s diagnostics=%s", + message_id, + self._describe_media_key_state(item=item, item_payload=file_item, media=media), + ) + return None + + filename = self._normalize_inbound_filename(file_item.get("file_name"), default_prefix="wechat-file", message_id=message_id, index=index) + mime_type = mimetypes.guess_type(filename)[0] or "application/octet-stream" + if not self._is_allowed_file_type(filename, mime_type): + logger.warning("[WeChat] inbound file type blocked, skipping message_id=%s filename=%s", message_id, filename) + return None + + encrypted = await self._download_cdn_bytes(full_url) + decrypted = _decrypt_aes_128_ecb(encrypted, aes_key) + if self._max_inbound_file_bytes > 0 and len(decrypted) > self._max_inbound_file_bytes: + logger.warning("[WeChat] inbound file exceeds size limit (%d bytes), skipping message_id=%s", len(decrypted), message_id) + return None + + stored_path = self._stage_downloaded_file(filename, decrypted) + if stored_path is None: + return None + + return { + "type": "file", + "filename": stored_path.name, + "size": len(decrypted), + "path": str(stored_path), + "mime_type": mime_type, + "source": "wechat", + "message_item_type": int(MessageItemType.FILE), + "full_url": full_url, + } + + def _stage_downloaded_file(self, filename: str, content: bytes) -> Path | None: + download_dir = self._download_dir() + if download_dir is None: + return None + try: + download_dir.mkdir(parents=True, exist_ok=True) + path = download_dir / filename + path.write_bytes(content) + return path + except OSError: + logger.exception("[WeChat] failed to persist inbound media file %s", filename) + return None + + @staticmethod + def _decode_base64_aes_key(value: str) -> bytes | None: + candidate = value.strip() + if not candidate: + return None + + def _normalize_decoded(decoded: bytes) -> bytes | None: + try: + _validate_aes_128_key(decoded) + return decoded + except ValueError: + pass + + try: + decoded_text = decoded.decode("utf-8").strip().strip('"').strip("'") + except UnicodeDecodeError: + return None + + if not decoded_text: + return None + + try: + key = bytes.fromhex(decoded_text) + _validate_aes_128_key(key) + return key + except ValueError: + return None + + padded = candidate + ("=" * (-len(candidate) % 4)) + decoders = ( + lambda: base64.b64decode(padded, validate=True), + lambda: base64.urlsafe_b64decode(padded), + ) + for decoder in decoders: + try: + key = _normalize_decoded(decoder()) + if key is not None: + return key + except (ValueError, TypeError, binascii.Error): + continue + return None + + @classmethod + def _parse_aes_key_candidate(cls, value: Any, *, prefer_hex: bool) -> bytes | None: + if isinstance(value, bytes): + try: + _validate_aes_128_key(value) + return value + except ValueError: + return None + + if isinstance(value, bytearray): + return cls._parse_aes_key_candidate(bytes(value), prefer_hex=prefer_hex) + + if not isinstance(value, str) or not value.strip(): + return None + + raw = value.strip() + parsers = ( + (lambda: bytes.fromhex(raw), lambda key: _validate_aes_128_key(key)), + (lambda: cls._decode_base64_aes_key(raw), None), + ) + if not prefer_hex: + parsers = (parsers[1], parsers[0]) + + for decoder, validator in parsers: + try: + key = decoder() + if key is None: + continue + if validator is not None: + validator(key) + return key + except ValueError: + continue + return None + + @classmethod + def _resolve_media_aes_key(cls, *payloads: Mapping[str, Any]) -> bytes | None: + for payload in payloads: + if not isinstance(payload, Mapping): + continue + for key_name in ("aeskey", "aes_key_hex"): + key = cls._parse_aes_key_candidate(payload.get(key_name), prefer_hex=True) + if key: + return key + for key_name in ("aes_key", "aesKey", "encrypt_key", "encryptKey"): + key = cls._parse_aes_key_candidate(payload.get(key_name), prefer_hex=False) + if key: + return key + media = payload.get("media") + if isinstance(media, Mapping): + key = cls._resolve_media_aes_key(media) + if key: + return key + return None + + @staticmethod + def _describe_media_key_state( + *, + item: Mapping[str, Any] | None, + item_payload: Mapping[str, Any] | None, + media: Mapping[str, Any] | None, + ) -> dict[str, Any]: + def _interesting(mapping: Mapping[str, Any] | None) -> dict[str, Any]: + if not isinstance(mapping, Mapping): + return {} + details: dict[str, Any] = {} + for key in ( + "aeskey", + "aes_key", + "aesKey", + "aes_key_hex", + "encrypt_key", + "encryptKey", + "encrypt_query_param", + "encrypt_type", + "full_url", + "file_name", + ): + if key not in mapping: + continue + value = mapping.get(key) + if isinstance(value, str): + details[key] = f"str(len={len(value.strip())})" + elif value is not None: + details[key] = type(value).__name__ + else: + details[key] = None + return details + + return { + "item": _interesting(item), + "item_payload": _interesting(item_payload), + "media": _interesting(media), + } + + @staticmethod + def _extract_ref_message(raw_message: Mapping[str, Any]) -> dict[str, Any] | None: + item_list = raw_message.get("item_list") + if not isinstance(item_list, list): + return None + for item in item_list: + if not isinstance(item, Mapping): + continue + ref_msg = item.get("ref_msg") + if isinstance(ref_msg, Mapping): + return dict(ref_msg) + return None + + def _is_allowed_file_type(self, filename: str, mime_type: str) -> bool: + suffix = Path(filename).suffix.lower() + if self._allowed_file_extensions and suffix not in self._allowed_file_extensions: + return False + if mime_type.startswith("text/"): + return True + return mime_type in self.DEFAULT_ALLOWED_FILE_MIME_TYPES + + @staticmethod + def _normalize_inbound_filename(raw_filename: Any, *, default_prefix: str, message_id: str, index: int) -> str: + if isinstance(raw_filename, str) and raw_filename.strip(): + candidate = Path(raw_filename.strip()).name + if candidate: + return candidate + return _safe_media_filename(default_prefix, ".bin", message_id=message_id, index=index) + + def _ensure_success(self, data: dict[str, Any], operation: str) -> None: + ret = data.get("ret", 0) + if ret in (0, None): + return + errcode = data.get("errcode") + errmsg = data.get("errmsg") or data.get("msg") or "unknown error" + raise RuntimeError(f"iLink {operation} failed: ret={ret} errcode={errcode} errmsg={errmsg}") + + def _load_state(self) -> None: + self._load_auth_state() + if not self._cursor_path or not self._cursor_path.exists(): + return + try: + data = json.loads(self._cursor_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + logger.warning("[WeChat] failed to read cursor state from %s", self._cursor_path) + return + cursor = data.get("get_updates_buf") + if isinstance(cursor, str): + self._get_updates_buf = cursor + + def _save_state(self) -> None: + if not self._cursor_path: + return + try: + self._cursor_path.parent.mkdir(parents=True, exist_ok=True) + self._cursor_path.write_text(json.dumps({"get_updates_buf": self._get_updates_buf}, ensure_ascii=False, indent=2), encoding="utf-8") + except OSError: + logger.warning("[WeChat] failed to persist cursor state to %s", self._cursor_path) + + def _load_auth_state(self) -> None: + if not self._auth_path or not self._auth_path.exists(): + return + try: + data = json.loads(self._auth_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + logger.warning("[WeChat] failed to read auth state from %s", self._auth_path) + return + if not isinstance(data, dict): + return + self._auth_state = dict(data) + + if not self._bot_token: + token = data.get("bot_token") + if isinstance(token, str) and token.strip(): + self._bot_token = token.strip() + + if not self._ilink_bot_id: + ilink_bot_id = data.get("ilink_bot_id") + if isinstance(ilink_bot_id, str) and ilink_bot_id.strip(): + self._ilink_bot_id = ilink_bot_id.strip() + + def _save_auth_state( + self, + *, + status: str, + bot_token: str | None = None, + ilink_bot_id: str | None = None, + qrcode: str | None = None, + qrcode_img_content: str | None = None, + ) -> dict[str, Any]: + data = dict(self._auth_state) + data["status"] = status + data["updated_at"] = int(time.time()) + + if bot_token is not None: + if bot_token: + data["bot_token"] = bot_token + else: + data.pop("bot_token", None) + elif self._bot_token: + data["bot_token"] = self._bot_token + + resolved_ilink_bot_id = ilink_bot_id if ilink_bot_id is not None else self._ilink_bot_id + if resolved_ilink_bot_id: + data["ilink_bot_id"] = resolved_ilink_bot_id + + if qrcode is not None: + data["qrcode"] = qrcode + if qrcode_img_content is not None: + data["qrcode_img_content"] = qrcode_img_content + + self._auth_state = data + if self._auth_path: + try: + self._auth_path.parent.mkdir(parents=True, exist_ok=True) + self._auth_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + except OSError: + logger.warning("[WeChat] failed to persist auth state to %s", self._auth_path) + return data + + @staticmethod + def _extract_text(raw_message: dict[str, Any]) -> str: + parts: list[str] = [] + for item in raw_message.get("item_list", []): + if not isinstance(item, dict) or item.get("type") != int(MessageItemType.TEXT): + continue + text_item = item.get("text_item") + if not isinstance(text_item, dict): + continue + text = text_item.get("text") + if isinstance(text, str) and text.strip(): + parts.append(text.strip()) + return "\n".join(parts) + + @staticmethod + def _resolve_state_dir(raw_state_dir: Any) -> Path | None: + if not isinstance(raw_state_dir, str) or not raw_state_dir.strip(): + return None + return Path(raw_state_dir).expanduser() + + @staticmethod + def _coerce_float(value: Any, default: float) -> float: + try: + return float(value) + except (TypeError, ValueError): + return default + + @staticmethod + def _coerce_int(value: Any, default: int) -> int: + try: + return int(value) + except (TypeError, ValueError): + return default + + @staticmethod + def _coerce_str_set(value: Any, default: frozenset[str]) -> set[str]: + if not isinstance(value, (list, tuple, set, frozenset)): + return set(default) + normalized = {str(item).strip().lower() if str(item).strip().startswith(".") else f".{str(item).strip().lower()}" for item in value if str(item).strip()} + return normalized or set(default) diff --git a/backend/tests/test_wechat_channel.py b/backend/tests/test_wechat_channel.py new file mode 100644 index 000000000..1843da4c2 --- /dev/null +++ b/backend/tests/test_wechat_channel.py @@ -0,0 +1,1253 @@ +"""Tests for the WeChat IM channel.""" + +from __future__ import annotations + +import asyncio +import base64 +import json +from pathlib import Path +from typing import Any + +from app.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage + + +def _run(coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +class _MockResponse: + def __init__(self, payload: dict[str, Any], content: bytes | None = None): + self._payload = payload + self.content = content or b"" + self.headers = payload.get("headers", {}) if isinstance(payload, dict) else {} + + def raise_for_status(self) -> None: + return None + + def json(self) -> dict[str, Any]: + return self._payload + + +class _MockAsyncClient: + def __init__( + self, + responses: list[dict[str, Any]] | None = None, + post_calls: list[dict[str, Any]] | None = None, + get_calls: list[dict[str, Any]] | None = None, + put_calls: list[dict[str, Any]] | None = None, + get_responses: list[dict[str, Any]] | None = None, + post_responses: list[dict[str, Any]] | None = None, + put_responses: list[dict[str, Any]] | None = None, + **kwargs, + ): + self._responses = list(responses or []) + self._post_responses = list(post_responses or self._responses) + self._get_responses = list(get_responses or []) + self._put_responses = list(put_responses or []) + self._post_calls = post_calls + self._get_calls = get_calls + self._put_calls = put_calls + self.kwargs = kwargs + + async def post( + self, + url: str, + json: dict[str, Any] | None = None, + headers: dict[str, Any] | None = None, + **kwargs, + ): + if self._post_calls is not None: + self._post_calls.append({"url": url, "json": json or {}, "headers": headers or {}, **kwargs}) + payload = self._post_responses.pop(0) if self._post_responses else {"ret": 0} + return _MockResponse(payload) + + async def get(self, url: str, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, **kwargs): + if self._get_calls is not None: + self._get_calls.append({"url": url, "params": params or {}, "headers": headers or {}, **kwargs}) + payload = self._get_responses.pop(0) if self._get_responses else {"ret": 0} + return _MockResponse(payload) + + async def put(self, url: str, content: bytes, headers: dict[str, Any] | None = None, **kwargs): + if self._put_calls is not None: + self._put_calls.append({"url": url, "content": content, "headers": headers or {}, **kwargs}) + payload = self._put_responses.pop(0) if self._put_responses else {"ret": 0} + return _MockResponse(payload) + + async def aclose(self) -> None: + return None + + +def test_handle_update_publishes_private_chat_message(): + from app.channels.wechat import WechatChannel + + async def go(): + bus = MessageBus() + published = [] + + async def capture(msg): + published.append(msg) + + bus.publish_inbound = capture # type: ignore[method-assign] + + channel = WechatChannel(bus=bus, config={"bot_token": "test-token"}) + await channel._handle_update( + { + "message_type": 1, + "from_user_id": "wx-user-1", + "context_token": "ctx-1", + "item_list": [{"type": 1, "text_item": {"text": "hello from wechat"}}], + } + ) + + assert len(published) == 1 + inbound = published[0] + assert inbound.chat_id == "wx-user-1" + assert inbound.user_id == "wx-user-1" + assert inbound.text == "hello from wechat" + assert inbound.msg_type == InboundMessageType.CHAT + assert inbound.topic_id is None + assert inbound.metadata["context_token"] == "ctx-1" + assert channel._context_tokens_by_chat["wx-user-1"] == "ctx-1" + + _run(go()) + + +def test_handle_update_downloads_inbound_image(monkeypatch, tmp_path: Path): + from app.channels.wechat import WechatChannel + + async def go(): + bus = MessageBus() + published = [] + + async def capture(msg): + published.append(msg) + + bus.publish_inbound = capture # type: ignore[method-assign] + + plaintext = b"fake-image-bytes" + aes_key = b"1234567890abcdef" + + channel = WechatChannel(bus=bus, config={"bot_token": "test-token", "state_dir": str(tmp_path)}) + encrypted = channel.__class__.__dict__["_extract_image_file"].__globals__["_encrypt_aes_128_ecb"](plaintext, aes_key) + + async def _fake_download(_url: str, *, timeout: float | None = None): + return encrypted + + channel._download_cdn_bytes = _fake_download # type: ignore[method-assign] + + await channel._handle_update( + { + "message_type": 1, + "message_id": 101, + "from_user_id": "wx-user-1", + "context_token": "ctx-img-1", + "item_list": [ + { + "type": 2, + "image_item": { + "aeskey": aes_key.hex(), + "media": {"full_url": "https://cdn.example/image.bin"}, + }, + } + ], + } + ) + + assert len(published) == 1 + inbound = published[0] + assert inbound.text == "" + assert len(inbound.files) == 1 + file_info = inbound.files[0] + assert file_info["source"] == "wechat" + assert file_info["message_item_type"] == 2 + stored = Path(file_info["path"]) + assert stored.exists() + assert stored.read_bytes() == plaintext + + _run(go()) + + +def test_handle_update_downloads_inbound_png_with_png_extension(monkeypatch, tmp_path: Path): + from app.channels.wechat import WechatChannel + + async def go(): + bus = MessageBus() + published = [] + + async def capture(msg): + published.append(msg) + + bus.publish_inbound = capture # type: ignore[method-assign] + + plaintext = b"\x89PNG\r\n\x1a\n" + b"png-body" + aes_key = b"1234567890abcdef" + + channel = WechatChannel(bus=bus, config={"bot_token": "test-token", "state_dir": str(tmp_path)}) + encrypted = channel.__class__.__dict__["_extract_image_file"].__globals__["_encrypt_aes_128_ecb"](plaintext, aes_key) + + async def _fake_download(_url: str, *, timeout: float | None = None): + return encrypted + + channel._download_cdn_bytes = _fake_download # type: ignore[method-assign] + + await channel._handle_update( + { + "message_type": 1, + "message_id": 303, + "from_user_id": "wx-user-1", + "context_token": "ctx-img-png", + "item_list": [ + { + "type": 2, + "image_item": { + "aeskey": aes_key.hex(), + "media": {"full_url": "https://cdn.example/image.bin"}, + }, + } + ], + } + ) + + assert len(published) == 1 + file_info = published[0].files[0] + assert file_info["filename"].endswith(".png") + assert file_info["mime_type"] == "image/png" + + _run(go()) + + +def test_handle_update_preserves_text_and_ref_msg_with_image(monkeypatch, tmp_path: Path): + from app.channels.wechat import WechatChannel + + async def go(): + bus = MessageBus() + published = [] + + async def capture(msg): + published.append(msg) + + bus.publish_inbound = capture # type: ignore[method-assign] + + plaintext = b"img-2" + aes_key = b"1234567890abcdef" + channel = WechatChannel(bus=bus, config={"bot_token": "test-token", "state_dir": str(tmp_path)}) + encrypted = channel.__class__.__dict__["_extract_image_file"].__globals__["_encrypt_aes_128_ecb"](plaintext, aes_key) + + async def _fake_download(_url: str, *, timeout: float | None = None): + return encrypted + + channel._download_cdn_bytes = _fake_download # type: ignore[method-assign] + + await channel._handle_update( + { + "message_type": 1, + "message_id": 202, + "from_user_id": "wx-user-1", + "context_token": "ctx-img-2", + "item_list": [ + {"type": 1, "text_item": {"text": "look at this"}}, + { + "type": 2, + "ref_msg": {"title": "quoted", "message_item": {"type": 1}}, + "image_item": { + "aeskey": aes_key.hex(), + "media": {"full_url": "https://cdn.example/image2.bin"}, + }, + }, + ], + } + ) + + assert len(published) == 1 + inbound = published[0] + assert inbound.text == "look at this" + assert len(inbound.files) == 1 + assert inbound.metadata["ref_msg"]["title"] == "quoted" + + _run(go()) + + +def test_handle_update_skips_image_without_url_or_key(tmp_path: Path): + from app.channels.wechat import WechatChannel + + async def go(): + bus = MessageBus() + published = [] + + async def capture(msg): + published.append(msg) + + bus.publish_inbound = capture # type: ignore[method-assign] + + channel = WechatChannel(bus=bus, config={"bot_token": "test-token", "state_dir": str(tmp_path)}) + + await channel._handle_update( + { + "message_type": 1, + "from_user_id": "wx-user-1", + "context_token": "ctx-img-3", + "item_list": [ + { + "type": 2, + "image_item": {"media": {}}, + } + ], + } + ) + + assert published == [] + + _run(go()) + + +def test_handle_update_routes_slash_command_as_command(): + from app.channels.wechat import WechatChannel + + async def go(): + bus = MessageBus() + published = [] + + async def capture(msg): + published.append(msg) + + bus.publish_inbound = capture # type: ignore[method-assign] + + channel = WechatChannel(bus=bus, config={"bot_token": "test-token"}) + await channel._handle_update( + { + "message_type": 1, + "from_user_id": "wx-user-1", + "context_token": "ctx-2", + "item_list": [{"type": 1, "text_item": {"text": "/status"}}], + } + ) + + assert len(published) == 1 + assert published[0].msg_type == InboundMessageType.COMMAND + + _run(go()) + + +def test_allowed_users_filter_blocks_non_whitelisted_sender(): + from app.channels.wechat import WechatChannel + + async def go(): + bus = MessageBus() + published = [] + + async def capture(msg): + published.append(msg) + + bus.publish_inbound = capture # type: ignore[method-assign] + + channel = WechatChannel(bus=bus, config={"bot_token": "test-token", "allowed_users": ["allowed-user"]}) + await channel._handle_update( + { + "message_type": 1, + "from_user_id": "blocked-user", + "context_token": "ctx-3", + "item_list": [{"type": 1, "text_item": {"text": "hello"}}], + } + ) + + assert published == [] + + _run(go()) + + +def test_send_uses_cached_context_token(monkeypatch): + from app.channels.wechat import WechatChannel + + async def go(): + post_calls: list[dict[str, Any]] = [] + + def _client_factory(*args, **kwargs): + return _MockAsyncClient(responses=[{"ret": 0}], post_calls=post_calls, **kwargs) + + monkeypatch.setattr("app.channels.wechat.httpx.AsyncClient", _client_factory) + + channel = WechatChannel(bus=MessageBus(), config={"bot_token": "bot-token"}) + channel._context_tokens_by_chat["wx-user-1"] = "ctx-send" + + await channel.send( + OutboundMessage( + channel_name="wechat", + chat_id="wx-user-1", + thread_id="thread-1", + text="reply text", + ) + ) + + assert len(post_calls) == 1 + assert post_calls[0]["url"].endswith("/ilink/bot/sendmessage") + assert post_calls[0]["json"]["msg"]["to_user_id"] == "wx-user-1" + assert post_calls[0]["json"]["msg"]["context_token"] == "ctx-send" + assert post_calls[0]["headers"]["Authorization"] == "Bearer bot-token" + assert post_calls[0]["headers"]["AuthorizationType"] == "ilink_bot_token" + assert "X-WECHAT-UIN" in post_calls[0]["headers"] + assert "iLink-App-ClientVersion" in post_calls[0]["headers"] + + _run(go()) + + +def test_send_skips_when_context_token_missing(monkeypatch): + from app.channels.wechat import WechatChannel + + async def go(): + post_calls: list[dict[str, Any]] = [] + + def _client_factory(*args, **kwargs): + return _MockAsyncClient(responses=[{"ret": 0}], post_calls=post_calls, **kwargs) + + monkeypatch.setattr("app.channels.wechat.httpx.AsyncClient", _client_factory) + + channel = WechatChannel(bus=MessageBus(), config={"bot_token": "bot-token"}) + await channel.send( + OutboundMessage( + channel_name="wechat", + chat_id="wx-user-1", + thread_id="thread-1", + text="reply text", + ) + ) + + assert post_calls == [] + + _run(go()) + + +def test_protocol_helpers_build_expected_values(): + from app.channels.wechat import ( + MessageItemType, + UploadMediaType, + _build_ilink_client_version, + _build_wechat_uin, + _encrypted_size_for_aes_128_ecb, + ) + + assert int(MessageItemType.TEXT) == 1 + assert int(UploadMediaType.FILE) == 3 + assert _build_ilink_client_version("1.0.11") == str((1 << 16) | 11) + + encoded = _build_wechat_uin() + decoded = base64.b64decode(encoded).decode("utf-8") + assert decoded.isdigit() + + assert _encrypted_size_for_aes_128_ecb(0) == 16 + assert _encrypted_size_for_aes_128_ecb(1) == 16 + assert _encrypted_size_for_aes_128_ecb(16) == 32 + + +def test_aes_roundtrip_encrypts_and_decrypts(): + from app.channels.wechat import _decrypt_aes_128_ecb, _encrypt_aes_128_ecb + + key = b"1234567890abcdef" + plaintext = b"hello-wechat-media" + + encrypted = _encrypt_aes_128_ecb(plaintext, key) + assert encrypted != plaintext + + decrypted = _decrypt_aes_128_ecb(encrypted, key) + assert decrypted == plaintext + + +def test_build_upload_request_supports_no_need_thumb(): + from app.channels.wechat import UploadMediaType, WechatChannel + + channel = WechatChannel(bus=MessageBus(), config={"bot_token": "bot-token"}) + payload = channel._build_upload_request( + filekey="file-key-1", + media_type=UploadMediaType.IMAGE, + to_user_id="wx-user-1", + plaintext=b"image-bytes", + aes_key=b"1234567890abcdef", + no_need_thumb=True, + ) + + assert payload["filekey"] == "file-key-1" + assert payload["media_type"] == 1 + assert payload["to_user_id"] == "wx-user-1" + assert payload["rawsize"] == len(b"image-bytes") + assert payload["filesize"] >= len(b"image-bytes") + assert payload["no_need_thumb"] is True + assert payload["aeskey"] == b"1234567890abcdef".hex() + + +def test_send_file_uploads_and_sends_image(monkeypatch, tmp_path: Path): + from app.channels.message_bus import ResolvedAttachment + from app.channels.wechat import WechatChannel + + async def go(): + post_calls: list[dict[str, Any]] = [] + put_calls: list[dict[str, Any]] = [] + + def _client_factory(*args, **kwargs): + return _MockAsyncClient( + post_calls=post_calls, + put_calls=put_calls, + post_responses=[ + { + "ret": 0, + "upload_param": "enc-query-original", + "thumb_upload_param": "enc-query-thumb", + "upload_full_url": "https://cdn.example/upload-original", + }, + {"ret": 0}, + ], + **kwargs, + ) + + monkeypatch.setattr("app.channels.wechat.httpx.AsyncClient", _client_factory) + + image_path = tmp_path / "chart.png" + image_path.write_bytes(b"png-binary-data") + + channel = WechatChannel(bus=MessageBus(), config={"bot_token": "bot-token"}) + channel._context_tokens_by_chat["wx-user-1"] = "ctx-image-send" + + ok = await channel.send_file( + OutboundMessage( + channel_name="wechat", + chat_id="wx-user-1", + thread_id="thread-1", + text="reply text", + ), + ResolvedAttachment( + virtual_path="/mnt/user-data/outputs/chart.png", + actual_path=image_path, + filename="chart.png", + mime_type="image/png", + size=image_path.stat().st_size, + is_image=True, + ), + ) + + assert ok is True + assert len(post_calls) == 3 + assert post_calls[0]["url"].endswith("/ilink/bot/getuploadurl") + assert post_calls[0]["json"]["media_type"] == 1 + assert post_calls[0]["json"]["no_need_thumb"] is True + assert len(put_calls) == 0 + assert post_calls[1]["url"] == "https://cdn.example/upload-original" + assert post_calls[2]["url"].endswith("/ilink/bot/sendmessage") + image_item = post_calls[2]["json"]["msg"]["item_list"][0]["image_item"] + assert image_item["media"]["encrypt_query_param"] == "enc-query-original" + assert image_item["media"]["encrypt_type"] == 1 + assert image_item["mid_size"] > 0 + assert "thumb_media" not in image_item + assert "aeskey" not in image_item + assert base64.b64decode(image_item["media"]["aes_key"]).decode("utf-8") == post_calls[0]["json"]["aeskey"] + + _run(go()) + + +def test_send_file_returns_false_without_upload_full_url(monkeypatch, tmp_path: Path): + from app.channels.message_bus import ResolvedAttachment + from app.channels.wechat import WechatChannel + + async def go(): + post_calls: list[dict[str, Any]] = [] + + def _client_factory(*args, **kwargs): + return _MockAsyncClient( + post_calls=post_calls, + post_responses=[ + {"ret": 0, "upload_param": "enc-query-only"}, + {"ret": 0}, + ], + **kwargs, + ) + + monkeypatch.setattr("app.channels.wechat.httpx.AsyncClient", _client_factory) + + image_path = tmp_path / "chart.png" + image_path.write_bytes(b"png-binary-data") + + channel = WechatChannel(bus=MessageBus(), config={"bot_token": "bot-token"}) + channel._context_tokens_by_chat["wx-user-1"] = "ctx-image-send" + + ok = await channel.send_file( + OutboundMessage(channel_name="wechat", chat_id="wx-user-1", thread_id="thread-1", text="reply text"), + ResolvedAttachment( + virtual_path="/mnt/user-data/outputs/chart.png", + actual_path=image_path, + filename="chart.png", + mime_type="image/png", + size=image_path.stat().st_size, + is_image=True, + ), + ) + + assert ok is True + assert len(post_calls) == 3 + assert post_calls[1]["url"].startswith("https://novac2c.cdn.weixin.qq.com/c2c/upload?") + assert post_calls[2]["url"].endswith("/ilink/bot/sendmessage") + image_item = post_calls[2]["json"]["msg"]["item_list"][0]["image_item"] + assert image_item["media"]["encrypt_query_param"] == "enc-query-only" + assert image_item["media"]["encrypt_type"] == 1 + + _run(go()) + + +def test_send_file_prefers_cdn_response_header_for_image(monkeypatch, tmp_path: Path): + from app.channels.message_bus import ResolvedAttachment + from app.channels.wechat import WechatChannel + + async def go(): + post_calls: list[dict[str, Any]] = [] + + def _client_factory(*args, **kwargs): + return _MockAsyncClient( + post_calls=post_calls, + post_responses=[ + {"ret": 0, "upload_param": "enc-query-original", "thumb_upload_param": "enc-query-thumb"}, + {"ret": 0, "headers": {"x-encrypted-param": "enc-query-downloaded"}}, + {"ret": 0}, + ], + **kwargs, + ) + + monkeypatch.setattr("app.channels.wechat.httpx.AsyncClient", _client_factory) + + image_path = tmp_path / "chart.png" + image_path.write_bytes(b"png-binary-data") + + channel = WechatChannel(bus=MessageBus(), config={"bot_token": "bot-token"}) + channel._context_tokens_by_chat["wx-user-1"] = "ctx-image-send" + + ok = await channel.send_file( + OutboundMessage(channel_name="wechat", chat_id="wx-user-1", thread_id="thread-1", text="reply text"), + ResolvedAttachment( + virtual_path="/mnt/user-data/outputs/chart.png", + actual_path=image_path, + filename="chart.png", + mime_type="image/png", + size=image_path.stat().st_size, + is_image=True, + ), + ) + + assert ok is True + assert post_calls[1]["url"].startswith("https://novac2c.cdn.weixin.qq.com/c2c/upload?") + image_item = post_calls[2]["json"]["msg"]["item_list"][0]["image_item"] + assert image_item["media"]["encrypt_query_param"] == "enc-query-downloaded" + assert image_item["media"]["encrypt_type"] == 1 + assert "thumb_media" not in image_item + assert "aeskey" not in image_item + + _run(go()) + + +def test_send_file_skips_non_image(monkeypatch, tmp_path: Path): + from app.channels.message_bus import ResolvedAttachment + from app.channels.wechat import WechatChannel + + async def go(): + post_calls: list[dict[str, Any]] = [] + + def _client_factory(*args, **kwargs): + return _MockAsyncClient(post_calls=post_calls, **kwargs) + + monkeypatch.setattr("app.channels.wechat.httpx.AsyncClient", _client_factory) + + file_path = tmp_path / "notes.txt" + file_path.write_text("hello") + + channel = WechatChannel(bus=MessageBus(), config={"bot_token": "bot-token"}) + ok = await channel.send_file( + OutboundMessage(channel_name="wechat", chat_id="wx-user-1", thread_id="thread-1", text="reply text"), + ResolvedAttachment( + virtual_path="/mnt/user-data/outputs/notes.txt", + actual_path=file_path, + filename="notes.txt", + mime_type="text/plain", + size=file_path.stat().st_size, + is_image=False, + ), + ) + + assert ok is False + assert post_calls == [] + + _run(go()) + + +def test_send_file_uploads_and_sends_regular_file(monkeypatch, tmp_path: Path): + from app.channels.message_bus import ResolvedAttachment + from app.channels.wechat import WechatChannel + + async def go(): + post_calls: list[dict[str, Any]] = [] + put_calls: list[dict[str, Any]] = [] + + def _client_factory(*args, **kwargs): + return _MockAsyncClient( + post_calls=post_calls, + put_calls=put_calls, + post_responses=[ + { + "ret": 0, + "upload_param": "enc-query-file", + "upload_full_url": "https://cdn.example/upload-file", + }, + {"ret": 0}, + ], + **kwargs, + ) + + monkeypatch.setattr("app.channels.wechat.httpx.AsyncClient", _client_factory) + + file_path = tmp_path / "report.pdf" + file_path.write_bytes(b"%PDF-1.4 fake") + + channel = WechatChannel(bus=MessageBus(), config={"bot_token": "bot-token"}) + channel._context_tokens_by_chat["wx-user-1"] = "ctx-file-send" + + ok = await channel.send_file( + OutboundMessage(channel_name="wechat", chat_id="wx-user-1", thread_id="thread-1", text="reply text"), + ResolvedAttachment( + virtual_path="/mnt/user-data/outputs/report.pdf", + actual_path=file_path, + filename="report.pdf", + mime_type="application/pdf", + size=file_path.stat().st_size, + is_image=False, + ), + ) + + assert ok is True + assert len(post_calls) == 3 + assert post_calls[0]["url"].endswith("/ilink/bot/getuploadurl") + assert post_calls[0]["json"]["media_type"] == 3 + assert post_calls[0]["json"]["no_need_thumb"] is True + assert len(put_calls) == 0 + assert post_calls[1]["url"] == "https://cdn.example/upload-file" + assert post_calls[2]["url"].endswith("/ilink/bot/sendmessage") + file_item = post_calls[2]["json"]["msg"]["item_list"][0]["file_item"] + assert file_item["media"]["encrypt_query_param"] == "enc-query-file" + assert file_item["file_name"] == "report.pdf" + assert file_item["media"]["encrypt_type"] == 1 + assert base64.b64decode(file_item["media"]["aes_key"]).decode("utf-8") == post_calls[0]["json"]["aeskey"] + + _run(go()) + + +def test_send_regular_file_uses_cdn_upload_fallback_when_upload_full_url_missing(monkeypatch, tmp_path: Path): + from app.channels.message_bus import ResolvedAttachment + from app.channels.wechat import WechatChannel + + async def go(): + post_calls: list[dict[str, Any]] = [] + + def _client_factory(*args, **kwargs): + return _MockAsyncClient( + post_calls=post_calls, + post_responses=[ + {"ret": 0, "upload_param": "enc-query-file"}, + {"ret": 0, "headers": {"x-encrypted-param": "enc-query-file-final"}}, + {"ret": 0}, + ], + **kwargs, + ) + + monkeypatch.setattr("app.channels.wechat.httpx.AsyncClient", _client_factory) + + file_path = tmp_path / "report.pdf" + file_path.write_bytes(b"%PDF-1.4 fake") + + channel = WechatChannel(bus=MessageBus(), config={"bot_token": "bot-token"}) + channel._context_tokens_by_chat["wx-user-1"] = "ctx-file-send" + + ok = await channel.send_file( + OutboundMessage(channel_name="wechat", chat_id="wx-user-1", thread_id="thread-1", text="reply text"), + ResolvedAttachment( + virtual_path="/mnt/user-data/outputs/report.pdf", + actual_path=file_path, + filename="report.pdf", + mime_type="application/pdf", + size=file_path.stat().st_size, + is_image=False, + ), + ) + + assert ok is True + assert post_calls[1]["url"].startswith("https://novac2c.cdn.weixin.qq.com/c2c/upload?") + assert post_calls[2]["url"].endswith("/ilink/bot/sendmessage") + file_item = post_calls[2]["json"]["msg"]["item_list"][0]["file_item"] + assert file_item["media"]["encrypt_query_param"] == "enc-query-file-final" + assert file_item["media"]["encrypt_type"] == 1 + + _run(go()) + + +def test_send_image_uses_post_even_when_upload_full_url_present(monkeypatch, tmp_path: Path): + from app.channels.message_bus import ResolvedAttachment + from app.channels.wechat import WechatChannel + + async def go(): + post_calls: list[dict[str, Any]] = [] + put_calls: list[dict[str, Any]] = [] + + def _client_factory(*args, **kwargs): + return _MockAsyncClient( + post_calls=post_calls, + put_calls=put_calls, + post_responses=[ + { + "ret": 0, + "upload_param": "enc-query-original", + "thumb_upload_param": "enc-query-thumb", + "upload_full_url": "https://cdn.example/upload-original", + }, + {"ret": 0, "headers": {"x-encrypted-param": "enc-query-downloaded"}}, + {"ret": 0}, + ], + **kwargs, + ) + + monkeypatch.setattr("app.channels.wechat.httpx.AsyncClient", _client_factory) + + image_path = tmp_path / "chart.png" + image_path.write_bytes(b"png-binary-data") + + channel = WechatChannel(bus=MessageBus(), config={"bot_token": "bot-token"}) + channel._context_tokens_by_chat["wx-user-1"] = "ctx-image-send" + + ok = await channel.send_file( + OutboundMessage(channel_name="wechat", chat_id="wx-user-1", thread_id="thread-1", text="reply text"), + ResolvedAttachment( + virtual_path="/mnt/user-data/outputs/chart.png", + actual_path=image_path, + filename="chart.png", + mime_type="image/png", + size=image_path.stat().st_size, + is_image=True, + ), + ) + + assert ok is True + assert len(put_calls) == 0 + assert post_calls[1]["url"] == "https://cdn.example/upload-original" + + _run(go()) + + +def test_send_file_blocks_disallowed_regular_file(monkeypatch, tmp_path: Path): + from app.channels.message_bus import ResolvedAttachment + from app.channels.wechat import WechatChannel + + async def go(): + post_calls: list[dict[str, Any]] = [] + + def _client_factory(*args, **kwargs): + return _MockAsyncClient(post_calls=post_calls, **kwargs) + + monkeypatch.setattr("app.channels.wechat.httpx.AsyncClient", _client_factory) + + file_path = tmp_path / "malware.exe" + file_path.write_bytes(b"MZ") + + channel = WechatChannel(bus=MessageBus(), config={"bot_token": "bot-token"}) + channel._context_tokens_by_chat["wx-user-1"] = "ctx-file-send" + + ok = await channel.send_file( + OutboundMessage(channel_name="wechat", chat_id="wx-user-1", thread_id="thread-1", text="reply text"), + ResolvedAttachment( + virtual_path="/mnt/user-data/outputs/malware.exe", + actual_path=file_path, + filename="malware.exe", + mime_type="application/octet-stream", + size=file_path.stat().st_size, + is_image=False, + ), + ) + + assert ok is False + assert post_calls == [] + + _run(go()) + + +def test_handle_update_downloads_inbound_file(monkeypatch, tmp_path: Path): + from app.channels.wechat import WechatChannel + + async def go(): + bus = MessageBus() + published = [] + + async def capture(msg): + published.append(msg) + + bus.publish_inbound = capture # type: ignore[method-assign] + + plaintext = b"hello,file" + aes_key = b"1234567890abcdef" + + channel = WechatChannel(bus=bus, config={"bot_token": "test-token", "state_dir": str(tmp_path)}) + encrypted = channel.__class__.__dict__["_extract_file_item"].__globals__["_encrypt_aes_128_ecb"](plaintext, aes_key) + + async def _fake_download(_url: str, *, timeout: float | None = None): + return encrypted + + channel._download_cdn_bytes = _fake_download # type: ignore[method-assign] + + await channel._handle_update( + { + "message_type": 1, + "message_id": 303, + "from_user_id": "wx-user-1", + "context_token": "ctx-file-1", + "item_list": [ + { + "type": 4, + "file_item": { + "file_name": "report.pdf", + "aeskey": aes_key.hex(), + "media": {"full_url": "https://cdn.example/report.bin"}, + }, + } + ], + } + ) + + assert len(published) == 1 + inbound = published[0] + assert inbound.text == "" + assert len(inbound.files) == 1 + file_info = inbound.files[0] + assert file_info["message_item_type"] == 4 + stored = Path(file_info["path"]) + assert stored.exists() + assert stored.read_bytes() == plaintext + + _run(go()) + + +def test_handle_update_downloads_inbound_file_with_media_aeskey_hex(monkeypatch, tmp_path: Path): + from app.channels.wechat import WechatChannel + + async def go(): + bus = MessageBus() + published = [] + + async def capture(msg): + published.append(msg) + + bus.publish_inbound = capture # type: ignore[method-assign] + + plaintext = b"hello,file" + aes_key = b"1234567890abcdef" + + channel = WechatChannel(bus=bus, config={"bot_token": "test-token", "state_dir": str(tmp_path)}) + encrypted = channel.__class__.__dict__["_extract_file_item"].__globals__["_encrypt_aes_128_ecb"](plaintext, aes_key) + + async def _fake_download(_url: str, *, timeout: float | None = None): + return encrypted + + channel._download_cdn_bytes = _fake_download # type: ignore[method-assign] + + await channel._handle_update( + { + "message_type": 1, + "message_id": 304, + "from_user_id": "wx-user-1", + "context_token": "ctx-file-1b", + "item_list": [ + { + "type": 4, + "file_item": { + "file_name": "report.pdf", + "media": { + "full_url": "https://cdn.example/report.bin", + "aeskey": aes_key.hex(), + }, + }, + } + ], + } + ) + + assert len(published) == 1 + assert published[0].files[0]["filename"] == "report.pdf" + + _run(go()) + + +def test_handle_update_downloads_inbound_file_with_unpadded_item_aes_key(monkeypatch, tmp_path: Path): + from app.channels.wechat import WechatChannel + + async def go(): + bus = MessageBus() + published = [] + + async def capture(msg): + published.append(msg) + + bus.publish_inbound = capture # type: ignore[method-assign] + + plaintext = b"hello,file" + aes_key = b"1234567890abcdef" + encoded_key = base64.b64encode(aes_key).decode("utf-8").rstrip("=") + + channel = WechatChannel(bus=bus, config={"bot_token": "test-token", "state_dir": str(tmp_path)}) + encrypted = channel.__class__.__dict__["_extract_file_item"].__globals__["_encrypt_aes_128_ecb"](plaintext, aes_key) + + async def _fake_download(_url: str, *, timeout: float | None = None): + return encrypted + + channel._download_cdn_bytes = _fake_download # type: ignore[method-assign] + + await channel._handle_update( + { + "message_type": 1, + "message_id": 305, + "from_user_id": "wx-user-1", + "context_token": "ctx-file-1c", + "item_list": [ + { + "type": 4, + "aesKey": encoded_key, + "file_item": { + "file_name": "report.pdf", + "media": {"full_url": "https://cdn.example/report.bin"}, + }, + } + ], + } + ) + + assert len(published) == 1 + assert published[0].files[0]["filename"] == "report.pdf" + + _run(go()) + + +def test_handle_update_downloads_inbound_file_with_media_aes_key_base64_of_hex(monkeypatch, tmp_path: Path): + from app.channels.wechat import WechatChannel + + async def go(): + bus = MessageBus() + published = [] + + async def capture(msg): + published.append(msg) + + bus.publish_inbound = capture # type: ignore[method-assign] + + plaintext = b"hello,file" + aes_key = b"1234567890abcdef" + encoded_hex_key = base64.b64encode(aes_key.hex().encode("utf-8")).decode("utf-8") + + channel = WechatChannel(bus=bus, config={"bot_token": "test-token", "state_dir": str(tmp_path)}) + encrypted = channel.__class__.__dict__["_extract_file_item"].__globals__["_encrypt_aes_128_ecb"](plaintext, aes_key) + + async def _fake_download(_url: str, *, timeout: float | None = None): + return encrypted + + channel._download_cdn_bytes = _fake_download # type: ignore[method-assign] + + await channel._handle_update( + { + "message_type": 1, + "message_id": 306, + "from_user_id": "wx-user-1", + "context_token": "ctx-file-1d", + "item_list": [ + { + "type": 4, + "file_item": { + "file_name": "report.pdf", + "media": { + "full_url": "https://cdn.example/report.bin", + "aes_key": encoded_hex_key, + }, + }, + } + ], + } + ) + + assert len(published) == 1 + assert published[0].files[0]["filename"] == "report.pdf" + + _run(go()) + + +def test_handle_update_skips_disallowed_inbound_file(monkeypatch, tmp_path: Path): + from app.channels.wechat import WechatChannel + + async def go(): + bus = MessageBus() + published = [] + + async def capture(msg): + published.append(msg) + + bus.publish_inbound = capture # type: ignore[method-assign] + + plaintext = b"MZ" + aes_key = b"1234567890abcdef" + + channel = WechatChannel(bus=bus, config={"bot_token": "test-token", "state_dir": str(tmp_path)}) + encrypted = channel.__class__.__dict__["_extract_file_item"].__globals__["_encrypt_aes_128_ecb"](plaintext, aes_key) + + async def _fake_download(_url: str, *, timeout: float | None = None): + return encrypted + + channel._download_cdn_bytes = _fake_download # type: ignore[method-assign] + + await channel._handle_update( + { + "message_type": 1, + "message_id": 404, + "from_user_id": "wx-user-1", + "context_token": "ctx-file-2", + "item_list": [ + { + "type": 4, + "file_item": { + "file_name": "malware.exe", + "aeskey": aes_key.hex(), + "media": {"full_url": "https://cdn.example/bad.bin"}, + }, + } + ], + } + ) + + assert published == [] + + _run(go()) + + +def test_poll_loop_updates_server_timeout(monkeypatch): + from app.channels.wechat import WechatChannel + + async def go(): + post_calls: list[dict[str, Any]] = [] + + def _client_factory(*args, **kwargs): + return _MockAsyncClient( + post_calls=post_calls, + post_responses=[ + { + "ret": 0, + "msgs": [ + { + "message_type": 1, + "from_user_id": "wx-user-1", + "context_token": "ctx-1", + "item_list": [{"type": 1, "text_item": {"text": "hello"}}], + } + ], + "get_updates_buf": "cursor-next", + "longpolling_timeout_ms": 42000, + } + ], + **kwargs, + ) + + monkeypatch.setattr("app.channels.wechat.httpx.AsyncClient", _client_factory) + + channel = WechatChannel(bus=MessageBus(), config={"bot_token": "bot-token"}) + channel._running = True + + async def _fake_handle_update(_raw): + channel._running = False + return None + + channel._handle_update = _fake_handle_update # type: ignore[method-assign] + + await channel._poll_loop() + + assert channel._get_updates_buf == "cursor-next" + assert channel._server_longpoll_timeout_seconds == 42.0 + assert post_calls[0]["url"].endswith("/ilink/bot/getupdates") + + _run(go()) + + +def test_state_cursor_is_loaded_from_disk(tmp_path: Path): + from app.channels.wechat import WechatChannel + + state_dir = tmp_path / "wechat-state" + state_dir.mkdir(parents=True, exist_ok=True) + (state_dir / "wechat-getupdates.json").write_text( + json.dumps({"get_updates_buf": "cursor-123"}, ensure_ascii=False), + encoding="utf-8", + ) + + channel = WechatChannel( + bus=MessageBus(), + config={"bot_token": "bot-token", "state_dir": str(state_dir)}, + ) + + assert channel._get_updates_buf == "cursor-123" + + +def test_auth_state_is_loaded_from_disk(tmp_path: Path): + from app.channels.wechat import WechatChannel + + state_dir = tmp_path / "wechat-state" + state_dir.mkdir(parents=True, exist_ok=True) + (state_dir / "wechat-auth.json").write_text( + json.dumps({"status": "confirmed", "bot_token": "saved-token", "ilink_bot_id": "bot-1"}, ensure_ascii=False), + encoding="utf-8", + ) + + channel = WechatChannel( + bus=MessageBus(), + config={"state_dir": str(state_dir), "qrcode_login_enabled": True}, + ) + + assert channel._bot_token == "saved-token" + assert channel._ilink_bot_id == "bot-1" + + +def test_qrcode_login_binds_and_persists_auth_state(monkeypatch, tmp_path: Path): + from app.channels.wechat import WechatChannel + + async def go(): + get_calls: list[dict[str, Any]] = [] + + def _client_factory(*args, **kwargs): + return _MockAsyncClient( + get_calls=get_calls, + get_responses=[ + {"qrcode": "qr-123", "qrcode_img_content": "https://example.com/qr.png"}, + {"status": "confirmed", "bot_token": "bound-token", "ilink_bot_id": "bot-99"}, + ], + **kwargs, + ) + + monkeypatch.setattr("app.channels.wechat.httpx.AsyncClient", _client_factory) + + state_dir = tmp_path / "wechat-state" + channel = WechatChannel( + bus=MessageBus(), + config={ + "state_dir": str(state_dir), + "qrcode_login_enabled": True, + "qrcode_poll_interval": 0.01, + "qrcode_poll_timeout": 1, + }, + ) + + ok = await channel._ensure_authenticated() + + assert ok is True + assert channel._bot_token == "bound-token" + assert channel._ilink_bot_id == "bot-99" + assert get_calls[0]["url"].endswith("/ilink/bot/get_bot_qrcode") + assert get_calls[1]["url"].endswith("/ilink/bot/get_qrcode_status") + + auth_state = json.loads((state_dir / "wechat-auth.json").read_text(encoding="utf-8")) + assert auth_state["status"] == "confirmed" + assert auth_state["bot_token"] == "bound-token" + assert auth_state["ilink_bot_id"] == "bot-99" + + _run(go()) diff --git a/config.example.yaml b/config.example.yaml index 9c93fd6c9..c22ad9b9d 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -794,6 +794,36 @@ checkpointer: # bot_token: $TELEGRAM_BOT_TOKEN # allowed_users: [] # empty = allow all # +# wechat: +# enabled: false +# bot_token: $WECHAT_BOT_TOKEN +# ilink_bot_id: $WECHAT_ILINK_BOT_ID +# # Optional: allow first-time QR bootstrap when bot_token is absent +# qrcode_login_enabled: true +# # Optional: sent as iLink-App-Id header when provided +# ilink_app_id: "" +# # Optional: sent as SKRouteTag header when provided +# route_tag: "" +# allowed_users: [] # empty = allow all +# # Optional: long-polling timeout in seconds +# polling_timeout: 35 +# # Optional: QR poll interval in seconds when qrcode_login_enabled is true +# qrcode_poll_interval: 2 +# # Optional: QR bootstrap timeout in seconds +# qrcode_poll_timeout: 180 +# # Optional: persist getupdates cursor under the gateway container volume +# state_dir: ./.deer-flow/wechat/state +# # Optional: max inbound image size in bytes before skipping download +# max_inbound_image_bytes: 20971520 +# # Optional: max outbound image size in bytes before skipping upload +# max_outbound_image_bytes: 20971520 +# # Optional: max inbound file size in bytes before skipping download +# max_inbound_file_bytes: 52428800 +# # Optional: max outbound file size in bytes before skipping upload +# max_outbound_file_bytes: 52428800 +# # Optional: allowed file extensions for regular file receive/send +# allowed_file_extensions: [".txt", ".md", ".pdf", ".csv", ".json", ".yaml", ".yml", ".xml", ".html", ".log", ".zip", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", ".rtf"] +# # # Optional: channel-level session overrides # session: # assistant_id: mobile-agent # custom agent names are supported here too