From 08afdcb907f149312f31827aa1a96eeaa67b85f9 Mon Sep 17 00:00:00 2001 From: He Wang Date: Thu, 30 Apr 2026 11:25:33 +0800 Subject: [PATCH] feat(channels): add DingTalk channel integration (#2628) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(channels): add DingTalk channel integration Add a new DingTalk messaging channel using the dingtalk-stream SDK with Stream Push (WebSocket), requiring no public IP. Supports both plain sampleMarkdown replies and optional AI Card streaming for a typewriter effect when card_template_id is configured. - Add DingTalkChannel implementation with token management, message routing, allowed_users filtering, and markdown adaptation - Register dingtalk in channel service registry and capability map - Propagate inbound metadata to outbound messages in ChannelManager for DingTalk sender context (sender_staff_id, conversation_type) - Add dingtalk-stream dependency to pyproject.toml - Add configuration examples in config.example.yaml and .env.example - Update all README translations with setup instructions - Add comprehensive test suite (test_dingtalk_channel.py) and metadata propagation test in test_channels.py - Update backend CLAUDE.md to document DingTalk channel Co-Authored-By: Claude Opus 4.6 * fix(channels): address PR review feedback for DingTalk integration - Replace runtime mutation of CHANNEL_CAPABILITIES with a `supports_streaming` property on the Channel base class, overridden by DingTalkChannel, FeishuChannel, and WeComChannel - Store stream client reference and attempt graceful disconnect in stop(); guard _on_chatbot_message with _running check to prevent post-stop message processing - Use msg.chat_id as the primary routing key in send/send_file via a shared _resolve_routing helper, with metadata as fallback - Fix process() return type annotation from tuple[str, str] to tuple[int, str] to match AckMessage.STATUS_OK - Protect _incoming_messages with threading.Lock for cross-thread safety between the Stream Push thread and the asyncio loop - Re-add Docker Compose URL guidance removed during DingTalk setup docs addition in README.md - Fix incomplete sentence in README_zh.md (missing verb "启用") Co-Authored-By: Claude Opus 4.6 * fix(docs): restore plain paragraph format for Docker Compose note Co-Authored-By: Claude Opus 4.6 * fix(channels): fix isinstance TypeError and add file size guard in DingTalk channel Use tuple syntax for isinstance() type check to avoid runtime TypeError with PEP 604 union types. Add upload size limit (20MB) before reading files into memory. Narrow exception handlers to specific types. Co-Authored-By: Claude Opus 4.6 * fix(channels): propagate markdown fallback errors and validate access token response - Re-raise exceptions in _send_markdown_fallback to prevent partial deliveries (files sent without accompanying text) - Validate _get_access_token response: reject non-dict bodies, empty tokens, and coerce invalid expireIn to a safe default - Add tests for both fixes Co-Authored-By: Claude Opus 4.6 * fix(channels): validate upload response and broaden send_file exception handling - Validate _upload_media JSON response: handle JSONDecodeError and non-dict payloads gracefully by returning None - Broaden send_file exception tuple to include TypeError and AttributeError for unexpected JSON shapes Co-Authored-By: Claude Opus 4.6 * fix(channels): fix streaming race on channel registration and slim outbound metadata - Register channel in service before calling start() to avoid race where background receiver publishes inbound before registration, causing manager to fall back to static CHANNEL_CAPABILITIES - Strip known-large metadata keys (raw_message, ref_msg) from outbound messages to prevent memory bloat from propagated inbound payloads Co-Authored-By: Claude Opus 4.6 * Update service.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update CLAUDE.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Claude Opus 4.6 Co-authored-by: Willem Jiang Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .env.example | 2 + README.md | 20 + README_fr.md | 19 + README_ja.md | 19 + README_ru.md | 15 + README_zh.md | 19 + backend/CLAUDE.md | 15 +- backend/app/channels/base.py | 4 + backend/app/channels/dingtalk.py | 740 +++++++++++ backend/app/channels/feishu.py | 4 + backend/app/channels/manager.py | 20 + backend/app/channels/service.py | 9 +- backend/app/channels/wecom.py | 4 + backend/pyproject.toml | 1 + backend/tests/test_channels.py | 84 ++ backend/tests/test_dingtalk_channel.py | 1554 ++++++++++++++++++++++++ backend/uv.lock | 15 + config.example.yaml | 7 + 18 files changed, 2544 insertions(+), 7 deletions(-) create mode 100644 backend/app/channels/dingtalk.py create mode 100644 backend/tests/test_dingtalk_channel.py diff --git a/.env.example b/.env.example index 89d169631..f443818b3 100644 --- a/.env.example +++ b/.env.example @@ -40,6 +40,8 @@ INFOQUEST_API_KEY=your-infoquest-api-key # # WECOM_BOT_ID=your-wecom-bot-id # WECOM_BOT_SECRET=your-wecom-bot-secret +# DINGTALK_CLIENT_ID=your-dingtalk-client-id +# DINGTALK_CLIENT_SECRET=your-dingtalk-client-secret # Set to "false" to disable Swagger UI, ReDoc, and OpenAPI schema in production # GATEWAY_ENABLE_DOCS=false diff --git a/README.md b/README.md index b64c75f97..c67fdc005 100644 --- a/README.md +++ b/README.md @@ -345,6 +345,7 @@ DeerFlow supports receiving tasks from messaging apps. Channels auto-start when | Feishu / Lark | WebSocket | Moderate | | WeChat | Tencent iLink (long-polling) | Moderate | | WeCom | WebSocket | Moderate | +| DingTalk | Stream Push (WebSocket) | Moderate | **Configuration in `config.yaml`:** @@ -414,6 +415,13 @@ channels: context: thinking_enabled: true subagent_enabled: true + + dingtalk: + enabled: true + client_id: $DINGTALK_CLIENT_ID # Client ID of your DingTalk application + client_secret: $DINGTALK_CLIENT_SECRET # Client Secret of your DingTalk application + allowed_users: [] # empty = allow all + card_template_id: "" # Optional: AI Card template ID for streaming typewriter effect ``` Notes: @@ -442,6 +450,10 @@ WECHAT_ILINK_BOT_ID=your_ilink_bot_id # WeCom WECOM_BOT_ID=your_bot_id WECOM_BOT_SECRET=your_bot_secret + +# DingTalk +DINGTALK_CLIENT_ID=your_client_id +DINGTALK_CLIENT_SECRET=your_client_secret ``` **Telegram Setup** @@ -480,6 +492,14 @@ WECOM_BOT_SECRET=your_bot_secret 4. Make sure backend dependencies include `wecom-aibot-python-sdk`. The channel uses a WebSocket long connection and does not require a public callback URL. 5. The current integration supports inbound text, image, and file messages. Final images/files generated by the agent are also sent back to the WeCom conversation. +**DingTalk Setup** + +1. Create a DingTalk application in the [DingTalk Developer Console](https://open.dingtalk.com/) and enable **Robot** capability. +2. Set the message receiving mode to **Stream Mode** in the robot configuration page. +3. Copy the `Client ID` and `Client Secret`, set `DINGTALK_CLIENT_ID` and `DINGTALK_CLIENT_SECRET` in `.env`, and enable the channel in `config.yaml`. +4. *(Optional)* To enable streaming AI Card replies (typewriter effect), create an **AI Card** template on the [DingTalk Card Platform](https://open.dingtalk.com/document/dingstart/typewriter-effect-streaming-ai-card), then set `card_template_id` in `config.yaml` to the template ID. You also need to apply for the `Card.Streaming.Write` and `Card.Instance.Write` permissions. + + When DeerFlow runs in Docker Compose, IM channels execute inside the `gateway` container. In that case, do not point `channels.langgraph_url` or `channels.gateway_url` at `localhost`; use container service names such as `http://gateway:8001/api` and `http://gateway:8001`, or set `DEER_FLOW_CHANNELS_LANGGRAPH_URL` and `DEER_FLOW_CHANNELS_GATEWAY_URL`. **Commands** diff --git a/README_fr.md b/README_fr.md index e7684a5f8..3b8dc3d41 100644 --- a/README_fr.md +++ b/README_fr.md @@ -290,6 +290,7 @@ DeerFlow peut recevoir des tâches depuis des applications de messagerie. Les ca | Telegram | Bot API (long-polling) | Facile | | Slack | Socket Mode | Modérée | | Feishu / Lark | WebSocket | Modérée | +| DingTalk | Stream Push (WebSocket) | Modérée | **Configuration dans `config.yaml` :** @@ -341,6 +342,13 @@ channels: context: thinking_enabled: true subagent_enabled: true + + dingtalk: + enabled: true + client_id: $DINGTALK_CLIENT_ID # ClientId depuis DingTalk Open Platform + client_secret: $DINGTALK_CLIENT_SECRET # ClientSecret depuis DingTalk Open Platform + allowed_users: [] # vide = tout le monde autorisé + card_template_id: "" # Optionnel : ID de modèle AI Card pour l'effet machine à écrire en streaming ``` Définissez les clés API correspondantes dans votre fichier `.env` : @@ -356,6 +364,10 @@ SLACK_APP_TOKEN=xapp-... # Feishu / Lark FEISHU_APP_ID=cli_xxxx FEISHU_APP_SECRET=your_app_secret + +# DingTalk +DINGTALK_CLIENT_ID=your_client_id +DINGTALK_CLIENT_SECRET=your_client_secret ``` **Configuration Telegram** @@ -378,6 +390,13 @@ FEISHU_APP_SECRET=your_app_secret 3. Dans **Events**, abonnez-vous à `im.message.receive_v1` et sélectionnez le mode **Long Connection**. 4. Copiez l'App ID et l'App Secret. Définissez `FEISHU_APP_ID` et `FEISHU_APP_SECRET` dans `.env` et activez le canal dans `config.yaml`. +**Configuration DingTalk** + +1. Créez une application sur [DingTalk Open Platform](https://open.dingtalk.com/) et activez la capacité **Robot**. +2. Dans la page de configuration du robot, définissez le mode de réception des messages sur **Stream**. +3. Copiez le `Client ID` et le `Client Secret`. Définissez `DINGTALK_CLIENT_ID` et `DINGTALK_CLIENT_SECRET` dans `.env` et activez le canal dans `config.yaml`. +4. *(Optionnel)* Pour activer les réponses en streaming AI Card (effet machine à écrire), créez un modèle **AI Card** sur la [plateforme de cartes DingTalk](https://open.dingtalk.com/document/dingstart/typewriter-effect-streaming-ai-card), puis définissez `card_template_id` dans `config.yaml` avec l'ID du modèle. Vous devez également demander les permissions `Card.Streaming.Write` et `Card.Instance.Write`. + **Commandes** Une fois un canal connecté, vous pouvez interagir avec DeerFlow directement depuis le chat : diff --git a/README_ja.md b/README_ja.md index 3e0ff4c85..d2ba81750 100644 --- a/README_ja.md +++ b/README_ja.md @@ -243,6 +243,7 @@ DeerFlowはメッセージングアプリからのタスク受信をサポート | Telegram | Bot API(ロングポーリング) | 簡単 | | Slack | Socket Mode | 中程度 | | Feishu / Lark | WebSocket | 中程度 | +| DingTalk | Stream Push(WebSocket) | 中程度 | **`config.yaml`での設定:** @@ -294,6 +295,13 @@ channels: context: thinking_enabled: true subagent_enabled: true + + dingtalk: + enabled: true + client_id: $DINGTALK_CLIENT_ID # DingTalk Open PlatformのClientId + client_secret: $DINGTALK_CLIENT_SECRET # DingTalk Open PlatformのClientSecret + allowed_users: [] # 空 = 全員許可 + card_template_id: "" # オプション:ストリーミングタイプライター効果用のAIカードテンプレートID ``` 対応するAPIキーを`.env`ファイルに設定します: @@ -309,6 +317,10 @@ SLACK_APP_TOKEN=xapp-... # Feishu / Lark FEISHU_APP_ID=cli_xxxx FEISHU_APP_SECRET=your_app_secret + +# DingTalk +DINGTALK_CLIENT_ID=your_client_id +DINGTALK_CLIENT_SECRET=your_client_secret ``` **Telegramのセットアップ** @@ -331,6 +343,13 @@ FEISHU_APP_SECRET=your_app_secret 3. **イベント**で`im.message.receive_v1`を購読し、**ロングコネクション**モードを選択。 4. App IDとApp Secretをコピー。`.env`に`FEISHU_APP_ID`と`FEISHU_APP_SECRET`を設定し、`config.yaml`でチャネルを有効にします。 +**DingTalkのセットアップ** + +1. [DingTalk Open Platform](https://open.dingtalk.com/)でアプリを作成し、**ロボット**機能を有効化します。 +2. ロボット設定ページでメッセージ受信モードを**Streamモード**に設定します。 +3. `Client ID`と`Client Secret`をコピー。`.env`に`DINGTALK_CLIENT_ID`と`DINGTALK_CLIENT_SECRET`を設定し、`config.yaml`でチャネルを有効にします。 +4. *(オプション)* ストリーミングAIカード返信(タイプライター効果)を有効にするには、[DingTalkカードプラットフォーム](https://open.dingtalk.com/document/dingstart/typewriter-effect-streaming-ai-card)で**AIカード**テンプレートを作成し、`config.yaml`の`card_template_id`にテンプレートIDを設定します。`Card.Streaming.Write` および `Card.Instance.Write` 権限の申請も必要です。 + **コマンド** チャネル接続後、チャットから直接DeerFlowと対話できます: diff --git a/README_ru.md b/README_ru.md index 6ee30ebc6..e74feaeac 100644 --- a/README_ru.md +++ b/README_ru.md @@ -256,6 +256,7 @@ DeerFlow принимает задачи прямо из мессенджеро | Telegram | Bot API (long-polling) | Просто | | Slack | Socket Mode | Средне | | Feishu / Lark | WebSocket | Средне | +| DingTalk | Stream Push (WebSocket) | Средне | **Конфигурация в `config.yaml`:** @@ -278,6 +279,13 @@ channels: enabled: true bot_token: $TELEGRAM_BOT_TOKEN allowed_users: [] + + dingtalk: + enabled: true + client_id: $DINGTALK_CLIENT_ID # ClientId с DingTalk Open Platform + client_secret: $DINGTALK_CLIENT_SECRET # ClientSecret с DingTalk Open Platform + allowed_users: [] # пусто = разрешить всем + card_template_id: "" # Опционально: ID шаблона AI Card для потокового эффекта печатной машинки ``` **Настройка Telegram** @@ -285,6 +293,13 @@ channels: 1. Напишите [@BotFather](https://t.me/BotFather), отправьте `/newbot` и скопируйте HTTP API-токен. 2. Укажите `TELEGRAM_BOT_TOKEN` в `.env` и включите канал в `config.yaml`. +**Настройка DingTalk** + +1. Создайте приложение на [DingTalk Open Platform](https://open.dingtalk.com/) и включите возможность **Робот**. +2. На странице настроек робота установите режим приёма сообщений на **Stream**. +3. Скопируйте `Client ID` и `Client Secret`. Укажите `DINGTALK_CLIENT_ID` и `DINGTALK_CLIENT_SECRET` в `.env` и включите канал в `config.yaml`. +4. *(Опционально)* Для включения потоковых ответов AI Card (эффект печатной машинки) создайте шаблон **AI Card** на [платформе карточек DingTalk](https://open.dingtalk.com/document/dingstart/typewriter-effect-streaming-ai-card), затем укажите `card_template_id` в `config.yaml` с ID шаблона. Также необходимо запросить разрешения `Card.Streaming.Write` и `Card.Instance.Write`. + **Доступные команды** | Команда | Описание | diff --git a/README_zh.md b/README_zh.md index f6043ff86..6e4a618c7 100644 --- a/README_zh.md +++ b/README_zh.md @@ -248,6 +248,7 @@ DeerFlow 支持从即时通讯应用接收任务。只要配置完成,对应 | Slack | Socket Mode | 中等 | | Feishu / Lark | WebSocket | 中等 | | 企业微信智能机器人 | WebSocket | 中等 | +| 钉钉 | Stream Push(WebSocket) | 中等 | **`config.yaml` 中的配置示例:** @@ -304,6 +305,13 @@ channels: context: thinking_enabled: true subagent_enabled: true + + dingtalk: + enabled: true + client_id: $DINGTALK_CLIENT_ID # 钉钉开放平台 ClientId + client_secret: $DINGTALK_CLIENT_SECRET # 钉钉开放平台 ClientSecret + allowed_users: [] # 留空表示允许所有人 + card_template_id: "" # 可选:AI 卡片模板 ID,用于流式打字机效果 ``` 说明: @@ -327,6 +335,10 @@ FEISHU_APP_SECRET=your_app_secret # 企业微信智能机器人 WECOM_BOT_ID=your_bot_id WECOM_BOT_SECRET=your_bot_secret + +# 钉钉 +DINGTALK_CLIENT_ID=your_client_id +DINGTALK_CLIENT_SECRET=your_client_secret ``` **Telegram 配置** @@ -357,6 +369,13 @@ WECOM_BOT_SECRET=your_bot_secret 4. 安装后端依赖时确保包含 `wecom-aibot-python-sdk`,渠道会通过 WebSocket 长连接接收消息,无需公网回调地址。 5. 当前支持文本、图片和文件入站消息;agent 生成的最终图片/文件也会回传到企业微信会话中。 +**钉钉配置** + +1. 在 [钉钉开放平台](https://open.dingtalk.com/) 创建应用,并启用 **机器人** 能力。 +2. 在机器人配置页面设置消息接收模式为 **Stream模式**。 +3. 复制 `Client ID` 和 `Client Secret`,在 `.env` 中设置 `DINGTALK_CLIENT_ID` 和 `DINGTALK_CLIENT_SECRET`,并在 `config.yaml` 中启用该渠道。 +4. *(可选)* 如需开启流式 AI 卡片回复(打字机效果),请在[钉钉卡片平台](https://open.dingtalk.com/document/dingstart/typewriter-effect-streaming-ai-card)创建 **AI 卡片**模板,然后在 `config.yaml` 中将 `card_template_id` 设为该模板 ID。同时需要申请 `Card.Streaming.Write` 和 `Card.Instance.Write` 权限。 + **命令** 渠道连接完成后,你可以直接在聊天窗口里和 DeerFlow 交互: diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 5a786ac95..b185ce4a1 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -112,7 +112,7 @@ CI runs these regression tests for every pull request via [.github/workflows/bac The backend is split into two layers with a strict dependency direction: - **Harness** (`packages/harness/deerflow/`): Publishable agent framework package (`deerflow-harness`). Import prefix: `deerflow.*`. Contains agent orchestration, tools, sandbox, models, MCP, skills, config — everything needed to build and run agents. -- **App** (`app/`): Unpublished application code. Import prefix: `app.*`. Contains the FastAPI Gateway API and IM channel integrations (Feishu, Slack, Telegram). +- **App** (`app/`): Unpublished application code. Import prefix: `app.*`. Contains the FastAPI Gateway API and IM channel integrations (Feishu, Slack, Telegram, DingTalk). **Dependency rule**: App imports deerflow, but deerflow never imports app. This boundary is enforced by `tests/test_harness_boundary.py` which runs in CI. @@ -312,7 +312,8 @@ Proxied through nginx: `/api/langgraph/*` → LangGraph, all other `/api/*` → ### IM Channels System (`app/channels/`) -Bridges external messaging platforms (Feishu, Slack, Telegram) to the DeerFlow agent via Gateway's LangGraph-compatible API. +Bridges external messaging platforms (Feishu, Slack, Telegram, DingTalk) to the DeerFlow agent via the LangGraph Server. + **Architecture**: Channels communicate with Gateway through the `langgraph-sdk` HTTP client (same as the frontend), ensuring threads are created and managed server-side. The internal SDK client injects process-local internal auth plus a matching CSRF cookie/header pair so Gateway accepts state-changing thread/run requests from channel workers without relying on browser session cookies. @@ -322,7 +323,7 @@ Bridges external messaging platforms (Feishu, Slack, Telegram) to the DeerFlow a - `manager.py` - Core dispatcher: creates threads via `client.threads.create()`, routes commands, keeps Slack/Telegram on `client.runs.wait()`, and uses `client.runs.stream(["messages-tuple", "values"])` for Feishu incremental outbound updates - `base.py` - Abstract `Channel` base class (start/stop/send lifecycle) - `service.py` - Manages lifecycle of all configured channels from `config.yaml` -- `slack.py` / `feishu.py` / `telegram.py` - Platform-specific implementations (`feishu.py` tracks the running card `message_id` in memory and patches the same card in place) +- `slack.py` / `feishu.py` / `telegram.py` / `dingtalk.py` - Platform-specific implementations (`feishu.py` tracks the running card `message_id` in memory and patches the same card in place; `dingtalk.py` optionally uses AI Card streaming for in-place updates when `card_template_id` is configured) **Message Flow**: 1. External platform -> Channel impl -> `MessageBus.publish_inbound()` @@ -331,14 +332,16 @@ Bridges external messaging platforms (Feishu, Slack, Telegram) to the DeerFlow a 4. Feishu chat: `runs.stream()` → accumulate AI text → publish multiple outbound updates (`is_final=False`) → publish final outbound (`is_final=True`) 5. Slack/Telegram chat: `runs.wait()` → extract final response → publish outbound 6. Feishu channel sends one running reply card up front, then patches the same card for each outbound update (card JSON sets `config.update_multi=true` for Feishu's patch API requirement) -7. For commands (`/new`, `/status`, `/models`, `/memory`, `/help`): handle locally or query Gateway API -8. Outbound → channel callbacks → platform reply +7. DingTalk AI Card mode (when `card_template_id` configured): `runs.stream()` → create card with initial text → stream updates via `PUT /v1.0/card/streaming` → finalize on `is_final=True`. Falls back to `sampleMarkdown` if card creation or streaming fails +8. For commands (`/new`, `/status`, `/models`, `/memory`, `/help`): handle locally or query Gateway API +9. Outbound → channel callbacks → platform reply **Configuration** (`config.yaml` -> `channels`): - `langgraph_url` - LangGraph-compatible Gateway API base URL (default: `http://localhost:8001/api`) - `gateway_url` - Gateway API URL for auxiliary commands (default: `http://localhost:8001`) - In Docker Compose, IM channels run inside the `gateway` container, so `localhost` points back to that container. Use `http://gateway:8001/api` for `langgraph_url` and `http://gateway:8001` for `gateway_url`, or set `DEER_FLOW_CHANNELS_LANGGRAPH_URL` / `DEER_FLOW_CHANNELS_GATEWAY_URL`. -- Per-channel configs: `feishu` (app_id, app_secret), `slack` (bot_token, app_token), `telegram` (bot_token) +- Per-channel configs: `feishu` (app_id, app_secret), `slack` (bot_token, app_token), `telegram` (bot_token), `dingtalk` (client_id, client_secret, optional `card_template_id` for AI Card streaming) + ### Memory System (`packages/harness/deerflow/agents/memory/`) diff --git a/backend/app/channels/base.py b/backend/app/channels/base.py index 95aecf267..baf542c48 100644 --- a/backend/app/channels/base.py +++ b/backend/app/channels/base.py @@ -31,6 +31,10 @@ class Channel(ABC): def is_running(self) -> bool: return self._running + @property + def supports_streaming(self) -> bool: + return False + # -- lifecycle --------------------------------------------------------- @abstractmethod diff --git a/backend/app/channels/dingtalk.py b/backend/app/channels/dingtalk.py new file mode 100644 index 000000000..f2833d4ff --- /dev/null +++ b/backend/app/channels/dingtalk.py @@ -0,0 +1,740 @@ +"""DingTalk channel implementation.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import re +import threading +import time +from pathlib import Path +from typing import Any + +import httpx + +from app.channels.base import Channel +from app.channels.commands import KNOWN_CHANNEL_COMMANDS +from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment + +logger = logging.getLogger(__name__) + +DINGTALK_API_BASE = "https://api.dingtalk.com" + +_TOKEN_REFRESH_MARGIN_SECONDS = 300 + +_CONVERSATION_TYPE_P2P = "1" +_CONVERSATION_TYPE_GROUP = "2" + +_MAX_UPLOAD_SIZE_BYTES = 20 * 1024 * 1024 + + +def _normalize_conversation_type(raw: Any) -> str: + """Normalize ``conversationType`` to ``"1"`` (P2P) or ``"2"`` (group). + + Stream payloads may send int or string values. + """ + if raw is None: + return _CONVERSATION_TYPE_P2P + s = str(raw).strip() + if s == _CONVERSATION_TYPE_GROUP: + return _CONVERSATION_TYPE_GROUP + return _CONVERSATION_TYPE_P2P + + +def _normalize_allowed_users(allowed_users: Any) -> set[str]: + if allowed_users is None: + return set() + if isinstance(allowed_users, str): + values = [allowed_users] + elif isinstance(allowed_users, (list, tuple, set)): + values = allowed_users + else: + logger.warning( + "DingTalk allowed_users should be a list of user IDs; treating %s as one string value", + type(allowed_users).__name__, + ) + values = [allowed_users] + return {str(uid) for uid in values if str(uid)} + + +def _is_dingtalk_command(text: str) -> bool: + if not text.startswith("/"): + return False + return text.split(maxsplit=1)[0].lower() in KNOWN_CHANNEL_COMMANDS + + +def _extract_text_from_rich_text(rich_text_list: list) -> str: + parts: list[str] = [] + for item in rich_text_list: + if isinstance(item, dict) and "text" in item: + parts.append(item["text"]) + return " ".join(parts) + + +_FENCED_CODE_BLOCK_RE = re.compile(r"```(\w*)\n(.*?)```", re.DOTALL) +_INLINE_CODE_RE = re.compile(r"`([^`\n]+)`") +_HORIZONTAL_RULE_RE = re.compile(r"^-{3,}$", re.MULTILINE) +_TABLE_SEPARATOR_RE = re.compile(r"^\|[-:| ]+\|$", re.MULTILINE) + + +def _convert_markdown_table(text: str) -> str: + # DingTalk sampleMarkdown does not render pipe-delimited tables. + lines = text.split("\n") + result: list[str] = [] + i = 0 + while i < len(lines): + line = lines[i] + # Detect table: header row followed by separator row + if i + 1 < len(lines) and line.strip().startswith("|") and _TABLE_SEPARATOR_RE.match(lines[i + 1].strip()): + headers = [h.strip() for h in line.strip().strip("|").split("|")] + i += 2 # skip header + separator + while i < len(lines) and lines[i].strip().startswith("|"): + cells = [c.strip() for c in lines[i].strip().strip("|").split("|")] + for h, c in zip(headers, cells): + result.append(f"> **{h}**: {c}") + result.append("") + i += 1 + else: + result.append(line) + i += 1 + return "\n".join(result) + + +def _adapt_markdown_for_dingtalk(text: str) -> str: + """Adapt markdown for DingTalk's limited sampleMarkdown renderer.""" + + def _code_block_to_quote(match: re.Match) -> str: + lang = match.group(1) + code = match.group(2).rstrip("\n") + prefix = f"> **{lang}**\n" if lang else "" + quoted_lines = "\n".join(f"> {line}" for line in code.split("\n")) + return f"{prefix}{quoted_lines}\n" + + text = _FENCED_CODE_BLOCK_RE.sub(_code_block_to_quote, text) + text = _INLINE_CODE_RE.sub(r"**\1**", text) + text = _convert_markdown_table(text) + text = _HORIZONTAL_RULE_RE.sub("───────────", text) + return text + + +class DingTalkChannel(Channel): + """DingTalk IM channel using Stream Push (WebSocket, no public IP needed).""" + + def __init__(self, bus: MessageBus, config: dict[str, Any]) -> None: + super().__init__(name="dingtalk", bus=bus, config=config) + self._thread: threading.Thread | None = None + self._main_loop: asyncio.AbstractEventLoop | None = None + self._client_id: str = "" + self._client_secret: str = "" + self._allowed_users: set[str] = _normalize_allowed_users(config.get("allowed_users")) + self._cached_token: str = "" + self._token_expires_at: float = 0.0 + self._token_lock = asyncio.Lock() + self._card_template_id: str = config.get("card_template_id", "") + self._card_track_ids: dict[str, str] = {} + self._dingtalk_client: Any = None + self._stream_client: Any = None + self._incoming_messages: dict[str, Any] = {} + self._incoming_messages_lock = threading.Lock() + self._card_repliers: dict[str, Any] = {} + + @property + def supports_streaming(self) -> bool: + return bool(self._card_template_id) + + async def start(self) -> None: + if self._running: + return + + try: + import dingtalk_stream # noqa: F401 + except ImportError: + logger.error("dingtalk-stream is not installed. Install it with: uv add dingtalk-stream") + return + + client_id = self.config.get("client_id", "") + client_secret = self.config.get("client_secret", "") + + if not client_id or not client_secret: + logger.error("DingTalk channel requires client_id and client_secret") + return + + self._client_id = client_id + self._client_secret = client_secret + self._main_loop = asyncio.get_running_loop() + + if self._card_template_id: + logger.info("[DingTalk] AI Card mode enabled (template=%s)", self._card_template_id) + + self._running = True + self.bus.subscribe_outbound(self._on_outbound) + + self._thread = threading.Thread( + target=self._run_stream, + args=(client_id, client_secret), + daemon=True, + ) + self._thread.start() + logger.info("DingTalk channel started") + + async def stop(self) -> None: + self._running = False + self.bus.unsubscribe_outbound(self._on_outbound) + + stream_client = self._stream_client + if stream_client is not None: + try: + if hasattr(stream_client, "disconnect"): + stream_client.disconnect() + except Exception: + logger.debug("[DingTalk] error disconnecting stream client", exc_info=True) + + self._dingtalk_client = None + self._stream_client = None + with self._incoming_messages_lock: + self._incoming_messages.clear() + self._card_repliers.clear() + self._card_track_ids.clear() + if self._thread: + self._thread.join(timeout=5) + self._thread = None + logger.info("DingTalk channel stopped") + + def _resolve_routing(self, msg: OutboundMessage) -> tuple[str, str, str]: + """Return (conversation_type, sender_staff_id, conversation_id). + + Uses msg.chat_id as the primary routing key; metadata as fallback. + """ + conversation_type = _normalize_conversation_type(msg.metadata.get("conversation_type")) + sender_staff_id = msg.metadata.get("sender_staff_id", "") + conversation_id = msg.metadata.get("conversation_id", "") + if conversation_type == _CONVERSATION_TYPE_GROUP: + conversation_id = msg.chat_id or conversation_id + else: + sender_staff_id = msg.chat_id or sender_staff_id + return conversation_type, sender_staff_id, conversation_id + + async def send(self, msg: OutboundMessage, *, _max_retries: int = 3) -> None: + conversation_type, sender_staff_id, conversation_id = self._resolve_routing(msg) + robot_code = self._client_id + + # Card mode: stream update to existing AI card + source_key = self._make_card_source_key_from_outbound(msg) + out_track_id = self._card_track_ids.get(source_key) + + # ``card_template_id`` enables ``runs.stream`` (non-final + final outbounds). + # If card creation failed, skip non-final chunks to avoid duplicate messages. + if self._card_template_id and not out_track_id and not msg.is_final: + return + + if out_track_id: + try: + await self._stream_update_card( + out_track_id, + msg.text, + is_finalize=msg.is_final, + ) + except Exception: + logger.warning("[DingTalk] card stream failed, falling back to sampleMarkdown") + if msg.is_final: + self._card_track_ids.pop(source_key, None) + self._card_repliers.pop(out_track_id, None) + await self._send_markdown_fallback(robot_code, conversation_type, sender_staff_id, conversation_id, msg.text) + return + if msg.is_final: + self._card_track_ids.pop(source_key, None) + self._card_repliers.pop(out_track_id, None) + return + + # Non-card mode: send sampleMarkdown with retry + last_exc: Exception | None = None + for attempt in range(_max_retries): + try: + if conversation_type == _CONVERSATION_TYPE_GROUP: + await self._send_group_message(robot_code, conversation_id, msg.text, at_user_ids=[sender_staff_id] if sender_staff_id else None) + else: + await self._send_p2p_message(robot_code, sender_staff_id, msg.text) + return + except Exception as exc: + last_exc = exc + if attempt < _max_retries - 1: + delay = 2**attempt + logger.warning( + "[DingTalk] send failed (attempt %d/%d), retrying in %ds: %s", + attempt + 1, + _max_retries, + delay, + exc, + ) + await asyncio.sleep(delay) + + logger.error("[DingTalk] send failed after %d attempts: %s", _max_retries, last_exc) + if last_exc is None: + raise RuntimeError("DingTalk send failed without an exception from any attempt") + raise last_exc + + async def _send_markdown_fallback( + self, + robot_code: str, + conversation_type: str, + sender_staff_id: str, + conversation_id: str, + text: str, + ) -> None: + try: + if conversation_type == _CONVERSATION_TYPE_GROUP: + await self._send_group_message(robot_code, conversation_id, text) + else: + await self._send_p2p_message(robot_code, sender_staff_id, text) + except Exception: + logger.exception("[DingTalk] markdown fallback also failed") + raise + + async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool: + if attachment.size > _MAX_UPLOAD_SIZE_BYTES: + logger.warning("[DingTalk] file too large (%d bytes), skipping: %s", attachment.size, attachment.filename) + return False + + conversation_type, sender_staff_id, conversation_id = self._resolve_routing(msg) + robot_code = self._client_id + + try: + media_id = await self._upload_media(attachment.actual_path, "image" if attachment.is_image else "file") + if not media_id: + return False + + if attachment.is_image: + msg_key = "sampleImageMsg" + msg_param = json.dumps({"photoURL": media_id}) + else: + msg_key = "sampleFile" + msg_param = json.dumps( + { + "fileUrl": media_id, + "fileName": attachment.filename, + "fileSize": str(attachment.size), + } + ) + + token = await self._get_access_token() + async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client: + if conversation_type == _CONVERSATION_TYPE_GROUP: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/robot/groupMessages/send", + headers=self._api_headers(token), + json={ + "msgKey": msg_key, + "msgParam": msg_param, + "robotCode": robot_code, + "openConversationId": conversation_id, + }, + ) + else: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/robot/oToMessages/batchSend", + headers=self._api_headers(token), + json={ + "msgKey": msg_key, + "msgParam": msg_param, + "robotCode": robot_code, + "userIds": [sender_staff_id], + }, + ) + response.raise_for_status() + + logger.info("[DingTalk] file sent: %s", attachment.filename) + return True + except (httpx.HTTPError, OSError, ValueError, TypeError, AttributeError): + logger.exception("[DingTalk] failed to send file: %s", attachment.filename) + return False + + # -- stream client (runs in dedicated thread) -------------------------- + + def _run_stream(self, client_id: str, client_secret: str) -> None: + try: + import dingtalk_stream + + credential = dingtalk_stream.Credential(client_id, client_secret) + client = dingtalk_stream.DingTalkStreamClient(credential) + self._stream_client = client + client.register_callback_handler( + dingtalk_stream.chatbot.ChatbotMessage.TOPIC, + _DingTalkMessageHandler(self), + ) + client.start_forever() + except Exception: + if self._running: + logger.exception("DingTalk Stream Push error") + finally: + self._stream_client = None + + def _on_chatbot_message(self, message: Any) -> None: + if not self._running: + return + try: + sender_staff_id = message.sender_staff_id or "" + conversation_type = _normalize_conversation_type(message.conversation_type) + conversation_id = message.conversation_id or "" + msg_id = message.message_id or "" + sender_nick = message.sender_nick or "" + + if self._allowed_users and sender_staff_id not in self._allowed_users: + logger.debug("[DingTalk] ignoring message from non-allowed user: %s", sender_staff_id) + return + + text = self._extract_text(message) + if not text: + logger.info("[DingTalk] empty text, ignoring message") + return + + logger.info( + "[DingTalk] parsed message: conv_type=%s, msg_id=%s, sender=%s(%s), text=%r", + conversation_type, + msg_id, + sender_staff_id, + sender_nick, + text[:100], + ) + + if _is_dingtalk_command(text): + msg_type = InboundMessageType.COMMAND + else: + msg_type = InboundMessageType.CHAT + + # P2P: topic_id=None (single thread per user, like Telegram private chat) + # Group: topic_id=msg_id (each new message starts a new topic, like Feishu) + topic_id: str | None = msg_id if conversation_type == _CONVERSATION_TYPE_GROUP else None + + # chat_id uses conversation_id for groups, sender_staff_id for P2P + chat_id = conversation_id if conversation_type == _CONVERSATION_TYPE_GROUP else sender_staff_id + + inbound = self._make_inbound( + chat_id=chat_id, + user_id=sender_staff_id, + text=text, + msg_type=msg_type, + thread_ts=msg_id, + metadata={ + "conversation_type": conversation_type, + "conversation_id": conversation_id, + "sender_staff_id": sender_staff_id, + "sender_nick": sender_nick, + "message_id": msg_id, + }, + ) + inbound.topic_id = topic_id + + if self._card_template_id: + source_key = self._make_card_source_key(inbound) + with self._incoming_messages_lock: + self._incoming_messages[source_key] = message + + if self._main_loop and self._main_loop.is_running(): + logger.info("[DingTalk] publishing inbound message to bus (type=%s, msg_id=%s)", msg_type.value, msg_id) + fut = asyncio.run_coroutine_threadsafe( + self._prepare_inbound(chat_id, inbound), + self._main_loop, + ) + fut.add_done_callback(lambda f, mid=msg_id: self._log_future_error(f, "prepare_inbound", mid)) + else: + logger.warning("[DingTalk] main loop not running, cannot publish inbound message") + except Exception: + logger.exception("[DingTalk] error processing chatbot message") + + @staticmethod + def _extract_text(message: Any) -> str: + msg_type = message.message_type + if msg_type == "text" and message.text: + return message.text.content.strip() + if msg_type == "richText" and message.rich_text_content: + return _extract_text_from_rich_text(message.rich_text_content.rich_text_list).strip() + return "" + + async def _prepare_inbound(self, chat_id: str, inbound: InboundMessage) -> None: + # Running reply must finish before publish_inbound so AI card tracks are + # registered before the manager emits streaming outbounds. + await self._send_running_reply(chat_id, inbound) + await self.bus.publish_inbound(inbound) + + async def _send_running_reply(self, chat_id: str, inbound: InboundMessage) -> None: + conversation_type = inbound.metadata.get("conversation_type", _CONVERSATION_TYPE_P2P) + sender_staff_id = inbound.metadata.get("sender_staff_id", "") + conversation_id = inbound.metadata.get("conversation_id", "") + text = "\u23f3 Working on it..." + + try: + if self._card_template_id: + source_key = self._make_card_source_key(inbound) + with self._incoming_messages_lock: + chatbot_message = self._incoming_messages.pop(source_key, None) + out_track_id = await self._create_and_deliver_card( + text, + chatbot_message=chatbot_message, + ) + if out_track_id: + self._card_track_ids[source_key] = out_track_id + logger.info("[DingTalk] AI card running reply sent for chat=%s", chat_id) + return + + robot_code = self._client_id + if conversation_type == _CONVERSATION_TYPE_GROUP: + await self._send_text_message_to_group(robot_code, conversation_id, text) + else: + await self._send_text_message_to_user(robot_code, sender_staff_id, text) + logger.info("[DingTalk] 'Working on it...' reply sent for chat=%s", chat_id) + except Exception: + logger.exception("[DingTalk] failed to send running reply for chat=%s", chat_id) + + # -- DingTalk API helpers ---------------------------------------------- + + async def _get_access_token(self) -> str: + if self._cached_token and time.monotonic() < self._token_expires_at: + return self._cached_token + async with self._token_lock: + if self._cached_token and time.monotonic() < self._token_expires_at: + return self._cached_token + async with httpx.AsyncClient(timeout=httpx.Timeout(10.0)) as client: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/oauth2/accessToken", + json={"appKey": self._client_id, "appSecret": self._client_secret}, # DingTalk API field names + ) + response.raise_for_status() + data = response.json() + + if not isinstance(data, dict): + raise ValueError(f"DingTalk access token response must be a JSON object, got {type(data).__name__}") + + access_token = data.get("accessToken") + if not isinstance(access_token, str) or not access_token.strip(): + raise ValueError("DingTalk access token response did not contain a usable accessToken") + + raw_expires_in = data.get("expireIn", 7200) + try: + expires_in = int(raw_expires_in) + except (TypeError, ValueError): + logger.warning("[DingTalk] invalid expireIn value %r, using default 7200s", raw_expires_in) + expires_in = 7200 + + self._cached_token = access_token.strip() + self._token_expires_at = time.monotonic() + expires_in - _TOKEN_REFRESH_MARGIN_SECONDS + return self._cached_token + + @staticmethod + def _api_headers(token: str) -> dict[str, str]: + return { + "x-acs-dingtalk-access-token": token, + "Content-Type": "application/json", + } + + async def _send_text_message_to_user(self, robot_code: str, user_id: str, text: str) -> None: + token = await self._get_access_token() + async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/robot/oToMessages/batchSend", + headers=self._api_headers(token), + json={ + "msgKey": "sampleText", + "msgParam": json.dumps({"content": text}), + "robotCode": robot_code, + "userIds": [user_id], + }, + ) + response.raise_for_status() + + async def _send_text_message_to_group(self, robot_code: str, conversation_id: str, text: str) -> None: + token = await self._get_access_token() + async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/robot/groupMessages/send", + headers=self._api_headers(token), + json={ + "msgKey": "sampleText", + "msgParam": json.dumps({"content": text}), + "robotCode": robot_code, + "openConversationId": conversation_id, + }, + ) + response.raise_for_status() + + async def _send_p2p_message(self, robot_code: str, user_id: str, text: str) -> None: + text = _adapt_markdown_for_dingtalk(text) + token = await self._get_access_token() + async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/robot/oToMessages/batchSend", + headers=self._api_headers(token), + json={ + "msgKey": "sampleMarkdown", + "msgParam": json.dumps({"title": "DeerFlow", "text": text}), + "robotCode": robot_code, + "userIds": [user_id], + }, + ) + response.raise_for_status() + data = response.json() + if data.get("processQueryKey"): + logger.info("[DingTalk] P2P message sent to user=%s", user_id) + else: + logger.warning("[DingTalk] P2P send response: %s", data) + + async def _send_group_message( + self, + robot_code: str, + conversation_id: str, + text: str, + *, + at_user_ids: list[str] | None = None, # noqa: ARG002 + ) -> None: + # at_user_ids accepted for call-site compatibility but not passed to the API + # (sampleMarkdown does not support @mentions). + text = _adapt_markdown_for_dingtalk(text) + token = await self._get_access_token() + + async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/robot/groupMessages/send", + headers=self._api_headers(token), + json={ + "msgKey": "sampleMarkdown", + "msgParam": json.dumps({"title": "DeerFlow", "text": text}), + "robotCode": robot_code, + "openConversationId": conversation_id, + }, + ) + response.raise_for_status() + data = response.json() + if data.get("processQueryKey"): + logger.info("[DingTalk] group message sent to conversation=%s", conversation_id) + else: + logger.warning("[DingTalk] group send response: %s", data) + + # -- AI Card streaming helpers ------------------------------------------- + + def _make_card_source_key(self, inbound: InboundMessage) -> str: + m = inbound.metadata + return f"{m.get('conversation_type', '')}:{m.get('sender_staff_id', '')}:{m.get('conversation_id', '')}:{m.get('message_id', '')}" + + def _make_card_source_key_from_outbound(self, msg: OutboundMessage) -> str: + m = msg.metadata + correlation_id = m.get("message_id") or msg.thread_ts or "" + return f"{m.get('conversation_type', '')}:{m.get('sender_staff_id', '')}:{m.get('conversation_id', '')}:{correlation_id}" + + async def _create_and_deliver_card( + self, + initial_text: str, + *, + chatbot_message: Any = None, + ) -> str | None: + if self._dingtalk_client is None or chatbot_message is None: + logger.warning("[DingTalk] SDK client or chatbot_message unavailable, skipping AI card") + return None + + try: + from dingtalk_stream.card_replier import AICardReplier + except ImportError: + logger.warning("[DingTalk] dingtalk-stream card_replier not available") + return None + + try: + replier = AICardReplier(self._dingtalk_client, chatbot_message) + card_instance_id = await replier.async_create_and_deliver_card( + card_template_id=self._card_template_id, + card_data={"content": initial_text}, + ) + if not card_instance_id: + return None + + self._card_repliers[card_instance_id] = replier + logger.info("[DingTalk] AI card created: outTrackId=%s", card_instance_id) + return card_instance_id + except Exception: + logger.exception("[DingTalk] failed to create AI card") + return None + + async def _stream_update_card( + self, + out_track_id: str, + content: str, + *, + is_finalize: bool = False, + is_error: bool = False, + ) -> None: + replier = self._card_repliers.get(out_track_id) + if not replier: + raise RuntimeError(f"No AICardReplier found for track ID {out_track_id}") + + await replier.async_streaming( + card_instance_id=out_track_id, + content_key="content", + content_value=content, + append=False, + finished=is_finalize, + failed=is_error, + ) + + # -- media upload -------------------------------------------------------- + + async def _upload_media(self, file_path: str | Path, media_type: str) -> str | None: + try: + file_bytes = await asyncio.to_thread(Path(file_path).read_bytes) + token = await self._get_access_token() + async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/files/upload", + headers={"x-acs-dingtalk-access-token": token}, + files={"file": ("upload", file_bytes)}, + data={"type": media_type}, + ) + response.raise_for_status() + try: + payload = response.json() + except json.JSONDecodeError: + logger.exception("[DingTalk] failed to decode upload response JSON: %s", file_path) + return None + if not isinstance(payload, dict): + logger.warning("[DingTalk] unexpected upload response type %s for %s", type(payload).__name__, file_path) + return None + return payload.get("mediaId") + except (httpx.HTTPError, OSError): + logger.exception("[DingTalk] failed to upload media: %s", file_path) + return None + + @staticmethod + def _log_future_error(fut: Any, name: str, msg_id: str) -> None: + try: + exc = fut.exception() + if exc: + logger.error("[DingTalk] %s failed for msg_id=%s: %s", name, msg_id, exc) + except (asyncio.CancelledError, asyncio.InvalidStateError): + pass + + +class _DingTalkMessageHandler: + """Callback handler registered with dingtalk-stream.""" + + def __init__(self, channel: DingTalkChannel) -> None: + self._channel = channel + + def pre_start(self) -> None: + if hasattr(self, "dingtalk_client") and self.dingtalk_client is not None: + self._channel._dingtalk_client = self.dingtalk_client + + async def raw_process(self, callback_message: Any) -> Any: + import dingtalk_stream + from dingtalk_stream.frames import Headers + + code, message = await self.process(callback_message) + ack_message = dingtalk_stream.AckMessage() + ack_message.code = code + ack_message.headers.message_id = callback_message.headers.message_id + ack_message.headers.content_type = Headers.CONTENT_TYPE_APPLICATION_JSON + ack_message.data = {"response": message} + return ack_message + + async def process(self, callback: Any) -> tuple[int, str]: + import dingtalk_stream + + incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data) + self._channel._on_chatbot_message(incoming_message) + return dingtalk_stream.AckMessage.STATUS_OK, "OK" diff --git a/backend/app/channels/feishu.py b/backend/app/channels/feishu.py index 5a80016f0..75892d54d 100644 --- a/backend/app/channels/feishu.py +++ b/backend/app/channels/feishu.py @@ -63,6 +63,10 @@ class FeishuChannel(Channel): self._GetMessageResourceRequest = None self._thread_lock = threading.Lock() + @property + def supports_streaming(self) -> bool: + return True + async def start(self) -> None: if self._running: return diff --git a/backend/app/channels/manager.py b/backend/app/channels/manager.py index 5680943b0..c09b13173 100644 --- a/backend/app/channels/manager.py +++ b/backend/app/channels/manager.py @@ -38,6 +38,7 @@ STREAM_UPDATE_MIN_INTERVAL_SECONDS = 0.35 THREAD_BUSY_MESSAGE = "This conversation is already processing another request. Please wait for it to finish and try again." CHANNEL_CAPABILITIES = { + "dingtalk": {"supports_streaming": False}, "discord": {"supports_streaming": False}, "feishu": {"supports_streaming": True}, "slack": {"supports_streaming": False}, @@ -48,6 +49,13 @@ CHANNEL_CAPABILITIES = { InboundFileReader = Callable[[dict[str, Any], httpx.AsyncClient], Awaitable[bytes | None]] +_METADATA_DROP_KEYS = frozenset({"raw_message", "ref_msg"}) + + +def _slim_metadata(meta: dict[str, Any]) -> dict[str, Any]: + """Return a shallow copy of *meta* with known-large keys removed.""" + return {k: v for k, v in meta.items() if k not in _METADATA_DROP_KEYS} + INBOUND_FILE_READERS: dict[str, InboundFileReader] = {} @@ -543,6 +551,13 @@ class ChannelManager: @staticmethod def _channel_supports_streaming(channel_name: str) -> bool: + from .service import get_channel_service + + service = get_channel_service() + if service: + channel = service.get_channel(channel_name) + if channel is not None: + return channel.supports_streaming return CHANNEL_CAPABILITIES.get(channel_name, {}).get("supports_streaming", False) def _resolve_session_layer(self, msg: InboundMessage) -> tuple[dict[str, Any], dict[str, Any]]: @@ -772,6 +787,7 @@ class ChannelManager: artifacts=artifacts, attachments=attachments, thread_ts=msg.thread_ts, + metadata=_slim_metadata(msg.metadata), ) logger.info("[Manager] publishing outbound message to bus: channel=%s, chat_id=%s", msg.channel_name, msg.chat_id) await self.bus.publish_outbound(outbound) @@ -833,6 +849,7 @@ class ChannelManager: text=latest_text, is_final=False, thread_ts=msg.thread_ts, + metadata=_slim_metadata(msg.metadata), ) ) last_published_text = latest_text @@ -877,6 +894,7 @@ class ChannelManager: attachments=attachments, is_final=True, thread_ts=msg.thread_ts, + metadata=_slim_metadata(msg.metadata), ) ) @@ -935,6 +953,7 @@ class ChannelManager: thread_id=self.store.get_thread_id(msg.channel_name, msg.chat_id) or "", text=reply, thread_ts=msg.thread_ts, + metadata=_slim_metadata(msg.metadata), ) await self.bus.publish_outbound(outbound) @@ -968,5 +987,6 @@ class ChannelManager: thread_id=self.store.get_thread_id(msg.channel_name, msg.chat_id) or "", text=error_text, thread_ts=msg.thread_ts, + metadata=_slim_metadata(msg.metadata), ) await self.bus.publish_outbound(outbound) diff --git a/backend/app/channels/service.py b/backend/app/channels/service.py index 72414e651..4a3df9060 100644 --- a/backend/app/channels/service.py +++ b/backend/app/channels/service.py @@ -18,6 +18,7 @@ if TYPE_CHECKING: # Channel name → import path for lazy loading _CHANNEL_REGISTRY: dict[str, str] = { + "dingtalk": "app.channels.dingtalk:DingTalkChannel", "discord": "app.channels.discord:DiscordChannel", "feishu": "app.channels.feishu:FeishuChannel", "slack": "app.channels.slack:SlackChannel", @@ -28,6 +29,7 @@ _CHANNEL_REGISTRY: dict[str, str] = { # Keys that indicate a user has configured credentials for a channel. _CHANNEL_CREDENTIAL_KEYS: dict[str, list[str]] = { + "dingtalk": ["client_id", "client_secret"], "discord": ["bot_token"], "feishu": ["app_id", "app_secret"], "slack": ["bot_token", "app_token"], @@ -166,11 +168,16 @@ class ChannelService: try: channel = channel_cls(bus=self.bus, config=config) - await channel.start() self._channels[name] = channel + await channel.start() + if not channel.is_running: + self._channels.pop(name, None) + logger.error("Channel %s did not enter a running state after start()", name) + return False logger.info("Channel %s started", name) return True except Exception: + self._channels.pop(name, None) logger.exception("Failed to start channel %s", name) return False diff --git a/backend/app/channels/wecom.py b/backend/app/channels/wecom.py index 5a8948bd4..3e0cdb3d1 100644 --- a/backend/app/channels/wecom.py +++ b/backend/app/channels/wecom.py @@ -29,6 +29,10 @@ class WeComChannel(Channel): self._ws_stream_ids: dict[str, str] = {} self._working_message = "Working on it..." + @property + def supports_streaming(self) -> bool: + return True + def _clear_ws_context(self, thread_ts: str | None) -> None: if not thread_ts: return diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 09fb518e9..64c6e74c3 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -17,6 +17,7 @@ dependencies = [ "langgraph-sdk>=0.1.51", "markdown-to-mrkdwn>=0.3.1", "wecom-aibot-python-sdk>=0.1.6", + "dingtalk-stream>=0.24.3", "bcrypt>=4.0.0", "pyjwt>=2.9.0", "email-validator>=2.0.0", diff --git a/backend/tests/test_channels.py b/backend/tests/test_channels.py index 7cf329bff..20c746928 100644 --- a/backend/tests/test_channels.py +++ b/backend/tests/test_channels.py @@ -462,6 +462,7 @@ class TestChannelManager: ) mock_channel = MagicMock() mock_channel.receive_file = AsyncMock(return_value=modified_msg) + mock_channel.supports_streaming = False mock_service = MagicMock() mock_service.get_channel.return_value = mock_channel monkeypatch.setattr("app.channels.service.get_channel_service", lambda: mock_service) @@ -535,6 +536,89 @@ class TestChannelManager: _run(go()) + def test_handle_chat_outbound_preserves_inbound_metadata(self): + """DingTalk (and similar) need inbound metadata on outbound sends (e.g. sender_staff_id).""" + from app.channels.manager import ChannelManager + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + manager = ChannelManager(bus=bus, store=store) + outbound_received: list[OutboundMessage] = [] + + async def capture_outbound(msg: OutboundMessage) -> None: + outbound_received.append(msg) + + bus.subscribe_outbound(capture_outbound) + mock_client = _make_mock_langgraph_client() + manager._client = mock_client + await manager.start() + + meta = { + "sender_staff_id": "staff_001", + "conversation_type": "1", + "conversation_id": "conv_001", + } + inbound = InboundMessage( + channel_name="test", + chat_id="chat1", + user_id="user1", + text="hi", + metadata=meta, + ) + await bus.publish_inbound(inbound) + await _wait_for(lambda: len(outbound_received) >= 1) + await manager.stop() + + assert len(outbound_received) == 1 + assert outbound_received[0].metadata == meta + + _run(go()) + + def test_handle_chat_outbound_drops_large_metadata_keys(self): + """Large metadata keys like raw_message should be stripped from outbound messages.""" + from app.channels.manager import ChannelManager + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + manager = ChannelManager(bus=bus, store=store) + outbound_received: list[OutboundMessage] = [] + + async def capture_outbound(msg: OutboundMessage) -> None: + outbound_received.append(msg) + + bus.subscribe_outbound(capture_outbound) + mock_client = _make_mock_langgraph_client() + manager._client = mock_client + await manager.start() + + meta = { + "sender_staff_id": "staff_001", + "conversation_type": "1", + "raw_message": {"huge": "payload" * 1000}, + "ref_msg": {"also": "large"}, + } + inbound = InboundMessage( + channel_name="test", + chat_id="chat1", + user_id="user1", + text="hi", + metadata=meta, + ) + await bus.publish_inbound(inbound) + await _wait_for(lambda: len(outbound_received) >= 1) + await manager.stop() + + assert len(outbound_received) == 1 + out_meta = outbound_received[0].metadata + assert "sender_staff_id" in out_meta + assert "conversation_type" in out_meta + assert "raw_message" not in out_meta + assert "ref_msg" not in out_meta + + _run(go()) + def test_handle_chat_uses_channel_session_overrides(self): from app.channels.manager import ChannelManager diff --git a/backend/tests/test_dingtalk_channel.py b/backend/tests/test_dingtalk_channel.py new file mode 100644 index 000000000..235de6db8 --- /dev/null +++ b/backend/tests/test_dingtalk_channel.py @@ -0,0 +1,1554 @@ +"""Tests for the DingTalk channel implementation.""" + +from __future__ import annotations + +import asyncio +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from app.channels.commands import KNOWN_CHANNEL_COMMANDS +from app.channels.dingtalk import ( + _CONVERSATION_TYPE_GROUP, + _CONVERSATION_TYPE_P2P, + DingTalkChannel, + _adapt_markdown_for_dingtalk, + _convert_markdown_table, + _DingTalkMessageHandler, + _extract_text_from_rich_text, + _is_dingtalk_command, + _normalize_allowed_users, + _normalize_conversation_type, +) +from app.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage + + +def _run(coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +# --------------------------------------------------------------------------- +# Helper: build mock ChatbotMessage +# --------------------------------------------------------------------------- + + +def _make_chatbot_message( + *, + text: str = "hello", + message_type: str = "text", + conversation_type: str | int = _CONVERSATION_TYPE_P2P, + sender_staff_id: str = "user_001", + sender_nick: str = "Test User", + conversation_id: str = "conv_001", + message_id: str = "msg_001", + rich_text_list: list | None = None, +): + """Build a minimal mock object mimicking dingtalk_stream.ChatbotMessage.""" + msg = SimpleNamespace() + msg.message_type = message_type + msg.conversation_type = conversation_type + msg.sender_staff_id = sender_staff_id + msg.sender_nick = sender_nick + msg.conversation_id = conversation_id + msg.message_id = message_id + + if message_type == "text": + msg.text = SimpleNamespace(content=text) + msg.rich_text_content = None + elif message_type == "richText": + msg.text = None + msg.rich_text_content = SimpleNamespace(rich_text_list=rich_text_list or []) + else: + msg.text = None + msg.rich_text_content = None + + return msg + + +# --------------------------------------------------------------------------- +# _DingTalkMessageHandler SDK contract +# --------------------------------------------------------------------------- + + +class TestDingTalkMessageHandlerSdkContract: + def test_pre_start_exists_and_noop(self): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + handler = _DingTalkMessageHandler(channel) + handler.pre_start() + + def test_raw_process_returns_ack(self): + pytest.importorskip("dingtalk_stream") + + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._on_chatbot_message = MagicMock() + handler = _DingTalkMessageHandler(channel) + cb = MagicMock() + cb.headers.message_id = "mid-1" + cb.data = { + "msgtype": "text", + "text": {"content": "hi"}, + "senderStaffId": "u1", + "conversationType": "1", + "msgId": "m1", + } + ack = await handler.raw_process(cb) + assert ack.code == 200 + assert ack.headers.message_id == "mid-1" + assert ack.data == {"response": "OK"} + channel._on_chatbot_message.assert_called_once() + + _run(go()) + + +# --------------------------------------------------------------------------- +# _normalize_allowed_users tests +# --------------------------------------------------------------------------- + + +class TestNormalizeAllowedUsers: + def test_none_returns_empty(self): + assert _normalize_allowed_users(None) == set() + + def test_empty_list_returns_empty(self): + assert _normalize_allowed_users([]) == set() + + def test_list_of_strings(self): + result = _normalize_allowed_users(["user1", "user2"]) + assert result == {"user1", "user2"} + + def test_single_string(self): + result = _normalize_allowed_users("user1") + assert result == {"user1"} + + def test_numeric_values_converted_to_string(self): + result = _normalize_allowed_users([123, 456]) + assert result == {"123", "456"} + + def test_scalar_treated_as_single_value(self): + result = _normalize_allowed_users(12345) + assert result == {"12345"} + + +# --------------------------------------------------------------------------- +# _normalize_conversation_type tests +# --------------------------------------------------------------------------- + + +class TestNormalizeConversationType: + def test_group_int_or_str(self): + assert _normalize_conversation_type(2) == _CONVERSATION_TYPE_GROUP + assert _normalize_conversation_type("2") == _CONVERSATION_TYPE_GROUP + + def test_p2p_or_none(self): + assert _normalize_conversation_type(1) == _CONVERSATION_TYPE_P2P + assert _normalize_conversation_type(None) == _CONVERSATION_TYPE_P2P + + +# --------------------------------------------------------------------------- +# _is_dingtalk_command tests +# --------------------------------------------------------------------------- + + +class TestIsDingTalkCommand: + @pytest.mark.parametrize("command", sorted(KNOWN_CHANNEL_COMMANDS)) + def test_known_commands_recognized(self, command): + assert _is_dingtalk_command(command) is True + + @pytest.mark.parametrize( + "text", + [ + "/unknown", + "/mnt/user-data/outputs/report.md", + "hello", + "", + "not a command", + ], + ) + def test_non_commands_rejected(self, text): + assert _is_dingtalk_command(text) is False + + +# --------------------------------------------------------------------------- +# _extract_text_from_rich_text tests +# --------------------------------------------------------------------------- + + +class TestExtractTextFromRichText: + def test_single_text_item(self): + result = _extract_text_from_rich_text([{"text": "hello"}]) + assert result == "hello" + + def test_multiple_text_items(self): + result = _extract_text_from_rich_text([{"text": "hello"}, {"text": "world"}]) + assert result == "hello world" + + def test_non_text_items_ignored(self): + result = _extract_text_from_rich_text( + [ + {"downloadCode": "abc123"}, + {"text": "caption"}, + ] + ) + assert result == "caption" + + def test_empty_list(self): + assert _extract_text_from_rich_text([]) == "" + + +# --------------------------------------------------------------------------- +# DingTalkChannel._extract_text tests +# --------------------------------------------------------------------------- + + +class TestExtractText: + def test_plain_text(self): + msg = _make_chatbot_message(text="Hello World") + assert DingTalkChannel._extract_text(msg) == "Hello World" + + def test_plain_text_stripped(self): + msg = _make_chatbot_message(text=" Hello ") + assert DingTalkChannel._extract_text(msg) == "Hello" + + def test_rich_text(self): + msg = _make_chatbot_message( + message_type="richText", + rich_text_list=[{"text": "Part 1"}, {"text": "Part 2"}], + ) + assert DingTalkChannel._extract_text(msg) == "Part 1 Part 2" + + def test_unknown_type_returns_empty(self): + msg = _make_chatbot_message(message_type="picture") + assert DingTalkChannel._extract_text(msg) == "" + + +# --------------------------------------------------------------------------- +# DingTalkChannel._on_chatbot_message tests (inbound parsing) +# --------------------------------------------------------------------------- + + +class TestOnChatbotMessage: + def test_p2p_message_produces_correct_inbound(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message( + text="hello from dingtalk", + conversation_type=_CONVERSATION_TYPE_P2P, + sender_staff_id="user_001", + message_id="msg_001", + ) + + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + + bus.publish_inbound.assert_awaited_once() + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.channel_name == "dingtalk" + assert inbound.chat_id == "user_001" + assert inbound.user_id == "user_001" + assert inbound.text == "hello from dingtalk" + assert inbound.topic_id is None + assert inbound.metadata["conversation_type"] == _CONVERSATION_TYPE_P2P + assert inbound.metadata["sender_staff_id"] == "user_001" + + _run(go()) + + def test_group_message_produces_correct_inbound(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message( + text="hello group", + conversation_type=_CONVERSATION_TYPE_GROUP, + sender_staff_id="user_002", + conversation_id="conv_group_001", + message_id="msg_group_001", + ) + + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + + bus.publish_inbound.assert_awaited_once() + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.channel_name == "dingtalk" + assert inbound.chat_id == "conv_group_001" + assert inbound.user_id == "user_002" + assert inbound.text == "hello group" + assert inbound.topic_id == "msg_group_001" + assert inbound.metadata["conversation_type"] == _CONVERSATION_TYPE_GROUP + assert inbound.metadata["conversation_id"] == "conv_group_001" + + _run(go()) + + def test_group_message_integer_conversation_type_normalized(self): + """SDK may deliver conversationType as int 2 — must still route as group.""" + + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message( + text="hello group", + conversation_type=2, + sender_staff_id="user_002", + conversation_id="conv_group_001", + message_id="msg_group_002", + ) + + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + + bus.publish_inbound.assert_awaited_once() + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.chat_id == "conv_group_001" + assert inbound.topic_id == "msg_group_002" + assert inbound.metadata["conversation_type"] == _CONVERSATION_TYPE_GROUP + + _run(go()) + + def test_command_classified_correctly(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message(text="/help") + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + + bus.publish_inbound.assert_awaited_once() + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.msg_type == InboundMessageType.COMMAND + + _run(go()) + + def test_non_command_classified_as_chat(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message(text="just chatting") + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + + bus.publish_inbound.assert_awaited_once() + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.msg_type == InboundMessageType.CHAT + + _run(go()) + + def test_empty_text_ignored(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message(text=" ") + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + bus.publish_inbound.assert_not_awaited() + + _run(go()) + + +# --------------------------------------------------------------------------- +# allowed_users filtering tests +# --------------------------------------------------------------------------- + + +class TestAllowedUsersFiltering: + def test_allowed_user_passes(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={"allowed_users": ["user_001"]}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message(sender_staff_id="user_001") + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + bus.publish_inbound.assert_awaited_once() + + _run(go()) + + def test_non_allowed_user_blocked(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={"allowed_users": ["user_001"]}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message(sender_staff_id="user_blocked") + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + bus.publish_inbound.assert_not_awaited() + + _run(go()) + + def test_empty_allowed_users_allows_all(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={"allowed_users": []}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message(sender_staff_id="anyone") + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + bus.publish_inbound.assert_awaited_once() + + _run(go()) + + +# --------------------------------------------------------------------------- +# send routing tests (P2P vs Group) +# --------------------------------------------------------------------------- + + +class TestMarkdownFallbackPropagation: + def test_fallback_raises_on_failure(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._cached_token = "tok" + channel._token_expires_at = float("inf") + + channel._send_p2p_message = AsyncMock(side_effect=ConnectionError("send failed")) + + with pytest.raises(ConnectionError, match="send failed"): + await channel._send_markdown_fallback("test_key", _CONVERSATION_TYPE_P2P, "user_001", "", "hello") + + _run(go()) + + +class TestSendRouting: + def test_p2p_send_uses_oto_endpoint(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + + channel._send_p2p_message = AsyncMock() + channel._send_group_message = AsyncMock() + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="Hello P2P", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + }, + ) + + await channel.send(msg) + + channel._send_p2p_message.assert_awaited_once_with("test_key", "user_001", "Hello P2P") + channel._send_group_message.assert_not_awaited() + + _run(go()) + + def test_group_send_uses_group_endpoint(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + + channel._send_p2p_message = AsyncMock() + channel._send_group_message = AsyncMock() + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="conv_001", + thread_id="thread_001", + text="Hello Group", + metadata={ + "conversation_type": _CONVERSATION_TYPE_GROUP, + "sender_staff_id": "user_001", + "conversation_id": "conv_001", + }, + ) + + await channel.send(msg) + + channel._send_group_message.assert_awaited_once_with("test_key", "conv_001", "Hello Group", at_user_ids=["user_001"]) + channel._send_p2p_message.assert_not_awaited() + + _run(go()) + + def test_default_metadata_uses_p2p(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + + channel._send_p2p_message = AsyncMock() + channel._send_group_message = AsyncMock() + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="Hello", + metadata={}, + ) + + await channel.send(msg) + + channel._send_p2p_message.assert_awaited_once() + channel._send_group_message.assert_not_awaited() + + _run(go()) + + +# --------------------------------------------------------------------------- +# send retry tests +# --------------------------------------------------------------------------- + + +class TestSendRetry: + def test_retries_on_failure_then_succeeds(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + + call_count = 0 + + async def flaky_send(robot_code, user_id, text): + nonlocal call_count + call_count += 1 + if call_count < 3: + raise ConnectionError("network error") + + channel._send_p2p_message = AsyncMock(side_effect=flaky_send) + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="hello", + metadata={"conversation_type": _CONVERSATION_TYPE_P2P, "sender_staff_id": "user_001"}, + ) + + await channel.send(msg) + assert call_count == 3 + + _run(go()) + + def test_raises_after_all_retries_exhausted(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + + channel._send_p2p_message = AsyncMock(side_effect=ConnectionError("fail")) + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="hello", + metadata={"conversation_type": _CONVERSATION_TYPE_P2P, "sender_staff_id": "user_001"}, + ) + + with pytest.raises(ConnectionError): + await channel.send(msg) + + assert channel._send_p2p_message.await_count == 3 + + _run(go()) + + def test_raises_runtime_error_when_no_attempts_configured(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="hello", + metadata={"conversation_type": _CONVERSATION_TYPE_P2P, "sender_staff_id": "user_001"}, + ) + + with pytest.raises(RuntimeError, match="without an exception"): + await channel.send(msg, _max_retries=0) + + _run(go()) + + +# --------------------------------------------------------------------------- +# topic_id mapping tests +# --------------------------------------------------------------------------- + + +class TestTopicIdMapping: + def test_p2p_topic_is_none(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message( + conversation_type=_CONVERSATION_TYPE_P2P, + message_id="msg_p2p_001", + ) + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.topic_id is None + + _run(go()) + + def test_group_topic_is_message_id(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message( + conversation_type=_CONVERSATION_TYPE_GROUP, + message_id="msg_group_001", + conversation_id="conv_001", + ) + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.topic_id == "msg_group_001" + + _run(go()) + + +# --------------------------------------------------------------------------- +# Token caching tests +# --------------------------------------------------------------------------- + + +class TestAccessTokenValidation: + def test_rejects_non_dict_response(self): + async def go(): + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "k" + channel._client_secret = "s" + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + return "not a dict" + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + return FakeResponse() + + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + with pytest.raises(ValueError, match="JSON object"): + await channel._get_access_token() + + _run(go()) + + def test_rejects_empty_access_token(self): + async def go(): + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "k" + channel._client_secret = "s" + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + return {"accessToken": "", "expireIn": 7200} + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + return FakeResponse() + + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + with pytest.raises(ValueError, match="usable accessToken"): + await channel._get_access_token() + + _run(go()) + + def test_invalid_expire_in_uses_default(self): + async def go(): + import time + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "k" + channel._client_secret = "s" + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + return {"accessToken": "tok_ok", "expireIn": "invalid"} + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + return FakeResponse() + + before = time.monotonic() + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + token = await channel._get_access_token() + + assert token == "tok_ok" + assert channel._token_expires_at > before + + _run(go()) + + +class TestTokenCaching: + def test_token_is_cached_across_calls(self): + async def go(): + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + + call_count = 0 + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + return {"accessToken": "tok_abc", "expireIn": 7200} + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + nonlocal call_count + call_count += 1 + return FakeResponse() + + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + t1 = await channel._get_access_token() + t2 = await channel._get_access_token() + + assert t1 == "tok_abc" + assert t2 == "tok_abc" + assert call_count == 1 + + _run(go()) + + +# --------------------------------------------------------------------------- +# Group message @ mention format tests +# --------------------------------------------------------------------------- + + +class TestGroupMessageMarkdownFormat: + def test_at_user_ids_still_use_markdown(self): + """groupMessages/send uses sampleMarkdown; @{userId} in body returns 400 so at_user_ids is ignored.""" + + async def go(): + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + channel._cached_token = "tok_test" + channel._token_expires_at = float("inf") + + captured_json: list[dict] = [] + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + return {"processQueryKey": "ok"} + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + captured_json.append(kwargs.get("json", {})) + return FakeResponse() + + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + await channel._send_group_message("bot", "conv1", "hello", at_user_ids=["staff_001"]) + + assert len(captured_json) == 1 + payload = captured_json[0] + assert payload["msgKey"] == "sampleMarkdown" + import json + + param = json.loads(payload["msgParam"]) + assert param["text"] == "hello" + assert "@" not in json.dumps(param) + + _run(go()) + + def test_no_at_user_ids_uses_markdown(self): + async def go(): + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + channel._cached_token = "tok_test" + channel._token_expires_at = float("inf") + + captured_json: list[dict] = [] + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + return {"processQueryKey": "ok"} + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + captured_json.append(kwargs.get("json", {})) + return FakeResponse() + + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + await channel._send_group_message("bot", "conv1", "hello") + + assert len(captured_json) == 1 + payload = captured_json[0] + assert payload["msgKey"] == "sampleMarkdown" + + _run(go()) + + +class TestAdaptMarkdownForDingtalk: + def test_fenced_code_block_to_blockquote(self): + text = "Hello\n```python\ndef foo():\n return 1\n```\nDone" + result = _adapt_markdown_for_dingtalk(text) + assert "```" not in result + assert "> **python**" in result + assert "> def foo():" in result + assert "> return 1" in result + + def test_fenced_code_block_no_language(self): + text = "```\nplain code\n```" + result = _adapt_markdown_for_dingtalk(text) + assert "```" not in result + assert "> plain code" in result + + def test_inline_code_to_bold(self): + text = "Use `pip install` to install" + result = _adapt_markdown_for_dingtalk(text) + assert result == "Use **pip install** to install" + + def test_horizontal_rule_to_unicode(self): + text = "Above\n---\nBelow" + result = _adapt_markdown_for_dingtalk(text) + assert "───────────" in result + assert "---" not in result + + def test_supported_markdown_preserved(self): + text = "# Title\n**bold** and *italic*\n- list item\n> quote\n[link](http://example.com)" + result = _adapt_markdown_for_dingtalk(text) + assert result == text + + def test_plain_text_unchanged(self): + text = "Hello world, no markdown here." + assert _adapt_markdown_for_dingtalk(text) == text + + def test_combined_elements(self): + text = "# Report\n\nRun `make test` then:\n\n```bash\npytest -v\n```\n\n---\n\nDone." + result = _adapt_markdown_for_dingtalk(text) + assert "# Report" in result + assert "**make test**" in result + assert "> **bash**" in result + assert "> pytest -v" in result + assert "───────────" in result + assert "Done." in result + + +class TestConvertMarkdownTable: + def test_simple_table(self): + text = "| Name | Age |\n|------|-----|\n| Alice | 30 |\n| Bob | 25 |" + result = _convert_markdown_table(text) + assert "> **Name**: Alice" in result + assert "> **Age**: 30" in result + assert "> **Name**: Bob" in result + assert "> **Age**: 25" in result + assert "|" not in result + + def test_table_with_surrounding_text(self): + text = "Results:\n\n| Key | Value |\n|-----|-------|\n| a | 1 |\n\nEnd." + result = _convert_markdown_table(text) + assert "Results:" in result + assert "> **Key**: a" in result + assert "> **Value**: 1" in result + assert "End." in result + + def test_no_table(self): + text = "Just plain text\nwith lines" + assert _convert_markdown_table(text) == text + + def test_alignment_separators(self): + text = "| Left | Center | Right |\n|:-----|:------:|------:|\n| a | b | c |" + result = _convert_markdown_table(text) + assert "> **Left**: a" in result + assert "> **Center**: b" in result + assert "> **Right**: c" in result + + +class TestUploadMediaValidation: + def test_non_dict_response_returns_none(self): + async def go(): + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "k" + channel._client_secret = "s" + channel._cached_token = "tok" + channel._token_expires_at = float("inf") + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + return ["not", "a", "dict"] + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + return FakeResponse() + + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + result = await channel._upload_media("/tmp/test.png", "image") + + assert result is None + + _run(go()) + + def test_json_decode_error_returns_none(self): + async def go(): + import json as json_mod + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "k" + channel._client_secret = "s" + channel._cached_token = "tok" + channel._token_expires_at = float("inf") + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + raise json_mod.JSONDecodeError("err", "", 0) + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + return FakeResponse() + + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + result = await channel._upload_media("/tmp/test.png", "image") + + assert result is None + + _run(go()) + + +class TestChannelRegistration: + def test_dingtalk_in_channel_registry(self): + from app.channels.service import _CHANNEL_REGISTRY + + assert "dingtalk" in _CHANNEL_REGISTRY + assert _CHANNEL_REGISTRY["dingtalk"] == "app.channels.dingtalk:DingTalkChannel" + + def test_dingtalk_in_credential_keys(self): + from app.channels.service import _CHANNEL_CREDENTIAL_KEYS + + assert "dingtalk" in _CHANNEL_CREDENTIAL_KEYS + assert "client_id" in _CHANNEL_CREDENTIAL_KEYS["dingtalk"] + assert "client_secret" in _CHANNEL_CREDENTIAL_KEYS["dingtalk"] + + def test_dingtalk_in_channel_capabilities(self): + from app.channels.manager import CHANNEL_CAPABILITIES + + assert "dingtalk" in CHANNEL_CAPABILITIES + assert CHANNEL_CAPABILITIES["dingtalk"]["supports_streaming"] is False + + +# --------------------------------------------------------------------------- +# AI Card streaming mode tests +# --------------------------------------------------------------------------- + + +class TestCardMode: + def test_card_mode_enabled_supports_streaming(self): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + assert channel.supports_streaming is True + + def test_non_card_mode_no_streaming(self): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + assert channel.supports_streaming is False + + def test_non_card_mode_unchanged(self): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + assert channel._card_template_id == "" + assert channel._card_track_ids == {} + assert channel._card_repliers == {} + assert channel._incoming_messages == {} + assert channel._dingtalk_client is None + + def test_card_source_key_matches_inbound_using_message_id_metadata(self): + """Outbound correlation must match inbound ``message_id`` even if ``thread_ts`` drifts.""" + + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + inbound = channel._make_inbound( + chat_id="x", + user_id="u", + text="hi", + thread_ts="ts_fallback", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + "message_id": "msg_real", + }, + ) + out = OutboundMessage( + channel_name="dingtalk", + chat_id="x", + thread_id="t", + text="ok", + thread_ts="wrong_ts", + metadata=dict(inbound.metadata), + ) + assert channel._make_card_source_key(inbound) == channel._make_card_source_key_from_outbound(out) + + _run(go()) + + def test_running_reply_creates_card(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + + channel._create_and_deliver_card = AsyncMock(return_value="track_001") + + inbound = channel._make_inbound( + chat_id="user_001", + user_id="user_001", + text="hello", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + "message_id": "msg_001", + }, + ) + + mock_chatbot_msg = MagicMock() + source_key = channel._make_card_source_key(inbound) + channel._incoming_messages[source_key] = mock_chatbot_msg + + await channel._send_running_reply("user_001", inbound) + + channel._create_and_deliver_card.assert_awaited_once_with( + "\u23f3 Working on it...", + chatbot_message=mock_chatbot_msg, + ) + assert channel._card_track_ids[source_key] == "track_001" + + _run(go()) + + def test_send_streams_to_card(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + + channel._stream_update_card = AsyncMock() + + # Pre-populate card tracking + source_key = f"{_CONVERSATION_TYPE_P2P}:user_001::msg_001" + channel._card_track_ids[source_key] = "track_001" + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="Partial response...", + is_final=False, + thread_ts="msg_001", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + }, + ) + + await channel.send(msg) + + channel._stream_update_card.assert_awaited_once_with( + "track_001", + "Partial response...", + is_finalize=False, + ) + # Track ID should still exist (not final) + assert source_key in channel._card_track_ids + + _run(go()) + + def test_send_finalizes_card(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + + channel._stream_update_card = AsyncMock() + + source_key = f"{_CONVERSATION_TYPE_P2P}:user_001::msg_001" + channel._card_track_ids[source_key] = "track_001" + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="Final answer.", + is_final=True, + thread_ts="msg_001", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + }, + ) + + await channel.send(msg) + + channel._stream_update_card.assert_awaited_once_with( + "track_001", + "Final answer.", + is_finalize=True, + ) + # Track ID should be cleaned up after final + assert source_key not in channel._card_track_ids + + _run(go()) + + def test_card_mode_skips_markdown_adaptation(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + + raw_markdown = "```python\ndef foo():\n pass\n```" + captured_content: list[str] = [] + + async def capture_stream(out_track_id, content, *, is_finalize=False, is_error=False): + captured_content.append(content) + + channel._stream_update_card = AsyncMock(side_effect=capture_stream) + + source_key = f"{_CONVERSATION_TYPE_P2P}:user_001::msg_001" + channel._card_track_ids[source_key] = "track_001" + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text=raw_markdown, + is_final=True, + thread_ts="msg_001", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + }, + ) + + await channel.send(msg) + + # Raw markdown should be passed through without adaptation + assert captured_content[0] == raw_markdown + + _run(go()) + + def test_card_fallback_on_creation_failure(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + + # Card creation returns None (failure) + channel._create_and_deliver_card = AsyncMock(return_value=None) + channel._send_text_message_to_user = AsyncMock() + + inbound = channel._make_inbound( + chat_id="user_001", + user_id="user_001", + text="hello", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + "message_id": "msg_001", + }, + ) + + source_key = channel._make_card_source_key(inbound) + channel._incoming_messages[source_key] = MagicMock() + + await channel._send_running_reply("user_001", inbound) + + # Should fall through to text message + channel._send_text_message_to_user.assert_awaited_once() + assert len(channel._card_track_ids) == 0 + + _run(go()) + + def test_send_skips_non_final_without_card_track_when_template_configured(self): + """Without a live card track, Manager streaming would duplicate sampleMarkdown sends.""" + + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + channel._send_group_message = AsyncMock() + channel._send_p2p_message = AsyncMock() + + meta = { + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + } + await channel.send( + OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="t1", + text="partial", + is_final=False, + thread_ts="msg_001", + metadata=meta, + ) + ) + channel._send_p2p_message.assert_not_called() + channel._send_group_message.assert_not_called() + + await channel.send( + OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="t1", + text="final answer", + is_final=True, + thread_ts="msg_001", + metadata=meta, + ) + ) + channel._send_p2p_message.assert_awaited_once() + + _run(go()) + + def test_card_fallback_on_stream_failure(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + + channel._stream_update_card = AsyncMock(side_effect=ConnectionError("stream failed")) + channel._send_markdown_fallback = AsyncMock() + + source_key = f"{_CONVERSATION_TYPE_P2P}:user_001::msg_001" + channel._card_track_ids[source_key] = "track_001" + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="Final answer.", + is_final=True, + thread_ts="msg_001", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + }, + ) + + await channel.send(msg) + + # Should fallback to markdown + channel._send_markdown_fallback.assert_awaited_once_with( + "test_key", + _CONVERSATION_TYPE_P2P, + "user_001", + "", + "Final answer.", + ) + # Track ID should be cleaned up + assert source_key not in channel._card_track_ids + + _run(go()) + + def test_pre_start_stores_dingtalk_client(self): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + handler = _DingTalkMessageHandler(channel) + + mock_client = MagicMock() + handler.dingtalk_client = mock_client + handler.pre_start() + + assert channel._dingtalk_client is mock_client + + def test_chatbot_message_stored_for_card_mode(self): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + + mock_message = MagicMock() + mock_message.sender_staff_id = "user_001" + mock_message.conversation_type = "1" + mock_message.conversation_id = "" + mock_message.message_id = "msg_001" + mock_message.sender_nick = "TestUser" + mock_message.message_type = "text" + mock_message.text = MagicMock(content="hello") + mock_message.rich_text_content = None + + channel._main_loop = MagicMock() + channel._main_loop.is_running.return_value = False + channel._allowed_users = set() + channel._running = True + + channel._on_chatbot_message(mock_message) + + assert len(channel._incoming_messages) == 1 + stored_msg = list(channel._incoming_messages.values())[0] + assert stored_msg is mock_message + + def test_card_replier_cleanup_on_final(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + + channel._stream_update_card = AsyncMock() + + source_key = f"{_CONVERSATION_TYPE_P2P}:user_001::msg_001" + channel._card_track_ids[source_key] = "track_001" + channel._card_repliers["track_001"] = MagicMock() + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="Final answer.", + is_final=True, + thread_ts="msg_001", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + }, + ) + + await channel.send(msg) + + assert source_key not in channel._card_track_ids + assert "track_001" not in channel._card_repliers + + _run(go()) + + def test_card_creation_without_sdk_client_returns_none(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._dingtalk_client = None + + result = await channel._create_and_deliver_card( + "test", + chatbot_message=MagicMock(), + ) + assert result is None + + _run(go()) + + def test_card_creation_without_chatbot_message_returns_none(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._dingtalk_client = MagicMock() + + result = await channel._create_and_deliver_card( + "test", + chatbot_message=None, + ) + assert result is None + + _run(go()) + + def test_stream_update_card_raises_without_replier(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + + with pytest.raises(RuntimeError, match="No AICardReplier found"): + await channel._stream_update_card("nonexistent_track", "content") + + _run(go()) + + def test_stop_clears_card_state(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._running = True + channel._dingtalk_client = MagicMock() + channel._incoming_messages["key"] = MagicMock() + channel._card_repliers["track"] = MagicMock() + channel._card_track_ids["source"] = "track" + + await channel.stop() + + assert channel._dingtalk_client is None + assert channel._incoming_messages == {} + assert channel._card_repliers == {} + assert channel._card_track_ids == {} + + _run(go()) diff --git a/backend/uv.lock b/backend/uv.lock index e0ea058c0..378bbb842 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -746,6 +746,7 @@ source = { virtual = "." } dependencies = [ { name = "bcrypt" }, { name = "deerflow-harness" }, + { name = "dingtalk-stream" }, { name = "email-validator" }, { name = "fastapi" }, { name = "httpx" }, @@ -779,6 +780,7 @@ requires-dist = [ { name = "bcrypt", specifier = ">=4.0.0" }, { name = "deerflow-harness", editable = "packages/harness" }, { name = "deerflow-harness", extras = ["postgres"], marker = "extra == 'postgres'", editable = "packages/harness" }, + { name = "dingtalk-stream", specifier = ">=0.24.3" }, { name = "email-validator", specifier = ">=2.0.0" }, { name = "fastapi", specifier = ">=0.115.0" }, { name = "httpx", specifier = ">=0.28.0" }, @@ -908,6 +910,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/07/6c/aa3f2f849e01cb6a001cd8554a88d4c77c5c1a31c95bdf1cf9301e6d9ef4/defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61", size = 25604, upload-time = "2021-03-08T10:59:24.45Z" }, ] +[[package]] +name = "dingtalk-stream" +version = "0.24.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiohttp" }, + { name = "requests" }, + { name = "websockets" }, +] +wheels = [ + { url = "https://files.pythonhosted.org/packages/4c/44/102dede3f371277598df6aa9725b82e3add068c729333c7a5dbc12764579/dingtalk_stream-0.24.3-py3-none-any.whl", hash = "sha256:2160403656985962878bf60cdf5adf41619f21067348e06f07a7c7eebf5943ad", size = 27813, upload-time = "2025-10-24T09:36:57.497Z" }, +] + [[package]] name = "distro" version = "1.9.0" diff --git a/config.example.yaml b/config.example.yaml index 0b72ab80f..a14ca5886 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -964,6 +964,13 @@ run_events: # enabled: false # bot_id: $WECOM_BOT_ID # bot_secret: $WECOM_BOT_SECRET +# +# dingtalk: +# enabled: false +# client_id: $DINGTALK_CLIENT_ID +# client_secret: $DINGTALK_CLIENT_SECRET +# allowed_users: [] # empty = allow all +# card_template_id: "" # Optional: AI Card template ID for streaming updates # ============================================================================ # Guardrails Configuration