From f0dd8cb0d22bd49cdb6c7efa35ca765403d143da Mon Sep 17 00:00:00 2001 From: lulusiyuyu <72681838+lulusiyuyu@users.noreply.github.com> Date: Tue, 7 Apr 2026 11:12:25 +0800 Subject: [PATCH] fix(subagents): add cooperative cancellation for subagent threads (#1873) * fix(subagents): add cooperative cancellation for subagent threads Subagent tasks run inside ThreadPoolExecutor threads with their own event loop (asyncio.run). When a user clicks stop, RunManager cancels the parent asyncio.Task, but Future.cancel() cannot terminate a running thread and asyncio.Event does not propagate across event loops. This causes subagent threads to keep executing (writing files, calling LLMs) even after the user explicitly stops the run. Fix: add a threading.Event (cancel_event) to SubagentResult and check it cooperatively in _aexecute()'s astream iteration loop. On cancel, request_cancel_background_task() sets the event, and the thread exits at the next iteration boundary. Changes: - executor.py: Add cancel_event field to SubagentResult, check it in _aexecute loop, set it on timeout, add request_cancel_background_task - task_tool.py: Call request_cancel_background_task on CancelledError * fix(subagents): guard cancel status and add pre-check before astream - Only overwrite status to FAILED when still RUNNING, preserving TIMED_OUT set by the scheduler thread. - Add cancel_event pre-check before entering the astream loop so cancellation is detected immediately when already signalled. * fix(subagents): guard status updates with lock to prevent race condition Wrap the check-and-set on result.status in _aexecute with _background_tasks_lock so the timeout handler in execute_async cannot interleave between the read and write. * fix(subagents): add dedicated CANCELLED status for user cancellation Introduce SubagentStatus.CANCELLED to distinguish user-initiated cancellation from actual execution failures. Update _aexecute, task_tool polling, cleanup terminal-status sets, and test fixtures. * test(subagents): add cancellation tests and fix timeout regression test - Add dedicated TestCooperativeCancellation test class with 6 tests: - Pre-set cancel_event prevents astream from starting - Mid-stream cancel_event returns CANCELLED immediately - request_cancel_background_task() sets cancel_event correctly - request_cancel on nonexistent task is a no-op - Real execute_async timeout does not overwrite CANCELLED (deterministic threading.Event sync, no wall-clock sleeps) - cleanup_background_task removes CANCELLED tasks - Add task_tool cancellation coverage: - test_cancellation_calls_request_cancel: assert CancelledError path calls request_cancel_background_task(task_id) - test_task_tool_returns_cancelled_message: assert CANCELLED polling branch emits task_cancelled event and returns expected message - Fix pre-existing test infrastructure issue: add deerflow.sandbox.security to _MOCKED_MODULE_NAMES (fixes ModuleNotFoundError for all executor tests) - Add RUNNING guard to timeout handler in executor.py to prevent TIMED_OUT from overwriting CANCELLED status - Add cooperative cancellation granularity comment documenting that cancellation is only detected at astream iteration boundaries --------- Co-authored-by: lulusiyuyu --- .../harness/deerflow/subagents/executor.py | 57 ++++- .../deerflow/tools/builtins/task_tool.py | 14 +- backend/tests/test_subagent_executor.py | 233 ++++++++++++++++++ backend/tests/test_task_tool_core_logic.py | 100 ++++++++ 4 files changed, 397 insertions(+), 7 deletions(-) diff --git a/backend/packages/harness/deerflow/subagents/executor.py b/backend/packages/harness/deerflow/subagents/executor.py index 8e1b15138..add25de0e 100644 --- a/backend/packages/harness/deerflow/subagents/executor.py +++ b/backend/packages/harness/deerflow/subagents/executor.py @@ -6,7 +6,7 @@ import threading import uuid from concurrent.futures import Future, ThreadPoolExecutor from concurrent.futures import TimeoutError as FuturesTimeoutError -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Any @@ -30,6 +30,7 @@ class SubagentStatus(Enum): RUNNING = "running" COMPLETED = "completed" FAILED = "failed" + CANCELLED = "cancelled" TIMED_OUT = "timed_out" @@ -56,6 +57,7 @@ class SubagentResult: started_at: datetime | None = None completed_at: datetime | None = None ai_messages: list[dict[str, Any]] | None = None + cancel_event: threading.Event = field(default_factory=threading.Event, repr=False) def __post_init__(self): """Initialize mutable defaults.""" @@ -241,7 +243,31 @@ class SubagentExecutor: # Use stream instead of invoke to get real-time updates # This allows us to collect AI messages as they are generated final_state = None + + # Pre-check: bail out immediately if already cancelled before streaming starts + if result.cancel_event.is_set(): + logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} cancelled before streaming") + with _background_tasks_lock: + if result.status == SubagentStatus.RUNNING: + result.status = SubagentStatus.CANCELLED + result.error = "Cancelled by user" + result.completed_at = datetime.now() + return result + async for chunk in agent.astream(state, config=run_config, context=context, stream_mode="values"): # type: ignore[arg-type] + # Cooperative cancellation: check if parent requested stop. + # Note: cancellation is only detected at astream iteration boundaries, + # so long-running tool calls within a single iteration will not be + # interrupted until the next chunk is yielded. + if result.cancel_event.is_set(): + logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} cancelled by parent") + with _background_tasks_lock: + if result.status == SubagentStatus.RUNNING: + result.status = SubagentStatus.CANCELLED + result.error = "Cancelled by user" + result.completed_at = datetime.now() + return result + final_state = chunk # Extract AI messages from the current state @@ -437,10 +463,12 @@ class SubagentExecutor: except FuturesTimeoutError: logger.error(f"[trace={self.trace_id}] Subagent {self.config.name} execution timed out after {self.config.timeout_seconds}s") with _background_tasks_lock: - _background_tasks[task_id].status = SubagentStatus.TIMED_OUT - _background_tasks[task_id].error = f"Execution timed out after {self.config.timeout_seconds} seconds" - _background_tasks[task_id].completed_at = datetime.now() - # Cancel the future (best effort - may not stop the actual execution) + if _background_tasks[task_id].status == SubagentStatus.RUNNING: + _background_tasks[task_id].status = SubagentStatus.TIMED_OUT + _background_tasks[task_id].error = f"Execution timed out after {self.config.timeout_seconds} seconds" + _background_tasks[task_id].completed_at = datetime.now() + # Signal cooperative cancellation and cancel the future + result_holder.cancel_event.set() execution_future.cancel() except Exception as e: logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} async execution failed") @@ -456,6 +484,24 @@ class SubagentExecutor: MAX_CONCURRENT_SUBAGENTS = 3 +def request_cancel_background_task(task_id: str) -> None: + """Signal a running background task to stop. + + Sets the cancel_event on the task, which is checked cooperatively + by ``_aexecute`` during ``agent.astream()`` iteration. This allows + subagent threads — which cannot be force-killed via ``Future.cancel()`` + — to stop at the next iteration boundary. + + Args: + task_id: The task ID to cancel. + """ + with _background_tasks_lock: + result = _background_tasks.get(task_id) + if result is not None: + result.cancel_event.set() + logger.info("Requested cancellation for background task %s", task_id) + + def get_background_task_result(task_id: str) -> SubagentResult | None: """Get the result of a background task. @@ -503,6 +549,7 @@ def cleanup_background_task(task_id: str) -> None: is_terminal_status = result.status in { SubagentStatus.COMPLETED, SubagentStatus.FAILED, + SubagentStatus.CANCELLED, SubagentStatus.TIMED_OUT, } if is_terminal_status or result.completed_at is not None: diff --git a/backend/packages/harness/deerflow/tools/builtins/task_tool.py b/backend/packages/harness/deerflow/tools/builtins/task_tool.py index 963590a29..6004999dc 100644 --- a/backend/packages/harness/deerflow/tools/builtins/task_tool.py +++ b/backend/packages/harness/deerflow/tools/builtins/task_tool.py @@ -14,7 +14,7 @@ from deerflow.agents.lead_agent.prompt import get_skills_prompt_section from deerflow.agents.thread_state import ThreadState from deerflow.sandbox.security import LOCAL_BASH_SUBAGENT_DISABLED_MESSAGE, is_host_bash_allowed from deerflow.subagents import SubagentExecutor, get_available_subagent_names, get_subagent_config -from deerflow.subagents.executor import SubagentStatus, cleanup_background_task, get_background_task_result +from deerflow.subagents.executor import SubagentStatus, cleanup_background_task, get_background_task_result, request_cancel_background_task logger = logging.getLogger(__name__) @@ -182,6 +182,11 @@ async def task_tool( logger.error(f"[trace={trace_id}] Task {task_id} failed: {result.error}") cleanup_background_task(task_id) return f"Task failed. Error: {result.error}" + elif result.status == SubagentStatus.CANCELLED: + writer({"type": "task_cancelled", "task_id": task_id, "error": result.error}) + logger.info(f"[trace={trace_id}] Task {task_id} cancelled: {result.error}") + cleanup_background_task(task_id) + return "Task cancelled by user." elif result.status == SubagentStatus.TIMED_OUT: writer({"type": "task_timed_out", "task_id": task_id, "error": result.error}) logger.warning(f"[trace={trace_id}] Task {task_id} timed out: {result.error}") @@ -204,6 +209,11 @@ async def task_tool( writer({"type": "task_timed_out", "task_id": task_id}) return f"Task polling timed out after {timeout_minutes} minutes. This may indicate the background task is stuck. Status: {result.status.value}" except asyncio.CancelledError: + # Signal the background subagent thread to stop cooperatively. + # Without this, the thread (running in ThreadPoolExecutor with its + # own event loop via asyncio.run) would continue executing even + # after the parent task is cancelled. + request_cancel_background_task(task_id) async def cleanup_when_done() -> None: max_cleanup_polls = max_poll_count @@ -214,7 +224,7 @@ async def task_tool( if result is None: return - if result.status in {SubagentStatus.COMPLETED, SubagentStatus.FAILED, SubagentStatus.TIMED_OUT} or getattr(result, "completed_at", None) is not None: + if result.status in {SubagentStatus.COMPLETED, SubagentStatus.FAILED, SubagentStatus.CANCELLED, SubagentStatus.TIMED_OUT} or getattr(result, "completed_at", None) is not None: cleanup_background_task(task_id) return diff --git a/backend/tests/test_subagent_executor.py b/backend/tests/test_subagent_executor.py index cbec7acb5..9c8082068 100644 --- a/backend/tests/test_subagent_executor.py +++ b/backend/tests/test_subagent_executor.py @@ -6,6 +6,7 @@ Covers: - asyncio.run() properly executes async workflow within thread pool context - Error handling in both sync and async paths - Async tool support (MCP tools) +- Cooperative cancellation via cancel_event Note: Due to circular import issues in the main codebase, conftest.py mocks deerflow.subagents.executor. This test file uses delayed import via fixture to test @@ -14,6 +15,7 @@ the real implementation in isolation. import asyncio import sys +import threading from datetime import datetime from unittest.mock import MagicMock, patch @@ -27,6 +29,7 @@ _MOCKED_MODULE_NAMES = [ "deerflow.agents.middlewares.thread_data_middleware", "deerflow.sandbox", "deerflow.sandbox.middleware", + "deerflow.sandbox.security", "deerflow.models", ] @@ -771,3 +774,233 @@ class TestCleanupBackgroundTask: # Should be removed because completed_at is set assert task_id not in executor_module._background_tasks + + +# ----------------------------------------------------------------------------- +# Cooperative Cancellation Tests +# ----------------------------------------------------------------------------- + + +class TestCooperativeCancellation: + """Test cooperative cancellation via cancel_event.""" + + @pytest.fixture + def executor_module(self, _setup_executor_classes): + """Import the executor module with real classes.""" + import importlib + + from deerflow.subagents import executor + + return importlib.reload(executor) + + @pytest.mark.anyio + async def test_aexecute_cancelled_before_streaming(self, classes, base_config, mock_agent, msg): + """Test that _aexecute returns CANCELLED when cancel_event is set before streaming.""" + SubagentExecutor = classes["SubagentExecutor"] + SubagentResult = classes["SubagentResult"] + SubagentStatus = classes["SubagentStatus"] + + # The agent should never be called + call_count = 0 + + async def mock_astream(*args, **kwargs): + nonlocal call_count + call_count += 1 + yield {"messages": [msg.human("Task"), msg.ai("Done", "msg-1")]} + + mock_agent.astream = mock_astream + + # Pre-create result holder with cancel_event already set + result_holder = SubagentResult( + task_id="cancel-before", + trace_id="test-trace", + status=SubagentStatus.RUNNING, + started_at=datetime.now(), + ) + result_holder.cancel_event.set() + + executor = SubagentExecutor( + config=base_config, + tools=[], + thread_id="test-thread", + ) + + with patch.object(executor, "_create_agent", return_value=mock_agent): + result = await executor._aexecute("Task", result_holder=result_holder) + + assert result.status == SubagentStatus.CANCELLED + assert result.error == "Cancelled by user" + assert result.completed_at is not None + assert call_count == 0 # astream was never entered + + @pytest.mark.anyio + async def test_aexecute_cancelled_mid_stream(self, classes, base_config, msg): + """Test that _aexecute returns CANCELLED when cancel_event is set during streaming.""" + SubagentExecutor = classes["SubagentExecutor"] + SubagentResult = classes["SubagentResult"] + SubagentStatus = classes["SubagentStatus"] + + cancel_event = threading.Event() + + async def mock_astream(*args, **kwargs): + yield {"messages": [msg.human("Task"), msg.ai("Partial", "msg-1")]} + # Simulate cancellation during streaming + cancel_event.set() + yield {"messages": [msg.human("Task"), msg.ai("Should not appear", "msg-2")]} + + mock_agent = MagicMock() + mock_agent.astream = mock_astream + + result_holder = SubagentResult( + task_id="cancel-mid", + trace_id="test-trace", + status=SubagentStatus.RUNNING, + started_at=datetime.now(), + ) + result_holder.cancel_event = cancel_event + + executor = SubagentExecutor( + config=base_config, + tools=[], + thread_id="test-thread", + ) + + with patch.object(executor, "_create_agent", return_value=mock_agent): + result = await executor._aexecute("Task", result_holder=result_holder) + + assert result.status == SubagentStatus.CANCELLED + assert result.error == "Cancelled by user" + assert result.completed_at is not None + + def test_request_cancel_sets_event(self, executor_module, classes): + """Test that request_cancel_background_task sets the cancel_event.""" + SubagentResult = classes["SubagentResult"] + SubagentStatus = classes["SubagentStatus"] + + task_id = "test-cancel-event" + result = SubagentResult( + task_id=task_id, + trace_id="test-trace", + status=SubagentStatus.RUNNING, + started_at=datetime.now(), + ) + executor_module._background_tasks[task_id] = result + + assert not result.cancel_event.is_set() + + executor_module.request_cancel_background_task(task_id) + + assert result.cancel_event.is_set() + + def test_request_cancel_nonexistent_task_is_noop(self, executor_module): + """Test that requesting cancellation on a nonexistent task does not raise.""" + executor_module.request_cancel_background_task("nonexistent-task") + + def test_timeout_does_not_overwrite_cancelled(self, executor_module, classes, base_config, msg): + """Test that the real timeout handler does not overwrite CANCELLED status. + + This exercises the actual execute_async → run_task → FuturesTimeoutError + code path in executor.py. We make execute() block so the timeout fires + deterministically, pre-set the task to CANCELLED, and verify the RUNNING + guard preserves it. Uses threading.Event for synchronisation instead of + wall-clock sleeps. + """ + SubagentExecutor = classes["SubagentExecutor"] + SubagentStatus = classes["SubagentStatus"] + + short_config = classes["SubagentConfig"]( + name="test-agent", + description="Test agent", + system_prompt="You are a test agent.", + max_turns=10, + timeout_seconds=0.05, # 50ms – just enough for the future to time out + ) + + # Synchronisation primitives + execute_entered = threading.Event() # signals that execute() has started + execute_release = threading.Event() # lets execute() return + run_task_done = threading.Event() # signals that run_task() has finished + + # A blocking execute() replacement so we control the timing exactly + def blocking_execute(task, result_holder=None): + # Cooperative cancellation: honour cancel_event like real _aexecute + if result_holder and result_holder.cancel_event.is_set(): + result_holder.status = SubagentStatus.CANCELLED + result_holder.error = "Cancelled by user" + result_holder.completed_at = datetime.now() + execute_entered.set() + return result_holder + execute_entered.set() + execute_release.wait(timeout=5) + # Return a minimal completed result (will be ignored because timeout fires first) + from deerflow.subagents.executor import SubagentResult as _R + + return _R(task_id="x", trace_id="t", status=SubagentStatus.COMPLETED, result="late") + + executor = SubagentExecutor( + config=short_config, + tools=[], + thread_id="test-thread", + trace_id="test-trace", + ) + + # Wrap _scheduler_pool.submit so we know when run_task finishes + original_scheduler_submit = executor_module._scheduler_pool.submit + + def tracked_submit(fn, *args, **kwargs): + def wrapper(): + try: + fn(*args, **kwargs) + finally: + run_task_done.set() + + return original_scheduler_submit(wrapper) + + with patch.object(executor, "execute", blocking_execute), patch.object(executor_module._scheduler_pool, "submit", tracked_submit): + task_id = executor.execute_async("Task") + + # Wait until execute() is entered (i.e. it's running in _execution_pool) + assert execute_entered.wait(timeout=3), "execute() was never called" + + # Set CANCELLED on the result before the timeout handler runs. + # The 50ms timeout will fire while execute() is blocked. + with executor_module._background_tasks_lock: + executor_module._background_tasks[task_id].status = SubagentStatus.CANCELLED + executor_module._background_tasks[task_id].error = "Cancelled by user" + executor_module._background_tasks[task_id].completed_at = datetime.now() + + # Wait for run_task to finish — the FuturesTimeoutError handler has + # now executed and (should have) left CANCELLED intact. + assert run_task_done.wait(timeout=5), "run_task() did not finish" + + # Only NOW release the blocked execute() so the thread pool worker + # can be reclaimed. This MUST come after run_task_done to avoid a + # race where execute() returns before the timeout fires. + execute_release.set() + + result = executor_module._background_tasks.get(task_id) + assert result is not None + # The RUNNING guard in the FuturesTimeoutError handler must have + # preserved CANCELLED instead of overwriting with TIMED_OUT. + assert result.status.value == SubagentStatus.CANCELLED.value + assert result.error == "Cancelled by user" + assert result.completed_at is not None + + def test_cleanup_removes_cancelled_task(self, executor_module, classes): + """Test that cleanup removes a CANCELLED task (terminal state).""" + SubagentResult = classes["SubagentResult"] + SubagentStatus = classes["SubagentStatus"] + + task_id = "test-cancelled-cleanup" + result = SubagentResult( + task_id=task_id, + trace_id="test-trace", + status=SubagentStatus.CANCELLED, + error="Cancelled by user", + completed_at=datetime.now(), + ) + executor_module._background_tasks[task_id] = result + + executor_module.cleanup_background_task(task_id) + + assert task_id not in executor_module._background_tasks diff --git a/backend/tests/test_task_tool_core_logic.py b/backend/tests/test_task_tool_core_logic.py index c9ae1f849..067187281 100644 --- a/backend/tests/test_task_tool_core_logic.py +++ b/backend/tests/test_task_tool_core_logic.py @@ -20,6 +20,7 @@ class FakeSubagentStatus(Enum): RUNNING = "running" COMPLETED = "completed" FAILED = "failed" + CANCELLED = "cancelled" TIMED_OUT = "timed_out" @@ -557,3 +558,102 @@ def test_cancelled_cleanup_stops_after_timeout(monkeypatch): asyncio.run(scheduled_cleanup_coros.pop()) assert cleanup_calls == [] + + +def test_cancellation_calls_request_cancel(monkeypatch): + """Verify CancelledError path calls request_cancel_background_task(task_id).""" + config = _make_subagent_config() + events = [] + cancel_requests = [] + scheduled_cleanup_coros = [] + + async def cancel_on_first_sleep(_: float) -> None: + raise asyncio.CancelledError + + monkeypatch.setattr(task_tool_module, "SubagentStatus", FakeSubagentStatus) + monkeypatch.setattr( + task_tool_module, + "SubagentExecutor", + type("DummyExecutor", (), {"__init__": lambda self, **kwargs: None, "execute_async": lambda self, prompt, task_id=None: task_id}), + ) + monkeypatch.setattr(task_tool_module, "get_subagent_config", lambda _: config) + monkeypatch.setattr(task_tool_module, "get_skills_prompt_section", lambda: "") + monkeypatch.setattr( + task_tool_module, + "get_background_task_result", + lambda _: _make_result(FakeSubagentStatus.RUNNING, ai_messages=[]), + ) + monkeypatch.setattr(task_tool_module, "get_stream_writer", lambda: events.append) + monkeypatch.setattr(task_tool_module.asyncio, "sleep", cancel_on_first_sleep) + monkeypatch.setattr( + task_tool_module.asyncio, + "create_task", + lambda coro: (coro.close(), scheduled_cleanup_coros.append(None))[-1] or _DummyScheduledTask(), + ) + monkeypatch.setattr("deerflow.tools.get_available_tools", lambda **kwargs: []) + monkeypatch.setattr( + task_tool_module, + "request_cancel_background_task", + lambda task_id: cancel_requests.append(task_id), + ) + monkeypatch.setattr( + task_tool_module, + "cleanup_background_task", + lambda task_id: None, + ) + + with pytest.raises(asyncio.CancelledError): + _run_task_tool( + runtime=_make_runtime(), + description="执行任务", + prompt="cancel me", + subagent_type="general-purpose", + tool_call_id="tc-cancel-request", + ) + + assert cancel_requests == ["tc-cancel-request"] + + +def test_task_tool_returns_cancelled_message(monkeypatch): + """Verify polling a CANCELLED result emits task_cancelled event and returns message.""" + config = _make_subagent_config() + events = [] + cleanup_calls = [] + + # First poll: RUNNING, second poll: CANCELLED + responses = iter( + [ + _make_result(FakeSubagentStatus.RUNNING, ai_messages=[]), + _make_result(FakeSubagentStatus.CANCELLED, error="Cancelled by user"), + ] + ) + + monkeypatch.setattr(task_tool_module, "SubagentStatus", FakeSubagentStatus) + monkeypatch.setattr( + task_tool_module, + "SubagentExecutor", + type("DummyExecutor", (), {"__init__": lambda self, **kwargs: None, "execute_async": lambda self, prompt, task_id=None: task_id}), + ) + monkeypatch.setattr(task_tool_module, "get_subagent_config", lambda _: config) + monkeypatch.setattr(task_tool_module, "get_skills_prompt_section", lambda: "") + monkeypatch.setattr(task_tool_module, "get_background_task_result", lambda _: next(responses)) + monkeypatch.setattr(task_tool_module, "get_stream_writer", lambda: events.append) + monkeypatch.setattr(task_tool_module.asyncio, "sleep", _no_sleep) + monkeypatch.setattr("deerflow.tools.get_available_tools", lambda **kwargs: []) + monkeypatch.setattr( + task_tool_module, + "cleanup_background_task", + lambda task_id: cleanup_calls.append(task_id), + ) + + output = _run_task_tool( + runtime=_make_runtime(), + description="执行任务", + prompt="some task", + subagent_type="general-purpose", + tool_call_id="tc-poll-cancelled", + ) + + assert output == "Task cancelled by user." + assert any(e.get("type") == "task_cancelled" for e in events) + assert cleanup_calls == ["tc-poll-cancelled"]