diff --git a/backend/app/channels/manager.py b/backend/app/channels/manager.py index 9b3783177..8ebe7a748 100644 --- a/backend/app/channels/manager.py +++ b/backend/app/channels/manager.py @@ -10,6 +10,8 @@ import time from collections.abc import Mapping from typing import Any +from langgraph_sdk.errors import ConflictError + from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment from app.channels.store import ChannelStore @@ -27,6 +29,7 @@ DEFAULT_RUN_CONTEXT: dict[str, Any] = { "subagent_enabled": False, } STREAM_UPDATE_MIN_INTERVAL_SECONDS = 0.35 +THREAD_BUSY_MESSAGE = "This conversation is already processing another request. Please wait for it to finish and try again." CHANNEL_CAPABILITIES = { "feishu": {"supports_streaming": True}, @@ -39,6 +42,14 @@ class InvalidChannelSessionConfigError(ValueError): """Raised when IM channel session overrides contain invalid agent config.""" +def _is_thread_busy_error(exc: BaseException | None) -> bool: + if exc is None: + return False + if isinstance(exc, ConflictError): + return True + return "already running a task" in str(exc) + + def _as_dict(value: Any) -> dict[str, Any]: return dict(value) if isinstance(value, Mapping) else {} @@ -606,6 +617,7 @@ class ChannelManager: config=run_config, context=run_context, stream_mode=["messages-tuple", "values"], + multitask_strategy="reject", ): event = getattr(chunk, "event", "") data = getattr(chunk, "data", None) @@ -641,7 +653,10 @@ class ChannelManager: last_publish_at = now except Exception as exc: stream_error = exc - logger.exception("[Manager] streaming error: thread_id=%s", thread_id) + if _is_thread_busy_error(exc): + logger.warning("[Manager] thread busy (concurrent run rejected): thread_id=%s", thread_id) + else: + logger.exception("[Manager] streaming error: thread_id=%s", thread_id) finally: result = last_values if last_values is not None else {"messages": [{"type": "ai", "content": latest_text}]} response_text = _extract_response_text(result) @@ -652,7 +667,10 @@ class ChannelManager: if attachments: response_text = _format_artifact_text([attachment.virtual_path for attachment in attachments]) elif stream_error: - response_text = "An error occurred while processing your request. Please try again." + if _is_thread_busy_error(stream_error): + response_text = THREAD_BUSY_MESSAGE + else: + response_text = "An error occurred while processing your request. Please try again." else: response_text = latest_text or "(No response from agent)" diff --git a/backend/tests/test_channels.py b/backend/tests/test_channels.py index ed5ac093e..7ec5c4149 100644 --- a/backend/tests/test_channels.py +++ b/backend/tests/test_channels.py @@ -727,6 +727,60 @@ class TestChannelManager: _run(go()) + def test_handle_feishu_stream_conflict_sends_busy_message(self, monkeypatch): + import httpx + from langgraph_sdk.errors import ConflictError + + from app.channels.manager import THREAD_BUSY_MESSAGE, ChannelManager + + monkeypatch.setattr("app.channels.manager.STREAM_UPDATE_MIN_INTERVAL_SECONDS", 0.0) + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + manager = ChannelManager(bus=bus, store=store) + + outbound_received = [] + + async def capture_outbound(msg): + outbound_received.append(msg) + + bus.subscribe_outbound(capture_outbound) + + async def _conflict_stream(): + request = httpx.Request("POST", "http://127.0.0.1:2024/runs") + response = httpx.Response(409, request=request) + raise ConflictError( + "Thread is already running a task. Wait for it to finish or choose a different multitask strategy.", + response=response, + body={"message": "Thread is already running a task. Wait for it to finish or choose a different multitask strategy."}, + ) + yield # pragma: no cover + + mock_client = _make_mock_langgraph_client() + mock_client.runs.stream = MagicMock(return_value=_conflict_stream()) + manager._client = mock_client + + await manager.start() + + inbound = InboundMessage( + channel_name="feishu", + chat_id="chat1", + user_id="user1", + text="hi", + thread_ts="om-source-1", + ) + await bus.publish_inbound(inbound) + await _wait_for(lambda: any(m.is_final for m in outbound_received)) + await manager.stop() + + final_msgs = [m for m in outbound_received if m.is_final] + assert len(final_msgs) == 1 + assert final_msgs[0].text == THREAD_BUSY_MESSAGE + assert final_msgs[0].thread_ts == "om-source-1" + + _run(go()) + def test_handle_command_help(self): from app.channels.manager import ChannelManager