fix(channel): reject concurrent same-thread runs (#1465) (#1475)

* fix(channel): reject concurrent same-thread runs (#1465)

* Apply suggestions from code review

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix(lint): sort imports in manager.py and test_channels.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(channel): widen _is_thread_busy_error to BaseException and downgrade busy log to warning

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Nan Gao 2026-03-29 09:55:47 +08:00 committed by GitHub
parent 18e3487888
commit 89183ae76a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 74 additions and 2 deletions

View File

@ -10,6 +10,8 @@ import time
from collections.abc import Mapping
from typing import Any
from langgraph_sdk.errors import ConflictError
from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
from app.channels.store import ChannelStore
@ -27,6 +29,7 @@ DEFAULT_RUN_CONTEXT: dict[str, Any] = {
"subagent_enabled": False,
}
STREAM_UPDATE_MIN_INTERVAL_SECONDS = 0.35
THREAD_BUSY_MESSAGE = "This conversation is already processing another request. Please wait for it to finish and try again."
CHANNEL_CAPABILITIES = {
"feishu": {"supports_streaming": True},
@ -39,6 +42,14 @@ class InvalidChannelSessionConfigError(ValueError):
"""Raised when IM channel session overrides contain invalid agent config."""
def _is_thread_busy_error(exc: BaseException | None) -> bool:
if exc is None:
return False
if isinstance(exc, ConflictError):
return True
return "already running a task" in str(exc)
def _as_dict(value: Any) -> dict[str, Any]:
return dict(value) if isinstance(value, Mapping) else {}
@ -606,6 +617,7 @@ class ChannelManager:
config=run_config,
context=run_context,
stream_mode=["messages-tuple", "values"],
multitask_strategy="reject",
):
event = getattr(chunk, "event", "")
data = getattr(chunk, "data", None)
@ -641,7 +653,10 @@ class ChannelManager:
last_publish_at = now
except Exception as exc:
stream_error = exc
logger.exception("[Manager] streaming error: thread_id=%s", thread_id)
if _is_thread_busy_error(exc):
logger.warning("[Manager] thread busy (concurrent run rejected): thread_id=%s", thread_id)
else:
logger.exception("[Manager] streaming error: thread_id=%s", thread_id)
finally:
result = last_values if last_values is not None else {"messages": [{"type": "ai", "content": latest_text}]}
response_text = _extract_response_text(result)
@ -652,7 +667,10 @@ class ChannelManager:
if attachments:
response_text = _format_artifact_text([attachment.virtual_path for attachment in attachments])
elif stream_error:
response_text = "An error occurred while processing your request. Please try again."
if _is_thread_busy_error(stream_error):
response_text = THREAD_BUSY_MESSAGE
else:
response_text = "An error occurred while processing your request. Please try again."
else:
response_text = latest_text or "(No response from agent)"

View File

@ -727,6 +727,60 @@ class TestChannelManager:
_run(go())
def test_handle_feishu_stream_conflict_sends_busy_message(self, monkeypatch):
import httpx
from langgraph_sdk.errors import ConflictError
from app.channels.manager import THREAD_BUSY_MESSAGE, ChannelManager
monkeypatch.setattr("app.channels.manager.STREAM_UPDATE_MIN_INTERVAL_SECONDS", 0.0)
async def go():
bus = MessageBus()
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
manager = ChannelManager(bus=bus, store=store)
outbound_received = []
async def capture_outbound(msg):
outbound_received.append(msg)
bus.subscribe_outbound(capture_outbound)
async def _conflict_stream():
request = httpx.Request("POST", "http://127.0.0.1:2024/runs")
response = httpx.Response(409, request=request)
raise ConflictError(
"Thread is already running a task. Wait for it to finish or choose a different multitask strategy.",
response=response,
body={"message": "Thread is already running a task. Wait for it to finish or choose a different multitask strategy."},
)
yield # pragma: no cover
mock_client = _make_mock_langgraph_client()
mock_client.runs.stream = MagicMock(return_value=_conflict_stream())
manager._client = mock_client
await manager.start()
inbound = InboundMessage(
channel_name="feishu",
chat_id="chat1",
user_id="user1",
text="hi",
thread_ts="om-source-1",
)
await bus.publish_inbound(inbound)
await _wait_for(lambda: any(m.is_final for m in outbound_received))
await manager.stop()
final_msgs = [m for m in outbound_received if m.is_final]
assert len(final_msgs) == 1
assert final_msgs[0].text == THREAD_BUSY_MESSAGE
assert final_msgs[0].thread_ts == "om-source-1"
_run(go())
def test_handle_command_help(self):
from app.channels.manager import ChannelManager