fix: avoid temporary event loops in async subagent execution (#2414)

* fix: avoid temporary event loops in async subagent execution

* Rename isolated subagent loop globals

* Harden isolated subagent loop shutdown and logging

* Sort subagent executor imports

* Format subagent executor

* Remove isolated loop pool from subagent executor

* Format subagent executor cleanup

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
KiteEater 2026-04-30 15:29:17 +08:00 committed by GitHub
parent 88d47f677f
commit 7dea1666ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 236 additions and 75 deletions

View File

@ -1,6 +1,7 @@
"""Subagent execution engine."""
import asyncio
import atexit
import logging
import threading
import uuid
@ -24,6 +25,12 @@ from deerflow.subagents.config import SubagentConfig
logger = logging.getLogger(__name__)
_previous_shutdown_isolated_subagent_loop = globals().get("_shutdown_isolated_subagent_loop")
if callable(_previous_shutdown_isolated_subagent_loop):
atexit.unregister(_previous_shutdown_isolated_subagent_loop)
_previous_shutdown_isolated_subagent_loop()
class SubagentStatus(Enum):
"""Status of a subagent execution."""
@ -73,12 +80,92 @@ _background_tasks_lock = threading.Lock()
# Thread pool for background task scheduling and orchestration
_scheduler_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-scheduler-")
# Thread pool for actual subagent execution (with timeout support)
# Larger pool to avoid blocking when scheduler submits execution tasks
_execution_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-exec-")
# Persistent event loop for isolated subagent executions triggered from an
# already-running parent loop. Reusing one long-lived loop avoids creating a
# fresh loop per execution and then closing async resources bound to it.
_isolated_subagent_loop: asyncio.AbstractEventLoop | None = None
_isolated_subagent_loop_thread: threading.Thread | None = None
_isolated_subagent_loop_started: threading.Event | None = None
_isolated_subagent_loop_lock = threading.Lock()
# Dedicated pool for sync execute() calls made from an already-running event loop.
_isolated_loop_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-isolated-")
def _run_isolated_subagent_loop(
loop: asyncio.AbstractEventLoop,
started_event: threading.Event,
) -> None:
"""Run the persistent isolated subagent loop in a dedicated daemon thread."""
asyncio.set_event_loop(loop)
loop.call_soon(started_event.set)
try:
loop.run_forever()
finally:
started_event.clear()
def _shutdown_isolated_subagent_loop() -> None:
"""Stop and close the persistent isolated subagent loop."""
global _isolated_subagent_loop, _isolated_subagent_loop_thread, _isolated_subagent_loop_started
with _isolated_subagent_loop_lock:
loop = _isolated_subagent_loop
thread = _isolated_subagent_loop_thread
_isolated_subagent_loop = None
_isolated_subagent_loop_thread = None
_isolated_subagent_loop_started = None
if loop is None:
return
if loop.is_running():
loop.call_soon_threadsafe(loop.stop)
if thread is not None and thread.is_alive() and thread is not threading.current_thread():
thread.join(timeout=1)
thread_stopped = thread is None or not thread.is_alive()
loop_stopped = not loop.is_running()
if not loop.is_closed():
if thread_stopped and loop_stopped:
loop.close()
else:
logger.warning(
"Skipping close of isolated subagent loop because shutdown did not complete within timeout (thread_alive=%s, loop_running=%s)",
thread is not None and thread.is_alive(),
loop.is_running(),
)
atexit.register(_shutdown_isolated_subagent_loop)
def _get_isolated_subagent_loop() -> asyncio.AbstractEventLoop:
"""Return the persistent event loop used by isolated subagent executions."""
global _isolated_subagent_loop, _isolated_subagent_loop_thread, _isolated_subagent_loop_started
with _isolated_subagent_loop_lock:
thread_is_alive = _isolated_subagent_loop_thread is not None and _isolated_subagent_loop_thread.is_alive()
loop_is_usable = _isolated_subagent_loop is not None and not _isolated_subagent_loop.is_closed() and _isolated_subagent_loop.is_running() and thread_is_alive
if not loop_is_usable:
loop = asyncio.new_event_loop()
started_event = threading.Event()
thread = threading.Thread(
target=_run_isolated_subagent_loop,
args=(loop, started_event),
name="subagent-persistent-loop",
daemon=True,
)
thread.start()
if not started_event.wait(timeout=5):
loop.call_soon_threadsafe(loop.stop)
thread.join(timeout=1)
loop.close()
raise RuntimeError("Timed out starting isolated subagent event loop")
_isolated_subagent_loop = loop
_isolated_subagent_loop_thread = thread
_isolated_subagent_loop_started = started_event
return _isolated_subagent_loop
def _filter_tools(
@ -453,42 +540,39 @@ class SubagentExecutor:
return result
def _execute_in_isolated_loop(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult:
"""Execute the subagent in a completely fresh event loop.
"""Execute the subagent on the persistent isolated event loop.
This method is designed to run in a separate thread to ensure complete
isolation from any parent event loop, preventing conflicts with asyncio
primitives that may be bound to the parent loop (e.g., httpx clients).
This method is used by the sync ``execute()`` path when the caller is
already running inside an event loop. Because ``execute()`` is a sync
API, this path blocks the caller while the actual coroutine runs on the
long-lived isolated loop. Reusing that loop keeps shared async clients
from being tied to a short-lived loop that gets closed per execution.
"""
future: Future[SubagentResult] | None = None
try:
previous_loop = asyncio.get_event_loop()
except RuntimeError:
previous_loop = None
# Create and set a new event loop for this thread
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
return loop.run_until_complete(self._aexecute(task, result_holder))
finally:
try:
pending = asyncio.all_tasks(loop)
if pending:
for task_obj in pending:
task_obj.cancel()
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
except Exception:
future = asyncio.run_coroutine_threadsafe(
self._aexecute(task, result_holder),
_get_isolated_subagent_loop(),
)
return future.result(timeout=self.config.timeout_seconds)
except FuturesTimeoutError:
if result_holder is not None:
result_holder.cancel_event.set()
if future is not None:
future.cancel()
raise
except Exception:
if future is None:
logger.debug(
f"[trace={self.trace_id}] Failed while cleaning up isolated event loop for subagent {self.config.name}",
f"[trace={self.trace_id}] Failed to submit subagent {self.config.name} to the isolated event loop",
exc_info=True,
)
finally:
try:
loop.close()
finally:
asyncio.set_event_loop(previous_loop)
else:
logger.debug(
f"[trace={self.trace_id}] Subagent {self.config.name} failed while executing on the isolated event loop",
exc_info=True,
)
raise
def execute(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult:
"""Execute a task synchronously (wrapper around async execution).
@ -497,9 +581,9 @@ class SubagentExecutor:
asynchronous tools (like MCP tools) to be used within the thread pool.
When called from within an already-running event loop (e.g., when the
parent agent is async), this method isolates the subagent execution in
a separate thread to avoid event loop conflicts with shared async
primitives like httpx clients.
parent agent is async), this method synchronously waits on the
persistent isolated loop to avoid event loop conflicts with shared
async primitives like httpx clients.
Args:
task: The task description for the subagent.
@ -515,9 +599,8 @@ class SubagentExecutor:
loop = None
if loop is not None and loop.is_running():
logger.debug(f"[trace={self.trace_id}] Subagent {self.config.name} detected running event loop, using isolated thread")
future = _isolated_loop_pool.submit(self._execute_in_isolated_loop, task, result_holder)
return future.result()
logger.debug(f"[trace={self.trace_id}] Subagent {self.config.name} detected running event loop, using isolated loop")
return self._execute_in_isolated_loop(task, result_holder)
# Standard path: no running event loop, use asyncio.run
return asyncio.run(self._aexecute(task, result_holder))
@ -571,9 +654,12 @@ class SubagentExecutor:
result_holder = _background_tasks[task_id]
try:
# Submit execution to execution pool with timeout
# Pass result_holder so execute() can update it in real-time
execution_future: Future = _execution_pool.submit(self.execute, task, result_holder)
# Submit execution directly to the persistent isolated loop so the
# background path does not create a temporary loop via execute().
execution_future = asyncio.run_coroutine_threadsafe(
self._aexecute(task, result_holder),
_get_isolated_subagent_loop(),
)
try:
# Wait for execution with timeout
exec_result = execution_future.result(timeout=self.config.timeout_seconds)

View File

@ -3,7 +3,7 @@
Covers:
- SubagentExecutor.execute() synchronous execution path
- SubagentExecutor._aexecute() asynchronous execution path
- asyncio.run() properly executes async workflow within thread pool context
- execute_async() routes background work without bouncing through execute()
- Error handling in both sync and async paths
- Async tool support (MCP tools)
- Cooperative cancellation via cancel_event
@ -487,7 +487,7 @@ class TestSyncExecutionPath:
"""Test that execute() works correctly when called from a thread pool.
This simulates the real-world usage where execute() is called from
_execution_pool in execute_async().
a worker thread outside the main event loop.
"""
from concurrent.futures import ThreadPoolExecutor
@ -515,7 +515,7 @@ class TestSyncExecutionPath:
with patch.object(executor, "_create_agent", return_value=mock_agent):
return executor.execute("Task")
# Execute in thread pool (simulating _execution_pool usage)
# Execute in thread pool to simulate sync execution outside the main loop.
with ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(run_in_thread)
result = future.result(timeout=5)
@ -524,11 +524,13 @@ class TestSyncExecutionPath:
assert result.result == "Thread pool result"
@pytest.mark.anyio
async def test_execute_in_running_event_loop_uses_isolated_thread(self, classes, base_config, mock_agent, msg):
"""Test that execute() uses the isolated-thread path inside a running loop."""
async def test_execute_in_running_event_loop_calls_isolated_loop_directly(self, classes, base_config, mock_agent, msg):
"""Test that execute() calls the isolated-loop helper directly in a running loop."""
SubagentExecutor = classes["SubagentExecutor"]
SubagentStatus = classes["SubagentStatus"]
caller_thread = threading.current_thread().name
isolated_helper_threads = []
execution_threads = []
final_state = {
"messages": [
@ -549,16 +551,59 @@ class TestSyncExecutionPath:
thread_id="test-thread",
)
original_isolated_execute = executor._execute_in_isolated_loop
def tracked_isolated_execute(task, result_holder=None):
isolated_helper_threads.append(threading.current_thread().name)
return original_isolated_execute(task, result_holder)
with patch.object(executor, "_create_agent", return_value=mock_agent):
with patch.object(executor, "_execute_in_isolated_loop", wraps=executor._execute_in_isolated_loop) as isolated:
with patch.object(executor, "_execute_in_isolated_loop", side_effect=tracked_isolated_execute) as isolated:
result = executor.execute("Task")
assert isolated.call_count == 1
assert isolated_helper_threads == [caller_thread]
assert execution_threads
assert all(name.startswith("subagent-isolated-") for name in execution_threads)
assert execution_threads == ["subagent-persistent-loop"]
assert result.status == SubagentStatus.COMPLETED
assert result.result == "Async loop result"
@pytest.mark.anyio
async def test_execute_in_running_event_loop_reuses_persistent_isolated_loop(self, classes, base_config, mock_agent, msg):
"""Regression: repeated isolated executions should reuse one long-lived loop."""
SubagentExecutor = classes["SubagentExecutor"]
SubagentStatus = classes["SubagentStatus"]
execution_loops = []
final_state = {
"messages": [
msg.human("Task"),
msg.ai("Async loop result", "msg-1"),
]
}
async def mock_astream(*args, **kwargs):
execution_loops.append(asyncio.get_running_loop())
yield final_state
mock_agent.astream = mock_astream
executor = SubagentExecutor(
config=base_config,
tools=[],
thread_id="test-thread",
)
with patch.object(executor, "_create_agent", return_value=mock_agent):
first = executor.execute("Task 1")
second = executor.execute("Task 2")
assert first.status == SubagentStatus.COMPLETED
assert second.status == SubagentStatus.COMPLETED
assert len(execution_loops) == 2
assert execution_loops[0] is execution_loops[1]
assert execution_loops[0].is_running()
def test_execute_handles_asyncio_run_failure(self, classes, base_config):
"""Test handling when asyncio.run() itself fails."""
SubagentExecutor = classes["SubagentExecutor"]
@ -1022,6 +1067,53 @@ class TestCooperativeCancellation:
"""Test that requesting cancellation on a nonexistent task does not raise."""
executor_module.request_cancel_background_task("nonexistent-task")
def test_execute_async_runs_without_calling_execute(self, executor_module, classes, base_config):
"""Regression: execute_async should not route through execute()/asyncio.run()."""
import concurrent.futures
SubagentExecutor = classes["SubagentExecutor"]
SubagentResult = classes["SubagentResult"]
SubagentStatus = classes["SubagentStatus"]
def run_inline(fn, *args, **kwargs):
future = concurrent.futures.Future()
try:
future.set_result(fn(*args, **kwargs))
except Exception as exc:
future.set_exception(exc)
return future
async def fake_aexecute(task, result_holder=None):
result = result_holder or SubagentResult(
task_id="inline-task",
trace_id="test-trace",
status=SubagentStatus.RUNNING,
)
result.status = SubagentStatus.COMPLETED
result.result = f"done: {task}"
result.completed_at = datetime.now()
return result
executor = SubagentExecutor(
config=base_config,
tools=[],
thread_id="test-thread",
trace_id="test-trace",
)
with (
patch.object(executor_module._scheduler_pool, "submit", side_effect=run_inline),
patch.object(executor, "_aexecute", side_effect=fake_aexecute),
patch.object(executor, "execute", side_effect=AssertionError("execute() should not be called by execute_async")),
):
task_id = executor.execute_async("Task")
result = executor_module._background_tasks.get(task_id)
assert result is not None
assert result.status == SubagentStatus.COMPLETED
assert result.result == "done: Task"
assert result.error is None
def test_timeout_does_not_overwrite_cancelled(self, executor_module, classes, base_config, msg):
"""Test that the real timeout handler does not overwrite CANCELLED status.
@ -1043,25 +1135,13 @@ class TestCooperativeCancellation:
)
# Synchronisation primitives
execute_entered = threading.Event() # signals that execute() has started
execute_release = threading.Event() # lets execute() return
execute_entered = threading.Event() # signals that _aexecute() has started
run_task_done = threading.Event() # signals that run_task() has finished
# A blocking execute() replacement so we control the timing exactly
def blocking_execute(task, result_holder=None):
# Cooperative cancellation: honour cancel_event like real _aexecute
if result_holder and result_holder.cancel_event.is_set():
result_holder.status = SubagentStatus.CANCELLED
result_holder.error = "Cancelled by user"
result_holder.completed_at = datetime.now()
execute_entered.set()
return result_holder
# A blocking _aexecute() replacement so we control the timing exactly.
async def blocking_aexecute(task, result_holder=None):
execute_entered.set()
execute_release.wait(timeout=5)
# Return a minimal completed result (will be ignored because timeout fires first)
from deerflow.subagents.executor import SubagentResult as _R
return _R(task_id="x", trace_id="t", status=SubagentStatus.COMPLETED, result="late")
await asyncio.Event().wait()
executor = SubagentExecutor(
config=short_config,
@ -1082,11 +1162,11 @@ class TestCooperativeCancellation:
return original_scheduler_submit(wrapper)
with patch.object(executor, "execute", blocking_execute), patch.object(executor_module._scheduler_pool, "submit", tracked_submit):
with patch.object(executor, "_aexecute", side_effect=blocking_aexecute), patch.object(executor_module._scheduler_pool, "submit", tracked_submit):
task_id = executor.execute_async("Task")
# Wait until execute() is entered (i.e. it's running in _execution_pool)
assert execute_entered.wait(timeout=3), "execute() was never called"
# Wait until _aexecute() is entered on the persistent loop.
assert execute_entered.wait(timeout=3), "_aexecute() was never called"
# Set CANCELLED on the result before the timeout handler runs.
# The 50ms timeout will fire while execute() is blocked.
@ -1099,11 +1179,6 @@ class TestCooperativeCancellation:
# now executed and (should have) left CANCELLED intact.
assert run_task_done.wait(timeout=5), "run_task() did not finish"
# Only NOW release the blocked execute() so the thread pool worker
# can be reclaimed. This MUST come after run_task_done to avoid a
# race where execute() returns before the timeout fires.
execute_release.set()
result = executor_module._background_tasks.get(task_id)
assert result is not None
# The RUNNING guard in the FuturesTimeoutError handler must have