diff --git a/.env.example b/.env.example index 89d169631..f443818b3 100644 --- a/.env.example +++ b/.env.example @@ -40,6 +40,8 @@ INFOQUEST_API_KEY=your-infoquest-api-key # # WECOM_BOT_ID=your-wecom-bot-id # WECOM_BOT_SECRET=your-wecom-bot-secret +# DINGTALK_CLIENT_ID=your-dingtalk-client-id +# DINGTALK_CLIENT_SECRET=your-dingtalk-client-secret # Set to "false" to disable Swagger UI, ReDoc, and OpenAPI schema in production # GATEWAY_ENABLE_DOCS=false diff --git a/README.md b/README.md index b64c75f97..c67fdc005 100644 --- a/README.md +++ b/README.md @@ -345,6 +345,7 @@ DeerFlow supports receiving tasks from messaging apps. Channels auto-start when | Feishu / Lark | WebSocket | Moderate | | WeChat | Tencent iLink (long-polling) | Moderate | | WeCom | WebSocket | Moderate | +| DingTalk | Stream Push (WebSocket) | Moderate | **Configuration in `config.yaml`:** @@ -414,6 +415,13 @@ channels: context: thinking_enabled: true subagent_enabled: true + + dingtalk: + enabled: true + client_id: $DINGTALK_CLIENT_ID # Client ID of your DingTalk application + client_secret: $DINGTALK_CLIENT_SECRET # Client Secret of your DingTalk application + allowed_users: [] # empty = allow all + card_template_id: "" # Optional: AI Card template ID for streaming typewriter effect ``` Notes: @@ -442,6 +450,10 @@ WECHAT_ILINK_BOT_ID=your_ilink_bot_id # WeCom WECOM_BOT_ID=your_bot_id WECOM_BOT_SECRET=your_bot_secret + +# DingTalk +DINGTALK_CLIENT_ID=your_client_id +DINGTALK_CLIENT_SECRET=your_client_secret ``` **Telegram Setup** @@ -480,6 +492,14 @@ WECOM_BOT_SECRET=your_bot_secret 4. Make sure backend dependencies include `wecom-aibot-python-sdk`. The channel uses a WebSocket long connection and does not require a public callback URL. 5. The current integration supports inbound text, image, and file messages. Final images/files generated by the agent are also sent back to the WeCom conversation. +**DingTalk Setup** + +1. Create a DingTalk application in the [DingTalk Developer Console](https://open.dingtalk.com/) and enable **Robot** capability. +2. Set the message receiving mode to **Stream Mode** in the robot configuration page. +3. Copy the `Client ID` and `Client Secret`, set `DINGTALK_CLIENT_ID` and `DINGTALK_CLIENT_SECRET` in `.env`, and enable the channel in `config.yaml`. +4. *(Optional)* To enable streaming AI Card replies (typewriter effect), create an **AI Card** template on the [DingTalk Card Platform](https://open.dingtalk.com/document/dingstart/typewriter-effect-streaming-ai-card), then set `card_template_id` in `config.yaml` to the template ID. You also need to apply for the `Card.Streaming.Write` and `Card.Instance.Write` permissions. + + When DeerFlow runs in Docker Compose, IM channels execute inside the `gateway` container. In that case, do not point `channels.langgraph_url` or `channels.gateway_url` at `localhost`; use container service names such as `http://gateway:8001/api` and `http://gateway:8001`, or set `DEER_FLOW_CHANNELS_LANGGRAPH_URL` and `DEER_FLOW_CHANNELS_GATEWAY_URL`. **Commands** diff --git a/README_fr.md b/README_fr.md index e7684a5f8..3b8dc3d41 100644 --- a/README_fr.md +++ b/README_fr.md @@ -290,6 +290,7 @@ DeerFlow peut recevoir des tâches depuis des applications de messagerie. Les ca | Telegram | Bot API (long-polling) | Facile | | Slack | Socket Mode | Modérée | | Feishu / Lark | WebSocket | Modérée | +| DingTalk | Stream Push (WebSocket) | Modérée | **Configuration dans `config.yaml` :** @@ -341,6 +342,13 @@ channels: context: thinking_enabled: true subagent_enabled: true + + dingtalk: + enabled: true + client_id: $DINGTALK_CLIENT_ID # ClientId depuis DingTalk Open Platform + client_secret: $DINGTALK_CLIENT_SECRET # ClientSecret depuis DingTalk Open Platform + allowed_users: [] # vide = tout le monde autorisé + card_template_id: "" # Optionnel : ID de modèle AI Card pour l'effet machine à écrire en streaming ``` Définissez les clés API correspondantes dans votre fichier `.env` : @@ -356,6 +364,10 @@ SLACK_APP_TOKEN=xapp-... # Feishu / Lark FEISHU_APP_ID=cli_xxxx FEISHU_APP_SECRET=your_app_secret + +# DingTalk +DINGTALK_CLIENT_ID=your_client_id +DINGTALK_CLIENT_SECRET=your_client_secret ``` **Configuration Telegram** @@ -378,6 +390,13 @@ FEISHU_APP_SECRET=your_app_secret 3. Dans **Events**, abonnez-vous à `im.message.receive_v1` et sélectionnez le mode **Long Connection**. 4. Copiez l'App ID et l'App Secret. Définissez `FEISHU_APP_ID` et `FEISHU_APP_SECRET` dans `.env` et activez le canal dans `config.yaml`. +**Configuration DingTalk** + +1. Créez une application sur [DingTalk Open Platform](https://open.dingtalk.com/) et activez la capacité **Robot**. +2. Dans la page de configuration du robot, définissez le mode de réception des messages sur **Stream**. +3. Copiez le `Client ID` et le `Client Secret`. Définissez `DINGTALK_CLIENT_ID` et `DINGTALK_CLIENT_SECRET` dans `.env` et activez le canal dans `config.yaml`. +4. *(Optionnel)* Pour activer les réponses en streaming AI Card (effet machine à écrire), créez un modèle **AI Card** sur la [plateforme de cartes DingTalk](https://open.dingtalk.com/document/dingstart/typewriter-effect-streaming-ai-card), puis définissez `card_template_id` dans `config.yaml` avec l'ID du modèle. Vous devez également demander les permissions `Card.Streaming.Write` et `Card.Instance.Write`. + **Commandes** Une fois un canal connecté, vous pouvez interagir avec DeerFlow directement depuis le chat : diff --git a/README_ja.md b/README_ja.md index 3e0ff4c85..d2ba81750 100644 --- a/README_ja.md +++ b/README_ja.md @@ -243,6 +243,7 @@ DeerFlowはメッセージングアプリからのタスク受信をサポート | Telegram | Bot API(ロングポーリング) | 簡単 | | Slack | Socket Mode | 中程度 | | Feishu / Lark | WebSocket | 中程度 | +| DingTalk | Stream Push(WebSocket) | 中程度 | **`config.yaml`での設定:** @@ -294,6 +295,13 @@ channels: context: thinking_enabled: true subagent_enabled: true + + dingtalk: + enabled: true + client_id: $DINGTALK_CLIENT_ID # DingTalk Open PlatformのClientId + client_secret: $DINGTALK_CLIENT_SECRET # DingTalk Open PlatformのClientSecret + allowed_users: [] # 空 = 全員許可 + card_template_id: "" # オプション:ストリーミングタイプライター効果用のAIカードテンプレートID ``` 対応するAPIキーを`.env`ファイルに設定します: @@ -309,6 +317,10 @@ SLACK_APP_TOKEN=xapp-... # Feishu / Lark FEISHU_APP_ID=cli_xxxx FEISHU_APP_SECRET=your_app_secret + +# DingTalk +DINGTALK_CLIENT_ID=your_client_id +DINGTALK_CLIENT_SECRET=your_client_secret ``` **Telegramのセットアップ** @@ -331,6 +343,13 @@ FEISHU_APP_SECRET=your_app_secret 3. **イベント**で`im.message.receive_v1`を購読し、**ロングコネクション**モードを選択。 4. App IDとApp Secretをコピー。`.env`に`FEISHU_APP_ID`と`FEISHU_APP_SECRET`を設定し、`config.yaml`でチャネルを有効にします。 +**DingTalkのセットアップ** + +1. [DingTalk Open Platform](https://open.dingtalk.com/)でアプリを作成し、**ロボット**機能を有効化します。 +2. ロボット設定ページでメッセージ受信モードを**Streamモード**に設定します。 +3. `Client ID`と`Client Secret`をコピー。`.env`に`DINGTALK_CLIENT_ID`と`DINGTALK_CLIENT_SECRET`を設定し、`config.yaml`でチャネルを有効にします。 +4. *(オプション)* ストリーミングAIカード返信(タイプライター効果)を有効にするには、[DingTalkカードプラットフォーム](https://open.dingtalk.com/document/dingstart/typewriter-effect-streaming-ai-card)で**AIカード**テンプレートを作成し、`config.yaml`の`card_template_id`にテンプレートIDを設定します。`Card.Streaming.Write` および `Card.Instance.Write` 権限の申請も必要です。 + **コマンド** チャネル接続後、チャットから直接DeerFlowと対話できます: diff --git a/README_ru.md b/README_ru.md index 6ee30ebc6..e74feaeac 100644 --- a/README_ru.md +++ b/README_ru.md @@ -256,6 +256,7 @@ DeerFlow принимает задачи прямо из мессенджеро | Telegram | Bot API (long-polling) | Просто | | Slack | Socket Mode | Средне | | Feishu / Lark | WebSocket | Средне | +| DingTalk | Stream Push (WebSocket) | Средне | **Конфигурация в `config.yaml`:** @@ -278,6 +279,13 @@ channels: enabled: true bot_token: $TELEGRAM_BOT_TOKEN allowed_users: [] + + dingtalk: + enabled: true + client_id: $DINGTALK_CLIENT_ID # ClientId с DingTalk Open Platform + client_secret: $DINGTALK_CLIENT_SECRET # ClientSecret с DingTalk Open Platform + allowed_users: [] # пусто = разрешить всем + card_template_id: "" # Опционально: ID шаблона AI Card для потокового эффекта печатной машинки ``` **Настройка Telegram** @@ -285,6 +293,13 @@ channels: 1. Напишите [@BotFather](https://t.me/BotFather), отправьте `/newbot` и скопируйте HTTP API-токен. 2. Укажите `TELEGRAM_BOT_TOKEN` в `.env` и включите канал в `config.yaml`. +**Настройка DingTalk** + +1. Создайте приложение на [DingTalk Open Platform](https://open.dingtalk.com/) и включите возможность **Робот**. +2. На странице настроек робота установите режим приёма сообщений на **Stream**. +3. Скопируйте `Client ID` и `Client Secret`. Укажите `DINGTALK_CLIENT_ID` и `DINGTALK_CLIENT_SECRET` в `.env` и включите канал в `config.yaml`. +4. *(Опционально)* Для включения потоковых ответов AI Card (эффект печатной машинки) создайте шаблон **AI Card** на [платформе карточек DingTalk](https://open.dingtalk.com/document/dingstart/typewriter-effect-streaming-ai-card), затем укажите `card_template_id` в `config.yaml` с ID шаблона. Также необходимо запросить разрешения `Card.Streaming.Write` и `Card.Instance.Write`. + **Доступные команды** | Команда | Описание | diff --git a/README_zh.md b/README_zh.md index f6043ff86..6e4a618c7 100644 --- a/README_zh.md +++ b/README_zh.md @@ -248,6 +248,7 @@ DeerFlow 支持从即时通讯应用接收任务。只要配置完成,对应 | Slack | Socket Mode | 中等 | | Feishu / Lark | WebSocket | 中等 | | 企业微信智能机器人 | WebSocket | 中等 | +| 钉钉 | Stream Push(WebSocket) | 中等 | **`config.yaml` 中的配置示例:** @@ -304,6 +305,13 @@ channels: context: thinking_enabled: true subagent_enabled: true + + dingtalk: + enabled: true + client_id: $DINGTALK_CLIENT_ID # 钉钉开放平台 ClientId + client_secret: $DINGTALK_CLIENT_SECRET # 钉钉开放平台 ClientSecret + allowed_users: [] # 留空表示允许所有人 + card_template_id: "" # 可选:AI 卡片模板 ID,用于流式打字机效果 ``` 说明: @@ -327,6 +335,10 @@ FEISHU_APP_SECRET=your_app_secret # 企业微信智能机器人 WECOM_BOT_ID=your_bot_id WECOM_BOT_SECRET=your_bot_secret + +# 钉钉 +DINGTALK_CLIENT_ID=your_client_id +DINGTALK_CLIENT_SECRET=your_client_secret ``` **Telegram 配置** @@ -357,6 +369,13 @@ WECOM_BOT_SECRET=your_bot_secret 4. 安装后端依赖时确保包含 `wecom-aibot-python-sdk`,渠道会通过 WebSocket 长连接接收消息,无需公网回调地址。 5. 当前支持文本、图片和文件入站消息;agent 生成的最终图片/文件也会回传到企业微信会话中。 +**钉钉配置** + +1. 在 [钉钉开放平台](https://open.dingtalk.com/) 创建应用,并启用 **机器人** 能力。 +2. 在机器人配置页面设置消息接收模式为 **Stream模式**。 +3. 复制 `Client ID` 和 `Client Secret`,在 `.env` 中设置 `DINGTALK_CLIENT_ID` 和 `DINGTALK_CLIENT_SECRET`,并在 `config.yaml` 中启用该渠道。 +4. *(可选)* 如需开启流式 AI 卡片回复(打字机效果),请在[钉钉卡片平台](https://open.dingtalk.com/document/dingstart/typewriter-effect-streaming-ai-card)创建 **AI 卡片**模板,然后在 `config.yaml` 中将 `card_template_id` 设为该模板 ID。同时需要申请 `Card.Streaming.Write` 和 `Card.Instance.Write` 权限。 + **命令** 渠道连接完成后,你可以直接在聊天窗口里和 DeerFlow 交互: diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 5a786ac95..b185ce4a1 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -112,7 +112,7 @@ CI runs these regression tests for every pull request via [.github/workflows/bac The backend is split into two layers with a strict dependency direction: - **Harness** (`packages/harness/deerflow/`): Publishable agent framework package (`deerflow-harness`). Import prefix: `deerflow.*`. Contains agent orchestration, tools, sandbox, models, MCP, skills, config — everything needed to build and run agents. -- **App** (`app/`): Unpublished application code. Import prefix: `app.*`. Contains the FastAPI Gateway API and IM channel integrations (Feishu, Slack, Telegram). +- **App** (`app/`): Unpublished application code. Import prefix: `app.*`. Contains the FastAPI Gateway API and IM channel integrations (Feishu, Slack, Telegram, DingTalk). **Dependency rule**: App imports deerflow, but deerflow never imports app. This boundary is enforced by `tests/test_harness_boundary.py` which runs in CI. @@ -312,7 +312,8 @@ Proxied through nginx: `/api/langgraph/*` → LangGraph, all other `/api/*` → ### IM Channels System (`app/channels/`) -Bridges external messaging platforms (Feishu, Slack, Telegram) to the DeerFlow agent via Gateway's LangGraph-compatible API. +Bridges external messaging platforms (Feishu, Slack, Telegram, DingTalk) to the DeerFlow agent via the LangGraph Server. + **Architecture**: Channels communicate with Gateway through the `langgraph-sdk` HTTP client (same as the frontend), ensuring threads are created and managed server-side. The internal SDK client injects process-local internal auth plus a matching CSRF cookie/header pair so Gateway accepts state-changing thread/run requests from channel workers without relying on browser session cookies. @@ -322,7 +323,7 @@ Bridges external messaging platforms (Feishu, Slack, Telegram) to the DeerFlow a - `manager.py` - Core dispatcher: creates threads via `client.threads.create()`, routes commands, keeps Slack/Telegram on `client.runs.wait()`, and uses `client.runs.stream(["messages-tuple", "values"])` for Feishu incremental outbound updates - `base.py` - Abstract `Channel` base class (start/stop/send lifecycle) - `service.py` - Manages lifecycle of all configured channels from `config.yaml` -- `slack.py` / `feishu.py` / `telegram.py` - Platform-specific implementations (`feishu.py` tracks the running card `message_id` in memory and patches the same card in place) +- `slack.py` / `feishu.py` / `telegram.py` / `dingtalk.py` - Platform-specific implementations (`feishu.py` tracks the running card `message_id` in memory and patches the same card in place; `dingtalk.py` optionally uses AI Card streaming for in-place updates when `card_template_id` is configured) **Message Flow**: 1. External platform -> Channel impl -> `MessageBus.publish_inbound()` @@ -331,14 +332,16 @@ Bridges external messaging platforms (Feishu, Slack, Telegram) to the DeerFlow a 4. Feishu chat: `runs.stream()` → accumulate AI text → publish multiple outbound updates (`is_final=False`) → publish final outbound (`is_final=True`) 5. Slack/Telegram chat: `runs.wait()` → extract final response → publish outbound 6. Feishu channel sends one running reply card up front, then patches the same card for each outbound update (card JSON sets `config.update_multi=true` for Feishu's patch API requirement) -7. For commands (`/new`, `/status`, `/models`, `/memory`, `/help`): handle locally or query Gateway API -8. Outbound → channel callbacks → platform reply +7. DingTalk AI Card mode (when `card_template_id` configured): `runs.stream()` → create card with initial text → stream updates via `PUT /v1.0/card/streaming` → finalize on `is_final=True`. Falls back to `sampleMarkdown` if card creation or streaming fails +8. For commands (`/new`, `/status`, `/models`, `/memory`, `/help`): handle locally or query Gateway API +9. Outbound → channel callbacks → platform reply **Configuration** (`config.yaml` -> `channels`): - `langgraph_url` - LangGraph-compatible Gateway API base URL (default: `http://localhost:8001/api`) - `gateway_url` - Gateway API URL for auxiliary commands (default: `http://localhost:8001`) - In Docker Compose, IM channels run inside the `gateway` container, so `localhost` points back to that container. Use `http://gateway:8001/api` for `langgraph_url` and `http://gateway:8001` for `gateway_url`, or set `DEER_FLOW_CHANNELS_LANGGRAPH_URL` / `DEER_FLOW_CHANNELS_GATEWAY_URL`. -- Per-channel configs: `feishu` (app_id, app_secret), `slack` (bot_token, app_token), `telegram` (bot_token) +- Per-channel configs: `feishu` (app_id, app_secret), `slack` (bot_token, app_token), `telegram` (bot_token), `dingtalk` (client_id, client_secret, optional `card_template_id` for AI Card streaming) + ### Memory System (`packages/harness/deerflow/agents/memory/`) diff --git a/backend/app/channels/base.py b/backend/app/channels/base.py index 95aecf267..baf542c48 100644 --- a/backend/app/channels/base.py +++ b/backend/app/channels/base.py @@ -31,6 +31,10 @@ class Channel(ABC): def is_running(self) -> bool: return self._running + @property + def supports_streaming(self) -> bool: + return False + # -- lifecycle --------------------------------------------------------- @abstractmethod diff --git a/backend/app/channels/dingtalk.py b/backend/app/channels/dingtalk.py new file mode 100644 index 000000000..f2833d4ff --- /dev/null +++ b/backend/app/channels/dingtalk.py @@ -0,0 +1,740 @@ +"""DingTalk channel implementation.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import re +import threading +import time +from pathlib import Path +from typing import Any + +import httpx + +from app.channels.base import Channel +from app.channels.commands import KNOWN_CHANNEL_COMMANDS +from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment + +logger = logging.getLogger(__name__) + +DINGTALK_API_BASE = "https://api.dingtalk.com" + +_TOKEN_REFRESH_MARGIN_SECONDS = 300 + +_CONVERSATION_TYPE_P2P = "1" +_CONVERSATION_TYPE_GROUP = "2" + +_MAX_UPLOAD_SIZE_BYTES = 20 * 1024 * 1024 + + +def _normalize_conversation_type(raw: Any) -> str: + """Normalize ``conversationType`` to ``"1"`` (P2P) or ``"2"`` (group). + + Stream payloads may send int or string values. + """ + if raw is None: + return _CONVERSATION_TYPE_P2P + s = str(raw).strip() + if s == _CONVERSATION_TYPE_GROUP: + return _CONVERSATION_TYPE_GROUP + return _CONVERSATION_TYPE_P2P + + +def _normalize_allowed_users(allowed_users: Any) -> set[str]: + if allowed_users is None: + return set() + if isinstance(allowed_users, str): + values = [allowed_users] + elif isinstance(allowed_users, (list, tuple, set)): + values = allowed_users + else: + logger.warning( + "DingTalk allowed_users should be a list of user IDs; treating %s as one string value", + type(allowed_users).__name__, + ) + values = [allowed_users] + return {str(uid) for uid in values if str(uid)} + + +def _is_dingtalk_command(text: str) -> bool: + if not text.startswith("/"): + return False + return text.split(maxsplit=1)[0].lower() in KNOWN_CHANNEL_COMMANDS + + +def _extract_text_from_rich_text(rich_text_list: list) -> str: + parts: list[str] = [] + for item in rich_text_list: + if isinstance(item, dict) and "text" in item: + parts.append(item["text"]) + return " ".join(parts) + + +_FENCED_CODE_BLOCK_RE = re.compile(r"```(\w*)\n(.*?)```", re.DOTALL) +_INLINE_CODE_RE = re.compile(r"`([^`\n]+)`") +_HORIZONTAL_RULE_RE = re.compile(r"^-{3,}$", re.MULTILINE) +_TABLE_SEPARATOR_RE = re.compile(r"^\|[-:| ]+\|$", re.MULTILINE) + + +def _convert_markdown_table(text: str) -> str: + # DingTalk sampleMarkdown does not render pipe-delimited tables. + lines = text.split("\n") + result: list[str] = [] + i = 0 + while i < len(lines): + line = lines[i] + # Detect table: header row followed by separator row + if i + 1 < len(lines) and line.strip().startswith("|") and _TABLE_SEPARATOR_RE.match(lines[i + 1].strip()): + headers = [h.strip() for h in line.strip().strip("|").split("|")] + i += 2 # skip header + separator + while i < len(lines) and lines[i].strip().startswith("|"): + cells = [c.strip() for c in lines[i].strip().strip("|").split("|")] + for h, c in zip(headers, cells): + result.append(f"> **{h}**: {c}") + result.append("") + i += 1 + else: + result.append(line) + i += 1 + return "\n".join(result) + + +def _adapt_markdown_for_dingtalk(text: str) -> str: + """Adapt markdown for DingTalk's limited sampleMarkdown renderer.""" + + def _code_block_to_quote(match: re.Match) -> str: + lang = match.group(1) + code = match.group(2).rstrip("\n") + prefix = f"> **{lang}**\n" if lang else "" + quoted_lines = "\n".join(f"> {line}" for line in code.split("\n")) + return f"{prefix}{quoted_lines}\n" + + text = _FENCED_CODE_BLOCK_RE.sub(_code_block_to_quote, text) + text = _INLINE_CODE_RE.sub(r"**\1**", text) + text = _convert_markdown_table(text) + text = _HORIZONTAL_RULE_RE.sub("───────────", text) + return text + + +class DingTalkChannel(Channel): + """DingTalk IM channel using Stream Push (WebSocket, no public IP needed).""" + + def __init__(self, bus: MessageBus, config: dict[str, Any]) -> None: + super().__init__(name="dingtalk", bus=bus, config=config) + self._thread: threading.Thread | None = None + self._main_loop: asyncio.AbstractEventLoop | None = None + self._client_id: str = "" + self._client_secret: str = "" + self._allowed_users: set[str] = _normalize_allowed_users(config.get("allowed_users")) + self._cached_token: str = "" + self._token_expires_at: float = 0.0 + self._token_lock = asyncio.Lock() + self._card_template_id: str = config.get("card_template_id", "") + self._card_track_ids: dict[str, str] = {} + self._dingtalk_client: Any = None + self._stream_client: Any = None + self._incoming_messages: dict[str, Any] = {} + self._incoming_messages_lock = threading.Lock() + self._card_repliers: dict[str, Any] = {} + + @property + def supports_streaming(self) -> bool: + return bool(self._card_template_id) + + async def start(self) -> None: + if self._running: + return + + try: + import dingtalk_stream # noqa: F401 + except ImportError: + logger.error("dingtalk-stream is not installed. Install it with: uv add dingtalk-stream") + return + + client_id = self.config.get("client_id", "") + client_secret = self.config.get("client_secret", "") + + if not client_id or not client_secret: + logger.error("DingTalk channel requires client_id and client_secret") + return + + self._client_id = client_id + self._client_secret = client_secret + self._main_loop = asyncio.get_running_loop() + + if self._card_template_id: + logger.info("[DingTalk] AI Card mode enabled (template=%s)", self._card_template_id) + + self._running = True + self.bus.subscribe_outbound(self._on_outbound) + + self._thread = threading.Thread( + target=self._run_stream, + args=(client_id, client_secret), + daemon=True, + ) + self._thread.start() + logger.info("DingTalk channel started") + + async def stop(self) -> None: + self._running = False + self.bus.unsubscribe_outbound(self._on_outbound) + + stream_client = self._stream_client + if stream_client is not None: + try: + if hasattr(stream_client, "disconnect"): + stream_client.disconnect() + except Exception: + logger.debug("[DingTalk] error disconnecting stream client", exc_info=True) + + self._dingtalk_client = None + self._stream_client = None + with self._incoming_messages_lock: + self._incoming_messages.clear() + self._card_repliers.clear() + self._card_track_ids.clear() + if self._thread: + self._thread.join(timeout=5) + self._thread = None + logger.info("DingTalk channel stopped") + + def _resolve_routing(self, msg: OutboundMessage) -> tuple[str, str, str]: + """Return (conversation_type, sender_staff_id, conversation_id). + + Uses msg.chat_id as the primary routing key; metadata as fallback. + """ + conversation_type = _normalize_conversation_type(msg.metadata.get("conversation_type")) + sender_staff_id = msg.metadata.get("sender_staff_id", "") + conversation_id = msg.metadata.get("conversation_id", "") + if conversation_type == _CONVERSATION_TYPE_GROUP: + conversation_id = msg.chat_id or conversation_id + else: + sender_staff_id = msg.chat_id or sender_staff_id + return conversation_type, sender_staff_id, conversation_id + + async def send(self, msg: OutboundMessage, *, _max_retries: int = 3) -> None: + conversation_type, sender_staff_id, conversation_id = self._resolve_routing(msg) + robot_code = self._client_id + + # Card mode: stream update to existing AI card + source_key = self._make_card_source_key_from_outbound(msg) + out_track_id = self._card_track_ids.get(source_key) + + # ``card_template_id`` enables ``runs.stream`` (non-final + final outbounds). + # If card creation failed, skip non-final chunks to avoid duplicate messages. + if self._card_template_id and not out_track_id and not msg.is_final: + return + + if out_track_id: + try: + await self._stream_update_card( + out_track_id, + msg.text, + is_finalize=msg.is_final, + ) + except Exception: + logger.warning("[DingTalk] card stream failed, falling back to sampleMarkdown") + if msg.is_final: + self._card_track_ids.pop(source_key, None) + self._card_repliers.pop(out_track_id, None) + await self._send_markdown_fallback(robot_code, conversation_type, sender_staff_id, conversation_id, msg.text) + return + if msg.is_final: + self._card_track_ids.pop(source_key, None) + self._card_repliers.pop(out_track_id, None) + return + + # Non-card mode: send sampleMarkdown with retry + last_exc: Exception | None = None + for attempt in range(_max_retries): + try: + if conversation_type == _CONVERSATION_TYPE_GROUP: + await self._send_group_message(robot_code, conversation_id, msg.text, at_user_ids=[sender_staff_id] if sender_staff_id else None) + else: + await self._send_p2p_message(robot_code, sender_staff_id, msg.text) + return + except Exception as exc: + last_exc = exc + if attempt < _max_retries - 1: + delay = 2**attempt + logger.warning( + "[DingTalk] send failed (attempt %d/%d), retrying in %ds: %s", + attempt + 1, + _max_retries, + delay, + exc, + ) + await asyncio.sleep(delay) + + logger.error("[DingTalk] send failed after %d attempts: %s", _max_retries, last_exc) + if last_exc is None: + raise RuntimeError("DingTalk send failed without an exception from any attempt") + raise last_exc + + async def _send_markdown_fallback( + self, + robot_code: str, + conversation_type: str, + sender_staff_id: str, + conversation_id: str, + text: str, + ) -> None: + try: + if conversation_type == _CONVERSATION_TYPE_GROUP: + await self._send_group_message(robot_code, conversation_id, text) + else: + await self._send_p2p_message(robot_code, sender_staff_id, text) + except Exception: + logger.exception("[DingTalk] markdown fallback also failed") + raise + + async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool: + if attachment.size > _MAX_UPLOAD_SIZE_BYTES: + logger.warning("[DingTalk] file too large (%d bytes), skipping: %s", attachment.size, attachment.filename) + return False + + conversation_type, sender_staff_id, conversation_id = self._resolve_routing(msg) + robot_code = self._client_id + + try: + media_id = await self._upload_media(attachment.actual_path, "image" if attachment.is_image else "file") + if not media_id: + return False + + if attachment.is_image: + msg_key = "sampleImageMsg" + msg_param = json.dumps({"photoURL": media_id}) + else: + msg_key = "sampleFile" + msg_param = json.dumps( + { + "fileUrl": media_id, + "fileName": attachment.filename, + "fileSize": str(attachment.size), + } + ) + + token = await self._get_access_token() + async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client: + if conversation_type == _CONVERSATION_TYPE_GROUP: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/robot/groupMessages/send", + headers=self._api_headers(token), + json={ + "msgKey": msg_key, + "msgParam": msg_param, + "robotCode": robot_code, + "openConversationId": conversation_id, + }, + ) + else: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/robot/oToMessages/batchSend", + headers=self._api_headers(token), + json={ + "msgKey": msg_key, + "msgParam": msg_param, + "robotCode": robot_code, + "userIds": [sender_staff_id], + }, + ) + response.raise_for_status() + + logger.info("[DingTalk] file sent: %s", attachment.filename) + return True + except (httpx.HTTPError, OSError, ValueError, TypeError, AttributeError): + logger.exception("[DingTalk] failed to send file: %s", attachment.filename) + return False + + # -- stream client (runs in dedicated thread) -------------------------- + + def _run_stream(self, client_id: str, client_secret: str) -> None: + try: + import dingtalk_stream + + credential = dingtalk_stream.Credential(client_id, client_secret) + client = dingtalk_stream.DingTalkStreamClient(credential) + self._stream_client = client + client.register_callback_handler( + dingtalk_stream.chatbot.ChatbotMessage.TOPIC, + _DingTalkMessageHandler(self), + ) + client.start_forever() + except Exception: + if self._running: + logger.exception("DingTalk Stream Push error") + finally: + self._stream_client = None + + def _on_chatbot_message(self, message: Any) -> None: + if not self._running: + return + try: + sender_staff_id = message.sender_staff_id or "" + conversation_type = _normalize_conversation_type(message.conversation_type) + conversation_id = message.conversation_id or "" + msg_id = message.message_id or "" + sender_nick = message.sender_nick or "" + + if self._allowed_users and sender_staff_id not in self._allowed_users: + logger.debug("[DingTalk] ignoring message from non-allowed user: %s", sender_staff_id) + return + + text = self._extract_text(message) + if not text: + logger.info("[DingTalk] empty text, ignoring message") + return + + logger.info( + "[DingTalk] parsed message: conv_type=%s, msg_id=%s, sender=%s(%s), text=%r", + conversation_type, + msg_id, + sender_staff_id, + sender_nick, + text[:100], + ) + + if _is_dingtalk_command(text): + msg_type = InboundMessageType.COMMAND + else: + msg_type = InboundMessageType.CHAT + + # P2P: topic_id=None (single thread per user, like Telegram private chat) + # Group: topic_id=msg_id (each new message starts a new topic, like Feishu) + topic_id: str | None = msg_id if conversation_type == _CONVERSATION_TYPE_GROUP else None + + # chat_id uses conversation_id for groups, sender_staff_id for P2P + chat_id = conversation_id if conversation_type == _CONVERSATION_TYPE_GROUP else sender_staff_id + + inbound = self._make_inbound( + chat_id=chat_id, + user_id=sender_staff_id, + text=text, + msg_type=msg_type, + thread_ts=msg_id, + metadata={ + "conversation_type": conversation_type, + "conversation_id": conversation_id, + "sender_staff_id": sender_staff_id, + "sender_nick": sender_nick, + "message_id": msg_id, + }, + ) + inbound.topic_id = topic_id + + if self._card_template_id: + source_key = self._make_card_source_key(inbound) + with self._incoming_messages_lock: + self._incoming_messages[source_key] = message + + if self._main_loop and self._main_loop.is_running(): + logger.info("[DingTalk] publishing inbound message to bus (type=%s, msg_id=%s)", msg_type.value, msg_id) + fut = asyncio.run_coroutine_threadsafe( + self._prepare_inbound(chat_id, inbound), + self._main_loop, + ) + fut.add_done_callback(lambda f, mid=msg_id: self._log_future_error(f, "prepare_inbound", mid)) + else: + logger.warning("[DingTalk] main loop not running, cannot publish inbound message") + except Exception: + logger.exception("[DingTalk] error processing chatbot message") + + @staticmethod + def _extract_text(message: Any) -> str: + msg_type = message.message_type + if msg_type == "text" and message.text: + return message.text.content.strip() + if msg_type == "richText" and message.rich_text_content: + return _extract_text_from_rich_text(message.rich_text_content.rich_text_list).strip() + return "" + + async def _prepare_inbound(self, chat_id: str, inbound: InboundMessage) -> None: + # Running reply must finish before publish_inbound so AI card tracks are + # registered before the manager emits streaming outbounds. + await self._send_running_reply(chat_id, inbound) + await self.bus.publish_inbound(inbound) + + async def _send_running_reply(self, chat_id: str, inbound: InboundMessage) -> None: + conversation_type = inbound.metadata.get("conversation_type", _CONVERSATION_TYPE_P2P) + sender_staff_id = inbound.metadata.get("sender_staff_id", "") + conversation_id = inbound.metadata.get("conversation_id", "") + text = "\u23f3 Working on it..." + + try: + if self._card_template_id: + source_key = self._make_card_source_key(inbound) + with self._incoming_messages_lock: + chatbot_message = self._incoming_messages.pop(source_key, None) + out_track_id = await self._create_and_deliver_card( + text, + chatbot_message=chatbot_message, + ) + if out_track_id: + self._card_track_ids[source_key] = out_track_id + logger.info("[DingTalk] AI card running reply sent for chat=%s", chat_id) + return + + robot_code = self._client_id + if conversation_type == _CONVERSATION_TYPE_GROUP: + await self._send_text_message_to_group(robot_code, conversation_id, text) + else: + await self._send_text_message_to_user(robot_code, sender_staff_id, text) + logger.info("[DingTalk] 'Working on it...' reply sent for chat=%s", chat_id) + except Exception: + logger.exception("[DingTalk] failed to send running reply for chat=%s", chat_id) + + # -- DingTalk API helpers ---------------------------------------------- + + async def _get_access_token(self) -> str: + if self._cached_token and time.monotonic() < self._token_expires_at: + return self._cached_token + async with self._token_lock: + if self._cached_token and time.monotonic() < self._token_expires_at: + return self._cached_token + async with httpx.AsyncClient(timeout=httpx.Timeout(10.0)) as client: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/oauth2/accessToken", + json={"appKey": self._client_id, "appSecret": self._client_secret}, # DingTalk API field names + ) + response.raise_for_status() + data = response.json() + + if not isinstance(data, dict): + raise ValueError(f"DingTalk access token response must be a JSON object, got {type(data).__name__}") + + access_token = data.get("accessToken") + if not isinstance(access_token, str) or not access_token.strip(): + raise ValueError("DingTalk access token response did not contain a usable accessToken") + + raw_expires_in = data.get("expireIn", 7200) + try: + expires_in = int(raw_expires_in) + except (TypeError, ValueError): + logger.warning("[DingTalk] invalid expireIn value %r, using default 7200s", raw_expires_in) + expires_in = 7200 + + self._cached_token = access_token.strip() + self._token_expires_at = time.monotonic() + expires_in - _TOKEN_REFRESH_MARGIN_SECONDS + return self._cached_token + + @staticmethod + def _api_headers(token: str) -> dict[str, str]: + return { + "x-acs-dingtalk-access-token": token, + "Content-Type": "application/json", + } + + async def _send_text_message_to_user(self, robot_code: str, user_id: str, text: str) -> None: + token = await self._get_access_token() + async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/robot/oToMessages/batchSend", + headers=self._api_headers(token), + json={ + "msgKey": "sampleText", + "msgParam": json.dumps({"content": text}), + "robotCode": robot_code, + "userIds": [user_id], + }, + ) + response.raise_for_status() + + async def _send_text_message_to_group(self, robot_code: str, conversation_id: str, text: str) -> None: + token = await self._get_access_token() + async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/robot/groupMessages/send", + headers=self._api_headers(token), + json={ + "msgKey": "sampleText", + "msgParam": json.dumps({"content": text}), + "robotCode": robot_code, + "openConversationId": conversation_id, + }, + ) + response.raise_for_status() + + async def _send_p2p_message(self, robot_code: str, user_id: str, text: str) -> None: + text = _adapt_markdown_for_dingtalk(text) + token = await self._get_access_token() + async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/robot/oToMessages/batchSend", + headers=self._api_headers(token), + json={ + "msgKey": "sampleMarkdown", + "msgParam": json.dumps({"title": "DeerFlow", "text": text}), + "robotCode": robot_code, + "userIds": [user_id], + }, + ) + response.raise_for_status() + data = response.json() + if data.get("processQueryKey"): + logger.info("[DingTalk] P2P message sent to user=%s", user_id) + else: + logger.warning("[DingTalk] P2P send response: %s", data) + + async def _send_group_message( + self, + robot_code: str, + conversation_id: str, + text: str, + *, + at_user_ids: list[str] | None = None, # noqa: ARG002 + ) -> None: + # at_user_ids accepted for call-site compatibility but not passed to the API + # (sampleMarkdown does not support @mentions). + text = _adapt_markdown_for_dingtalk(text) + token = await self._get_access_token() + + async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/robot/groupMessages/send", + headers=self._api_headers(token), + json={ + "msgKey": "sampleMarkdown", + "msgParam": json.dumps({"title": "DeerFlow", "text": text}), + "robotCode": robot_code, + "openConversationId": conversation_id, + }, + ) + response.raise_for_status() + data = response.json() + if data.get("processQueryKey"): + logger.info("[DingTalk] group message sent to conversation=%s", conversation_id) + else: + logger.warning("[DingTalk] group send response: %s", data) + + # -- AI Card streaming helpers ------------------------------------------- + + def _make_card_source_key(self, inbound: InboundMessage) -> str: + m = inbound.metadata + return f"{m.get('conversation_type', '')}:{m.get('sender_staff_id', '')}:{m.get('conversation_id', '')}:{m.get('message_id', '')}" + + def _make_card_source_key_from_outbound(self, msg: OutboundMessage) -> str: + m = msg.metadata + correlation_id = m.get("message_id") or msg.thread_ts or "" + return f"{m.get('conversation_type', '')}:{m.get('sender_staff_id', '')}:{m.get('conversation_id', '')}:{correlation_id}" + + async def _create_and_deliver_card( + self, + initial_text: str, + *, + chatbot_message: Any = None, + ) -> str | None: + if self._dingtalk_client is None or chatbot_message is None: + logger.warning("[DingTalk] SDK client or chatbot_message unavailable, skipping AI card") + return None + + try: + from dingtalk_stream.card_replier import AICardReplier + except ImportError: + logger.warning("[DingTalk] dingtalk-stream card_replier not available") + return None + + try: + replier = AICardReplier(self._dingtalk_client, chatbot_message) + card_instance_id = await replier.async_create_and_deliver_card( + card_template_id=self._card_template_id, + card_data={"content": initial_text}, + ) + if not card_instance_id: + return None + + self._card_repliers[card_instance_id] = replier + logger.info("[DingTalk] AI card created: outTrackId=%s", card_instance_id) + return card_instance_id + except Exception: + logger.exception("[DingTalk] failed to create AI card") + return None + + async def _stream_update_card( + self, + out_track_id: str, + content: str, + *, + is_finalize: bool = False, + is_error: bool = False, + ) -> None: + replier = self._card_repliers.get(out_track_id) + if not replier: + raise RuntimeError(f"No AICardReplier found for track ID {out_track_id}") + + await replier.async_streaming( + card_instance_id=out_track_id, + content_key="content", + content_value=content, + append=False, + finished=is_finalize, + failed=is_error, + ) + + # -- media upload -------------------------------------------------------- + + async def _upload_media(self, file_path: str | Path, media_type: str) -> str | None: + try: + file_bytes = await asyncio.to_thread(Path(file_path).read_bytes) + token = await self._get_access_token() + async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client: + response = await client.post( + f"{DINGTALK_API_BASE}/v1.0/files/upload", + headers={"x-acs-dingtalk-access-token": token}, + files={"file": ("upload", file_bytes)}, + data={"type": media_type}, + ) + response.raise_for_status() + try: + payload = response.json() + except json.JSONDecodeError: + logger.exception("[DingTalk] failed to decode upload response JSON: %s", file_path) + return None + if not isinstance(payload, dict): + logger.warning("[DingTalk] unexpected upload response type %s for %s", type(payload).__name__, file_path) + return None + return payload.get("mediaId") + except (httpx.HTTPError, OSError): + logger.exception("[DingTalk] failed to upload media: %s", file_path) + return None + + @staticmethod + def _log_future_error(fut: Any, name: str, msg_id: str) -> None: + try: + exc = fut.exception() + if exc: + logger.error("[DingTalk] %s failed for msg_id=%s: %s", name, msg_id, exc) + except (asyncio.CancelledError, asyncio.InvalidStateError): + pass + + +class _DingTalkMessageHandler: + """Callback handler registered with dingtalk-stream.""" + + def __init__(self, channel: DingTalkChannel) -> None: + self._channel = channel + + def pre_start(self) -> None: + if hasattr(self, "dingtalk_client") and self.dingtalk_client is not None: + self._channel._dingtalk_client = self.dingtalk_client + + async def raw_process(self, callback_message: Any) -> Any: + import dingtalk_stream + from dingtalk_stream.frames import Headers + + code, message = await self.process(callback_message) + ack_message = dingtalk_stream.AckMessage() + ack_message.code = code + ack_message.headers.message_id = callback_message.headers.message_id + ack_message.headers.content_type = Headers.CONTENT_TYPE_APPLICATION_JSON + ack_message.data = {"response": message} + return ack_message + + async def process(self, callback: Any) -> tuple[int, str]: + import dingtalk_stream + + incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data) + self._channel._on_chatbot_message(incoming_message) + return dingtalk_stream.AckMessage.STATUS_OK, "OK" diff --git a/backend/app/channels/feishu.py b/backend/app/channels/feishu.py index 5a80016f0..75892d54d 100644 --- a/backend/app/channels/feishu.py +++ b/backend/app/channels/feishu.py @@ -63,6 +63,10 @@ class FeishuChannel(Channel): self._GetMessageResourceRequest = None self._thread_lock = threading.Lock() + @property + def supports_streaming(self) -> bool: + return True + async def start(self) -> None: if self._running: return diff --git a/backend/app/channels/manager.py b/backend/app/channels/manager.py index 5680943b0..c09b13173 100644 --- a/backend/app/channels/manager.py +++ b/backend/app/channels/manager.py @@ -38,6 +38,7 @@ STREAM_UPDATE_MIN_INTERVAL_SECONDS = 0.35 THREAD_BUSY_MESSAGE = "This conversation is already processing another request. Please wait for it to finish and try again." CHANNEL_CAPABILITIES = { + "dingtalk": {"supports_streaming": False}, "discord": {"supports_streaming": False}, "feishu": {"supports_streaming": True}, "slack": {"supports_streaming": False}, @@ -48,6 +49,13 @@ CHANNEL_CAPABILITIES = { InboundFileReader = Callable[[dict[str, Any], httpx.AsyncClient], Awaitable[bytes | None]] +_METADATA_DROP_KEYS = frozenset({"raw_message", "ref_msg"}) + + +def _slim_metadata(meta: dict[str, Any]) -> dict[str, Any]: + """Return a shallow copy of *meta* with known-large keys removed.""" + return {k: v for k, v in meta.items() if k not in _METADATA_DROP_KEYS} + INBOUND_FILE_READERS: dict[str, InboundFileReader] = {} @@ -543,6 +551,13 @@ class ChannelManager: @staticmethod def _channel_supports_streaming(channel_name: str) -> bool: + from .service import get_channel_service + + service = get_channel_service() + if service: + channel = service.get_channel(channel_name) + if channel is not None: + return channel.supports_streaming return CHANNEL_CAPABILITIES.get(channel_name, {}).get("supports_streaming", False) def _resolve_session_layer(self, msg: InboundMessage) -> tuple[dict[str, Any], dict[str, Any]]: @@ -772,6 +787,7 @@ class ChannelManager: artifacts=artifacts, attachments=attachments, thread_ts=msg.thread_ts, + metadata=_slim_metadata(msg.metadata), ) logger.info("[Manager] publishing outbound message to bus: channel=%s, chat_id=%s", msg.channel_name, msg.chat_id) await self.bus.publish_outbound(outbound) @@ -833,6 +849,7 @@ class ChannelManager: text=latest_text, is_final=False, thread_ts=msg.thread_ts, + metadata=_slim_metadata(msg.metadata), ) ) last_published_text = latest_text @@ -877,6 +894,7 @@ class ChannelManager: attachments=attachments, is_final=True, thread_ts=msg.thread_ts, + metadata=_slim_metadata(msg.metadata), ) ) @@ -935,6 +953,7 @@ class ChannelManager: thread_id=self.store.get_thread_id(msg.channel_name, msg.chat_id) or "", text=reply, thread_ts=msg.thread_ts, + metadata=_slim_metadata(msg.metadata), ) await self.bus.publish_outbound(outbound) @@ -968,5 +987,6 @@ class ChannelManager: thread_id=self.store.get_thread_id(msg.channel_name, msg.chat_id) or "", text=error_text, thread_ts=msg.thread_ts, + metadata=_slim_metadata(msg.metadata), ) await self.bus.publish_outbound(outbound) diff --git a/backend/app/channels/service.py b/backend/app/channels/service.py index 72414e651..4a3df9060 100644 --- a/backend/app/channels/service.py +++ b/backend/app/channels/service.py @@ -18,6 +18,7 @@ if TYPE_CHECKING: # Channel name → import path for lazy loading _CHANNEL_REGISTRY: dict[str, str] = { + "dingtalk": "app.channels.dingtalk:DingTalkChannel", "discord": "app.channels.discord:DiscordChannel", "feishu": "app.channels.feishu:FeishuChannel", "slack": "app.channels.slack:SlackChannel", @@ -28,6 +29,7 @@ _CHANNEL_REGISTRY: dict[str, str] = { # Keys that indicate a user has configured credentials for a channel. _CHANNEL_CREDENTIAL_KEYS: dict[str, list[str]] = { + "dingtalk": ["client_id", "client_secret"], "discord": ["bot_token"], "feishu": ["app_id", "app_secret"], "slack": ["bot_token", "app_token"], @@ -166,11 +168,16 @@ class ChannelService: try: channel = channel_cls(bus=self.bus, config=config) - await channel.start() self._channels[name] = channel + await channel.start() + if not channel.is_running: + self._channels.pop(name, None) + logger.error("Channel %s did not enter a running state after start()", name) + return False logger.info("Channel %s started", name) return True except Exception: + self._channels.pop(name, None) logger.exception("Failed to start channel %s", name) return False diff --git a/backend/app/channels/wecom.py b/backend/app/channels/wecom.py index 5a8948bd4..3e0cdb3d1 100644 --- a/backend/app/channels/wecom.py +++ b/backend/app/channels/wecom.py @@ -29,6 +29,10 @@ class WeComChannel(Channel): self._ws_stream_ids: dict[str, str] = {} self._working_message = "Working on it..." + @property + def supports_streaming(self) -> bool: + return True + def _clear_ws_context(self, thread_ts: str | None) -> None: if not thread_ts: return diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 09fb518e9..64c6e74c3 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -17,6 +17,7 @@ dependencies = [ "langgraph-sdk>=0.1.51", "markdown-to-mrkdwn>=0.3.1", "wecom-aibot-python-sdk>=0.1.6", + "dingtalk-stream>=0.24.3", "bcrypt>=4.0.0", "pyjwt>=2.9.0", "email-validator>=2.0.0", diff --git a/backend/tests/test_channels.py b/backend/tests/test_channels.py index 7cf329bff..20c746928 100644 --- a/backend/tests/test_channels.py +++ b/backend/tests/test_channels.py @@ -462,6 +462,7 @@ class TestChannelManager: ) mock_channel = MagicMock() mock_channel.receive_file = AsyncMock(return_value=modified_msg) + mock_channel.supports_streaming = False mock_service = MagicMock() mock_service.get_channel.return_value = mock_channel monkeypatch.setattr("app.channels.service.get_channel_service", lambda: mock_service) @@ -535,6 +536,89 @@ class TestChannelManager: _run(go()) + def test_handle_chat_outbound_preserves_inbound_metadata(self): + """DingTalk (and similar) need inbound metadata on outbound sends (e.g. sender_staff_id).""" + from app.channels.manager import ChannelManager + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + manager = ChannelManager(bus=bus, store=store) + outbound_received: list[OutboundMessage] = [] + + async def capture_outbound(msg: OutboundMessage) -> None: + outbound_received.append(msg) + + bus.subscribe_outbound(capture_outbound) + mock_client = _make_mock_langgraph_client() + manager._client = mock_client + await manager.start() + + meta = { + "sender_staff_id": "staff_001", + "conversation_type": "1", + "conversation_id": "conv_001", + } + inbound = InboundMessage( + channel_name="test", + chat_id="chat1", + user_id="user1", + text="hi", + metadata=meta, + ) + await bus.publish_inbound(inbound) + await _wait_for(lambda: len(outbound_received) >= 1) + await manager.stop() + + assert len(outbound_received) == 1 + assert outbound_received[0].metadata == meta + + _run(go()) + + def test_handle_chat_outbound_drops_large_metadata_keys(self): + """Large metadata keys like raw_message should be stripped from outbound messages.""" + from app.channels.manager import ChannelManager + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + manager = ChannelManager(bus=bus, store=store) + outbound_received: list[OutboundMessage] = [] + + async def capture_outbound(msg: OutboundMessage) -> None: + outbound_received.append(msg) + + bus.subscribe_outbound(capture_outbound) + mock_client = _make_mock_langgraph_client() + manager._client = mock_client + await manager.start() + + meta = { + "sender_staff_id": "staff_001", + "conversation_type": "1", + "raw_message": {"huge": "payload" * 1000}, + "ref_msg": {"also": "large"}, + } + inbound = InboundMessage( + channel_name="test", + chat_id="chat1", + user_id="user1", + text="hi", + metadata=meta, + ) + await bus.publish_inbound(inbound) + await _wait_for(lambda: len(outbound_received) >= 1) + await manager.stop() + + assert len(outbound_received) == 1 + out_meta = outbound_received[0].metadata + assert "sender_staff_id" in out_meta + assert "conversation_type" in out_meta + assert "raw_message" not in out_meta + assert "ref_msg" not in out_meta + + _run(go()) + def test_handle_chat_uses_channel_session_overrides(self): from app.channels.manager import ChannelManager diff --git a/backend/tests/test_dingtalk_channel.py b/backend/tests/test_dingtalk_channel.py new file mode 100644 index 000000000..235de6db8 --- /dev/null +++ b/backend/tests/test_dingtalk_channel.py @@ -0,0 +1,1554 @@ +"""Tests for the DingTalk channel implementation.""" + +from __future__ import annotations + +import asyncio +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from app.channels.commands import KNOWN_CHANNEL_COMMANDS +from app.channels.dingtalk import ( + _CONVERSATION_TYPE_GROUP, + _CONVERSATION_TYPE_P2P, + DingTalkChannel, + _adapt_markdown_for_dingtalk, + _convert_markdown_table, + _DingTalkMessageHandler, + _extract_text_from_rich_text, + _is_dingtalk_command, + _normalize_allowed_users, + _normalize_conversation_type, +) +from app.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage + + +def _run(coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +# --------------------------------------------------------------------------- +# Helper: build mock ChatbotMessage +# --------------------------------------------------------------------------- + + +def _make_chatbot_message( + *, + text: str = "hello", + message_type: str = "text", + conversation_type: str | int = _CONVERSATION_TYPE_P2P, + sender_staff_id: str = "user_001", + sender_nick: str = "Test User", + conversation_id: str = "conv_001", + message_id: str = "msg_001", + rich_text_list: list | None = None, +): + """Build a minimal mock object mimicking dingtalk_stream.ChatbotMessage.""" + msg = SimpleNamespace() + msg.message_type = message_type + msg.conversation_type = conversation_type + msg.sender_staff_id = sender_staff_id + msg.sender_nick = sender_nick + msg.conversation_id = conversation_id + msg.message_id = message_id + + if message_type == "text": + msg.text = SimpleNamespace(content=text) + msg.rich_text_content = None + elif message_type == "richText": + msg.text = None + msg.rich_text_content = SimpleNamespace(rich_text_list=rich_text_list or []) + else: + msg.text = None + msg.rich_text_content = None + + return msg + + +# --------------------------------------------------------------------------- +# _DingTalkMessageHandler SDK contract +# --------------------------------------------------------------------------- + + +class TestDingTalkMessageHandlerSdkContract: + def test_pre_start_exists_and_noop(self): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + handler = _DingTalkMessageHandler(channel) + handler.pre_start() + + def test_raw_process_returns_ack(self): + pytest.importorskip("dingtalk_stream") + + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._on_chatbot_message = MagicMock() + handler = _DingTalkMessageHandler(channel) + cb = MagicMock() + cb.headers.message_id = "mid-1" + cb.data = { + "msgtype": "text", + "text": {"content": "hi"}, + "senderStaffId": "u1", + "conversationType": "1", + "msgId": "m1", + } + ack = await handler.raw_process(cb) + assert ack.code == 200 + assert ack.headers.message_id == "mid-1" + assert ack.data == {"response": "OK"} + channel._on_chatbot_message.assert_called_once() + + _run(go()) + + +# --------------------------------------------------------------------------- +# _normalize_allowed_users tests +# --------------------------------------------------------------------------- + + +class TestNormalizeAllowedUsers: + def test_none_returns_empty(self): + assert _normalize_allowed_users(None) == set() + + def test_empty_list_returns_empty(self): + assert _normalize_allowed_users([]) == set() + + def test_list_of_strings(self): + result = _normalize_allowed_users(["user1", "user2"]) + assert result == {"user1", "user2"} + + def test_single_string(self): + result = _normalize_allowed_users("user1") + assert result == {"user1"} + + def test_numeric_values_converted_to_string(self): + result = _normalize_allowed_users([123, 456]) + assert result == {"123", "456"} + + def test_scalar_treated_as_single_value(self): + result = _normalize_allowed_users(12345) + assert result == {"12345"} + + +# --------------------------------------------------------------------------- +# _normalize_conversation_type tests +# --------------------------------------------------------------------------- + + +class TestNormalizeConversationType: + def test_group_int_or_str(self): + assert _normalize_conversation_type(2) == _CONVERSATION_TYPE_GROUP + assert _normalize_conversation_type("2") == _CONVERSATION_TYPE_GROUP + + def test_p2p_or_none(self): + assert _normalize_conversation_type(1) == _CONVERSATION_TYPE_P2P + assert _normalize_conversation_type(None) == _CONVERSATION_TYPE_P2P + + +# --------------------------------------------------------------------------- +# _is_dingtalk_command tests +# --------------------------------------------------------------------------- + + +class TestIsDingTalkCommand: + @pytest.mark.parametrize("command", sorted(KNOWN_CHANNEL_COMMANDS)) + def test_known_commands_recognized(self, command): + assert _is_dingtalk_command(command) is True + + @pytest.mark.parametrize( + "text", + [ + "/unknown", + "/mnt/user-data/outputs/report.md", + "hello", + "", + "not a command", + ], + ) + def test_non_commands_rejected(self, text): + assert _is_dingtalk_command(text) is False + + +# --------------------------------------------------------------------------- +# _extract_text_from_rich_text tests +# --------------------------------------------------------------------------- + + +class TestExtractTextFromRichText: + def test_single_text_item(self): + result = _extract_text_from_rich_text([{"text": "hello"}]) + assert result == "hello" + + def test_multiple_text_items(self): + result = _extract_text_from_rich_text([{"text": "hello"}, {"text": "world"}]) + assert result == "hello world" + + def test_non_text_items_ignored(self): + result = _extract_text_from_rich_text( + [ + {"downloadCode": "abc123"}, + {"text": "caption"}, + ] + ) + assert result == "caption" + + def test_empty_list(self): + assert _extract_text_from_rich_text([]) == "" + + +# --------------------------------------------------------------------------- +# DingTalkChannel._extract_text tests +# --------------------------------------------------------------------------- + + +class TestExtractText: + def test_plain_text(self): + msg = _make_chatbot_message(text="Hello World") + assert DingTalkChannel._extract_text(msg) == "Hello World" + + def test_plain_text_stripped(self): + msg = _make_chatbot_message(text=" Hello ") + assert DingTalkChannel._extract_text(msg) == "Hello" + + def test_rich_text(self): + msg = _make_chatbot_message( + message_type="richText", + rich_text_list=[{"text": "Part 1"}, {"text": "Part 2"}], + ) + assert DingTalkChannel._extract_text(msg) == "Part 1 Part 2" + + def test_unknown_type_returns_empty(self): + msg = _make_chatbot_message(message_type="picture") + assert DingTalkChannel._extract_text(msg) == "" + + +# --------------------------------------------------------------------------- +# DingTalkChannel._on_chatbot_message tests (inbound parsing) +# --------------------------------------------------------------------------- + + +class TestOnChatbotMessage: + def test_p2p_message_produces_correct_inbound(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message( + text="hello from dingtalk", + conversation_type=_CONVERSATION_TYPE_P2P, + sender_staff_id="user_001", + message_id="msg_001", + ) + + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + + bus.publish_inbound.assert_awaited_once() + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.channel_name == "dingtalk" + assert inbound.chat_id == "user_001" + assert inbound.user_id == "user_001" + assert inbound.text == "hello from dingtalk" + assert inbound.topic_id is None + assert inbound.metadata["conversation_type"] == _CONVERSATION_TYPE_P2P + assert inbound.metadata["sender_staff_id"] == "user_001" + + _run(go()) + + def test_group_message_produces_correct_inbound(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message( + text="hello group", + conversation_type=_CONVERSATION_TYPE_GROUP, + sender_staff_id="user_002", + conversation_id="conv_group_001", + message_id="msg_group_001", + ) + + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + + bus.publish_inbound.assert_awaited_once() + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.channel_name == "dingtalk" + assert inbound.chat_id == "conv_group_001" + assert inbound.user_id == "user_002" + assert inbound.text == "hello group" + assert inbound.topic_id == "msg_group_001" + assert inbound.metadata["conversation_type"] == _CONVERSATION_TYPE_GROUP + assert inbound.metadata["conversation_id"] == "conv_group_001" + + _run(go()) + + def test_group_message_integer_conversation_type_normalized(self): + """SDK may deliver conversationType as int 2 — must still route as group.""" + + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message( + text="hello group", + conversation_type=2, + sender_staff_id="user_002", + conversation_id="conv_group_001", + message_id="msg_group_002", + ) + + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + + bus.publish_inbound.assert_awaited_once() + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.chat_id == "conv_group_001" + assert inbound.topic_id == "msg_group_002" + assert inbound.metadata["conversation_type"] == _CONVERSATION_TYPE_GROUP + + _run(go()) + + def test_command_classified_correctly(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message(text="/help") + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + + bus.publish_inbound.assert_awaited_once() + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.msg_type == InboundMessageType.COMMAND + + _run(go()) + + def test_non_command_classified_as_chat(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message(text="just chatting") + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + + bus.publish_inbound.assert_awaited_once() + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.msg_type == InboundMessageType.CHAT + + _run(go()) + + def test_empty_text_ignored(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message(text=" ") + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + bus.publish_inbound.assert_not_awaited() + + _run(go()) + + +# --------------------------------------------------------------------------- +# allowed_users filtering tests +# --------------------------------------------------------------------------- + + +class TestAllowedUsersFiltering: + def test_allowed_user_passes(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={"allowed_users": ["user_001"]}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message(sender_staff_id="user_001") + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + bus.publish_inbound.assert_awaited_once() + + _run(go()) + + def test_non_allowed_user_blocked(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={"allowed_users": ["user_001"]}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message(sender_staff_id="user_blocked") + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + bus.publish_inbound.assert_not_awaited() + + _run(go()) + + def test_empty_allowed_users_allows_all(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={"allowed_users": []}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message(sender_staff_id="anyone") + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + bus.publish_inbound.assert_awaited_once() + + _run(go()) + + +# --------------------------------------------------------------------------- +# send routing tests (P2P vs Group) +# --------------------------------------------------------------------------- + + +class TestMarkdownFallbackPropagation: + def test_fallback_raises_on_failure(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._cached_token = "tok" + channel._token_expires_at = float("inf") + + channel._send_p2p_message = AsyncMock(side_effect=ConnectionError("send failed")) + + with pytest.raises(ConnectionError, match="send failed"): + await channel._send_markdown_fallback("test_key", _CONVERSATION_TYPE_P2P, "user_001", "", "hello") + + _run(go()) + + +class TestSendRouting: + def test_p2p_send_uses_oto_endpoint(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + + channel._send_p2p_message = AsyncMock() + channel._send_group_message = AsyncMock() + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="Hello P2P", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + }, + ) + + await channel.send(msg) + + channel._send_p2p_message.assert_awaited_once_with("test_key", "user_001", "Hello P2P") + channel._send_group_message.assert_not_awaited() + + _run(go()) + + def test_group_send_uses_group_endpoint(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + + channel._send_p2p_message = AsyncMock() + channel._send_group_message = AsyncMock() + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="conv_001", + thread_id="thread_001", + text="Hello Group", + metadata={ + "conversation_type": _CONVERSATION_TYPE_GROUP, + "sender_staff_id": "user_001", + "conversation_id": "conv_001", + }, + ) + + await channel.send(msg) + + channel._send_group_message.assert_awaited_once_with("test_key", "conv_001", "Hello Group", at_user_ids=["user_001"]) + channel._send_p2p_message.assert_not_awaited() + + _run(go()) + + def test_default_metadata_uses_p2p(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + + channel._send_p2p_message = AsyncMock() + channel._send_group_message = AsyncMock() + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="Hello", + metadata={}, + ) + + await channel.send(msg) + + channel._send_p2p_message.assert_awaited_once() + channel._send_group_message.assert_not_awaited() + + _run(go()) + + +# --------------------------------------------------------------------------- +# send retry tests +# --------------------------------------------------------------------------- + + +class TestSendRetry: + def test_retries_on_failure_then_succeeds(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + + call_count = 0 + + async def flaky_send(robot_code, user_id, text): + nonlocal call_count + call_count += 1 + if call_count < 3: + raise ConnectionError("network error") + + channel._send_p2p_message = AsyncMock(side_effect=flaky_send) + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="hello", + metadata={"conversation_type": _CONVERSATION_TYPE_P2P, "sender_staff_id": "user_001"}, + ) + + await channel.send(msg) + assert call_count == 3 + + _run(go()) + + def test_raises_after_all_retries_exhausted(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + + channel._send_p2p_message = AsyncMock(side_effect=ConnectionError("fail")) + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="hello", + metadata={"conversation_type": _CONVERSATION_TYPE_P2P, "sender_staff_id": "user_001"}, + ) + + with pytest.raises(ConnectionError): + await channel.send(msg) + + assert channel._send_p2p_message.await_count == 3 + + _run(go()) + + def test_raises_runtime_error_when_no_attempts_configured(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="hello", + metadata={"conversation_type": _CONVERSATION_TYPE_P2P, "sender_staff_id": "user_001"}, + ) + + with pytest.raises(RuntimeError, match="without an exception"): + await channel.send(msg, _max_retries=0) + + _run(go()) + + +# --------------------------------------------------------------------------- +# topic_id mapping tests +# --------------------------------------------------------------------------- + + +class TestTopicIdMapping: + def test_p2p_topic_is_none(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message( + conversation_type=_CONVERSATION_TYPE_P2P, + message_id="msg_p2p_001", + ) + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.topic_id is None + + _run(go()) + + def test_group_topic_is_message_id(self): + async def go(): + bus = MessageBus() + bus.publish_inbound = AsyncMock() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._main_loop = asyncio.get_event_loop() + channel._running = True + + msg = _make_chatbot_message( + conversation_type=_CONVERSATION_TYPE_GROUP, + message_id="msg_group_001", + conversation_id="conv_001", + ) + channel._send_running_reply = AsyncMock() + channel._on_chatbot_message(msg) + + await asyncio.sleep(0.1) + inbound = bus.publish_inbound.await_args.args[0] + assert inbound.topic_id == "msg_group_001" + + _run(go()) + + +# --------------------------------------------------------------------------- +# Token caching tests +# --------------------------------------------------------------------------- + + +class TestAccessTokenValidation: + def test_rejects_non_dict_response(self): + async def go(): + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "k" + channel._client_secret = "s" + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + return "not a dict" + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + return FakeResponse() + + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + with pytest.raises(ValueError, match="JSON object"): + await channel._get_access_token() + + _run(go()) + + def test_rejects_empty_access_token(self): + async def go(): + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "k" + channel._client_secret = "s" + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + return {"accessToken": "", "expireIn": 7200} + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + return FakeResponse() + + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + with pytest.raises(ValueError, match="usable accessToken"): + await channel._get_access_token() + + _run(go()) + + def test_invalid_expire_in_uses_default(self): + async def go(): + import time + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "k" + channel._client_secret = "s" + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + return {"accessToken": "tok_ok", "expireIn": "invalid"} + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + return FakeResponse() + + before = time.monotonic() + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + token = await channel._get_access_token() + + assert token == "tok_ok" + assert channel._token_expires_at > before + + _run(go()) + + +class TestTokenCaching: + def test_token_is_cached_across_calls(self): + async def go(): + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + + call_count = 0 + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + return {"accessToken": "tok_abc", "expireIn": 7200} + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + nonlocal call_count + call_count += 1 + return FakeResponse() + + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + t1 = await channel._get_access_token() + t2 = await channel._get_access_token() + + assert t1 == "tok_abc" + assert t2 == "tok_abc" + assert call_count == 1 + + _run(go()) + + +# --------------------------------------------------------------------------- +# Group message @ mention format tests +# --------------------------------------------------------------------------- + + +class TestGroupMessageMarkdownFormat: + def test_at_user_ids_still_use_markdown(self): + """groupMessages/send uses sampleMarkdown; @{userId} in body returns 400 so at_user_ids is ignored.""" + + async def go(): + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + channel._cached_token = "tok_test" + channel._token_expires_at = float("inf") + + captured_json: list[dict] = [] + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + return {"processQueryKey": "ok"} + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + captured_json.append(kwargs.get("json", {})) + return FakeResponse() + + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + await channel._send_group_message("bot", "conv1", "hello", at_user_ids=["staff_001"]) + + assert len(captured_json) == 1 + payload = captured_json[0] + assert payload["msgKey"] == "sampleMarkdown" + import json + + param = json.loads(payload["msgParam"]) + assert param["text"] == "hello" + assert "@" not in json.dumps(param) + + _run(go()) + + def test_no_at_user_ids_uses_markdown(self): + async def go(): + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "test_key" + channel._client_secret = "test_secret" + channel._cached_token = "tok_test" + channel._token_expires_at = float("inf") + + captured_json: list[dict] = [] + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + return {"processQueryKey": "ok"} + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + captured_json.append(kwargs.get("json", {})) + return FakeResponse() + + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + await channel._send_group_message("bot", "conv1", "hello") + + assert len(captured_json) == 1 + payload = captured_json[0] + assert payload["msgKey"] == "sampleMarkdown" + + _run(go()) + + +class TestAdaptMarkdownForDingtalk: + def test_fenced_code_block_to_blockquote(self): + text = "Hello\n```python\ndef foo():\n return 1\n```\nDone" + result = _adapt_markdown_for_dingtalk(text) + assert "```" not in result + assert "> **python**" in result + assert "> def foo():" in result + assert "> return 1" in result + + def test_fenced_code_block_no_language(self): + text = "```\nplain code\n```" + result = _adapt_markdown_for_dingtalk(text) + assert "```" not in result + assert "> plain code" in result + + def test_inline_code_to_bold(self): + text = "Use `pip install` to install" + result = _adapt_markdown_for_dingtalk(text) + assert result == "Use **pip install** to install" + + def test_horizontal_rule_to_unicode(self): + text = "Above\n---\nBelow" + result = _adapt_markdown_for_dingtalk(text) + assert "───────────" in result + assert "---" not in result + + def test_supported_markdown_preserved(self): + text = "# Title\n**bold** and *italic*\n- list item\n> quote\n[link](http://example.com)" + result = _adapt_markdown_for_dingtalk(text) + assert result == text + + def test_plain_text_unchanged(self): + text = "Hello world, no markdown here." + assert _adapt_markdown_for_dingtalk(text) == text + + def test_combined_elements(self): + text = "# Report\n\nRun `make test` then:\n\n```bash\npytest -v\n```\n\n---\n\nDone." + result = _adapt_markdown_for_dingtalk(text) + assert "# Report" in result + assert "**make test**" in result + assert "> **bash**" in result + assert "> pytest -v" in result + assert "───────────" in result + assert "Done." in result + + +class TestConvertMarkdownTable: + def test_simple_table(self): + text = "| Name | Age |\n|------|-----|\n| Alice | 30 |\n| Bob | 25 |" + result = _convert_markdown_table(text) + assert "> **Name**: Alice" in result + assert "> **Age**: 30" in result + assert "> **Name**: Bob" in result + assert "> **Age**: 25" in result + assert "|" not in result + + def test_table_with_surrounding_text(self): + text = "Results:\n\n| Key | Value |\n|-----|-------|\n| a | 1 |\n\nEnd." + result = _convert_markdown_table(text) + assert "Results:" in result + assert "> **Key**: a" in result + assert "> **Value**: 1" in result + assert "End." in result + + def test_no_table(self): + text = "Just plain text\nwith lines" + assert _convert_markdown_table(text) == text + + def test_alignment_separators(self): + text = "| Left | Center | Right |\n|:-----|:------:|------:|\n| a | b | c |" + result = _convert_markdown_table(text) + assert "> **Left**: a" in result + assert "> **Center**: b" in result + assert "> **Right**: c" in result + + +class TestUploadMediaValidation: + def test_non_dict_response_returns_none(self): + async def go(): + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "k" + channel._client_secret = "s" + channel._cached_token = "tok" + channel._token_expires_at = float("inf") + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + return ["not", "a", "dict"] + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + return FakeResponse() + + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + result = await channel._upload_media("/tmp/test.png", "image") + + assert result is None + + _run(go()) + + def test_json_decode_error_returns_none(self): + async def go(): + import json as json_mod + from unittest.mock import patch + + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + channel._client_id = "k" + channel._client_secret = "s" + channel._cached_token = "tok" + channel._token_expires_at = float("inf") + + class FakeResponse: + def raise_for_status(self): + pass + + def json(self): + raise json_mod.JSONDecodeError("err", "", 0) + + class FakeClient: + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + async def post(self, url, **kwargs): + return FakeResponse() + + with patch("app.channels.dingtalk.httpx.AsyncClient", return_value=FakeClient()): + result = await channel._upload_media("/tmp/test.png", "image") + + assert result is None + + _run(go()) + + +class TestChannelRegistration: + def test_dingtalk_in_channel_registry(self): + from app.channels.service import _CHANNEL_REGISTRY + + assert "dingtalk" in _CHANNEL_REGISTRY + assert _CHANNEL_REGISTRY["dingtalk"] == "app.channels.dingtalk:DingTalkChannel" + + def test_dingtalk_in_credential_keys(self): + from app.channels.service import _CHANNEL_CREDENTIAL_KEYS + + assert "dingtalk" in _CHANNEL_CREDENTIAL_KEYS + assert "client_id" in _CHANNEL_CREDENTIAL_KEYS["dingtalk"] + assert "client_secret" in _CHANNEL_CREDENTIAL_KEYS["dingtalk"] + + def test_dingtalk_in_channel_capabilities(self): + from app.channels.manager import CHANNEL_CAPABILITIES + + assert "dingtalk" in CHANNEL_CAPABILITIES + assert CHANNEL_CAPABILITIES["dingtalk"]["supports_streaming"] is False + + +# --------------------------------------------------------------------------- +# AI Card streaming mode tests +# --------------------------------------------------------------------------- + + +class TestCardMode: + def test_card_mode_enabled_supports_streaming(self): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + assert channel.supports_streaming is True + + def test_non_card_mode_no_streaming(self): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + assert channel.supports_streaming is False + + def test_non_card_mode_unchanged(self): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + assert channel._card_template_id == "" + assert channel._card_track_ids == {} + assert channel._card_repliers == {} + assert channel._incoming_messages == {} + assert channel._dingtalk_client is None + + def test_card_source_key_matches_inbound_using_message_id_metadata(self): + """Outbound correlation must match inbound ``message_id`` even if ``thread_ts`` drifts.""" + + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + inbound = channel._make_inbound( + chat_id="x", + user_id="u", + text="hi", + thread_ts="ts_fallback", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + "message_id": "msg_real", + }, + ) + out = OutboundMessage( + channel_name="dingtalk", + chat_id="x", + thread_id="t", + text="ok", + thread_ts="wrong_ts", + metadata=dict(inbound.metadata), + ) + assert channel._make_card_source_key(inbound) == channel._make_card_source_key_from_outbound(out) + + _run(go()) + + def test_running_reply_creates_card(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + + channel._create_and_deliver_card = AsyncMock(return_value="track_001") + + inbound = channel._make_inbound( + chat_id="user_001", + user_id="user_001", + text="hello", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + "message_id": "msg_001", + }, + ) + + mock_chatbot_msg = MagicMock() + source_key = channel._make_card_source_key(inbound) + channel._incoming_messages[source_key] = mock_chatbot_msg + + await channel._send_running_reply("user_001", inbound) + + channel._create_and_deliver_card.assert_awaited_once_with( + "\u23f3 Working on it...", + chatbot_message=mock_chatbot_msg, + ) + assert channel._card_track_ids[source_key] == "track_001" + + _run(go()) + + def test_send_streams_to_card(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + + channel._stream_update_card = AsyncMock() + + # Pre-populate card tracking + source_key = f"{_CONVERSATION_TYPE_P2P}:user_001::msg_001" + channel._card_track_ids[source_key] = "track_001" + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="Partial response...", + is_final=False, + thread_ts="msg_001", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + }, + ) + + await channel.send(msg) + + channel._stream_update_card.assert_awaited_once_with( + "track_001", + "Partial response...", + is_finalize=False, + ) + # Track ID should still exist (not final) + assert source_key in channel._card_track_ids + + _run(go()) + + def test_send_finalizes_card(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + + channel._stream_update_card = AsyncMock() + + source_key = f"{_CONVERSATION_TYPE_P2P}:user_001::msg_001" + channel._card_track_ids[source_key] = "track_001" + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="Final answer.", + is_final=True, + thread_ts="msg_001", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + }, + ) + + await channel.send(msg) + + channel._stream_update_card.assert_awaited_once_with( + "track_001", + "Final answer.", + is_finalize=True, + ) + # Track ID should be cleaned up after final + assert source_key not in channel._card_track_ids + + _run(go()) + + def test_card_mode_skips_markdown_adaptation(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + + raw_markdown = "```python\ndef foo():\n pass\n```" + captured_content: list[str] = [] + + async def capture_stream(out_track_id, content, *, is_finalize=False, is_error=False): + captured_content.append(content) + + channel._stream_update_card = AsyncMock(side_effect=capture_stream) + + source_key = f"{_CONVERSATION_TYPE_P2P}:user_001::msg_001" + channel._card_track_ids[source_key] = "track_001" + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text=raw_markdown, + is_final=True, + thread_ts="msg_001", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + }, + ) + + await channel.send(msg) + + # Raw markdown should be passed through without adaptation + assert captured_content[0] == raw_markdown + + _run(go()) + + def test_card_fallback_on_creation_failure(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + + # Card creation returns None (failure) + channel._create_and_deliver_card = AsyncMock(return_value=None) + channel._send_text_message_to_user = AsyncMock() + + inbound = channel._make_inbound( + chat_id="user_001", + user_id="user_001", + text="hello", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + "message_id": "msg_001", + }, + ) + + source_key = channel._make_card_source_key(inbound) + channel._incoming_messages[source_key] = MagicMock() + + await channel._send_running_reply("user_001", inbound) + + # Should fall through to text message + channel._send_text_message_to_user.assert_awaited_once() + assert len(channel._card_track_ids) == 0 + + _run(go()) + + def test_send_skips_non_final_without_card_track_when_template_configured(self): + """Without a live card track, Manager streaming would duplicate sampleMarkdown sends.""" + + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + channel._send_group_message = AsyncMock() + channel._send_p2p_message = AsyncMock() + + meta = { + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + } + await channel.send( + OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="t1", + text="partial", + is_final=False, + thread_ts="msg_001", + metadata=meta, + ) + ) + channel._send_p2p_message.assert_not_called() + channel._send_group_message.assert_not_called() + + await channel.send( + OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="t1", + text="final answer", + is_final=True, + thread_ts="msg_001", + metadata=meta, + ) + ) + channel._send_p2p_message.assert_awaited_once() + + _run(go()) + + def test_card_fallback_on_stream_failure(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + + channel._stream_update_card = AsyncMock(side_effect=ConnectionError("stream failed")) + channel._send_markdown_fallback = AsyncMock() + + source_key = f"{_CONVERSATION_TYPE_P2P}:user_001::msg_001" + channel._card_track_ids[source_key] = "track_001" + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="Final answer.", + is_final=True, + thread_ts="msg_001", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + }, + ) + + await channel.send(msg) + + # Should fallback to markdown + channel._send_markdown_fallback.assert_awaited_once_with( + "test_key", + _CONVERSATION_TYPE_P2P, + "user_001", + "", + "Final answer.", + ) + # Track ID should be cleaned up + assert source_key not in channel._card_track_ids + + _run(go()) + + def test_pre_start_stores_dingtalk_client(self): + bus = MessageBus() + channel = DingTalkChannel(bus, config={}) + handler = _DingTalkMessageHandler(channel) + + mock_client = MagicMock() + handler.dingtalk_client = mock_client + handler.pre_start() + + assert channel._dingtalk_client is mock_client + + def test_chatbot_message_stored_for_card_mode(self): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + + mock_message = MagicMock() + mock_message.sender_staff_id = "user_001" + mock_message.conversation_type = "1" + mock_message.conversation_id = "" + mock_message.message_id = "msg_001" + mock_message.sender_nick = "TestUser" + mock_message.message_type = "text" + mock_message.text = MagicMock(content="hello") + mock_message.rich_text_content = None + + channel._main_loop = MagicMock() + channel._main_loop.is_running.return_value = False + channel._allowed_users = set() + channel._running = True + + channel._on_chatbot_message(mock_message) + + assert len(channel._incoming_messages) == 1 + stored_msg = list(channel._incoming_messages.values())[0] + assert stored_msg is mock_message + + def test_card_replier_cleanup_on_final(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._client_id = "test_key" + + channel._stream_update_card = AsyncMock() + + source_key = f"{_CONVERSATION_TYPE_P2P}:user_001::msg_001" + channel._card_track_ids[source_key] = "track_001" + channel._card_repliers["track_001"] = MagicMock() + + msg = OutboundMessage( + channel_name="dingtalk", + chat_id="user_001", + thread_id="thread_001", + text="Final answer.", + is_final=True, + thread_ts="msg_001", + metadata={ + "conversation_type": _CONVERSATION_TYPE_P2P, + "sender_staff_id": "user_001", + "conversation_id": "", + }, + ) + + await channel.send(msg) + + assert source_key not in channel._card_track_ids + assert "track_001" not in channel._card_repliers + + _run(go()) + + def test_card_creation_without_sdk_client_returns_none(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._dingtalk_client = None + + result = await channel._create_and_deliver_card( + "test", + chatbot_message=MagicMock(), + ) + assert result is None + + _run(go()) + + def test_card_creation_without_chatbot_message_returns_none(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._dingtalk_client = MagicMock() + + result = await channel._create_and_deliver_card( + "test", + chatbot_message=None, + ) + assert result is None + + _run(go()) + + def test_stream_update_card_raises_without_replier(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + + with pytest.raises(RuntimeError, match="No AICardReplier found"): + await channel._stream_update_card("nonexistent_track", "content") + + _run(go()) + + def test_stop_clears_card_state(self): + async def go(): + bus = MessageBus() + channel = DingTalkChannel(bus, config={"card_template_id": "tpl_123"}) + channel._running = True + channel._dingtalk_client = MagicMock() + channel._incoming_messages["key"] = MagicMock() + channel._card_repliers["track"] = MagicMock() + channel._card_track_ids["source"] = "track" + + await channel.stop() + + assert channel._dingtalk_client is None + assert channel._incoming_messages == {} + assert channel._card_repliers == {} + assert channel._card_track_ids == {} + + _run(go()) diff --git a/backend/uv.lock b/backend/uv.lock index e0ea058c0..378bbb842 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -746,6 +746,7 @@ source = { virtual = "." } dependencies = [ { name = "bcrypt" }, { name = "deerflow-harness" }, + { name = "dingtalk-stream" }, { name = "email-validator" }, { name = "fastapi" }, { name = "httpx" }, @@ -779,6 +780,7 @@ requires-dist = [ { name = "bcrypt", specifier = ">=4.0.0" }, { name = "deerflow-harness", editable = "packages/harness" }, { name = "deerflow-harness", extras = ["postgres"], marker = "extra == 'postgres'", editable = "packages/harness" }, + { name = "dingtalk-stream", specifier = ">=0.24.3" }, { name = "email-validator", specifier = ">=2.0.0" }, { name = "fastapi", specifier = ">=0.115.0" }, { name = "httpx", specifier = ">=0.28.0" }, @@ -908,6 +910,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/07/6c/aa3f2f849e01cb6a001cd8554a88d4c77c5c1a31c95bdf1cf9301e6d9ef4/defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61", size = 25604, upload-time = "2021-03-08T10:59:24.45Z" }, ] +[[package]] +name = "dingtalk-stream" +version = "0.24.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiohttp" }, + { name = "requests" }, + { name = "websockets" }, +] +wheels = [ + { url = "https://files.pythonhosted.org/packages/4c/44/102dede3f371277598df6aa9725b82e3add068c729333c7a5dbc12764579/dingtalk_stream-0.24.3-py3-none-any.whl", hash = "sha256:2160403656985962878bf60cdf5adf41619f21067348e06f07a7c7eebf5943ad", size = 27813, upload-time = "2025-10-24T09:36:57.497Z" }, +] + [[package]] name = "distro" version = "1.9.0" diff --git a/config.example.yaml b/config.example.yaml index 0b72ab80f..a14ca5886 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -964,6 +964,13 @@ run_events: # enabled: false # bot_id: $WECOM_BOT_ID # bot_secret: $WECOM_BOT_SECRET +# +# dingtalk: +# enabled: false +# client_id: $DINGTALK_CLIENT_ID +# client_secret: $DINGTALK_CLIENT_SECRET +# allowed_users: [] # empty = allow all +# card_template_id: "" # Optional: AI Card template ID for streaming updates # ============================================================================ # Guardrails Configuration