diff --git a/backend/packages/harness/deerflow/subagents/executor.py b/backend/packages/harness/deerflow/subagents/executor.py index add25de0e..5529bec2c 100644 --- a/backend/packages/harness/deerflow/subagents/executor.py +++ b/backend/packages/harness/deerflow/subagents/executor.py @@ -76,6 +76,9 @@ _scheduler_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent # Larger pool to avoid blocking when scheduler submits execution tasks _execution_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-exec-") +# Dedicated pool for sync execute() calls made from an already-running event loop. +_isolated_loop_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-isolated-") + def _filter_tools( all_tools: list[BaseTool], @@ -374,12 +377,55 @@ class SubagentExecutor: return result + def _execute_in_isolated_loop(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult: + """Execute the subagent in a completely fresh event loop. + + This method is designed to run in a separate thread to ensure complete + isolation from any parent event loop, preventing conflicts with asyncio + primitives that may be bound to the parent loop (e.g., httpx clients). + """ + try: + previous_loop = asyncio.get_event_loop() + except RuntimeError: + previous_loop = None + + # Create and set a new event loop for this thread + loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(loop) + return loop.run_until_complete(self._aexecute(task, result_holder)) + finally: + try: + pending = asyncio.all_tasks(loop) + if pending: + for task_obj in pending: + task_obj.cancel() + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.run_until_complete(loop.shutdown_default_executor()) + except Exception: + logger.debug( + f"[trace={self.trace_id}] Failed while cleaning up isolated event loop for subagent {self.config.name}", + exc_info=True, + ) + finally: + try: + loop.close() + finally: + asyncio.set_event_loop(previous_loop) + def execute(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult: """Execute a task synchronously (wrapper around async execution). This method runs the async execution in a new event loop, allowing asynchronous tools (like MCP tools) to be used within the thread pool. + When called from within an already-running event loop (e.g., when the + parent agent is async), this method isolates the subagent execution in + a separate thread to avoid event loop conflicts with shared async + primitives like httpx clients. + Args: task: The task description for the subagent. result_holder: Optional pre-created result object to update during execution. @@ -387,16 +433,18 @@ class SubagentExecutor: Returns: SubagentResult with the execution result. """ - # Run the async execution in a new event loop - # This is necessary because: - # 1. We may have async-only tools (like MCP tools) - # 2. We're running inside a ThreadPoolExecutor which doesn't have an event loop - # - # Note: _aexecute() catches all exceptions internally, so this outer - # try-except only handles asyncio.run() failures (e.g., if called from - # an async context where an event loop already exists). Subagent execution - # errors are handled within _aexecute() and returned as FAILED status. try: + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop is not None and loop.is_running(): + logger.debug(f"[trace={self.trace_id}] Subagent {self.config.name} detected running event loop, using isolated thread") + future = _isolated_loop_pool.submit(self._execute_in_isolated_loop, task, result_holder) + return future.result() + + # Standard path: no running event loop, use asyncio.run return asyncio.run(self._aexecute(task, result_holder)) except Exception as e: logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} execution failed") diff --git a/backend/tests/test_subagent_executor.py b/backend/tests/test_subagent_executor.py index 9c8082068..a6a62c2b6 100644 --- a/backend/tests/test_subagent_executor.py +++ b/backend/tests/test_subagent_executor.py @@ -433,6 +433,42 @@ class TestSyncExecutionPath: assert result.status == SubagentStatus.COMPLETED assert result.result == "Thread pool result" + @pytest.mark.anyio + async def test_execute_in_running_event_loop_uses_isolated_thread(self, classes, base_config, mock_agent, msg): + """Test that execute() uses the isolated-thread path inside a running loop.""" + SubagentExecutor = classes["SubagentExecutor"] + SubagentStatus = classes["SubagentStatus"] + + execution_threads = [] + final_state = { + "messages": [ + msg.human("Task"), + msg.ai("Async loop result", "msg-1"), + ] + } + + async def mock_astream(*args, **kwargs): + execution_threads.append(threading.current_thread().name) + yield final_state + + mock_agent.astream = mock_astream + + executor = SubagentExecutor( + config=base_config, + tools=[], + thread_id="test-thread", + ) + + with patch.object(executor, "_create_agent", return_value=mock_agent): + with patch.object(executor, "_execute_in_isolated_loop", wraps=executor._execute_in_isolated_loop) as isolated: + result = executor.execute("Task") + + assert isolated.call_count == 1 + assert execution_threads + assert all(name.startswith("subagent-isolated-") for name in execution_threads) + assert result.status == SubagentStatus.COMPLETED + assert result.result == "Async loop result" + def test_execute_handles_asyncio_run_failure(self, classes, base_config): """Test handling when asyncio.run() itself fails.""" SubagentExecutor = classes["SubagentExecutor"]