ChatDev/workflow/executor/cycle_executor.py
2026-01-07 16:24:01 +08:00

615 lines
23 KiB
Python
Executable File

"""Cycle executor that runs workflow graphs containing loops."""
import copy
import threading
from typing import Dict, List, Callable, Any, Set, Optional
from entity.configs import Node, EdgeLink
from utils.log_manager import LogManager
from workflow.cycle_manager import CycleManager
from workflow.executor.parallel_executor import ParallelExecutor
from workflow.topology_builder import GraphTopologyBuilder
class CycleExecutor:
"""Execute workflow graphs that contain cycles.
Features:
- Scheduling is based on "super nodes"
- Parallel execution inside cycles
- Automatic detection of exit conditions
"""
def __init__(
self,
log_manager: LogManager,
nodes: Dict[str, Node],
cycle_execution_order: List[Dict[str, Any]],
cycle_manager: CycleManager,
execute_node_func: Callable[[Node], None],
):
"""Initialize the cycle executor.
Args:
log_manager: Logger instance
nodes: Mapping of node ids to nodes
cycle_execution_order: Super-node execution order with cycles
cycle_manager: Cycle manager coordinating iterations
execute_node_func: Callable that executes a single node
"""
self.log_manager = log_manager
self.nodes = nodes
self.cycle_execution_order = cycle_execution_order
self.cycle_manager = cycle_manager
self.execute_node_func = execute_node_func
self.parallel_executor = ParallelExecutor(log_manager, nodes)
def execute(self) -> None:
"""Run the workflow that contains cycles."""
self.log_manager.debug("Executing graph with cycles using super-node scheduler")
for layer_idx, layer_items in enumerate(self.cycle_execution_order):
self.log_manager.debug(f"Executing super-node layer {layer_idx} with {len(layer_items)} items")
self._execute_super_layer(layer_items)
def _execute_super_layer(self, layer_items: List[Dict[str, Any]]) -> None:
"""Execute a single super-node layer."""
self._execute_super_layer_parallel(layer_items)
def _execute_super_layer_parallel(self, layer_items: List[Dict[str, Any]]) -> None:
"""Execute a super-node layer in parallel."""
def item_desc_func(item: Dict[str, Any]) -> str:
if item["type"] == "cycle":
return f"cycle {item['cycle_id']}"
elif item["type"] == "node":
# New format
return f"node {item['node_id']}"
else:
# Old format: "layer"
return f"node {item['nodes'][0]}"
self.parallel_executor.execute_items_parallel(
layer_items,
self._execute_super_item,
item_desc_func
)
def _execute_super_item(self, item: Dict[str, Any]) -> None:
"""Execute a single super-node item (node or cycle)."""
if item["type"] == "layer":
# Old format: {"type": "layer", "nodes": [node_id]}
self._execute_single_node(item["nodes"][0])
elif item["type"] == "node":
# New format from GraphTopologyBuilder: {"type": "node", "node_id": "..."}
self._execute_single_node(item["node_id"])
elif item["type"] == "cycle":
self._execute_cycle(item)
def _execute_single_node(self, node_id: str) -> None:
"""Execute a non-cycle node."""
self.log_manager.debug(f"Executing non-cycle node: {node_id}")
node = self.nodes[node_id]
if node.is_triggered():
self.execute_node_func(node)
else:
self.log_manager.warning(f"Node {node_id} is not triggered, skipping execution")
def _execute_cycle(self, cycle_info: Dict[str, Any]) -> None:
"""Execute a cycle using the multi-iteration logic."""
cycle_id = cycle_info["cycle_id"]
nodes = cycle_info["nodes"]
self.log_manager.debug(f"Executing cycle {cycle_id} with nodes: {nodes}")
# Step 2: Validate cycle entry uniqueness
try:
initial_node_id = self._validate_cycle_entry(cycle_id, nodes)
except ValueError as e:
self.log_manager.error(str(e))
raise
if initial_node_id is None:
self.log_manager.debug(
f"Cycle {cycle_id} has no triggered entry node in this pass; skipping execution"
)
return
# Store initial node in cycle_manager
self.cycle_manager.cycles[cycle_id].initial_node = initial_node_id
self.log_manager.debug(f"Cycle {cycle_id} initial node: {initial_node_id}")
# Activate cycle
self.cycle_manager.activate_cycle(cycle_id)
# Step 4: Execute cycle with iterations
self._execute_cycle_with_iterations(
cycle_id,
nodes,
initial_node_id,
max_iterations=self.cycle_manager.cycles[cycle_id].get_max_iterations()
)
# Cleanup
self.cycle_manager.deactivate_cycle(cycle_id)
self.log_manager.debug(f"Cycle {cycle_id} completed")
# ==================== New Methods for Refactored Cycle Execution ====================
def _validate_cycle_entry(self, cycle_id: str, nodes: List[str]) -> str | None:
"""
Validate that exactly one node in the cycle is triggered by external edges.
Args:
cycle_id: The cycle ID
nodes: List of node IDs in the cycle
Returns:
The ID of the unique initial node
Raises:
ValueError: If no node or multiple nodes are triggered
"""
triggered_nodes: List[str] = []
for node_id in nodes:
node = self.nodes[node_id]
# Check if any external predecessor (node outside the cycle) triggers this node
for predecessor in node.predecessors:
if predecessor.id not in nodes: # External node
edge = predecessor.find_outgoing_edge(node_id)
if edge and edge.trigger and edge.triggered:
triggered_nodes.append(node_id)
break
cycle_info = self.cycle_manager.cycles.get(cycle_id)
configured_entry = cycle_info.configured_entry_node if cycle_info else None
if len(triggered_nodes) == 0:
if configured_entry:
return configured_entry
return None
elif len(triggered_nodes) > 1:
raise ValueError(
f"Cycle {cycle_id} has multiple triggered entry nodes: {triggered_nodes}. "
"Only one entry node must be triggered when entering a cycle."
)
entry_node = triggered_nodes[0]
if configured_entry and entry_node != configured_entry:
raise ValueError(
f"Cycle {cycle_id} entry mismatch: configured '{configured_entry}' "
f"but triggered '{entry_node}'",
)
return entry_node
def _execute_cycle_with_iterations(
self,
cycle_id: str,
cycle_nodes: List[str],
initial_node_id: str,
max_iterations: int,
) -> Set[str]:
"""
Execute a cycle with multiple iterations.
Args:
cycle_id: Cycle ID
cycle_nodes: List of all nodes in the cycle
initial_node_id: Initial node ID
max_iterations: Maximum number of iterations
Returns:
A tuple of two sets:
- exit_nodes: nodes triggered outside the *current* cycle scope
- external_nodes: subset of exit_nodes that are also outside the
provided parent_cycle_nodes scope
"""
iteration = 0
while iteration < max_iterations:
self.log_manager.debug(
f"Cycle {cycle_id} iteration {iteration + 1}/{max_iterations}"
)
# Step 1: Detect nested cycles in the scoped subgraph
inner_cycles = self._detect_cycles_in_scope(cycle_nodes, initial_node_id)
# Build topological layers (whether there are nested cycles or not)
execution_layers = self._build_topological_layers_in_scope(
cycle_nodes, initial_node_id, inner_cycles,
is_first_iteration=(iteration == 0)
)
# Execute the topological layers
external_nodes = self._execute_scope_layers(
execution_layers,
cycle_id,
cycle_nodes,
initial_node_id=initial_node_id,
is_first_iteration=(iteration == 0)
)
if external_nodes:
self.log_manager.debug(
f"Cycle {cycle_id} exited - external nodes triggered: {sorted(external_nodes)}"
)
return external_nodes
# Step 4: Check if initial node is retriggered
if not self._is_initial_node_retriggered(initial_node_id, cycle_nodes):
self.log_manager.debug(
f"Cycle {cycle_id} completed - initial node not retriggered"
)
break
iteration += 1
if iteration >= max_iterations:
self.log_manager.warning(
f"Cycle {cycle_id} reached max iterations ({max_iterations})"
)
return set()
def _detect_cycles_in_scope(
self,
scope_nodes: List[str],
initial_node_id: str
) -> List[Set[str]]:
"""
Detect nested cycles within the scoped subgraph.
Constructs a subgraph containing only:
1. Nodes in scope_nodes
2. Edges where both source and target are in scope_nodes
3. Initial node's incoming edges are REMOVED (to break the outer cycle)
Args:
scope_nodes: List of node IDs in the current scope
initial_node_id: Initial node ID (whose incoming edges are removed)
Returns:
List of detected nested cycles (excluding the current cycle itself)
"""
# Build scoped nodes with initial node's incoming edges removed
scoped_nodes = self._build_scoped_nodes(scope_nodes, clear_entry_node=initial_node_id)
# Use GraphTopologyBuilder to detect cycles
all_cycles = GraphTopologyBuilder.detect_cycles(scoped_nodes)
# Filter out single-node "cycles" (unless they have self-loops)
nested_cycles = [
cycle for cycle in all_cycles
if len(cycle) > 1
]
return nested_cycles
def _build_scoped_nodes(
self,
scope_nodes: List[str],
clear_entry_node: Optional[str] = None
) -> Dict[str, Node]:
"""
Build a scoped subgraph containing only nodes and edges within the scope.
Args:
scope_nodes: List of node IDs in the scope
clear_entry_node: If specified, this node's incoming edges will be removed
(used to break the outer cycle when detecting nested cycles)
Returns:
Dictionary of scoped nodes
"""
scoped_nodes = {}
scope_nodes_set = set(scope_nodes)
for node_id in scope_nodes:
original_node = self.nodes[node_id]
# Shallow copy the node
scoped_node = copy.copy(original_node)
# Filter outgoing edges: only keep edges where target is in scope AND trigger=true
# Special case: if target is clear_entry_node, remove this edge
scoped_edges = [
edge_link for edge_link in original_node.iter_outgoing_edges()
if edge_link.target.id in scope_nodes_set
and edge_link.trigger
and edge_link.target.id != clear_entry_node # Remove edges to entry node
]
scoped_node._outgoing_edges = scoped_edges
# Filter predecessors: only keep predecessors in scope AND with trigger=true edge
# Special case: if this node is clear_entry_node, clear all predecessors
if node_id == clear_entry_node:
scoped_node.predecessors = []
else:
scoped_predecessors = []
for pred in original_node.predecessors:
if pred.id in scope_nodes_set:
# Check if the edge from pred to node has trigger=true
edge = pred.find_outgoing_edge(node_id)
if edge and edge.trigger:
scoped_predecessors.append(pred)
scoped_node.predecessors = scoped_predecessors
# Filter successors: only keep successors in scope AND with trigger=true edge
# Special case: remove clear_entry_node from successors
scoped_successors = [
succ for succ in original_node.successors
if succ.id in scope_nodes_set
and succ.id != clear_entry_node # Remove entry node from successors
and any(
edge_link.target.id == succ.id and edge_link.trigger
for edge_link in original_node.iter_outgoing_edges()
)
]
scoped_node.successors = scoped_successors
scoped_nodes[node_id] = scoped_node
return scoped_nodes
def _build_topological_layers_in_scope(
self,
scope_nodes: List[str],
initial_node_id: str,
inner_cycles: List[Set[str]],
is_first_iteration: bool = False
) -> List[Dict[str, Any]]:
"""
Build topological execution order for the scoped subgraph.
Args:
scope_nodes: List of node IDs in the scope
initial_node_id: Initial node ID
inner_cycles: List of nested cycles detected in the scope
is_first_iteration: Whether this is the first iteration (affects initial node handling)
Returns:
List of execution layers, each containing execution items
"""
# Build scoped nodes WITHOUT clearing entry node
# We want to keep all edges intact for execution
scoped_nodes = self._build_scoped_nodes(scope_nodes, clear_entry_node=None)
# Handle entry points based on iteration:
# - First iteration: manually clear initial node's predecessors (for in_degree calculation only)
# - Subsequent iterations: clear predecessors for all triggered nodes
if is_first_iteration:
# Clear initial node's predecessors to make it an entry point
if initial_node_id in scoped_nodes:
scoped_nodes[initial_node_id].predecessors = []
else:
# Subsequent iterations: clear predecessors for all triggered nodes
for node_id in scope_nodes:
if self.nodes[node_id].is_triggered():
scoped_nodes[node_id].predecessors = []
# Extract scoped edges from scoped_nodes (not original nodes)
# This ensures consistency with the filtered graph structure
scoped_edges = []
# Collect nodes whose incoming edges should be excluded
# (to break cycles in topological sorting)
exclude_targets = set()
if is_first_iteration:
# First iteration: exclude edges to initial_node
exclude_targets.add(initial_node_id)
else:
# Subsequent iterations: exclude edges to all triggered nodes
for node_id in scope_nodes:
if self.nodes[node_id].is_triggered():
exclude_targets.add(node_id)
for node_id in scope_nodes:
# Use scoped_node to get filtered edges
scoped_node = scoped_nodes.get(node_id)
if scoped_node:
for edge_link in scoped_node.iter_outgoing_edges():
# Exclude edges pointing to nodes in exclude_targets
if edge_link.target.id in exclude_targets:
continue
scoped_edges.append({
"from": node_id,
"to": edge_link.target.id,
"trigger": edge_link.trigger,
"condition": edge_link.condition
})
# Use GraphTopologyBuilder to build execution order
if not inner_cycles:
# No nested cycles, use DAG layers
layers = GraphTopologyBuilder.build_dag_layers(scoped_nodes)
return layers
else:
# Has nested cycles, use super-node approach
super_graph = GraphTopologyBuilder.create_super_node_graph(
scoped_nodes, scoped_edges, inner_cycles
)
layers = GraphTopologyBuilder.topological_sort_super_nodes(
super_graph, inner_cycles
)
return layers
def _execute_scope_layers(
self,
execution_layers: List[List[Dict[str, Any]]],
parent_cycle_id: str,
parent_cycle_nodes: List[str],
initial_node_id: Optional[str] = None,
is_first_iteration: bool = False
) -> Set[str]:
"""
Execute scoped layers with parallelism, supporting nested cycles.
Args:
execution_layers: List of execution layers
parent_cycle_id: Parent cycle ID
parent_cycle_nodes: List of nodes in the parent cycle
initial_node_id: Initial node ID (for first iteration special handling)
is_first_iteration: Whether this is the first iteration
Returns:
external_nodes: subset of exit_nodes outside parent_cycle_nodes_set
"""
scope_node_set = set(parent_cycle_nodes)
external_nodes: Set[str] = set()
stop_event = threading.Event()
result_lock = threading.Lock()
def record_external(nodes: Set[str]) -> None:
nonlocal external_nodes
if not nodes:
return
with result_lock:
if nodes:
external_nodes.update(nodes)
stop_event.set()
def item_desc(item: Dict[str, Any]) -> str:
if item["type"] == "node":
return f"node {item['node_id']}"
if item["type"] == "cycle":
return f"cycle {item['cycle_id']}"
return "layer_item"
for layer in execution_layers:
if stop_event.is_set():
break
def executor_func(item: Dict[str, Any]) -> None:
if stop_event.is_set():
return
if item["type"] == "node":
_node_id = item["node_id"]
force_execute = is_first_iteration and (_node_id == initial_node_id)
targets = self._execute_single_cycle_node_in_scope(
_node_id,
scope_node_set,
force_execute=force_execute
)
if targets:
record_external(targets)
elif item["type"] == "cycle":
inner_cycle_nodes = item["nodes"]
inner_cycle_id = item["cycle_id"]
self.log_manager.debug(
f"Executing nested cycle {inner_cycle_id} within cycle {parent_cycle_id}"
)
try:
inner_initial_node = self._validate_cycle_entry(
inner_cycle_id, inner_cycle_nodes
)
except ValueError as e:
self.log_manager.error(str(e))
raise
if inner_initial_node is None:
self.log_manager.debug(
f"Nested cycle {inner_cycle_id} has no triggered entry; skipping"
)
return
inner_external_nodes = self._execute_cycle_with_iterations(
inner_cycle_id,
inner_cycle_nodes,
inner_initial_node,
max_iterations=100,
)
if inner_external_nodes:
filtered = {
node
for node in inner_external_nodes
if node not in scope_node_set
}
if filtered:
record_external(filtered)
self.parallel_executor.execute_items_parallel(
layer,
executor_func,
item_desc
)
if stop_event.is_set():
break
if external_nodes:
for node_id in scope_node_set:
self.nodes[node_id].reset_triggers()
return external_nodes
def _execute_single_cycle_node_in_scope(
self,
node_id: str,
scope_node_set: Set[str],
force_execute: bool = False
) -> Set[str]:
"""
Execute a single node within a cycle scope.
Args:
node_id: Node ID to execute
scope_node_set: Nodes that belong to the current scoped cycle
force_execute: If True, execute even if not triggered (for initial node in first iteration)
Returns:
Set of node IDs triggered outside the current scoped cycle
"""
node = self.nodes[node_id]
# Check if node is triggered (unless force_execute is True)
if not force_execute:
if not node.is_triggered():
return set()
# Reset edge triggers
for edge_link in node.iter_outgoing_edges():
edge_link.triggered = False
# Execute the node
self.execute_node_func(node)
# Check if any external node was triggered
external_targets: Set[str] = set()
for edge_link in node.iter_outgoing_edges():
if edge_link.target.id not in scope_node_set and edge_link.triggered:
self.log_manager.debug(
f"Node {node_id} triggered external node {edge_link.target.id}"
)
external_targets.add(edge_link.target.id)
return external_targets
def _is_initial_node_retriggered(
self,
initial_node_id: str,
cycle_nodes: List[str]
) -> bool:
"""
Check if the initial node is retriggered by any internal edge (from within the cycle).
Args:
initial_node_id: Initial node ID
cycle_nodes: List of nodes in the cycle
Returns:
True if the initial node is retriggered by an internal edge
"""
initial_node = self.nodes[initial_node_id]
for predecessor in initial_node.predecessors:
# Only check predecessors within the cycle
if predecessor.id in cycle_nodes:
edge = predecessor.find_outgoing_edge(initial_node_id)
if edge and edge.trigger and edge.triggered:
return True
return False