lulusiyuyu f0dd8cb0d2
fix(subagents): add cooperative cancellation for subagent threads (#1873)
* fix(subagents): add cooperative cancellation for subagent threads

Subagent tasks run inside ThreadPoolExecutor threads with their own
event loop (asyncio.run). When a user clicks stop, RunManager cancels
the parent asyncio.Task, but Future.cancel() cannot terminate a running
thread and asyncio.Event does not propagate across event loops. This
causes subagent threads to keep executing (writing files, calling LLMs)
even after the user explicitly stops the run.

Fix: add a threading.Event (cancel_event) to SubagentResult and check
it cooperatively in _aexecute()'s astream iteration loop. On cancel,
request_cancel_background_task() sets the event, and the thread exits
at the next iteration boundary.

Changes:
- executor.py: Add cancel_event field to SubagentResult, check it in
  _aexecute loop, set it on timeout, add request_cancel_background_task
- task_tool.py: Call request_cancel_background_task on CancelledError

* fix(subagents): guard cancel status and add pre-check before astream

- Only overwrite status to FAILED when still RUNNING, preserving
  TIMED_OUT set by the scheduler thread.
- Add cancel_event pre-check before entering the astream loop so
  cancellation is detected immediately when already signalled.

* fix(subagents): guard status updates with lock to prevent race condition

Wrap the check-and-set on result.status in _aexecute with
_background_tasks_lock so the timeout handler in execute_async
cannot interleave between the read and write.

* fix(subagents): add dedicated CANCELLED status for user cancellation

Introduce SubagentStatus.CANCELLED to distinguish user-initiated
cancellation from actual execution failures.  Update _aexecute,
task_tool polling, cleanup terminal-status sets, and test fixtures.

* test(subagents): add cancellation tests and fix timeout regression test

- Add dedicated TestCooperativeCancellation test class with 6 tests:
  - Pre-set cancel_event prevents astream from starting
  - Mid-stream cancel_event returns CANCELLED immediately
  - request_cancel_background_task() sets cancel_event correctly
  - request_cancel on nonexistent task is a no-op
  - Real execute_async timeout does not overwrite CANCELLED (deterministic
    threading.Event sync, no wall-clock sleeps)
  - cleanup_background_task removes CANCELLED tasks

- Add task_tool cancellation coverage:
  - test_cancellation_calls_request_cancel: assert CancelledError path
    calls request_cancel_background_task(task_id)
  - test_task_tool_returns_cancelled_message: assert CANCELLED polling
    branch emits task_cancelled event and returns expected message

- Fix pre-existing test infrastructure issue: add deerflow.sandbox.security
  to _MOCKED_MODULE_NAMES (fixes ModuleNotFoundError for all executor tests)

- Add RUNNING guard to timeout handler in executor.py to prevent
  TIMED_OUT from overwriting CANCELLED status

- Add cooperative cancellation granularity comment documenting that
  cancellation is only detected at astream iteration boundaries

---------

Co-authored-by: lulusiyuyu <lulusiyuyu@users.noreply.github.com>
2026-04-07 11:12:25 +08:00

564 lines
23 KiB
Python

"""Subagent execution engine."""
import asyncio
import logging
import threading
import uuid
from concurrent.futures import Future, ThreadPoolExecutor
from concurrent.futures import TimeoutError as FuturesTimeoutError
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
from langchain.agents import create_agent
from langchain.tools import BaseTool
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.runnables import RunnableConfig
from deerflow.agents.thread_state import SandboxState, ThreadDataState, ThreadState
from deerflow.models import create_chat_model
from deerflow.subagents.config import SubagentConfig
logger = logging.getLogger(__name__)
class SubagentStatus(Enum):
"""Status of a subagent execution."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
TIMED_OUT = "timed_out"
@dataclass
class SubagentResult:
"""Result of a subagent execution.
Attributes:
task_id: Unique identifier for this execution.
trace_id: Trace ID for distributed tracing (links parent and subagent logs).
status: Current status of the execution.
result: The final result message (if completed).
error: Error message (if failed).
started_at: When execution started.
completed_at: When execution completed.
ai_messages: List of complete AI messages (as dicts) generated during execution.
"""
task_id: str
trace_id: str
status: SubagentStatus
result: str | None = None
error: str | None = None
started_at: datetime | None = None
completed_at: datetime | None = None
ai_messages: list[dict[str, Any]] | None = None
cancel_event: threading.Event = field(default_factory=threading.Event, repr=False)
def __post_init__(self):
"""Initialize mutable defaults."""
if self.ai_messages is None:
self.ai_messages = []
# Global storage for background task results
_background_tasks: dict[str, SubagentResult] = {}
_background_tasks_lock = threading.Lock()
# Thread pool for background task scheduling and orchestration
_scheduler_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-scheduler-")
# Thread pool for actual subagent execution (with timeout support)
# Larger pool to avoid blocking when scheduler submits execution tasks
_execution_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-exec-")
def _filter_tools(
all_tools: list[BaseTool],
allowed: list[str] | None,
disallowed: list[str] | None,
) -> list[BaseTool]:
"""Filter tools based on subagent configuration.
Args:
all_tools: List of all available tools.
allowed: Optional allowlist of tool names. If provided, only these tools are included.
disallowed: Optional denylist of tool names. These tools are always excluded.
Returns:
Filtered list of tools.
"""
filtered = all_tools
# Apply allowlist if specified
if allowed is not None:
allowed_set = set(allowed)
filtered = [t for t in filtered if t.name in allowed_set]
# Apply denylist
if disallowed is not None:
disallowed_set = set(disallowed)
filtered = [t for t in filtered if t.name not in disallowed_set]
return filtered
def _get_model_name(config: SubagentConfig, parent_model: str | None) -> str | None:
"""Resolve the model name for a subagent.
Args:
config: Subagent configuration.
parent_model: The parent agent's model name.
Returns:
Model name to use, or None to use default.
"""
if config.model == "inherit":
return parent_model
return config.model
class SubagentExecutor:
"""Executor for running subagents."""
def __init__(
self,
config: SubagentConfig,
tools: list[BaseTool],
parent_model: str | None = None,
sandbox_state: SandboxState | None = None,
thread_data: ThreadDataState | None = None,
thread_id: str | None = None,
trace_id: str | None = None,
):
"""Initialize the executor.
Args:
config: Subagent configuration.
tools: List of all available tools (will be filtered).
parent_model: The parent agent's model name for inheritance.
sandbox_state: Sandbox state from parent agent.
thread_data: Thread data from parent agent.
thread_id: Thread ID for sandbox operations.
trace_id: Trace ID from parent for distributed tracing.
"""
self.config = config
self.parent_model = parent_model
self.sandbox_state = sandbox_state
self.thread_data = thread_data
self.thread_id = thread_id
# Generate trace_id if not provided (for top-level calls)
self.trace_id = trace_id or str(uuid.uuid4())[:8]
# Filter tools based on config
self.tools = _filter_tools(
tools,
config.tools,
config.disallowed_tools,
)
logger.info(f"[trace={self.trace_id}] SubagentExecutor initialized: {config.name} with {len(self.tools)} tools")
def _create_agent(self):
"""Create the agent instance."""
model_name = _get_model_name(self.config, self.parent_model)
model = create_chat_model(name=model_name, thinking_enabled=False)
from deerflow.agents.middlewares.tool_error_handling_middleware import build_subagent_runtime_middlewares
# Reuse shared middleware composition with lead agent.
middlewares = build_subagent_runtime_middlewares(lazy_init=True)
return create_agent(
model=model,
tools=self.tools,
middleware=middlewares,
system_prompt=self.config.system_prompt,
state_schema=ThreadState,
)
def _build_initial_state(self, task: str) -> dict[str, Any]:
"""Build the initial state for agent execution.
Args:
task: The task description.
Returns:
Initial state dictionary.
"""
state: dict[str, Any] = {
"messages": [HumanMessage(content=task)],
}
# Pass through sandbox and thread data from parent
if self.sandbox_state is not None:
state["sandbox"] = self.sandbox_state
if self.thread_data is not None:
state["thread_data"] = self.thread_data
return state
async def _aexecute(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult:
"""Execute a task asynchronously.
Args:
task: The task description for the subagent.
result_holder: Optional pre-created result object to update during execution.
Returns:
SubagentResult with the execution result.
"""
if result_holder is not None:
# Use the provided result holder (for async execution with real-time updates)
result = result_holder
else:
# Create a new result for synchronous execution
task_id = str(uuid.uuid4())[:8]
result = SubagentResult(
task_id=task_id,
trace_id=self.trace_id,
status=SubagentStatus.RUNNING,
started_at=datetime.now(),
)
try:
agent = self._create_agent()
state = self._build_initial_state(task)
# Build config with thread_id for sandbox access and recursion limit
run_config: RunnableConfig = {
"recursion_limit": self.config.max_turns,
}
context = {}
if self.thread_id:
run_config["configurable"] = {"thread_id": self.thread_id}
context["thread_id"] = self.thread_id
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting async execution with max_turns={self.config.max_turns}")
# Use stream instead of invoke to get real-time updates
# This allows us to collect AI messages as they are generated
final_state = None
# Pre-check: bail out immediately if already cancelled before streaming starts
if result.cancel_event.is_set():
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} cancelled before streaming")
with _background_tasks_lock:
if result.status == SubagentStatus.RUNNING:
result.status = SubagentStatus.CANCELLED
result.error = "Cancelled by user"
result.completed_at = datetime.now()
return result
async for chunk in agent.astream(state, config=run_config, context=context, stream_mode="values"): # type: ignore[arg-type]
# Cooperative cancellation: check if parent requested stop.
# Note: cancellation is only detected at astream iteration boundaries,
# so long-running tool calls within a single iteration will not be
# interrupted until the next chunk is yielded.
if result.cancel_event.is_set():
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} cancelled by parent")
with _background_tasks_lock:
if result.status == SubagentStatus.RUNNING:
result.status = SubagentStatus.CANCELLED
result.error = "Cancelled by user"
result.completed_at = datetime.now()
return result
final_state = chunk
# Extract AI messages from the current state
messages = chunk.get("messages", [])
if messages:
last_message = messages[-1]
# Check if this is a new AI message
if isinstance(last_message, AIMessage):
# Convert message to dict for serialization
message_dict = last_message.model_dump()
# Only add if it's not already in the list (avoid duplicates)
# Check by comparing message IDs if available, otherwise compare full dict
message_id = message_dict.get("id")
is_duplicate = False
if message_id:
is_duplicate = any(msg.get("id") == message_id for msg in result.ai_messages)
else:
is_duplicate = message_dict in result.ai_messages
if not is_duplicate:
result.ai_messages.append(message_dict)
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} captured AI message #{len(result.ai_messages)}")
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} completed async execution")
if final_state is None:
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no final state")
result.result = "No response generated"
else:
# Extract the final message - find the last AIMessage
messages = final_state.get("messages", [])
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} final messages count: {len(messages)}")
# Find the last AIMessage in the conversation
last_ai_message = None
for msg in reversed(messages):
if isinstance(msg, AIMessage):
last_ai_message = msg
break
if last_ai_message is not None:
content = last_ai_message.content
# Handle both str and list content types for the final result
if isinstance(content, str):
result.result = content
elif isinstance(content, list):
# Extract text from list of content blocks for final result only.
# Concatenate raw string chunks directly, but preserve separation
# between full text blocks for readability.
text_parts = []
pending_str_parts = []
for block in content:
if isinstance(block, str):
pending_str_parts.append(block)
elif isinstance(block, dict):
if pending_str_parts:
text_parts.append("".join(pending_str_parts))
pending_str_parts.clear()
text_val = block.get("text")
if isinstance(text_val, str):
text_parts.append(text_val)
if pending_str_parts:
text_parts.append("".join(pending_str_parts))
result.result = "\n".join(text_parts) if text_parts else "No text content in response"
else:
result.result = str(content)
elif messages:
# Fallback: use the last message if no AIMessage found
last_message = messages[-1]
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no AIMessage found, using last message: {type(last_message)}")
raw_content = last_message.content if hasattr(last_message, "content") else str(last_message)
if isinstance(raw_content, str):
result.result = raw_content
elif isinstance(raw_content, list):
parts = []
pending_str_parts = []
for block in raw_content:
if isinstance(block, str):
pending_str_parts.append(block)
elif isinstance(block, dict):
if pending_str_parts:
parts.append("".join(pending_str_parts))
pending_str_parts.clear()
text_val = block.get("text")
if isinstance(text_val, str):
parts.append(text_val)
if pending_str_parts:
parts.append("".join(pending_str_parts))
result.result = "\n".join(parts) if parts else "No text content in response"
else:
result.result = str(raw_content)
else:
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no messages in final state")
result.result = "No response generated"
result.status = SubagentStatus.COMPLETED
result.completed_at = datetime.now()
except Exception as e:
logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} async execution failed")
result.status = SubagentStatus.FAILED
result.error = str(e)
result.completed_at = datetime.now()
return result
def execute(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult:
"""Execute a task synchronously (wrapper around async execution).
This method runs the async execution in a new event loop, allowing
asynchronous tools (like MCP tools) to be used within the thread pool.
Args:
task: The task description for the subagent.
result_holder: Optional pre-created result object to update during execution.
Returns:
SubagentResult with the execution result.
"""
# Run the async execution in a new event loop
# This is necessary because:
# 1. We may have async-only tools (like MCP tools)
# 2. We're running inside a ThreadPoolExecutor which doesn't have an event loop
#
# Note: _aexecute() catches all exceptions internally, so this outer
# try-except only handles asyncio.run() failures (e.g., if called from
# an async context where an event loop already exists). Subagent execution
# errors are handled within _aexecute() and returned as FAILED status.
try:
return asyncio.run(self._aexecute(task, result_holder))
except Exception as e:
logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} execution failed")
# Create a result with error if we don't have one
if result_holder is not None:
result = result_holder
else:
result = SubagentResult(
task_id=str(uuid.uuid4())[:8],
trace_id=self.trace_id,
status=SubagentStatus.FAILED,
)
result.status = SubagentStatus.FAILED
result.error = str(e)
result.completed_at = datetime.now()
return result
def execute_async(self, task: str, task_id: str | None = None) -> str:
"""Start a task execution in the background.
Args:
task: The task description for the subagent.
task_id: Optional task ID to use. If not provided, a random UUID will be generated.
Returns:
Task ID that can be used to check status later.
"""
# Use provided task_id or generate a new one
if task_id is None:
task_id = str(uuid.uuid4())[:8]
# Create initial pending result
result = SubagentResult(
task_id=task_id,
trace_id=self.trace_id,
status=SubagentStatus.PENDING,
)
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting async execution, task_id={task_id}, timeout={self.config.timeout_seconds}s")
with _background_tasks_lock:
_background_tasks[task_id] = result
# Submit to scheduler pool
def run_task():
with _background_tasks_lock:
_background_tasks[task_id].status = SubagentStatus.RUNNING
_background_tasks[task_id].started_at = datetime.now()
result_holder = _background_tasks[task_id]
try:
# Submit execution to execution pool with timeout
# Pass result_holder so execute() can update it in real-time
execution_future: Future = _execution_pool.submit(self.execute, task, result_holder)
try:
# Wait for execution with timeout
exec_result = execution_future.result(timeout=self.config.timeout_seconds)
with _background_tasks_lock:
_background_tasks[task_id].status = exec_result.status
_background_tasks[task_id].result = exec_result.result
_background_tasks[task_id].error = exec_result.error
_background_tasks[task_id].completed_at = datetime.now()
_background_tasks[task_id].ai_messages = exec_result.ai_messages
except FuturesTimeoutError:
logger.error(f"[trace={self.trace_id}] Subagent {self.config.name} execution timed out after {self.config.timeout_seconds}s")
with _background_tasks_lock:
if _background_tasks[task_id].status == SubagentStatus.RUNNING:
_background_tasks[task_id].status = SubagentStatus.TIMED_OUT
_background_tasks[task_id].error = f"Execution timed out after {self.config.timeout_seconds} seconds"
_background_tasks[task_id].completed_at = datetime.now()
# Signal cooperative cancellation and cancel the future
result_holder.cancel_event.set()
execution_future.cancel()
except Exception as e:
logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} async execution failed")
with _background_tasks_lock:
_background_tasks[task_id].status = SubagentStatus.FAILED
_background_tasks[task_id].error = str(e)
_background_tasks[task_id].completed_at = datetime.now()
_scheduler_pool.submit(run_task)
return task_id
MAX_CONCURRENT_SUBAGENTS = 3
def request_cancel_background_task(task_id: str) -> None:
"""Signal a running background task to stop.
Sets the cancel_event on the task, which is checked cooperatively
by ``_aexecute`` during ``agent.astream()`` iteration. This allows
subagent threads — which cannot be force-killed via ``Future.cancel()``
— to stop at the next iteration boundary.
Args:
task_id: The task ID to cancel.
"""
with _background_tasks_lock:
result = _background_tasks.get(task_id)
if result is not None:
result.cancel_event.set()
logger.info("Requested cancellation for background task %s", task_id)
def get_background_task_result(task_id: str) -> SubagentResult | None:
"""Get the result of a background task.
Args:
task_id: The task ID returned by execute_async.
Returns:
SubagentResult if found, None otherwise.
"""
with _background_tasks_lock:
return _background_tasks.get(task_id)
def list_background_tasks() -> list[SubagentResult]:
"""List all background tasks.
Returns:
List of all SubagentResult instances.
"""
with _background_tasks_lock:
return list(_background_tasks.values())
def cleanup_background_task(task_id: str) -> None:
"""Remove a completed task from background tasks.
Should be called by task_tool after it finishes polling and returns the result.
This prevents memory leaks from accumulated completed tasks.
Only removes tasks that are in a terminal state (COMPLETED/FAILED/TIMED_OUT)
to avoid race conditions with the background executor still updating the task entry.
Args:
task_id: The task ID to remove.
"""
with _background_tasks_lock:
result = _background_tasks.get(task_id)
if result is None:
# Nothing to clean up; may have been removed already.
logger.debug("Requested cleanup for unknown background task %s", task_id)
return
# Only clean up tasks that are in a terminal state to avoid races with
# the background executor still updating the task entry.
is_terminal_status = result.status in {
SubagentStatus.COMPLETED,
SubagentStatus.FAILED,
SubagentStatus.CANCELLED,
SubagentStatus.TIMED_OUT,
}
if is_terminal_status or result.completed_at is not None:
del _background_tasks[task_id]
logger.debug("Cleaned up background task: %s", task_id)
else:
logger.debug(
"Skipping cleanup for non-terminal background task %s (status=%s)",
task_id,
result.status.value if hasattr(result.status, "value") else result.status,
)