mirror of
https://github.com/OpenBMB/ChatDev.git
synced 2026-04-26 03:38:12 +00:00
615 lines
23 KiB
Python
Executable File
615 lines
23 KiB
Python
Executable File
"""Cycle executor that runs workflow graphs containing loops."""
|
|
|
|
import copy
|
|
import threading
|
|
from typing import Dict, List, Callable, Any, Set, Optional
|
|
|
|
from entity.configs import Node, EdgeLink
|
|
from utils.log_manager import LogManager
|
|
from workflow.cycle_manager import CycleManager
|
|
from workflow.executor.parallel_executor import ParallelExecutor
|
|
from workflow.topology_builder import GraphTopologyBuilder
|
|
|
|
|
|
class CycleExecutor:
|
|
"""Execute workflow graphs that contain cycles.
|
|
|
|
Features:
|
|
- Scheduling is based on "super nodes"
|
|
- Parallel execution inside cycles
|
|
- Automatic detection of exit conditions
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
log_manager: LogManager,
|
|
nodes: Dict[str, Node],
|
|
cycle_execution_order: List[Dict[str, Any]],
|
|
cycle_manager: CycleManager,
|
|
execute_node_func: Callable[[Node], None],
|
|
):
|
|
"""Initialize the cycle executor.
|
|
|
|
Args:
|
|
log_manager: Logger instance
|
|
nodes: Mapping of node ids to nodes
|
|
cycle_execution_order: Super-node execution order with cycles
|
|
cycle_manager: Cycle manager coordinating iterations
|
|
execute_node_func: Callable that executes a single node
|
|
"""
|
|
self.log_manager = log_manager
|
|
self.nodes = nodes
|
|
self.cycle_execution_order = cycle_execution_order
|
|
self.cycle_manager = cycle_manager
|
|
self.execute_node_func = execute_node_func
|
|
self.parallel_executor = ParallelExecutor(log_manager, nodes)
|
|
|
|
def execute(self) -> None:
|
|
"""Run the workflow that contains cycles."""
|
|
self.log_manager.debug("Executing graph with cycles using super-node scheduler")
|
|
|
|
for layer_idx, layer_items in enumerate(self.cycle_execution_order):
|
|
self.log_manager.debug(f"Executing super-node layer {layer_idx} with {len(layer_items)} items")
|
|
self._execute_super_layer(layer_items)
|
|
|
|
def _execute_super_layer(self, layer_items: List[Dict[str, Any]]) -> None:
|
|
"""Execute a single super-node layer."""
|
|
self._execute_super_layer_parallel(layer_items)
|
|
|
|
def _execute_super_layer_parallel(self, layer_items: List[Dict[str, Any]]) -> None:
|
|
"""Execute a super-node layer in parallel."""
|
|
def item_desc_func(item: Dict[str, Any]) -> str:
|
|
if item["type"] == "cycle":
|
|
return f"cycle {item['cycle_id']}"
|
|
elif item["type"] == "node":
|
|
# New format
|
|
return f"node {item['node_id']}"
|
|
else:
|
|
# Old format: "layer"
|
|
return f"node {item['nodes'][0]}"
|
|
|
|
self.parallel_executor.execute_items_parallel(
|
|
layer_items,
|
|
self._execute_super_item,
|
|
item_desc_func
|
|
)
|
|
|
|
def _execute_super_item(self, item: Dict[str, Any]) -> None:
|
|
"""Execute a single super-node item (node or cycle)."""
|
|
if item["type"] == "layer":
|
|
# Old format: {"type": "layer", "nodes": [node_id]}
|
|
self._execute_single_node(item["nodes"][0])
|
|
elif item["type"] == "node":
|
|
# New format from GraphTopologyBuilder: {"type": "node", "node_id": "..."}
|
|
self._execute_single_node(item["node_id"])
|
|
elif item["type"] == "cycle":
|
|
self._execute_cycle(item)
|
|
|
|
def _execute_single_node(self, node_id: str) -> None:
|
|
"""Execute a non-cycle node."""
|
|
self.log_manager.debug(f"Executing non-cycle node: {node_id}")
|
|
|
|
node = self.nodes[node_id]
|
|
if node.is_triggered():
|
|
self.execute_node_func(node)
|
|
else:
|
|
self.log_manager.warning(f"Node {node_id} is not triggered, skipping execution")
|
|
|
|
def _execute_cycle(self, cycle_info: Dict[str, Any]) -> None:
|
|
"""Execute a cycle using the multi-iteration logic."""
|
|
cycle_id = cycle_info["cycle_id"]
|
|
nodes = cycle_info["nodes"]
|
|
|
|
self.log_manager.debug(f"Executing cycle {cycle_id} with nodes: {nodes}")
|
|
|
|
# Step 2: Validate cycle entry uniqueness
|
|
try:
|
|
initial_node_id = self._validate_cycle_entry(cycle_id, nodes)
|
|
except ValueError as e:
|
|
self.log_manager.error(str(e))
|
|
raise
|
|
|
|
if initial_node_id is None:
|
|
self.log_manager.debug(
|
|
f"Cycle {cycle_id} has no triggered entry node in this pass; skipping execution"
|
|
)
|
|
return
|
|
|
|
# Store initial node in cycle_manager
|
|
self.cycle_manager.cycles[cycle_id].initial_node = initial_node_id
|
|
self.log_manager.debug(f"Cycle {cycle_id} initial node: {initial_node_id}")
|
|
|
|
# Activate cycle
|
|
self.cycle_manager.activate_cycle(cycle_id)
|
|
|
|
# Step 4: Execute cycle with iterations
|
|
self._execute_cycle_with_iterations(
|
|
cycle_id,
|
|
nodes,
|
|
initial_node_id,
|
|
max_iterations=self.cycle_manager.cycles[cycle_id].get_max_iterations()
|
|
)
|
|
|
|
# Cleanup
|
|
self.cycle_manager.deactivate_cycle(cycle_id)
|
|
self.log_manager.debug(f"Cycle {cycle_id} completed")
|
|
|
|
# ==================== New Methods for Refactored Cycle Execution ====================
|
|
|
|
def _validate_cycle_entry(self, cycle_id: str, nodes: List[str]) -> str | None:
|
|
"""
|
|
Validate that exactly one node in the cycle is triggered by external edges.
|
|
|
|
Args:
|
|
cycle_id: The cycle ID
|
|
nodes: List of node IDs in the cycle
|
|
|
|
Returns:
|
|
The ID of the unique initial node
|
|
|
|
Raises:
|
|
ValueError: If no node or multiple nodes are triggered
|
|
"""
|
|
triggered_nodes: List[str] = []
|
|
|
|
for node_id in nodes:
|
|
node = self.nodes[node_id]
|
|
# Check if any external predecessor (node outside the cycle) triggers this node
|
|
for predecessor in node.predecessors:
|
|
if predecessor.id not in nodes: # External node
|
|
edge = predecessor.find_outgoing_edge(node_id)
|
|
if edge and edge.trigger and edge.triggered:
|
|
triggered_nodes.append(node_id)
|
|
break
|
|
|
|
cycle_info = self.cycle_manager.cycles.get(cycle_id)
|
|
configured_entry = cycle_info.configured_entry_node if cycle_info else None
|
|
|
|
if len(triggered_nodes) == 0:
|
|
if configured_entry:
|
|
return configured_entry
|
|
return None
|
|
elif len(triggered_nodes) > 1:
|
|
raise ValueError(
|
|
f"Cycle {cycle_id} has multiple triggered entry nodes: {triggered_nodes}. "
|
|
"Only one entry node must be triggered when entering a cycle."
|
|
)
|
|
entry_node = triggered_nodes[0]
|
|
if configured_entry and entry_node != configured_entry:
|
|
raise ValueError(
|
|
f"Cycle {cycle_id} entry mismatch: configured '{configured_entry}' "
|
|
f"but triggered '{entry_node}'",
|
|
)
|
|
|
|
return entry_node
|
|
|
|
def _execute_cycle_with_iterations(
|
|
self,
|
|
cycle_id: str,
|
|
cycle_nodes: List[str],
|
|
initial_node_id: str,
|
|
max_iterations: int,
|
|
) -> Set[str]:
|
|
"""
|
|
Execute a cycle with multiple iterations.
|
|
|
|
Args:
|
|
cycle_id: Cycle ID
|
|
cycle_nodes: List of all nodes in the cycle
|
|
initial_node_id: Initial node ID
|
|
max_iterations: Maximum number of iterations
|
|
|
|
Returns:
|
|
A tuple of two sets:
|
|
- exit_nodes: nodes triggered outside the *current* cycle scope
|
|
- external_nodes: subset of exit_nodes that are also outside the
|
|
provided parent_cycle_nodes scope
|
|
"""
|
|
iteration = 0
|
|
|
|
while iteration < max_iterations:
|
|
self.log_manager.debug(
|
|
f"Cycle {cycle_id} iteration {iteration + 1}/{max_iterations}"
|
|
)
|
|
|
|
# Step 1: Detect nested cycles in the scoped subgraph
|
|
inner_cycles = self._detect_cycles_in_scope(cycle_nodes, initial_node_id)
|
|
|
|
# Build topological layers (whether there are nested cycles or not)
|
|
execution_layers = self._build_topological_layers_in_scope(
|
|
cycle_nodes, initial_node_id, inner_cycles,
|
|
is_first_iteration=(iteration == 0)
|
|
)
|
|
|
|
# Execute the topological layers
|
|
external_nodes = self._execute_scope_layers(
|
|
execution_layers,
|
|
cycle_id,
|
|
cycle_nodes,
|
|
initial_node_id=initial_node_id,
|
|
is_first_iteration=(iteration == 0)
|
|
)
|
|
|
|
if external_nodes:
|
|
self.log_manager.debug(
|
|
f"Cycle {cycle_id} exited - external nodes triggered: {sorted(external_nodes)}"
|
|
)
|
|
return external_nodes
|
|
|
|
# Step 4: Check if initial node is retriggered
|
|
if not self._is_initial_node_retriggered(initial_node_id, cycle_nodes):
|
|
self.log_manager.debug(
|
|
f"Cycle {cycle_id} completed - initial node not retriggered"
|
|
)
|
|
break
|
|
|
|
iteration += 1
|
|
|
|
if iteration >= max_iterations:
|
|
self.log_manager.warning(
|
|
f"Cycle {cycle_id} reached max iterations ({max_iterations})"
|
|
)
|
|
return set()
|
|
def _detect_cycles_in_scope(
|
|
self,
|
|
scope_nodes: List[str],
|
|
initial_node_id: str
|
|
) -> List[Set[str]]:
|
|
"""
|
|
Detect nested cycles within the scoped subgraph.
|
|
|
|
Constructs a subgraph containing only:
|
|
1. Nodes in scope_nodes
|
|
2. Edges where both source and target are in scope_nodes
|
|
3. Initial node's incoming edges are REMOVED (to break the outer cycle)
|
|
|
|
Args:
|
|
scope_nodes: List of node IDs in the current scope
|
|
initial_node_id: Initial node ID (whose incoming edges are removed)
|
|
|
|
Returns:
|
|
List of detected nested cycles (excluding the current cycle itself)
|
|
"""
|
|
# Build scoped nodes with initial node's incoming edges removed
|
|
scoped_nodes = self._build_scoped_nodes(scope_nodes, clear_entry_node=initial_node_id)
|
|
|
|
# Use GraphTopologyBuilder to detect cycles
|
|
all_cycles = GraphTopologyBuilder.detect_cycles(scoped_nodes)
|
|
|
|
# Filter out single-node "cycles" (unless they have self-loops)
|
|
nested_cycles = [
|
|
cycle for cycle in all_cycles
|
|
if len(cycle) > 1
|
|
]
|
|
|
|
return nested_cycles
|
|
|
|
def _build_scoped_nodes(
|
|
self,
|
|
scope_nodes: List[str],
|
|
clear_entry_node: Optional[str] = None
|
|
) -> Dict[str, Node]:
|
|
"""
|
|
Build a scoped subgraph containing only nodes and edges within the scope.
|
|
|
|
Args:
|
|
scope_nodes: List of node IDs in the scope
|
|
clear_entry_node: If specified, this node's incoming edges will be removed
|
|
(used to break the outer cycle when detecting nested cycles)
|
|
|
|
Returns:
|
|
Dictionary of scoped nodes
|
|
"""
|
|
scoped_nodes = {}
|
|
scope_nodes_set = set(scope_nodes)
|
|
|
|
for node_id in scope_nodes:
|
|
original_node = self.nodes[node_id]
|
|
# Shallow copy the node
|
|
scoped_node = copy.copy(original_node)
|
|
|
|
# Filter outgoing edges: only keep edges where target is in scope AND trigger=true
|
|
# Special case: if target is clear_entry_node, remove this edge
|
|
scoped_edges = [
|
|
edge_link for edge_link in original_node.iter_outgoing_edges()
|
|
if edge_link.target.id in scope_nodes_set
|
|
and edge_link.trigger
|
|
and edge_link.target.id != clear_entry_node # Remove edges to entry node
|
|
]
|
|
scoped_node._outgoing_edges = scoped_edges
|
|
|
|
# Filter predecessors: only keep predecessors in scope AND with trigger=true edge
|
|
# Special case: if this node is clear_entry_node, clear all predecessors
|
|
if node_id == clear_entry_node:
|
|
scoped_node.predecessors = []
|
|
else:
|
|
scoped_predecessors = []
|
|
for pred in original_node.predecessors:
|
|
if pred.id in scope_nodes_set:
|
|
# Check if the edge from pred to node has trigger=true
|
|
edge = pred.find_outgoing_edge(node_id)
|
|
if edge and edge.trigger:
|
|
scoped_predecessors.append(pred)
|
|
scoped_node.predecessors = scoped_predecessors
|
|
|
|
# Filter successors: only keep successors in scope AND with trigger=true edge
|
|
# Special case: remove clear_entry_node from successors
|
|
scoped_successors = [
|
|
succ for succ in original_node.successors
|
|
if succ.id in scope_nodes_set
|
|
and succ.id != clear_entry_node # Remove entry node from successors
|
|
and any(
|
|
edge_link.target.id == succ.id and edge_link.trigger
|
|
for edge_link in original_node.iter_outgoing_edges()
|
|
)
|
|
]
|
|
scoped_node.successors = scoped_successors
|
|
|
|
scoped_nodes[node_id] = scoped_node
|
|
|
|
return scoped_nodes
|
|
|
|
def _build_topological_layers_in_scope(
|
|
self,
|
|
scope_nodes: List[str],
|
|
initial_node_id: str,
|
|
inner_cycles: List[Set[str]],
|
|
is_first_iteration: bool = False
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Build topological execution order for the scoped subgraph.
|
|
|
|
Args:
|
|
scope_nodes: List of node IDs in the scope
|
|
initial_node_id: Initial node ID
|
|
inner_cycles: List of nested cycles detected in the scope
|
|
is_first_iteration: Whether this is the first iteration (affects initial node handling)
|
|
|
|
Returns:
|
|
List of execution layers, each containing execution items
|
|
"""
|
|
# Build scoped nodes WITHOUT clearing entry node
|
|
# We want to keep all edges intact for execution
|
|
scoped_nodes = self._build_scoped_nodes(scope_nodes, clear_entry_node=None)
|
|
|
|
# Handle entry points based on iteration:
|
|
# - First iteration: manually clear initial node's predecessors (for in_degree calculation only)
|
|
# - Subsequent iterations: clear predecessors for all triggered nodes
|
|
if is_first_iteration:
|
|
# Clear initial node's predecessors to make it an entry point
|
|
if initial_node_id in scoped_nodes:
|
|
scoped_nodes[initial_node_id].predecessors = []
|
|
else:
|
|
# Subsequent iterations: clear predecessors for all triggered nodes
|
|
for node_id in scope_nodes:
|
|
if self.nodes[node_id].is_triggered():
|
|
scoped_nodes[node_id].predecessors = []
|
|
|
|
# Extract scoped edges from scoped_nodes (not original nodes)
|
|
# This ensures consistency with the filtered graph structure
|
|
scoped_edges = []
|
|
|
|
# Collect nodes whose incoming edges should be excluded
|
|
# (to break cycles in topological sorting)
|
|
exclude_targets = set()
|
|
if is_first_iteration:
|
|
# First iteration: exclude edges to initial_node
|
|
exclude_targets.add(initial_node_id)
|
|
else:
|
|
# Subsequent iterations: exclude edges to all triggered nodes
|
|
for node_id in scope_nodes:
|
|
if self.nodes[node_id].is_triggered():
|
|
exclude_targets.add(node_id)
|
|
|
|
for node_id in scope_nodes:
|
|
# Use scoped_node to get filtered edges
|
|
scoped_node = scoped_nodes.get(node_id)
|
|
if scoped_node:
|
|
for edge_link in scoped_node.iter_outgoing_edges():
|
|
# Exclude edges pointing to nodes in exclude_targets
|
|
if edge_link.target.id in exclude_targets:
|
|
continue
|
|
scoped_edges.append({
|
|
"from": node_id,
|
|
"to": edge_link.target.id,
|
|
"trigger": edge_link.trigger,
|
|
"condition": edge_link.condition
|
|
})
|
|
|
|
# Use GraphTopologyBuilder to build execution order
|
|
if not inner_cycles:
|
|
# No nested cycles, use DAG layers
|
|
layers = GraphTopologyBuilder.build_dag_layers(scoped_nodes)
|
|
return layers
|
|
else:
|
|
# Has nested cycles, use super-node approach
|
|
super_graph = GraphTopologyBuilder.create_super_node_graph(
|
|
scoped_nodes, scoped_edges, inner_cycles
|
|
)
|
|
layers = GraphTopologyBuilder.topological_sort_super_nodes(
|
|
super_graph, inner_cycles
|
|
)
|
|
return layers
|
|
|
|
def _execute_scope_layers(
|
|
self,
|
|
execution_layers: List[List[Dict[str, Any]]],
|
|
parent_cycle_id: str,
|
|
parent_cycle_nodes: List[str],
|
|
initial_node_id: Optional[str] = None,
|
|
is_first_iteration: bool = False
|
|
) -> Set[str]:
|
|
"""
|
|
Execute scoped layers with parallelism, supporting nested cycles.
|
|
|
|
Args:
|
|
execution_layers: List of execution layers
|
|
parent_cycle_id: Parent cycle ID
|
|
parent_cycle_nodes: List of nodes in the parent cycle
|
|
initial_node_id: Initial node ID (for first iteration special handling)
|
|
is_first_iteration: Whether this is the first iteration
|
|
|
|
Returns:
|
|
external_nodes: subset of exit_nodes outside parent_cycle_nodes_set
|
|
"""
|
|
scope_node_set = set(parent_cycle_nodes)
|
|
external_nodes: Set[str] = set()
|
|
stop_event = threading.Event()
|
|
result_lock = threading.Lock()
|
|
|
|
def record_external(nodes: Set[str]) -> None:
|
|
nonlocal external_nodes
|
|
if not nodes:
|
|
return
|
|
with result_lock:
|
|
if nodes:
|
|
external_nodes.update(nodes)
|
|
stop_event.set()
|
|
|
|
def item_desc(item: Dict[str, Any]) -> str:
|
|
if item["type"] == "node":
|
|
return f"node {item['node_id']}"
|
|
if item["type"] == "cycle":
|
|
return f"cycle {item['cycle_id']}"
|
|
return "layer_item"
|
|
|
|
for layer in execution_layers:
|
|
if stop_event.is_set():
|
|
break
|
|
|
|
def executor_func(item: Dict[str, Any]) -> None:
|
|
if stop_event.is_set():
|
|
return
|
|
|
|
if item["type"] == "node":
|
|
_node_id = item["node_id"]
|
|
force_execute = is_first_iteration and (_node_id == initial_node_id)
|
|
targets = self._execute_single_cycle_node_in_scope(
|
|
_node_id,
|
|
scope_node_set,
|
|
force_execute=force_execute
|
|
)
|
|
if targets:
|
|
record_external(targets)
|
|
|
|
elif item["type"] == "cycle":
|
|
inner_cycle_nodes = item["nodes"]
|
|
inner_cycle_id = item["cycle_id"]
|
|
|
|
self.log_manager.debug(
|
|
f"Executing nested cycle {inner_cycle_id} within cycle {parent_cycle_id}"
|
|
)
|
|
|
|
try:
|
|
inner_initial_node = self._validate_cycle_entry(
|
|
inner_cycle_id, inner_cycle_nodes
|
|
)
|
|
except ValueError as e:
|
|
self.log_manager.error(str(e))
|
|
raise
|
|
|
|
if inner_initial_node is None:
|
|
self.log_manager.debug(
|
|
f"Nested cycle {inner_cycle_id} has no triggered entry; skipping"
|
|
)
|
|
return
|
|
|
|
inner_external_nodes = self._execute_cycle_with_iterations(
|
|
inner_cycle_id,
|
|
inner_cycle_nodes,
|
|
inner_initial_node,
|
|
max_iterations=100,
|
|
)
|
|
|
|
if inner_external_nodes:
|
|
filtered = {
|
|
node
|
|
for node in inner_external_nodes
|
|
if node not in scope_node_set
|
|
}
|
|
if filtered:
|
|
record_external(filtered)
|
|
|
|
self.parallel_executor.execute_items_parallel(
|
|
layer,
|
|
executor_func,
|
|
item_desc
|
|
)
|
|
|
|
if stop_event.is_set():
|
|
break
|
|
|
|
if external_nodes:
|
|
for node_id in scope_node_set:
|
|
self.nodes[node_id].reset_triggers()
|
|
|
|
return external_nodes
|
|
|
|
def _execute_single_cycle_node_in_scope(
|
|
self,
|
|
node_id: str,
|
|
scope_node_set: Set[str],
|
|
force_execute: bool = False
|
|
) -> Set[str]:
|
|
"""
|
|
Execute a single node within a cycle scope.
|
|
|
|
Args:
|
|
node_id: Node ID to execute
|
|
scope_node_set: Nodes that belong to the current scoped cycle
|
|
force_execute: If True, execute even if not triggered (for initial node in first iteration)
|
|
|
|
Returns:
|
|
Set of node IDs triggered outside the current scoped cycle
|
|
"""
|
|
node = self.nodes[node_id]
|
|
|
|
# Check if node is triggered (unless force_execute is True)
|
|
if not force_execute:
|
|
if not node.is_triggered():
|
|
return set()
|
|
|
|
# Reset edge triggers
|
|
for edge_link in node.iter_outgoing_edges():
|
|
edge_link.triggered = False
|
|
|
|
# Execute the node
|
|
self.execute_node_func(node)
|
|
|
|
# Check if any external node was triggered
|
|
external_targets: Set[str] = set()
|
|
for edge_link in node.iter_outgoing_edges():
|
|
if edge_link.target.id not in scope_node_set and edge_link.triggered:
|
|
self.log_manager.debug(
|
|
f"Node {node_id} triggered external node {edge_link.target.id}"
|
|
)
|
|
external_targets.add(edge_link.target.id)
|
|
|
|
return external_targets
|
|
|
|
def _is_initial_node_retriggered(
|
|
self,
|
|
initial_node_id: str,
|
|
cycle_nodes: List[str]
|
|
) -> bool:
|
|
"""
|
|
Check if the initial node is retriggered by any internal edge (from within the cycle).
|
|
|
|
Args:
|
|
initial_node_id: Initial node ID
|
|
cycle_nodes: List of nodes in the cycle
|
|
|
|
Returns:
|
|
True if the initial node is retriggered by an internal edge
|
|
"""
|
|
initial_node = self.nodes[initial_node_id]
|
|
|
|
for predecessor in initial_node.predecessors:
|
|
# Only check predecessors within the cycle
|
|
if predecessor.id in cycle_nodes:
|
|
edge = predecessor.find_outgoing_edge(initial_node_id)
|
|
if edge and edge.trigger and edge.triggered:
|
|
return True
|
|
|
|
return False
|