From 7dea1666ce665905e684af4dac2b300f6d6884f4 Mon Sep 17 00:00:00 2001 From: KiteEater <145987840+Kiteeater@users.noreply.github.com> Date: Thu, 30 Apr 2026 15:29:17 +0800 Subject: [PATCH] fix: avoid temporary event loops in async subagent execution (#2414) * fix: avoid temporary event loops in async subagent execution * Rename isolated subagent loop globals * Harden isolated subagent loop shutdown and logging * Sort subagent executor imports * Format subagent executor * Remove isolated loop pool from subagent executor * Format subagent executor cleanup --------- Co-authored-by: Willem Jiang --- .../harness/deerflow/subagents/executor.py | 174 +++++++++++++----- backend/tests/test_subagent_executor.py | 137 ++++++++++---- 2 files changed, 236 insertions(+), 75 deletions(-) diff --git a/backend/packages/harness/deerflow/subagents/executor.py b/backend/packages/harness/deerflow/subagents/executor.py index fea865699..c8da0789b 100644 --- a/backend/packages/harness/deerflow/subagents/executor.py +++ b/backend/packages/harness/deerflow/subagents/executor.py @@ -1,6 +1,7 @@ """Subagent execution engine.""" import asyncio +import atexit import logging import threading import uuid @@ -24,6 +25,12 @@ from deerflow.subagents.config import SubagentConfig logger = logging.getLogger(__name__) +_previous_shutdown_isolated_subagent_loop = globals().get("_shutdown_isolated_subagent_loop") +if callable(_previous_shutdown_isolated_subagent_loop): + atexit.unregister(_previous_shutdown_isolated_subagent_loop) + _previous_shutdown_isolated_subagent_loop() + + class SubagentStatus(Enum): """Status of a subagent execution.""" @@ -73,12 +80,92 @@ _background_tasks_lock = threading.Lock() # Thread pool for background task scheduling and orchestration _scheduler_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-scheduler-") -# Thread pool for actual subagent execution (with timeout support) -# Larger pool to avoid blocking when scheduler submits execution tasks -_execution_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-exec-") +# Persistent event loop for isolated subagent executions triggered from an +# already-running parent loop. Reusing one long-lived loop avoids creating a +# fresh loop per execution and then closing async resources bound to it. +_isolated_subagent_loop: asyncio.AbstractEventLoop | None = None +_isolated_subagent_loop_thread: threading.Thread | None = None +_isolated_subagent_loop_started: threading.Event | None = None +_isolated_subagent_loop_lock = threading.Lock() -# Dedicated pool for sync execute() calls made from an already-running event loop. -_isolated_loop_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-isolated-") + +def _run_isolated_subagent_loop( + loop: asyncio.AbstractEventLoop, + started_event: threading.Event, +) -> None: + """Run the persistent isolated subagent loop in a dedicated daemon thread.""" + asyncio.set_event_loop(loop) + loop.call_soon(started_event.set) + try: + loop.run_forever() + finally: + started_event.clear() + + +def _shutdown_isolated_subagent_loop() -> None: + """Stop and close the persistent isolated subagent loop.""" + global _isolated_subagent_loop, _isolated_subagent_loop_thread, _isolated_subagent_loop_started + + with _isolated_subagent_loop_lock: + loop = _isolated_subagent_loop + thread = _isolated_subagent_loop_thread + _isolated_subagent_loop = None + _isolated_subagent_loop_thread = None + _isolated_subagent_loop_started = None + + if loop is None: + return + + if loop.is_running(): + loop.call_soon_threadsafe(loop.stop) + + if thread is not None and thread.is_alive() and thread is not threading.current_thread(): + thread.join(timeout=1) + + thread_stopped = thread is None or not thread.is_alive() + loop_stopped = not loop.is_running() + + if not loop.is_closed(): + if thread_stopped and loop_stopped: + loop.close() + else: + logger.warning( + "Skipping close of isolated subagent loop because shutdown did not complete within timeout (thread_alive=%s, loop_running=%s)", + thread is not None and thread.is_alive(), + loop.is_running(), + ) + + +atexit.register(_shutdown_isolated_subagent_loop) + + +def _get_isolated_subagent_loop() -> asyncio.AbstractEventLoop: + """Return the persistent event loop used by isolated subagent executions.""" + global _isolated_subagent_loop, _isolated_subagent_loop_thread, _isolated_subagent_loop_started + with _isolated_subagent_loop_lock: + thread_is_alive = _isolated_subagent_loop_thread is not None and _isolated_subagent_loop_thread.is_alive() + loop_is_usable = _isolated_subagent_loop is not None and not _isolated_subagent_loop.is_closed() and _isolated_subagent_loop.is_running() and thread_is_alive + + if not loop_is_usable: + loop = asyncio.new_event_loop() + started_event = threading.Event() + thread = threading.Thread( + target=_run_isolated_subagent_loop, + args=(loop, started_event), + name="subagent-persistent-loop", + daemon=True, + ) + thread.start() + if not started_event.wait(timeout=5): + loop.call_soon_threadsafe(loop.stop) + thread.join(timeout=1) + loop.close() + raise RuntimeError("Timed out starting isolated subagent event loop") + _isolated_subagent_loop = loop + _isolated_subagent_loop_thread = thread + _isolated_subagent_loop_started = started_event + + return _isolated_subagent_loop def _filter_tools( @@ -453,42 +540,39 @@ class SubagentExecutor: return result def _execute_in_isolated_loop(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult: - """Execute the subagent in a completely fresh event loop. + """Execute the subagent on the persistent isolated event loop. - This method is designed to run in a separate thread to ensure complete - isolation from any parent event loop, preventing conflicts with asyncio - primitives that may be bound to the parent loop (e.g., httpx clients). + This method is used by the sync ``execute()`` path when the caller is + already running inside an event loop. Because ``execute()`` is a sync + API, this path blocks the caller while the actual coroutine runs on the + long-lived isolated loop. Reusing that loop keeps shared async clients + from being tied to a short-lived loop that gets closed per execution. """ + future: Future[SubagentResult] | None = None try: - previous_loop = asyncio.get_event_loop() - except RuntimeError: - previous_loop = None - - # Create and set a new event loop for this thread - loop = asyncio.new_event_loop() - try: - asyncio.set_event_loop(loop) - return loop.run_until_complete(self._aexecute(task, result_holder)) - finally: - try: - pending = asyncio.all_tasks(loop) - if pending: - for task_obj in pending: - task_obj.cancel() - loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) - - loop.run_until_complete(loop.shutdown_asyncgens()) - loop.run_until_complete(loop.shutdown_default_executor()) - except Exception: + future = asyncio.run_coroutine_threadsafe( + self._aexecute(task, result_holder), + _get_isolated_subagent_loop(), + ) + return future.result(timeout=self.config.timeout_seconds) + except FuturesTimeoutError: + if result_holder is not None: + result_holder.cancel_event.set() + if future is not None: + future.cancel() + raise + except Exception: + if future is None: logger.debug( - f"[trace={self.trace_id}] Failed while cleaning up isolated event loop for subagent {self.config.name}", + f"[trace={self.trace_id}] Failed to submit subagent {self.config.name} to the isolated event loop", exc_info=True, ) - finally: - try: - loop.close() - finally: - asyncio.set_event_loop(previous_loop) + else: + logger.debug( + f"[trace={self.trace_id}] Subagent {self.config.name} failed while executing on the isolated event loop", + exc_info=True, + ) + raise def execute(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult: """Execute a task synchronously (wrapper around async execution). @@ -497,9 +581,9 @@ class SubagentExecutor: asynchronous tools (like MCP tools) to be used within the thread pool. When called from within an already-running event loop (e.g., when the - parent agent is async), this method isolates the subagent execution in - a separate thread to avoid event loop conflicts with shared async - primitives like httpx clients. + parent agent is async), this method synchronously waits on the + persistent isolated loop to avoid event loop conflicts with shared + async primitives like httpx clients. Args: task: The task description for the subagent. @@ -515,9 +599,8 @@ class SubagentExecutor: loop = None if loop is not None and loop.is_running(): - logger.debug(f"[trace={self.trace_id}] Subagent {self.config.name} detected running event loop, using isolated thread") - future = _isolated_loop_pool.submit(self._execute_in_isolated_loop, task, result_holder) - return future.result() + logger.debug(f"[trace={self.trace_id}] Subagent {self.config.name} detected running event loop, using isolated loop") + return self._execute_in_isolated_loop(task, result_holder) # Standard path: no running event loop, use asyncio.run return asyncio.run(self._aexecute(task, result_holder)) @@ -571,9 +654,12 @@ class SubagentExecutor: result_holder = _background_tasks[task_id] try: - # Submit execution to execution pool with timeout - # Pass result_holder so execute() can update it in real-time - execution_future: Future = _execution_pool.submit(self.execute, task, result_holder) + # Submit execution directly to the persistent isolated loop so the + # background path does not create a temporary loop via execute(). + execution_future = asyncio.run_coroutine_threadsafe( + self._aexecute(task, result_holder), + _get_isolated_subagent_loop(), + ) try: # Wait for execution with timeout exec_result = execution_future.result(timeout=self.config.timeout_seconds) diff --git a/backend/tests/test_subagent_executor.py b/backend/tests/test_subagent_executor.py index a5ecc918b..428c82d17 100644 --- a/backend/tests/test_subagent_executor.py +++ b/backend/tests/test_subagent_executor.py @@ -3,7 +3,7 @@ Covers: - SubagentExecutor.execute() synchronous execution path - SubagentExecutor._aexecute() asynchronous execution path -- asyncio.run() properly executes async workflow within thread pool context +- execute_async() routes background work without bouncing through execute() - Error handling in both sync and async paths - Async tool support (MCP tools) - Cooperative cancellation via cancel_event @@ -487,7 +487,7 @@ class TestSyncExecutionPath: """Test that execute() works correctly when called from a thread pool. This simulates the real-world usage where execute() is called from - _execution_pool in execute_async(). + a worker thread outside the main event loop. """ from concurrent.futures import ThreadPoolExecutor @@ -515,7 +515,7 @@ class TestSyncExecutionPath: with patch.object(executor, "_create_agent", return_value=mock_agent): return executor.execute("Task") - # Execute in thread pool (simulating _execution_pool usage) + # Execute in thread pool to simulate sync execution outside the main loop. with ThreadPoolExecutor(max_workers=1) as pool: future = pool.submit(run_in_thread) result = future.result(timeout=5) @@ -524,11 +524,13 @@ class TestSyncExecutionPath: assert result.result == "Thread pool result" @pytest.mark.anyio - async def test_execute_in_running_event_loop_uses_isolated_thread(self, classes, base_config, mock_agent, msg): - """Test that execute() uses the isolated-thread path inside a running loop.""" + async def test_execute_in_running_event_loop_calls_isolated_loop_directly(self, classes, base_config, mock_agent, msg): + """Test that execute() calls the isolated-loop helper directly in a running loop.""" SubagentExecutor = classes["SubagentExecutor"] SubagentStatus = classes["SubagentStatus"] + caller_thread = threading.current_thread().name + isolated_helper_threads = [] execution_threads = [] final_state = { "messages": [ @@ -549,16 +551,59 @@ class TestSyncExecutionPath: thread_id="test-thread", ) + original_isolated_execute = executor._execute_in_isolated_loop + + def tracked_isolated_execute(task, result_holder=None): + isolated_helper_threads.append(threading.current_thread().name) + return original_isolated_execute(task, result_holder) + with patch.object(executor, "_create_agent", return_value=mock_agent): - with patch.object(executor, "_execute_in_isolated_loop", wraps=executor._execute_in_isolated_loop) as isolated: + with patch.object(executor, "_execute_in_isolated_loop", side_effect=tracked_isolated_execute) as isolated: result = executor.execute("Task") assert isolated.call_count == 1 + assert isolated_helper_threads == [caller_thread] assert execution_threads - assert all(name.startswith("subagent-isolated-") for name in execution_threads) + assert execution_threads == ["subagent-persistent-loop"] assert result.status == SubagentStatus.COMPLETED assert result.result == "Async loop result" + @pytest.mark.anyio + async def test_execute_in_running_event_loop_reuses_persistent_isolated_loop(self, classes, base_config, mock_agent, msg): + """Regression: repeated isolated executions should reuse one long-lived loop.""" + SubagentExecutor = classes["SubagentExecutor"] + SubagentStatus = classes["SubagentStatus"] + execution_loops = [] + + final_state = { + "messages": [ + msg.human("Task"), + msg.ai("Async loop result", "msg-1"), + ] + } + + async def mock_astream(*args, **kwargs): + execution_loops.append(asyncio.get_running_loop()) + yield final_state + + mock_agent.astream = mock_astream + + executor = SubagentExecutor( + config=base_config, + tools=[], + thread_id="test-thread", + ) + + with patch.object(executor, "_create_agent", return_value=mock_agent): + first = executor.execute("Task 1") + second = executor.execute("Task 2") + + assert first.status == SubagentStatus.COMPLETED + assert second.status == SubagentStatus.COMPLETED + assert len(execution_loops) == 2 + assert execution_loops[0] is execution_loops[1] + assert execution_loops[0].is_running() + def test_execute_handles_asyncio_run_failure(self, classes, base_config): """Test handling when asyncio.run() itself fails.""" SubagentExecutor = classes["SubagentExecutor"] @@ -1022,6 +1067,53 @@ class TestCooperativeCancellation: """Test that requesting cancellation on a nonexistent task does not raise.""" executor_module.request_cancel_background_task("nonexistent-task") + def test_execute_async_runs_without_calling_execute(self, executor_module, classes, base_config): + """Regression: execute_async should not route through execute()/asyncio.run().""" + import concurrent.futures + + SubagentExecutor = classes["SubagentExecutor"] + SubagentResult = classes["SubagentResult"] + SubagentStatus = classes["SubagentStatus"] + + def run_inline(fn, *args, **kwargs): + future = concurrent.futures.Future() + try: + future.set_result(fn(*args, **kwargs)) + except Exception as exc: + future.set_exception(exc) + return future + + async def fake_aexecute(task, result_holder=None): + result = result_holder or SubagentResult( + task_id="inline-task", + trace_id="test-trace", + status=SubagentStatus.RUNNING, + ) + result.status = SubagentStatus.COMPLETED + result.result = f"done: {task}" + result.completed_at = datetime.now() + return result + + executor = SubagentExecutor( + config=base_config, + tools=[], + thread_id="test-thread", + trace_id="test-trace", + ) + + with ( + patch.object(executor_module._scheduler_pool, "submit", side_effect=run_inline), + patch.object(executor, "_aexecute", side_effect=fake_aexecute), + patch.object(executor, "execute", side_effect=AssertionError("execute() should not be called by execute_async")), + ): + task_id = executor.execute_async("Task") + + result = executor_module._background_tasks.get(task_id) + assert result is not None + assert result.status == SubagentStatus.COMPLETED + assert result.result == "done: Task" + assert result.error is None + def test_timeout_does_not_overwrite_cancelled(self, executor_module, classes, base_config, msg): """Test that the real timeout handler does not overwrite CANCELLED status. @@ -1043,25 +1135,13 @@ class TestCooperativeCancellation: ) # Synchronisation primitives - execute_entered = threading.Event() # signals that execute() has started - execute_release = threading.Event() # lets execute() return + execute_entered = threading.Event() # signals that _aexecute() has started run_task_done = threading.Event() # signals that run_task() has finished - # A blocking execute() replacement so we control the timing exactly - def blocking_execute(task, result_holder=None): - # Cooperative cancellation: honour cancel_event like real _aexecute - if result_holder and result_holder.cancel_event.is_set(): - result_holder.status = SubagentStatus.CANCELLED - result_holder.error = "Cancelled by user" - result_holder.completed_at = datetime.now() - execute_entered.set() - return result_holder + # A blocking _aexecute() replacement so we control the timing exactly. + async def blocking_aexecute(task, result_holder=None): execute_entered.set() - execute_release.wait(timeout=5) - # Return a minimal completed result (will be ignored because timeout fires first) - from deerflow.subagents.executor import SubagentResult as _R - - return _R(task_id="x", trace_id="t", status=SubagentStatus.COMPLETED, result="late") + await asyncio.Event().wait() executor = SubagentExecutor( config=short_config, @@ -1082,11 +1162,11 @@ class TestCooperativeCancellation: return original_scheduler_submit(wrapper) - with patch.object(executor, "execute", blocking_execute), patch.object(executor_module._scheduler_pool, "submit", tracked_submit): + with patch.object(executor, "_aexecute", side_effect=blocking_aexecute), patch.object(executor_module._scheduler_pool, "submit", tracked_submit): task_id = executor.execute_async("Task") - # Wait until execute() is entered (i.e. it's running in _execution_pool) - assert execute_entered.wait(timeout=3), "execute() was never called" + # Wait until _aexecute() is entered on the persistent loop. + assert execute_entered.wait(timeout=3), "_aexecute() was never called" # Set CANCELLED on the result before the timeout handler runs. # The 50ms timeout will fire while execute() is blocked. @@ -1099,11 +1179,6 @@ class TestCooperativeCancellation: # now executed and (should have) left CANCELLED intact. assert run_task_done.wait(timeout=5), "run_task() did not finish" - # Only NOW release the blocked execute() so the thread pool worker - # can be reclaimed. This MUST come after run_task_done to avoid a - # race where execute() returns before the timeout fires. - execute_release.set() - result = executor_module._background_tasks.get(task_id) assert result is not None # The RUNNING guard in the FuturesTimeoutError handler must have