mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-25 11:18:22 +00:00
Fix(subagent): Event loop conflict in SubagentExecutor.execute() (#1965)
* Fix event loop conflict in SubagentExecutor.execute() When SubagentExecutor.execute() is called from within an already-running event loop (e.g., when the parent agent uses async/await), calling asyncio.run() creates a new event loop that conflicts with asyncio primitives (like httpx.AsyncClient) that were created in and bound to the parent loop. This fix detects if we're already in a running event loop, and if so, runs the subagent in a separate thread with its own isolated event loop to avoid conflicts. Fixes: sub-task cards not appearing in Ultra mode when using async parent agents Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(subagent): harden isolated event loop execution --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
85b7ed3cec
commit
e5b149068c
@ -76,6 +76,9 @@ _scheduler_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent
|
||||
# Larger pool to avoid blocking when scheduler submits execution tasks
|
||||
_execution_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-exec-")
|
||||
|
||||
# Dedicated pool for sync execute() calls made from an already-running event loop.
|
||||
_isolated_loop_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-isolated-")
|
||||
|
||||
|
||||
def _filter_tools(
|
||||
all_tools: list[BaseTool],
|
||||
@ -374,12 +377,55 @@ class SubagentExecutor:
|
||||
|
||||
return result
|
||||
|
||||
def _execute_in_isolated_loop(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult:
|
||||
"""Execute the subagent in a completely fresh event loop.
|
||||
|
||||
This method is designed to run in a separate thread to ensure complete
|
||||
isolation from any parent event loop, preventing conflicts with asyncio
|
||||
primitives that may be bound to the parent loop (e.g., httpx clients).
|
||||
"""
|
||||
try:
|
||||
previous_loop = asyncio.get_event_loop()
|
||||
except RuntimeError:
|
||||
previous_loop = None
|
||||
|
||||
# Create and set a new event loop for this thread
|
||||
loop = asyncio.new_event_loop()
|
||||
try:
|
||||
asyncio.set_event_loop(loop)
|
||||
return loop.run_until_complete(self._aexecute(task, result_holder))
|
||||
finally:
|
||||
try:
|
||||
pending = asyncio.all_tasks(loop)
|
||||
if pending:
|
||||
for task_obj in pending:
|
||||
task_obj.cancel()
|
||||
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
||||
|
||||
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||
loop.run_until_complete(loop.shutdown_default_executor())
|
||||
except Exception:
|
||||
logger.debug(
|
||||
f"[trace={self.trace_id}] Failed while cleaning up isolated event loop for subagent {self.config.name}",
|
||||
exc_info=True,
|
||||
)
|
||||
finally:
|
||||
try:
|
||||
loop.close()
|
||||
finally:
|
||||
asyncio.set_event_loop(previous_loop)
|
||||
|
||||
def execute(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult:
|
||||
"""Execute a task synchronously (wrapper around async execution).
|
||||
|
||||
This method runs the async execution in a new event loop, allowing
|
||||
asynchronous tools (like MCP tools) to be used within the thread pool.
|
||||
|
||||
When called from within an already-running event loop (e.g., when the
|
||||
parent agent is async), this method isolates the subagent execution in
|
||||
a separate thread to avoid event loop conflicts with shared async
|
||||
primitives like httpx clients.
|
||||
|
||||
Args:
|
||||
task: The task description for the subagent.
|
||||
result_holder: Optional pre-created result object to update during execution.
|
||||
@ -387,16 +433,18 @@ class SubagentExecutor:
|
||||
Returns:
|
||||
SubagentResult with the execution result.
|
||||
"""
|
||||
# Run the async execution in a new event loop
|
||||
# This is necessary because:
|
||||
# 1. We may have async-only tools (like MCP tools)
|
||||
# 2. We're running inside a ThreadPoolExecutor which doesn't have an event loop
|
||||
#
|
||||
# Note: _aexecute() catches all exceptions internally, so this outer
|
||||
# try-except only handles asyncio.run() failures (e.g., if called from
|
||||
# an async context where an event loop already exists). Subagent execution
|
||||
# errors are handled within _aexecute() and returned as FAILED status.
|
||||
try:
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
loop = None
|
||||
|
||||
if loop is not None and loop.is_running():
|
||||
logger.debug(f"[trace={self.trace_id}] Subagent {self.config.name} detected running event loop, using isolated thread")
|
||||
future = _isolated_loop_pool.submit(self._execute_in_isolated_loop, task, result_holder)
|
||||
return future.result()
|
||||
|
||||
# Standard path: no running event loop, use asyncio.run
|
||||
return asyncio.run(self._aexecute(task, result_holder))
|
||||
except Exception as e:
|
||||
logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} execution failed")
|
||||
|
||||
@ -433,6 +433,42 @@ class TestSyncExecutionPath:
|
||||
assert result.status == SubagentStatus.COMPLETED
|
||||
assert result.result == "Thread pool result"
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_execute_in_running_event_loop_uses_isolated_thread(self, classes, base_config, mock_agent, msg):
|
||||
"""Test that execute() uses the isolated-thread path inside a running loop."""
|
||||
SubagentExecutor = classes["SubagentExecutor"]
|
||||
SubagentStatus = classes["SubagentStatus"]
|
||||
|
||||
execution_threads = []
|
||||
final_state = {
|
||||
"messages": [
|
||||
msg.human("Task"),
|
||||
msg.ai("Async loop result", "msg-1"),
|
||||
]
|
||||
}
|
||||
|
||||
async def mock_astream(*args, **kwargs):
|
||||
execution_threads.append(threading.current_thread().name)
|
||||
yield final_state
|
||||
|
||||
mock_agent.astream = mock_astream
|
||||
|
||||
executor = SubagentExecutor(
|
||||
config=base_config,
|
||||
tools=[],
|
||||
thread_id="test-thread",
|
||||
)
|
||||
|
||||
with patch.object(executor, "_create_agent", return_value=mock_agent):
|
||||
with patch.object(executor, "_execute_in_isolated_loop", wraps=executor._execute_in_isolated_loop) as isolated:
|
||||
result = executor.execute("Task")
|
||||
|
||||
assert isolated.call_count == 1
|
||||
assert execution_threads
|
||||
assert all(name.startswith("subagent-isolated-") for name in execution_threads)
|
||||
assert result.status == SubagentStatus.COMPLETED
|
||||
assert result.result == "Async loop result"
|
||||
|
||||
def test_execute_handles_asyncio_run_failure(self, classes, base_config):
|
||||
"""Test handling when asyncio.run() itself fails."""
|
||||
SubagentExecutor = classes["SubagentExecutor"]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user