mirror of
https://github.com/OpenBMB/ChatDev.git
synced 2026-04-25 19:28:09 +00:00
93 lines
3.1 KiB
Python
Executable File
93 lines
3.1 KiB
Python
Executable File
"""Registry helpers for pluggable edge condition managers."""
|
|
|
|
from dataclasses import dataclass
|
|
from importlib import import_module
|
|
from typing import Any, Dict, Type
|
|
|
|
from schema_registry import register_edge_condition_schema
|
|
from utils.registry import Registry, RegistryEntry, RegistryError
|
|
from entity.configs.edge.edge_condition import EdgeConditionConfig, EdgeConditionTypeConfig
|
|
from .base import EdgeConditionManager, ConditionFactoryContext
|
|
from ...node.executor import ExecutionContext
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class EdgeConditionRegistration:
|
|
"""Registry entry describing a pluggable condition type."""
|
|
|
|
name: str
|
|
config_cls: Type["EdgeConditionTypeConfig"]
|
|
manager_cls: Type["EdgeConditionManager[Any]"]
|
|
summary: str | None = None
|
|
|
|
|
|
edge_condition_registry = Registry("edge_condition_type")
|
|
_BUILTINS_LOADED = False
|
|
|
|
|
|
def _ensure_builtins_loaded() -> None:
|
|
global _BUILTINS_LOADED
|
|
if not _BUILTINS_LOADED:
|
|
import_module("runtime.edge.conditions.builtin_types")
|
|
_BUILTINS_LOADED = True
|
|
|
|
|
|
def register_edge_condition(
|
|
name: str,
|
|
*,
|
|
config_cls: Type["EdgeConditionTypeConfig"],
|
|
manager_cls: Type["EdgeConditionManager[Any]"],
|
|
summary: str | None = None,
|
|
) -> None:
|
|
"""Register a manager class that encapsulates edge processing logic."""
|
|
if name in edge_condition_registry.names():
|
|
raise RegistryError(f"Edge condition type '{name}' already registered")
|
|
entry = EdgeConditionRegistration(
|
|
name=name,
|
|
config_cls=config_cls,
|
|
manager_cls=manager_cls,
|
|
summary=summary,
|
|
)
|
|
edge_condition_registry.register(name, target=entry)
|
|
register_edge_condition_schema(name, config_cls=config_cls, summary=summary)
|
|
|
|
|
|
def get_edge_condition_registration(name: str) -> EdgeConditionRegistration:
|
|
"""Retrieve a registered condition type."""
|
|
_ensure_builtins_loaded()
|
|
entry: RegistryEntry = edge_condition_registry.get(name)
|
|
registration = entry.load()
|
|
if not isinstance(registration, EdgeConditionRegistration):
|
|
raise RegistryError(f"Entry '{name}' is not an EdgeConditionRegistration")
|
|
return registration
|
|
|
|
|
|
def iter_edge_condition_registrations() -> Dict[str, EdgeConditionRegistration]:
|
|
"""Iterate over registered condition types."""
|
|
_ensure_builtins_loaded()
|
|
return {name: entry.load() for name, entry in edge_condition_registry.items()}
|
|
|
|
|
|
def build_edge_condition_manager(
|
|
condition: "EdgeConditionConfig",
|
|
context: ConditionFactoryContext,
|
|
execution_context: ExecutionContext
|
|
) -> "EdgeConditionManager[Any]":
|
|
"""Instantiate the manager responsible for a specific edge."""
|
|
registration = get_edge_condition_registration(condition.type)
|
|
manager_cls = registration.manager_cls
|
|
if not manager_cls:
|
|
raise RegistryError(
|
|
f"Edge condition type '{condition.type}' does not provide a manager implementation"
|
|
)
|
|
return manager_cls(condition.config, context, execution_context)
|
|
|
|
|
|
__all__ = [
|
|
"EdgeConditionRegistration",
|
|
"register_edge_condition",
|
|
"get_edge_condition_registration",
|
|
"iter_edge_condition_registrations",
|
|
"build_edge_condition_manager",
|
|
]
|