mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-25 11:18:22 +00:00
feat(channels): add Discord channel integration (#1806)
* feat(channels): add Discord channel integration Add a Discord bot channel following the existing Telegram/Slack pattern. The bot listens for messages, creates conversation threads, and relays responses back to Discord with 2000-char message splitting. - DiscordChannel extends Channel base class - Lazy imports discord.py with install hint - Thread-based conversations (each Discord thread maps to a DeerFlow thread) - Allowed guilds filter for access control - File attachment support via discord.File - Registered in service.py and manager.py Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(channels): address Copilot review suggestions for Discord integration - Disable @everyone/@here mentions via AllowedMentions.none() - Add 10s timeout to client close to prevent shutdown hangs - Log publish_inbound errors via future callback instead of silently dropping - Open file handle on caller thread to avoid cross-thread ownership issues - Notify user in channel when thread creation fails Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(discord): resolve lint errors in Discord channel - Replace asyncio.TimeoutError with builtin TimeoutError (UP041) - Remove extraneous f-string prefix (F541) - Apply ruff format Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(tests): remove fake langgraph_sdk shim from test_discord_channel The module-level sys.modules.setdefault shim installed a fake langgraph_sdk.errors.ConflictError during pytest collection. Because pytest imports all test modules before running them, test_channels.py then imported the fake ConflictError instead of the real one. In test_handle_feishu_stream_conflict_sends_busy_message, the test constructs ConflictError(message, response=..., body=...). The fake only subclasses Exception (which takes no kwargs), so the construction raised TypeError. The manager's _is_thread_busy_error check then saw a TypeError instead of a ConflictError and fell through to the generic 'An error occurred' message. langgraph_sdk is a real dependency, so the shim is unnecessary. Removing it makes both test files import the same real ConflictError and the full suite pass (1773 passed, 15 skipped). --------- Co-authored-by: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
parent
dc50a7fdfb
commit
c4d273a68a
@ -24,6 +24,7 @@ INFOQUEST_API_KEY=your-infoquest-api-key
|
|||||||
# SLACK_BOT_TOKEN=your-slack-bot-token
|
# SLACK_BOT_TOKEN=your-slack-bot-token
|
||||||
# SLACK_APP_TOKEN=your-slack-app-token
|
# SLACK_APP_TOKEN=your-slack-app-token
|
||||||
# TELEGRAM_BOT_TOKEN=your-telegram-bot-token
|
# TELEGRAM_BOT_TOKEN=your-telegram-bot-token
|
||||||
|
# DISCORD_BOT_TOKEN=your-discord-bot-token
|
||||||
|
|
||||||
# Enable LangSmith to monitor and debug your LLM calls, agent runs, and tool executions.
|
# Enable LangSmith to monitor and debug your LLM calls, agent runs, and tool executions.
|
||||||
# LANGSMITH_TRACING=true
|
# LANGSMITH_TRACING=true
|
||||||
|
|||||||
273
backend/app/channels/discord.py
Normal file
273
backend/app/channels/discord.py
Normal file
@ -0,0 +1,273 @@
|
|||||||
|
"""Discord channel integration using discord.py."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from app.channels.base import Channel
|
||||||
|
from app.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_DISCORD_MAX_MESSAGE_LEN = 2000
|
||||||
|
|
||||||
|
|
||||||
|
class DiscordChannel(Channel):
|
||||||
|
"""Discord bot channel.
|
||||||
|
|
||||||
|
Configuration keys (in ``config.yaml`` under ``channels.discord``):
|
||||||
|
- ``bot_token``: Discord Bot token.
|
||||||
|
- ``allowed_guilds``: (optional) List of allowed Discord guild IDs. Empty = allow all.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, bus: MessageBus, config: dict[str, Any]) -> None:
|
||||||
|
super().__init__(name="discord", bus=bus, config=config)
|
||||||
|
self._bot_token = str(config.get("bot_token", "")).strip()
|
||||||
|
self._allowed_guilds: set[int] = set()
|
||||||
|
for guild_id in config.get("allowed_guilds", []):
|
||||||
|
try:
|
||||||
|
self._allowed_guilds.add(int(guild_id))
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
self._client = None
|
||||||
|
self._thread: threading.Thread | None = None
|
||||||
|
self._discord_loop: asyncio.AbstractEventLoop | None = None
|
||||||
|
self._main_loop: asyncio.AbstractEventLoop | None = None
|
||||||
|
self._discord_module = None
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
if self._running:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
import discord
|
||||||
|
except ImportError:
|
||||||
|
logger.error("discord.py is not installed. Install it with: uv add discord.py")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self._bot_token:
|
||||||
|
logger.error("Discord channel requires bot_token")
|
||||||
|
return
|
||||||
|
|
||||||
|
intents = discord.Intents.default()
|
||||||
|
intents.messages = True
|
||||||
|
intents.guilds = True
|
||||||
|
intents.message_content = True
|
||||||
|
|
||||||
|
client = discord.Client(
|
||||||
|
intents=intents,
|
||||||
|
allowed_mentions=discord.AllowedMentions.none(),
|
||||||
|
)
|
||||||
|
self._client = client
|
||||||
|
self._discord_module = discord
|
||||||
|
self._main_loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
@client.event
|
||||||
|
async def on_message(message) -> None:
|
||||||
|
await self._on_message(message)
|
||||||
|
|
||||||
|
self._running = True
|
||||||
|
self.bus.subscribe_outbound(self._on_outbound)
|
||||||
|
|
||||||
|
self._thread = threading.Thread(target=self._run_client, daemon=True)
|
||||||
|
self._thread.start()
|
||||||
|
logger.info("Discord channel started")
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
self._running = False
|
||||||
|
self.bus.unsubscribe_outbound(self._on_outbound)
|
||||||
|
|
||||||
|
if self._client and self._discord_loop and self._discord_loop.is_running():
|
||||||
|
close_future = asyncio.run_coroutine_threadsafe(self._client.close(), self._discord_loop)
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(asyncio.wrap_future(close_future), timeout=10)
|
||||||
|
except TimeoutError:
|
||||||
|
logger.warning("[Discord] client close timed out after 10s")
|
||||||
|
except Exception:
|
||||||
|
logger.exception("[Discord] error while closing client")
|
||||||
|
|
||||||
|
if self._thread:
|
||||||
|
self._thread.join(timeout=10)
|
||||||
|
self._thread = None
|
||||||
|
|
||||||
|
self._client = None
|
||||||
|
self._discord_loop = None
|
||||||
|
self._discord_module = None
|
||||||
|
logger.info("Discord channel stopped")
|
||||||
|
|
||||||
|
async def send(self, msg: OutboundMessage) -> None:
|
||||||
|
target = await self._resolve_target(msg)
|
||||||
|
if target is None:
|
||||||
|
logger.error("[Discord] target not found for chat_id=%s thread_ts=%s", msg.chat_id, msg.thread_ts)
|
||||||
|
return
|
||||||
|
|
||||||
|
text = msg.text or ""
|
||||||
|
for chunk in self._split_text(text):
|
||||||
|
send_future = asyncio.run_coroutine_threadsafe(target.send(chunk), self._discord_loop)
|
||||||
|
await asyncio.wrap_future(send_future)
|
||||||
|
|
||||||
|
async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool:
|
||||||
|
target = await self._resolve_target(msg)
|
||||||
|
if target is None:
|
||||||
|
logger.error("[Discord] target not found for file upload chat_id=%s thread_ts=%s", msg.chat_id, msg.thread_ts)
|
||||||
|
return False
|
||||||
|
|
||||||
|
if self._discord_module is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
fp = open(str(attachment.actual_path), "rb") # noqa: SIM115
|
||||||
|
file = self._discord_module.File(fp, filename=attachment.filename)
|
||||||
|
send_future = asyncio.run_coroutine_threadsafe(target.send(file=file), self._discord_loop)
|
||||||
|
await asyncio.wrap_future(send_future)
|
||||||
|
logger.info("[Discord] file uploaded: %s", attachment.filename)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
logger.exception("[Discord] failed to upload file: %s", attachment.filename)
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def _on_message(self, message) -> None:
|
||||||
|
if not self._running or not self._client:
|
||||||
|
return
|
||||||
|
|
||||||
|
if message.author.bot:
|
||||||
|
return
|
||||||
|
|
||||||
|
if self._client.user and message.author.id == self._client.user.id:
|
||||||
|
return
|
||||||
|
|
||||||
|
guild = message.guild
|
||||||
|
if self._allowed_guilds:
|
||||||
|
if guild is None or guild.id not in self._allowed_guilds:
|
||||||
|
return
|
||||||
|
|
||||||
|
text = (message.content or "").strip()
|
||||||
|
if not text:
|
||||||
|
return
|
||||||
|
|
||||||
|
if self._discord_module is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
if isinstance(message.channel, self._discord_module.Thread):
|
||||||
|
chat_id = str(message.channel.parent_id or message.channel.id)
|
||||||
|
thread_id = str(message.channel.id)
|
||||||
|
else:
|
||||||
|
thread = await self._create_thread(message)
|
||||||
|
if thread is None:
|
||||||
|
return
|
||||||
|
chat_id = str(message.channel.id)
|
||||||
|
thread_id = str(thread.id)
|
||||||
|
|
||||||
|
msg_type = InboundMessageType.COMMAND if text.startswith("/") else InboundMessageType.CHAT
|
||||||
|
inbound = self._make_inbound(
|
||||||
|
chat_id=chat_id,
|
||||||
|
user_id=str(message.author.id),
|
||||||
|
text=text,
|
||||||
|
msg_type=msg_type,
|
||||||
|
thread_ts=thread_id,
|
||||||
|
metadata={
|
||||||
|
"guild_id": str(guild.id) if guild else None,
|
||||||
|
"channel_id": str(message.channel.id),
|
||||||
|
"message_id": str(message.id),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
inbound.topic_id = thread_id
|
||||||
|
|
||||||
|
if self._main_loop and self._main_loop.is_running():
|
||||||
|
future = asyncio.run_coroutine_threadsafe(self.bus.publish_inbound(inbound), self._main_loop)
|
||||||
|
future.add_done_callback(lambda f: logger.exception("[Discord] publish_inbound failed", exc_info=f.exception()) if f.exception() else None)
|
||||||
|
|
||||||
|
def _run_client(self) -> None:
|
||||||
|
self._discord_loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(self._discord_loop)
|
||||||
|
try:
|
||||||
|
self._discord_loop.run_until_complete(self._client.start(self._bot_token))
|
||||||
|
except Exception:
|
||||||
|
if self._running:
|
||||||
|
logger.exception("Discord client error")
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
if self._client and not self._client.is_closed():
|
||||||
|
self._discord_loop.run_until_complete(self._client.close())
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Error during Discord shutdown")
|
||||||
|
|
||||||
|
async def _create_thread(self, message):
|
||||||
|
try:
|
||||||
|
thread_name = f"deerflow-{message.author.display_name}-{message.id}"[:100]
|
||||||
|
return await message.create_thread(name=thread_name)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("[Discord] failed to create thread for message=%s (threads may be disabled or missing permissions)", message.id)
|
||||||
|
try:
|
||||||
|
await message.channel.send("Could not create a thread for your message. Please check that threads are enabled in this channel.")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def _resolve_target(self, msg: OutboundMessage):
|
||||||
|
if not self._client or not self._discord_loop:
|
||||||
|
return None
|
||||||
|
|
||||||
|
target_ids: list[str] = []
|
||||||
|
if msg.thread_ts:
|
||||||
|
target_ids.append(msg.thread_ts)
|
||||||
|
if msg.chat_id and msg.chat_id not in target_ids:
|
||||||
|
target_ids.append(msg.chat_id)
|
||||||
|
|
||||||
|
for raw_id in target_ids:
|
||||||
|
target = await self._get_channel_or_thread(raw_id)
|
||||||
|
if target is not None:
|
||||||
|
return target
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def _get_channel_or_thread(self, raw_id: str):
|
||||||
|
if not self._client or not self._discord_loop:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
target_id = int(raw_id)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
get_future = asyncio.run_coroutine_threadsafe(self._fetch_channel(target_id), self._discord_loop)
|
||||||
|
try:
|
||||||
|
return await asyncio.wrap_future(get_future)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("[Discord] failed to resolve target id=%s", raw_id)
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def _fetch_channel(self, target_id: int):
|
||||||
|
if not self._client:
|
||||||
|
return None
|
||||||
|
|
||||||
|
channel = self._client.get_channel(target_id)
|
||||||
|
if channel is not None:
|
||||||
|
return channel
|
||||||
|
|
||||||
|
try:
|
||||||
|
return await self._client.fetch_channel(target_id)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _split_text(text: str) -> list[str]:
|
||||||
|
if not text:
|
||||||
|
return [""]
|
||||||
|
|
||||||
|
chunks: list[str] = []
|
||||||
|
remaining = text
|
||||||
|
while len(remaining) > _DISCORD_MAX_MESSAGE_LEN:
|
||||||
|
split_at = remaining.rfind("\n", 0, _DISCORD_MAX_MESSAGE_LEN)
|
||||||
|
if split_at <= 0:
|
||||||
|
split_at = _DISCORD_MAX_MESSAGE_LEN
|
||||||
|
chunks.append(remaining[:split_at])
|
||||||
|
remaining = remaining[split_at:].lstrip("\n")
|
||||||
|
|
||||||
|
if remaining:
|
||||||
|
chunks.append(remaining)
|
||||||
|
|
||||||
|
return chunks
|
||||||
@ -35,6 +35,7 @@ STREAM_UPDATE_MIN_INTERVAL_SECONDS = 0.35
|
|||||||
THREAD_BUSY_MESSAGE = "This conversation is already processing another request. Please wait for it to finish and try again."
|
THREAD_BUSY_MESSAGE = "This conversation is already processing another request. Please wait for it to finish and try again."
|
||||||
|
|
||||||
CHANNEL_CAPABILITIES = {
|
CHANNEL_CAPABILITIES = {
|
||||||
|
"discord": {"supports_streaming": False},
|
||||||
"feishu": {"supports_streaming": True},
|
"feishu": {"supports_streaming": True},
|
||||||
"slack": {"supports_streaming": False},
|
"slack": {"supports_streaming": False},
|
||||||
"telegram": {"supports_streaming": False},
|
"telegram": {"supports_streaming": False},
|
||||||
|
|||||||
@ -15,6 +15,7 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
# Channel name → import path for lazy loading
|
# Channel name → import path for lazy loading
|
||||||
_CHANNEL_REGISTRY: dict[str, str] = {
|
_CHANNEL_REGISTRY: dict[str, str] = {
|
||||||
|
"discord": "app.channels.discord:DiscordChannel",
|
||||||
"feishu": "app.channels.feishu:FeishuChannel",
|
"feishu": "app.channels.feishu:FeishuChannel",
|
||||||
"slack": "app.channels.slack:SlackChannel",
|
"slack": "app.channels.slack:SlackChannel",
|
||||||
"telegram": "app.channels.telegram:TelegramChannel",
|
"telegram": "app.channels.telegram:TelegramChannel",
|
||||||
|
|||||||
23
backend/tests/test_discord_channel.py
Normal file
23
backend/tests/test_discord_channel.py
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
"""Tests for Discord channel integration wiring."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from app.channels.discord import DiscordChannel
|
||||||
|
from app.channels.manager import CHANNEL_CAPABILITIES
|
||||||
|
from app.channels.message_bus import MessageBus
|
||||||
|
from app.channels.service import _CHANNEL_REGISTRY
|
||||||
|
|
||||||
|
|
||||||
|
def test_discord_channel_registered() -> None:
|
||||||
|
assert "discord" in _CHANNEL_REGISTRY
|
||||||
|
|
||||||
|
|
||||||
|
def test_discord_channel_capabilities() -> None:
|
||||||
|
assert "discord" in CHANNEL_CAPABILITIES
|
||||||
|
|
||||||
|
|
||||||
|
def test_discord_channel_init() -> None:
|
||||||
|
bus = MessageBus()
|
||||||
|
channel = DiscordChannel(bus=bus, config={"bot_token": "token"})
|
||||||
|
|
||||||
|
assert channel.name == "discord"
|
||||||
Loading…
x
Reference in New Issue
Block a user