ChatDev/schema_registry/registry.py
2026-01-07 16:24:01 +08:00

291 lines
8.9 KiB
Python
Executable File

"""Schema registries for entity-layer configuration classes."""
from dataclasses import dataclass, field
from typing import Any, Dict, Mapping, MutableMapping, Type
from entity.configs.base import BaseConfig
class SchemaLookupError(RuntimeError):
"""Raised when a requested schema spec is not registered."""
class SchemaRegistrationError(RuntimeError):
"""Raised when schema registration is inconsistent."""
@dataclass
class NodeSchemaSpec:
name: str
config_cls: Type["BaseConfig"]
summary: str | None = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class EdgeConditionSchemaSpec:
name: str
config_cls: Type["BaseConfig"]
summary: str | None = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class EdgeProcessorSchemaSpec:
name: str
config_cls: Type["BaseConfig"]
summary: str | None = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class MemoryStoreSchemaSpec:
name: str
config_cls: Type["BaseConfig"]
summary: str | None = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ThinkingSchemaSpec:
name: str
config_cls: Type["BaseConfig"]
summary: str | None = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ModelProviderSchemaSpec:
name: str
label: str | None = None
summary: str | None = None
metadata: Dict[str, Any] = field(default_factory=dict)
_node_schemas: Dict[str, NodeSchemaSpec] = {}
_edge_condition_schemas: Dict[str, EdgeConditionSchemaSpec] = {}
_edge_processor_schemas: Dict[str, EdgeProcessorSchemaSpec] = {}
_edge_processor_builtins_loaded = False
_memory_store_schemas: Dict[str, MemoryStoreSchemaSpec] = {}
_thinking_schemas: Dict[str, ThinkingSchemaSpec] = {}
_model_provider_schemas: Dict[str, ModelProviderSchemaSpec] = {}
def _update_metadata(target: MutableMapping[str, Any], new_items: Mapping[str, Any] | None) -> None:
if not new_items:
return
target.update({key: value for key, value in new_items.items() if value is not None})
def register_node_schema(
name: str,
*,
config_cls: Type["BaseConfig"],
summary: str | None = None,
metadata: Mapping[str, Any] | None = None,
) -> NodeSchemaSpec:
spec = _node_schemas.get(name)
if spec:
if spec.config_cls is not config_cls:
raise SchemaRegistrationError(
f"Node schema '{name}' already registered with a different config class"
)
if summary:
spec.summary = summary
_update_metadata(spec.metadata, metadata)
return spec
spec = NodeSchemaSpec(name=name, config_cls=config_cls, summary=summary)
_update_metadata(spec.metadata, metadata)
_node_schemas[name] = spec
return spec
def iter_node_schemas() -> Dict[str, NodeSchemaSpec]:
return dict(_node_schemas)
def get_node_schema(name: str) -> NodeSchemaSpec:
try:
return _node_schemas[name]
except KeyError as exc: # pragma: no cover - defensive guard
raise SchemaLookupError(f"Node schema '{name}' is not registered") from exc
def register_edge_condition_schema(
name: str,
*,
config_cls: Type["BaseConfig"],
summary: str | None = None,
metadata: Mapping[str, Any] | None = None,
) -> EdgeConditionSchemaSpec:
spec = _edge_condition_schemas.get(name)
if spec:
if spec.config_cls is not config_cls:
raise SchemaRegistrationError(
f"Edge condition schema '{name}' already registered with a different config class"
)
if summary:
spec.summary = summary
_update_metadata(spec.metadata, metadata)
return spec
spec = EdgeConditionSchemaSpec(name=name, config_cls=config_cls, summary=summary)
_update_metadata(spec.metadata, metadata)
_edge_condition_schemas[name] = spec
return spec
def iter_edge_condition_schemas() -> Dict[str, EdgeConditionSchemaSpec]:
return dict(_edge_condition_schemas)
def get_edge_condition_schema(name: str) -> EdgeConditionSchemaSpec:
try:
return _edge_condition_schemas[name]
except KeyError as exc: # pragma: no cover
raise SchemaLookupError(f"Edge condition schema '{name}' is not registered") from exc
def register_edge_processor_schema(
name: str,
*,
config_cls: Type["BaseConfig"],
summary: str | None = None,
metadata: Mapping[str, Any] | None = None,
) -> EdgeProcessorSchemaSpec:
spec = _edge_processor_schemas.get(name)
if spec:
if spec.config_cls is not config_cls:
raise SchemaRegistrationError(
f"Edge processor schema '{name}' already registered with a different config class"
)
if summary:
spec.summary = summary
_update_metadata(spec.metadata, metadata)
return spec
spec = EdgeProcessorSchemaSpec(name=name, config_cls=config_cls, summary=summary)
_update_metadata(spec.metadata, metadata)
_edge_processor_schemas[name] = spec
return spec
def iter_edge_processor_schemas() -> Dict[str, EdgeProcessorSchemaSpec]:
_ensure_edge_processor_builtins_loaded()
return dict(_edge_processor_schemas)
def get_edge_processor_schema(name: str) -> EdgeProcessorSchemaSpec:
_ensure_edge_processor_builtins_loaded()
try:
return _edge_processor_schemas[name]
except KeyError as exc: # pragma: no cover
raise SchemaLookupError(f"Edge processor schema '{name}' is not registered") from exc
def register_memory_store_schema(
name: str,
*,
config_cls: Type["BaseConfig"],
summary: str | None = None,
metadata: Mapping[str, Any] | None = None,
) -> MemoryStoreSchemaSpec:
spec = _memory_store_schemas.get(name)
if spec:
if spec.config_cls is not config_cls:
raise SchemaRegistrationError(
f"Memory store schema '{name}' already registered with a different config class"
)
if summary:
spec.summary = summary
_update_metadata(spec.metadata, metadata)
return spec
spec = MemoryStoreSchemaSpec(name=name, config_cls=config_cls, summary=summary)
_update_metadata(spec.metadata, metadata)
_memory_store_schemas[name] = spec
return spec
def iter_memory_store_schemas() -> Dict[str, MemoryStoreSchemaSpec]:
return dict(_memory_store_schemas)
def get_memory_store_schema(name: str) -> MemoryStoreSchemaSpec:
try:
return _memory_store_schemas[name]
except KeyError as exc: # pragma: no cover
raise SchemaLookupError(f"Memory store schema '{name}' is not registered") from exc
def register_thinking_schema(
name: str,
*,
config_cls: Type["BaseConfig"],
summary: str | None = None,
metadata: Mapping[str, Any] | None = None,
) -> ThinkingSchemaSpec:
spec = _thinking_schemas.get(name)
if spec:
if spec.config_cls is not config_cls:
raise SchemaRegistrationError(
f"Thinking schema '{name}' already registered with a different config class"
)
if summary:
spec.summary = summary
_update_metadata(spec.metadata, metadata)
return spec
spec = ThinkingSchemaSpec(name=name, config_cls=config_cls, summary=summary)
_update_metadata(spec.metadata, metadata)
_thinking_schemas[name] = spec
return spec
def iter_thinking_schemas() -> Dict[str, ThinkingSchemaSpec]:
return dict(_thinking_schemas)
def get_thinking_schema(name: str) -> ThinkingSchemaSpec:
try:
return _thinking_schemas[name]
except KeyError as exc: # pragma: no cover
raise SchemaLookupError(f"Thinking schema '{name}' is not registered") from exc
def register_model_provider_schema(
name: str,
*,
label: str | None = None,
summary: str | None = None,
metadata: Mapping[str, Any] | None = None,
) -> ModelProviderSchemaSpec:
spec = _model_provider_schemas.get(name)
if spec:
if label:
spec.label = label
if summary:
spec.summary = summary
_update_metadata(spec.metadata, metadata)
return spec
spec = ModelProviderSchemaSpec(name=name, label=label, summary=summary)
_update_metadata(spec.metadata, metadata)
_model_provider_schemas[name] = spec
return spec
def iter_model_provider_schemas() -> Dict[str, ModelProviderSchemaSpec]:
return dict(_model_provider_schemas)
def get_model_provider_schema(name: str) -> ModelProviderSchemaSpec:
try:
return _model_provider_schemas[name]
except KeyError as exc: # pragma: no cover
raise SchemaLookupError(f"Model provider schema '{name}' is not registered") from exc
def _ensure_edge_processor_builtins_loaded() -> None:
global _edge_processor_builtins_loaded
if _edge_processor_builtins_loaded:
return
try:
import runtime.edge.processors.builtin_types # noqa: F401
except Exception:
pass
_edge_processor_builtins_loaded = True