feat: Add loop_timer node for time-based loop control

- Add LoopTimerConfig with duration units support (seconds/minutes/hours)
- Implement LoopTimerNodeExecutor with standard and passthrough modes
- Register loop_timer node type in builtin_nodes.py
- Update documentation (execution_logic.md, YAML_FORMAT_QUICK_GUIDE.md)
- Add demo workflows for both modes

Closes: add-loop-timer change proposal
This commit is contained in:
laansdole 2026-02-07 12:27:29 +07:00
parent 7ca6084484
commit 42cd389d59
7 changed files with 1304 additions and 6 deletions

915
YAML_FORMAT_QUICK_GUIDE.md Normal file
View File

@ -0,0 +1,915 @@
# Deep Research: YAML Scenario File Format for ChatDev 2.0 (DevAll)
This document details the YAML format used for defining multi-agent workflows in ChatDev 2.0. It is based on an analysis of the codebase, specifically the configuration schemas in `entity/configs/` and validation logic in `check/`.
## 1. File Structure Overview
A valid workflow file consists of three main top-level keys:
```yaml
version: "0.0.0" # Optional, defaults to "0.0.0"
vars: # Optional global variables
API_KEY: ${API_KEY}
BASE_URL: ${BASE_URL}
graph: # REQUIRED: The core workflow definition
id: "MyWorkflow"
description: "Description of what this workflow does"
nodes: [] # List of Node objects
edges: [] # List of Edge objects
```
## 2. Graph Definition (`graph`)
| Field | Type | Required | Description |
| :--- | :--- | :--- | :--- |
| `id` | `str` | Yes | Unique identifier (alphanumeric, underscores, hyphens). |
| `nodes` | `List[Node]` | Yes | List of nodes. Must contain at least one node. |
| `edges` | `List[Edge]` | Yes | List of directed edges connecting nodes. |
| `description` | `str` | No | Human-readable description. |
| `log_level` | `enum` | No | `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`. Default: `DEBUG`. |
| `is_majority_voting` | `bool` | No | Default: `false`. |
| `memory` | `List` | No | List of `MemoryStoreConfig` definitions. |
| `start` | `List[str]` | No* | List of start node IDs. *Inferred if graph has unique source.* |
| `end` | `List[str]` | No* | List of end node IDs. *Inferred if graph has unique sink.* |
| `initial_instruction` | `str` | No | Initial instruction text for the user. |
| `organization` | `str` | No | Organization name. |
## 3. Node Configuration (`nodes`)
Each item in the `nodes` list represents a processing unit.
```yaml
- id: "NodeID" # Required: Unique ID
type: "agent" # Required: Node type (agent, human, loop_counter, etc.)
description: "..." # Optional
context_window: 0 # Optional: 0 (clear), -1 (unlimited), N (keep last N)
config: # Required: Configuration specific to the 'type'
...
```
### Common Node Types & Configurations
#### **`agent`**
Represents an LLM-based agent.
```yaml
type: agent
config:
name: "gpt-4o" # Required: Model name
provider: "openai" # Required: Provider (openai, etc.)
role: "System prompt..." # Optional: System message
base_url: ${BASE_URL} # Optional: Override URL
api_key: ${API_KEY} # Optional: API Key
params: # Optional: Model parameters
temperature: 0.7
tooling: # Optional: List of tools
- type: function
config:
tools:
- name: "read_file"
memories: [] # Optional: Memory attachments
retry: # Optional: Retry configuration
enabled: true
max_attempts: 5
```
#### **`human`**
Represents a human interaction point.
```yaml
type: human
config:
description: "Instruction for the human user"
memories: # Optional: Memory attachments for context retrieval and writing
- name: "memory_name" # Reference to a memory store defined in graph.memory
read: true # Enable memory retrieval
write: true # Enable memory writing (human feedback stored)
top_k: 10 # Number of relevant items to retrieve
similarity_threshold: 0.5 # Minimum similarity score (0.0-1.0)
retrieve_stage: # When to retrieve memory
- gen # During generation stage (default if not specified)
- pre_gen_thinking # Before thinking step
```
**Note**:
- Memory context retrieved from attached memory stores will be displayed to the human user alongside the description and input data.
- When `write: true`, human feedback will be automatically stored in the memory for future retrieval.
- `retrieve_stage` controls when memory is retrieved. If not specified, defaults to `gen` stage.
**Best Practice - Memory Flow Patterns**:
When designing workflows with shared memory between agent and human nodes, consider who should write to which memory:
1. **State Memory Pattern** (Agent writes, Human reads):
```yaml
# Environment/state generator agent
- id: environment
type: agent
config:
memories:
- name: environment_memory
read: true
write: true # Agent owns this memory
top_k: 10
similarity_threshold: 0.0 # IMPORTANT: Use 0.0 to always retrieve recent items
retrieve_stage:
- gen # Retrieve during generation (before model call)
# Human controller
- id: HumanControl
type: human
config:
memories:
- name: environment_memory
read: true # Human only reads to see history
write: false # Human does NOT write to state memory
top_k: 10
similarity_threshold: 0.0 # Always show recent history
```
**Use when**: Agent generates state/context that human needs to review but not modify directly. The human provides commands via edges, and the agent interprets them to update state.
**Why `similarity_threshold: 0.0`?** When the agent receives user input like "continue", the query text has low semantic similarity to past state descriptions (e.g., "COVID-19 outbreak" vs "continue"). Setting threshold to 0.0 ensures the agent ALWAYS retrieves its most recent states regardless of query similarity, maintaining continuity.
**Why `retrieve_stage: gen`?** The `reflection` thinking type only processes memory during the generation stage. Using `pre_gen_thinking` would cause memory to be retrieved but ignored. With `gen` stage, memory is injected into the conversation BEFORE the model generates output, ensuring continuity.
2. **Feedback Memory Pattern** (Human writes, Agent reads):
```yaml
# Agent processes feedback
- id: processor
type: agent
config:
memories:
- name: feedback_memory
read: true
write: false
# Human provides feedback
- id: reviewer
type: human
config:
memories:
- name: feedback_memory
read: true
write: true # Human owns feedback memory
```
**Use when**: Human provides annotations, corrections, or judgments that agents need to incorporate.
3. **Separate Memory Pattern** (Isolated stores):
```yaml
# Each node has its own memory store
- id: agent_a
memories:
- name: agent_a_memory
- id: human_b
memories:
- name: human_b_memory
```
**Use when**: No memory sharing needed; each node maintains independent history.
#### **`loop_counter`**
Controls loops by counting iterations.
```yaml
type: loop_counter
config:
max_iterations: 5 # Max allowed loops
reset_on_emit: true # Reset count when condition is met
message: "" # Optional message
```
#### **`loop_timer`**
Controls loops by tracking elapsed time.
```yaml
type: loop_timer
config:
max_duration: 5 # Max allowed time
duration_unit: seconds # Unit: 'seconds', 'minutes', or 'hours'
reset_on_emit: true # Reset timer when condition is met
message: "" # Optional message
passthrough: false # Passthrough mode (same as loop_counter)
```
#### **`passthrough`**
A simple node that passes data through without modification.
```yaml
type: passthrough
config: {}
```
#### **`literal`**
Injects static content into the workflow.
```yaml
type: literal
config:
content: "Static content text"
role: "user" # Role of the message (user, assistant, system)
```
#### **`python_runner`** (implied from imports)
Executes Python code.
```yaml
type: python_runner
config:
timeout_seconds: 60
```
## 4. Edge Configuration (`edges`)
Defines the flow between nodes.
```yaml
- from: "SourceNodeID" # Required
to: "TargetNodeID" # Required
trigger: true # Default: true. Can trigger target execution?
condition: "true" # Condition to traverse (default "true")
carry_data: true # Pass output of source to target?
keep_message: false # Mark message as 'keep' in target context?
clear_context: false # Clear target's context before adding new data?
clear_kept_context: false # Clear 'kept' messages in target?
processor: # Optional: Transform data before passing to target
type: template
config:
template: "..."
dynamic: # Optional: Dynamic execution (map/tree patterns)
type: map
config:
split_type: regex
```
### 4.1 Context Management
Context management controls how messages accumulate in a node's execution context across workflow execution.
#### **Context vs Memory**
- **Context**: Message queue visible to a node during execution (temporary, workflow-scoped)
- **Memory**: Persistent storage across workflow runs (permanent, stored in vector database)
#### **Message Lifecycle**
1. Source node produces output
2. Edge delivers message to target node's context
3. Target node processes messages in its context
4. Context can be cleared or preserved based on edge configuration
#### **Edge Parameters for Context Control**
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `keep_message` | `bool` | `false` | Mark message as "kept" (survives soft reset) |
| `clear_context` | `bool` | `false` | Clear non-kept messages before adding new data |
| `clear_kept_context` | `bool` | `false` | Clear kept messages (requires `clear_context: true`) |
#### **Soft Reset vs Hard Reset**
**Soft Reset** (`clear_context: true`):
- Clears only messages with `keep=false`
- Preserves messages marked with `keep_message: true`
- Use case: Clear temporary data but keep system instructions
```yaml
- from: UserInput
to: Agent
clear_context: true # Soft reset
keep_message: false
```
**Hard Reset** (`clear_context: true` + `clear_kept_context: true`):
- Clears ALL messages (including `keep=true`)
- Complete context isolation
- Use case: Start fresh between workflow rounds
```yaml
- from: LoopControl
to: StartNode
clear_context: true # Hard reset
clear_kept_context: true
```
#### **Context Reset Demo Example**
See `yaml_instance/demo_context_reset.yaml` for a working demonstration:
**Workflow Structure**:
1. `entry_normal``collector` (`keep_message: false`) - Normal message
2. `entry_keep``collector` (`keep_message: true`) - Kept message
3. `soft_reset``collector` (`clear_context: true`) - Soft reset
4. `hard_reset``collector` (`clear_context: true`, `clear_kept_context: true`) - Hard reset
**Execution Flow**:
- After `entry_normal` + `entry_keep`: collector has 2 messages
- After `soft_reset`: collector has `entry_keep` + `soft_reset` (normal cleared)
- After `hard_reset`: collector has only `hard_reset` (all previous cleared)
#### **Common Use Cases**
**Preserve System Instructions**:
```yaml
- from: SystemSetup
to: Agent
keep_message: true # This survives soft resets
- from: UserInput
to: Agent
clear_context: true # Clears user messages but keeps system
```
**Recursive Workflows with Isolation**:
```yaml
- from: HumanControl
to: StartNode
clear_context: true # Fresh start each round
clear_kept_context: true # Complete isolation
condition:
type: keyword
config:
none: ["EXIT"] # Loop until user says "EXIT"
```
**Accumulate Important Context**:
```yaml
- from: CriticalNode
to: Aggregator
keep_message: true # Don't lose this data
- from: TemporaryNode
to: Aggregator
keep_message: false # Can be cleared later
```
### 4.2 Condition Configuration
Edges can have conditions that determine when they are traversed.
#### **Keyword Condition**
Match specific keywords in the source node output.
```yaml
condition:
type: keyword
config:
any: ["APPROVED", "SUCCESS"] # Match if ANY keyword present
none: ["ERROR", "FAILED"] # Match if NONE present
all: ["READY", "VERIFIED"] # Match if ALL present
```
#### **Regex Condition**
Match output against a regular expression.
```yaml
condition:
type: regex
config:
pattern: "^RESULT: .*"
flags: ["IGNORECASE"]
```
## 5. Memory Configuration
Memory stores enable persistent data across workflow runs using vector similarity search.
### 5.1 Graph-Level Memory Stores
Define memory stores in the `graph.memory` section:
```yaml
graph:
memory:
- name: patient_memory
type: simple
config:
embedding:
provider: openai
base_url: ${BASE_URL}
api_key: ${API_KEY}
model: text-embedding-bge-reranker-v2-m3
```
### 5.2 Node-Level Memory Attachments
Attach memory to nodes for read/write operations:
```yaml
nodes:
- id: PatientGenerator
type: agent
config:
memories:
- name: patient_memory # Must match graph.memory name
top_k: 50 # Retrieve top 50 similar items
similarity_threshold: 0.3 # Minimum similarity score (0-1)
retrieve_stage: # When to retrieve memory
- pre_gen_thinking # Before thinking step
read: true # Enable reading from memory
write: true # Enable writing to memory
```
#### **Memory Parameters**
| Parameter | Type | Description |
|-----------|------|-------------|
| `name` | `str` | Memory store name (must exist in `graph.memory`) |
| `top_k` | `int` | Number of similar items to retrieve (default: 10) |
| `similarity_threshold` | `float` | Minimum similarity score 0-1 (default: 0.5) |
| `retrieve_stage` | `List[str]` | When to retrieve: `pre_gen_thinking`, `post_gen`, etc. |
| `read` | `bool` | Enable reading from memory (default: false) |
| `write` | `bool` | Enable writing to memory (default: false) |
#### **Use Cases**
**Cross-Round Uniqueness** (High top_k, Low threshold):
```yaml
memories:
- name: patient_memory
top_k: 50 # Cast wide net
similarity_threshold: 0.3 # Catch similar items
retrieve_stage: [pre_gen_thinking]
read: true
write: true
```
**Recent Context Preservation** (Low top_k, Medium threshold):
```yaml
memories:
- name: environment_memory
top_k: 10 # Only recent context
similarity_threshold: 0.5 # Moderately similar
retrieve_stage: [pre_gen_thinking]
read: true
write: true
```
**Human Node Memory Support**:
Human nodes support full memory functionality including read, write, and retrieve_stage configuration:
```yaml
nodes:
- id: HumanFeedback
type: human
config:
description: "Review the article and provide feedback..."
memories:
- name: feedback_memory
read: true # Retrieve and display previous feedback
write: true # Store human feedback for future reference
top_k: 10
similarity_threshold: 0.5
retrieve_stage: # Control when memory is retrieved
- gen # Default stage for human nodes
```
**Features**:
- **Read**: Retrieved memory is displayed alongside the description to help users make informed decisions
- **Write**: Human feedback is automatically stored in memory after user input
- **retrieve_stage**: Controls when memory retrieval occurs. Available stages: `gen`, `pre_gen_thinking`, `post_gen_thinking`, `finished`
- If `retrieve_stage` is not specified, defaults to `gen` stage
**Use Cases**:
- **Feedback Consistency**: Store and retrieve past user feedback to maintain consistent review standards
- **Context Awareness**: Display historical decisions to inform current choices
- **Learning from History**: Build up knowledge base from human inputs over multiple workflow runs
## 6. Thinking Module
The thinking module adds reflection and planning capabilities to agent nodes.
### 6.1 Reflection Thinking
Enables post-generation reflection and refinement:
```yaml
nodes:
- id: environment
type: agent
config:
thinking:
type: reflection
config:
reflection_prompt: |
You have just generated output. Now reflect and refine:
QUALITY CHECK:
1. Does the output meet all requirements?
2. Is it consistent with retrieved memory?
3. Are there any contradictions or gaps?
Output ONLY the final refined result with no commentary.
```
### 6.2 Thinking with Memory Integration
Combine thinking with memory retrieval for intelligent context-aware processing:
```yaml
nodes:
- id: PatientGenerator
type: agent
config:
memories:
- name: patient_memory
retrieve_stage: [pre_gen_thinking] # Retrieve BEFORE thinking
read: true
write: true
thinking:
type: reflection
config:
reflection_prompt: |
Review retrieved patient memory and verify uniqueness:
- Are there duplicate names or symptom patterns?
- Generate fresh alternatives if conflicts exist.
Output the final unique patient list.
```
**Execution Flow**:
1. Node generates initial output
2. Memory is retrieved (if `retrieve_stage: [pre_gen_thinking]`)
3. Retrieved memory + initial output passed to thinking module
4. Thinking module refines and returns final output
## 7. Dynamic Execution
Dynamic execution enables parallel processing patterns like map-reduce.
### 7.1 Map Pattern (Parallel Fan-Out)
Split input into multiple parallel branches:
```yaml
edges:
- from: PatientGenerator
to: NurseIntake
dynamic:
type: map
config:
split_type: json_array # Split JSON array into items
item_name: patient # Variable name for each item
```
**How it works**:
1. `PatientGenerator` outputs JSON array: `[{patient1}, {patient2}, {patient3}]`
2. Edge splits into 3 parallel executions of `NurseIntake`
3. Each execution receives one patient object
### 7.2 Supported Split Types
| Split Type | Description | Example |
|------------|-------------|---------|
| `json_array` | Split JSON array | `[{...}, {...}]` → multiple items |
| `regex` | Split by regex pattern | Split by delimiter |
| `lines` | Split by newlines | Multi-line text → separate lines |
### 7.3 Real-World Example (Hospital Simulation)
```yaml
# Patient generator creates array of patients
- id: PatientGenerator
type: agent
config:
role: "Generate array of patients in JSON format"
# Parallel fan-out to process each patient
edges:
- from: PatientGenerator
to: NurseIntake
dynamic:
type: map
config:
split_type: json_array
item_name: patient
# Each nurse intake processes one patient in parallel
- id: NurseIntake
type: agent
config:
role: "Process patient: {{patient}}"
```
**Result**: 5 patients → 5 parallel NurseIntake executions
## 8. Edge Processors
Edge processors transform data before passing to target nodes.
### 8.1 Template Processor
Use template strings to format data:
```yaml
edges:
- from: HumanControl
to: environment
processor:
type: template
config:
template: |
USER INPUT: {{content}}
INSTRUCTION: Update the environment based on user input.
- If input is minimal ("continue"), maintain scenario with time progression
- If input adds elements, integrate them while preserving context
```
**Variables available**:
- `{{content}}`: Message content from source node
- Custom variables from node outputs
### 8.2 Regex Extract Processor
Extract specific data using regex:
```yaml
edges:
- from: Analyzer
to: Processor
processor:
type: regex_extract
config:
pattern: "RESULT: (.*)"
group: 1
```
## 9. Common Errors & Best Practices
1. **Unique IDs**: Ensure every `id` in `nodes` is unique. Duplicate IDs cause validation failure.
2. **Valid References**: `from` and `to` in `edges` must match exactly with a defined `id` in `nodes`.
3. **Root Structure**: The file **must** have the `graph:` key. `vars:` defines placeholders like `${API_KEY}`.
4. **Type Consistency**:
* `context_window` is an **integer**, not a string.
* `condition` is a string expression (e.g., `"true"`, `"false"`) or a config object.
5. **Agent Config**:
* `name` and `provider` are mandatory for `agent` nodes.
* `tooling` must be a list of tool configurations.
6. **Environment Variables**: Use `${VAR_NAME}` in YAML and define them in `.env` or the `vars` section. The validation logic checks schema but resolves variables at runtime.
## 9. Common Errors & Best Practices
1. **Unique IDs**: Ensure every `id` in `nodes` is unique. Duplicate IDs cause validation failure.
2. **Valid References**: `from` and `to` in `edges` must match exactly with a defined `id` in `nodes`.
3. **Root Structure**: The file **must** have the `graph:` key. `vars:` defines placeholders like `${API_KEY}`.
4. **Type Consistency**:
* `context_window` is an **integer**, not a string.
* `condition` is a string expression (e.g., `"true"`, `"false"`) or a config object.
5. **Agent Config**:
* `name` and `provider` are mandatory for `agent` nodes.
* `tooling` must be a list of tool configurations.
6. **Environment Variables**: Use `${VAR_NAME}` in YAML and define them in `.env` or the `vars` section. The validation logic checks schema but resolves variables at runtime.
7. **Memory Configuration**:
* Memory store names in `nodes.config.memories` must exist in `graph.memory`.
* `retrieve_stage` determines when memory is retrieved (before/after thinking).
* Balance `top_k` and `similarity_threshold` based on use case.
8. **Context Management**:
* Use `keep_message: true` for system instructions and critical context.
* Use soft reset (`clear_context: true`) to clear temporary data.
* Use hard reset (`clear_context + clear_kept_context`) for complete isolation.
9. **Dynamic Execution**:
* Ensure source node output format matches `split_type` (e.g., JSON array for `json_array`).
* Use descriptive `item_name` for clarity in target node templates.
10. **Thinking Module**:
* Keep reflection prompts focused and concise.
* Clearly specify expected output format.
* Use thinking with memory retrieval for context-aware refinement.
## 10. Complete Example: Recursive Hospital Simulation
This example demonstrates advanced features: recursive loops, memory, thinking, dynamic execution, and context management.
See `yaml_instance/simulation_hospital.yaml` for the full implementation.
**Key Features**:
1. **Environment Continuity** (`environment` node):
- Memory retrieval at `pre_gen_thinking` stage
- Reflection thinking detects "continue" vs "evolve" modes
- Maintains scenario context across rounds
2. **Unique Patient Generation** (`PatientGenerator` node):
- Memory tracks all previously generated patients
- Reflection thinking verifies uniqueness
- Dynamic execution fans out to parallel patient processing
3. **Recursive Loop** (edges):
- `HumanControl``environment` with hard reset
- Condition: `none: ["SIMULATION ENDED."]` (continues)
- Complete context isolation between rounds
4. **Parallel Processing** (dynamic execution):
- Patient array splits into parallel nurse/doctor workflows
- Each patient processed independently
- Results aggregate before next round
**Simplified Structure**:
```yaml
graph:
memory:
- name: patient_memory
type: simple
- name: environment_memory
type: simple
nodes:
- id: environment
type: agent
config:
memories:
- name: environment_memory
top_k: 10
similarity_threshold: 0.5
retrieve_stage: [pre_gen_thinking]
read: true
write: true
thinking:
type: reflection
config:
reflection_prompt: "Detect mode and maintain continuity..."
- id: PatientGenerator
type: agent
config:
memories:
- name: patient_memory
top_k: 50
similarity_threshold: 0.3
retrieve_stage: [pre_gen_thinking]
read: true
write: true
thinking:
type: reflection
config:
reflection_prompt: "Verify uniqueness against memory..."
- id: HumanControl
type: human
config:
description: "Enter next scenario or 'SIMULATION ENDED.'"
- id: SimulationEnd
type: literal
config:
content: "Simulation complete."
edges:
# Dynamic parallel processing
- from: PatientGenerator
to: NurseIntake
dynamic:
type: map
config:
split_type: json_array
item_name: patient
# Recursive loop with hard reset
- from: HumanControl
to: environment
clear_context: true # Hard reset
clear_kept_context: true # Complete isolation
condition:
type: keyword
config:
none: ["SIMULATION ENDED."]
# Exit condition
- from: HumanControl
to: SimulationEnd
condition:
type: keyword
config:
any: ["SIMULATION ENDED."]
```
## 11. Validation
Use the project's validation tool to check your YAML:
```bash
uv run python -m check.check --path yaml_instance/your_workflow.yaml
```
This tool performs:
* **Schema Validation**: Checks if fields match the defined dataclasses (`entity/configs`).
* **Structure Validation**: Checks for orphan nodes, invalid edges, and logical consistency.
* **Memory Validation**: Verifies memory store references are valid.
* **Condition Validation**: Ensures condition syntax is correct.
## 12. Quick Reference
### Context Management
| Goal | Configuration |
|------|---------------|
| Keep system instructions | `keep_message: true` on edge |
| Clear temporary messages | `clear_context: true` |
| Complete context reset | `clear_context: true` + `clear_kept_context: true` |
| Preserve between rounds | Use `keep_message: true` on critical edges |
### Memory Patterns
| Use Case | top_k | similarity_threshold |
|----------|-------|---------------------|
| Broad historical search | 50+ | 0.3 (low) |
| Recent context | 10 | 0.5 (medium) |
| Exact matches | 5 | 0.7+ (high) |
### Thinking Integration
| Retrieve Stage | Purpose |
|----------------|---------|
| `pre_gen_thinking` | Provide memory context before thinking |
| `post_gen` | Retrieve after generation (rare) |
### Dynamic Execution
| Split Type | Input Format |
|------------|--------------|
| `json_array` | `[{...}, {...}]` |
| `regex` | Text with delimiter |
| `lines` | Multi-line text |
### Condition Types
| Type | Use Case |
|------|----------|
| `keyword` | Match specific words (any/none/all) |
| `regex` | Pattern matching |
| `"true"` | Always traverse (default) |
## 13. Implementation Patterns
### Pattern 1: Recursive Workflow with Isolation
```yaml
edges:
- from: HumanControl
to: StartNode
clear_context: true
clear_kept_context: true
condition:
type: keyword
config:
none: ["EXIT"]
```
### Pattern 2: Memory-Based Uniqueness Check
```yaml
nodes:
- id: Generator
type: agent
config:
memories:
- name: tracking_memory
top_k: 50
similarity_threshold: 0.3
retrieve_stage: [pre_gen_thinking]
read: true
write: true
thinking:
type: reflection
config:
reflection_prompt: "Verify uniqueness against retrieved memory..."
```
### Pattern 3: Parallel Processing with Aggregation
```yaml
edges:
# Fan out
- from: Generator
to: Processor
dynamic:
type: map
config:
split_type: json_array
# Aggregate
- from: Processor
to: Aggregator
```
### Pattern 4: Conditional Branching
```yaml
edges:
- from: Decision
to: PathA
condition:
type: keyword
config:
any: ["APPROVE"]
- from: Decision
to: PathB
condition:
type: keyword
config:
any: ["REJECT"]
```
## 14. Reference Files
- **Demo Workflows**: `yaml_instance/demo_context_reset.yaml` - Context management demo
- **Complex Example**: `yaml_instance/simulation_hospital.yaml` - Full-featured recursive simulation
- **Configuration Schemas**: `entity/configs/` - Dataclass definitions
- **Validation Logic**: `check/check.py` - Schema and structure validation
- **User Guide**: `docs/user_guide/en/` - Official documentation
---
**Last Updated**: Based on codebase analysis and `demo_context_reset.yaml` demonstration workflow.

View File

@ -114,6 +114,7 @@ Execute according to the topological order:
After completing each round of in-cycle execution, the system checks these exit conditions:
- **Exit edge triggered**: If any in-cycle node triggers an edge to an out-of-cycle node, exit the loop
- **Maximum iterations reached**: If the configured maximum (default 100) is reached, force termination
- **Time limit reached**: If a `loop_timer` node within the cycle reaches its configured time limit, exit the loop
- **Initial node not re-triggered**: If the initial node isn't re-triggered by in-cycle predecessors, the loop naturally terminates
If none of the conditions are met, return to Step 2 for the next iteration.

View File

@ -0,0 +1,122 @@
"""Configuration for loop timer guard nodes."""
from dataclasses import dataclass
from typing import Mapping, Any, Optional
from entity.configs.base import (
BaseConfig,
ConfigError,
ConfigFieldSpec,
require_mapping,
extend_path,
optional_str,
)
@dataclass
class LoopTimerConfig(BaseConfig):
"""Configuration schema for the loop timer node type."""
max_duration: float = 60.0
duration_unit: str = "seconds"
reset_on_emit: bool = True
message: Optional[str] = None
passthrough: bool = False
@classmethod
def from_dict(
cls, data: Mapping[str, Any] | None, *, path: str
) -> "LoopTimerConfig":
mapping = require_mapping(data or {}, path)
max_duration_raw = mapping.get("max_duration", 60.0)
try:
max_duration = float(max_duration_raw)
except (TypeError, ValueError) as exc: # pragma: no cover - defensive
raise ConfigError(
"max_duration must be a number",
extend_path(path, "max_duration"),
) from exc
if max_duration <= 0:
raise ConfigError(
"max_duration must be > 0", extend_path(path, "max_duration")
)
duration_unit = str(mapping.get("duration_unit", "seconds"))
valid_units = ["seconds", "minutes", "hours"]
if duration_unit not in valid_units:
raise ConfigError(
f"duration_unit must be one of: {', '.join(valid_units)}",
extend_path(path, "duration_unit"),
)
reset_on_emit = bool(mapping.get("reset_on_emit", True))
message = optional_str(mapping, "message", path)
passthrough = bool(mapping.get("passthrough", False))
return cls(
max_duration=max_duration,
duration_unit=duration_unit,
reset_on_emit=reset_on_emit,
message=message,
passthrough=passthrough,
path=path,
)
def validate(self) -> None:
if self.max_duration <= 0:
raise ConfigError(
"max_duration must be > 0", extend_path(self.path, "max_duration")
)
valid_units = ["seconds", "minutes", "hours"]
if self.duration_unit not in valid_units:
raise ConfigError(
f"duration_unit must be one of: {', '.join(valid_units)}",
extend_path(self.path, "duration_unit"),
)
FIELD_SPECS = {
"max_duration": ConfigFieldSpec(
name="max_duration",
display_name="Maximum Duration",
type_hint="float",
required=True,
default=60.0,
description="How long the loop can run before this node emits an output.",
),
"duration_unit": ConfigFieldSpec(
name="duration_unit",
display_name="Duration Unit",
type_hint="str",
required=True,
default="seconds",
description="Unit of time for max_duration: 'seconds', 'minutes', or 'hours'.",
),
"reset_on_emit": ConfigFieldSpec(
name="reset_on_emit",
display_name="Reset After Emit",
type_hint="bool",
required=False,
default=True,
description="Whether to reset the internal timer after reaching the limit.",
advance=True,
),
"message": ConfigFieldSpec(
name="message",
display_name="Release Message",
type_hint="text",
required=False,
description="Optional text sent downstream once the time limit is reached.",
advance=True,
),
"passthrough": ConfigFieldSpec(
name="passthrough",
display_name="Passthrough Mode",
type_hint="bool",
required=False,
default=False,
description="If true, after emitting the limit message, all subsequent inputs pass through unchanged.",
advance=True,
),
}

View File

@ -12,6 +12,7 @@ from entity.configs.node.passthrough import PassthroughConfig
from entity.configs.node.literal import LiteralNodeConfig
from entity.configs.node.python_runner import PythonRunnerConfig
from entity.configs.node.loop_counter import LoopCounterConfig
from entity.configs.node.loop_timer import LoopTimerConfig
from runtime.node.executor.agent_executor import AgentNodeExecutor
from runtime.node.executor.human_executor import HumanNodeExecutor
from runtime.node.executor.passthrough_executor import PassthroughNodeExecutor
@ -19,6 +20,7 @@ from runtime.node.executor.literal_executor import LiteralNodeExecutor
from runtime.node.executor.python_executor import PythonNodeExecutor
from runtime.node.executor.subgraph_executor import SubgraphNodeExecutor
from runtime.node.executor.loop_counter_executor import LoopCounterNodeExecutor
from runtime.node.executor.loop_timer_executor import LoopTimerNodeExecutor
from runtime.node.registry import NodeCapabilities, register_node_type
@ -48,9 +50,10 @@ register_node_type(
"subgraph",
config_cls=SubgraphConfig,
executor_cls=SubgraphNodeExecutor,
capabilities=NodeCapabilities(
capabilities=NodeCapabilities(),
executor_factory=lambda context, subgraphs=None: SubgraphNodeExecutor(
context, subgraphs or {}
),
executor_factory=lambda context, subgraphs=None: SubgraphNodeExecutor(context, subgraphs or {}),
summary="Embeds (through file path or inline config) and runs another named subgraph within the current workflow",
)
@ -69,8 +72,7 @@ register_node_type(
"passthrough",
config_cls=PassthroughConfig,
executor_cls=PassthroughNodeExecutor,
capabilities=NodeCapabilities(
),
capabilities=NodeCapabilities(),
summary="Forwards prior node output downstream without modification",
)
@ -78,8 +80,7 @@ register_node_type(
"literal",
config_cls=LiteralNodeConfig,
executor_cls=LiteralNodeExecutor,
capabilities=NodeCapabilities(
),
capabilities=NodeCapabilities(),
summary="Emits the configured text message every time it is triggered",
)
@ -91,6 +92,14 @@ register_node_type(
summary="Blocks downstream edges until the configured iteration limit is reached, then emits a message to release the loop.",
)
register_node_type(
"loop_timer",
config_cls=LoopTimerConfig,
executor_cls=LoopTimerNodeExecutor,
capabilities=NodeCapabilities(),
summary="Blocks downstream edges until the configured time limit is reached, then emits a message to release the loop.",
)
# Register subgraph source types (file-based and inline config)
register_subgraph_source(
"config",

View File

@ -0,0 +1,148 @@
"""Loop timer guard node executor."""
import time
from typing import List, Dict, Any
from entity.configs import Node
from entity.configs.node.loop_timer import LoopTimerConfig
from entity.messages import Message, MessageRole
from runtime.node.executor.base import NodeExecutor
class LoopTimerNodeExecutor(NodeExecutor):
"""Track loop duration and emit output only after hitting the time limit.
Supports two modes:
1. Standard Mode (passthrough=False): Suppresses input until time limit, then emits message
2. Terminal Gate Mode (passthrough=True): Acts as a sequential switch
- Before limit: Pass input through unchanged
- At limit: Emit configured message, suppress original input
- After limit: Transparent gate, pass all subsequent messages through
"""
STATE_KEY = "loop_timer"
def execute(self, node: Node, inputs: List[Message]) -> List[Message]:
config = node.as_config(LoopTimerConfig)
if config is None:
raise ValueError(f"Node {node.id} missing loop_timer configuration")
state = self._get_state()
timer_state = state.setdefault(node.id, {})
# Initialize timer on first execution
current_time = time.time()
if "start_time" not in timer_state:
timer_state["start_time"] = current_time
timer_state["emitted"] = False
start_time = timer_state["start_time"]
elapsed_time = current_time - start_time
# Convert max_duration to seconds based on unit
max_duration_seconds = self._convert_to_seconds(
config.max_duration, config.duration_unit
)
# Check if time limit has been reached
limit_reached = elapsed_time >= max_duration_seconds
# Terminal Gate Mode (passthrough=True)
if config.passthrough:
if not limit_reached:
# Before limit: pass input through unchanged
self.log_manager.debug(
f"LoopTimer {node.id}: {elapsed_time:.1f}s / {max_duration_seconds:.1f}s "
f"(passthrough mode: forwarding input)"
)
return inputs
elif not timer_state["emitted"]:
# At limit: emit configured message, suppress original input
timer_state["emitted"] = True
if config.reset_on_emit:
timer_state["start_time"] = current_time
content = (
config.message
or f"Time limit reached ({config.max_duration} {config.duration_unit})"
)
metadata = {
"loop_timer": {
"elapsed_time": elapsed_time,
"max_duration": config.max_duration,
"duration_unit": config.duration_unit,
"reset_on_emit": config.reset_on_emit,
"passthrough": True,
}
}
self.log_manager.debug(
f"LoopTimer {node.id}: {elapsed_time:.1f}s / {max_duration_seconds:.1f}s "
f"(passthrough mode: emitting limit message)"
)
return [
Message(
role=MessageRole.ASSISTANT,
content=content,
metadata=metadata,
)
]
else:
# After limit: transparent gate, pass all subsequent messages through
self.log_manager.debug(
f"LoopTimer {node.id}: {elapsed_time:.1f}s (passthrough mode: transparent gate)"
)
return inputs
# Standard Mode (passthrough=False)
if not limit_reached:
self.log_manager.debug(
f"LoopTimer {node.id}: {elapsed_time:.1f}s / {max_duration_seconds:.1f}s "
f"(suppress downstream)"
)
return []
if config.reset_on_emit and not timer_state["emitted"]:
timer_state["start_time"] = current_time
timer_state["emitted"] = True
content = (
config.message
or f"Time limit reached ({config.max_duration} {config.duration_unit})"
)
metadata = {
"loop_timer": {
"elapsed_time": elapsed_time,
"max_duration": config.max_duration,
"duration_unit": config.duration_unit,
"reset_on_emit": config.reset_on_emit,
"passthrough": False,
}
}
self.log_manager.debug(
f"LoopTimer {node.id}: {elapsed_time:.1f}s / {max_duration_seconds:.1f}s "
f"reached limit, releasing output"
)
return [
Message(
role=MessageRole.ASSISTANT,
content=content,
metadata=metadata,
)
]
def _get_state(self) -> Dict[str, Dict[str, Any]]:
return self.context.global_state.setdefault(self.STATE_KEY, {})
def _convert_to_seconds(self, duration: float, unit: str) -> float:
"""Convert duration to seconds based on unit."""
unit_multipliers = {
"seconds": 1.0,
"minutes": 60.0,
"hours": 3600.0,
}
return duration * unit_multipliers.get(unit, 1.0)

View File

@ -0,0 +1,52 @@
version: 0.4.0
graph:
start:
- Writer
end:
- Finalizer
id: loop_timer_demo
description: LoopTimer demo that releases output after 5 seconds of elapsed time.
is_majority_voting: false
log_level: INFO
nodes:
- id: Writer
type: literal
description: Responsible for outputting a fixed draft.
config:
content: Draft iteration from Writer
role: assistant
- id: Critic
type: literal
description: Simulates feedback, always requesting further revisions.
config:
content: Please revise again
role: user
- id: Timer Gate
type: loop_timer
description: Counts elapsed time, only granting passage after 5 seconds.
config:
max_duration: 5
duration_unit: seconds
reset_on_emit: true
message: Loop finished after 5 seconds
- id: Finalizer
type: literal
description: Receives the release signal from Timer Gate and outputs the final statement.
config:
content: Final summary released
role: assistant
edges:
- from: Writer
to: Critic
- from: Critic
to: Writer
- from: Critic
to: Timer Gate
- from: Timer Gate
to: Writer
trigger: true
condition: 'true'
carry_data: true
keep_message: false
- from: Timer Gate
to: Finalizer

View File

@ -0,0 +1,51 @@
version: 0.4.0
graph:
start:
- Writer
end:
- Finalizer
id: loop_timer_passthrough_demo
description: LoopTimer passthrough mode demo - passes through messages before the limit, emits at the limit, then becomes transparent.
is_majority_voting: false
log_level: INFO
nodes:
- id: Writer
type: literal
description: Outputs a draft message.
config:
content: Draft iteration from Writer
role: assistant
- id: Critic
type: literal
description: Provides feedback.
config:
content: Please revise again
role: user
- id: Timer Gate
type: loop_timer
description: Passthrough mode - passes messages through before 5 seconds, emits limit message at 5 seconds, then transparent.
config:
max_duration: 5
duration_unit: seconds
reset_on_emit: false
message: Time limit reached - switching to passthrough
passthrough: true
- id: Finalizer
type: literal
description: Receives messages.
config:
content: Final summary released
role: assistant
edges:
- from: Writer
to: Critic
- from: Critic
to: Timer Gate
- from: Timer Gate
to: Writer
trigger: true
condition: 'true'
carry_data: true
keep_message: false
- from: Timer Gate
to: Finalizer