mirror of
https://github.com/OpenBMB/ChatDev.git
synced 2026-04-25 11:18:06 +00:00
291 lines
8.9 KiB
Python
Executable File
291 lines
8.9 KiB
Python
Executable File
"""Schema registries for entity-layer configuration classes."""
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Dict, Mapping, MutableMapping, Type
|
|
from entity.configs.base import BaseConfig
|
|
|
|
|
|
class SchemaLookupError(RuntimeError):
|
|
"""Raised when a requested schema spec is not registered."""
|
|
|
|
|
|
class SchemaRegistrationError(RuntimeError):
|
|
"""Raised when schema registration is inconsistent."""
|
|
|
|
|
|
@dataclass
|
|
class NodeSchemaSpec:
|
|
name: str
|
|
config_cls: Type["BaseConfig"]
|
|
summary: str | None = None
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class EdgeConditionSchemaSpec:
|
|
name: str
|
|
config_cls: Type["BaseConfig"]
|
|
summary: str | None = None
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class EdgeProcessorSchemaSpec:
|
|
name: str
|
|
config_cls: Type["BaseConfig"]
|
|
summary: str | None = None
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class MemoryStoreSchemaSpec:
|
|
name: str
|
|
config_cls: Type["BaseConfig"]
|
|
summary: str | None = None
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class ThinkingSchemaSpec:
|
|
name: str
|
|
config_cls: Type["BaseConfig"]
|
|
summary: str | None = None
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class ModelProviderSchemaSpec:
|
|
name: str
|
|
label: str | None = None
|
|
summary: str | None = None
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
_node_schemas: Dict[str, NodeSchemaSpec] = {}
|
|
_edge_condition_schemas: Dict[str, EdgeConditionSchemaSpec] = {}
|
|
_edge_processor_schemas: Dict[str, EdgeProcessorSchemaSpec] = {}
|
|
_edge_processor_builtins_loaded = False
|
|
_memory_store_schemas: Dict[str, MemoryStoreSchemaSpec] = {}
|
|
_thinking_schemas: Dict[str, ThinkingSchemaSpec] = {}
|
|
_model_provider_schemas: Dict[str, ModelProviderSchemaSpec] = {}
|
|
|
|
|
|
def _update_metadata(target: MutableMapping[str, Any], new_items: Mapping[str, Any] | None) -> None:
|
|
if not new_items:
|
|
return
|
|
target.update({key: value for key, value in new_items.items() if value is not None})
|
|
|
|
|
|
def register_node_schema(
|
|
name: str,
|
|
*,
|
|
config_cls: Type["BaseConfig"],
|
|
summary: str | None = None,
|
|
metadata: Mapping[str, Any] | None = None,
|
|
) -> NodeSchemaSpec:
|
|
spec = _node_schemas.get(name)
|
|
if spec:
|
|
if spec.config_cls is not config_cls:
|
|
raise SchemaRegistrationError(
|
|
f"Node schema '{name}' already registered with a different config class"
|
|
)
|
|
if summary:
|
|
spec.summary = summary
|
|
_update_metadata(spec.metadata, metadata)
|
|
return spec
|
|
spec = NodeSchemaSpec(name=name, config_cls=config_cls, summary=summary)
|
|
_update_metadata(spec.metadata, metadata)
|
|
_node_schemas[name] = spec
|
|
return spec
|
|
|
|
|
|
def iter_node_schemas() -> Dict[str, NodeSchemaSpec]:
|
|
return dict(_node_schemas)
|
|
|
|
|
|
def get_node_schema(name: str) -> NodeSchemaSpec:
|
|
try:
|
|
return _node_schemas[name]
|
|
except KeyError as exc: # pragma: no cover - defensive guard
|
|
raise SchemaLookupError(f"Node schema '{name}' is not registered") from exc
|
|
|
|
|
|
def register_edge_condition_schema(
|
|
name: str,
|
|
*,
|
|
config_cls: Type["BaseConfig"],
|
|
summary: str | None = None,
|
|
metadata: Mapping[str, Any] | None = None,
|
|
) -> EdgeConditionSchemaSpec:
|
|
spec = _edge_condition_schemas.get(name)
|
|
if spec:
|
|
if spec.config_cls is not config_cls:
|
|
raise SchemaRegistrationError(
|
|
f"Edge condition schema '{name}' already registered with a different config class"
|
|
)
|
|
if summary:
|
|
spec.summary = summary
|
|
_update_metadata(spec.metadata, metadata)
|
|
return spec
|
|
spec = EdgeConditionSchemaSpec(name=name, config_cls=config_cls, summary=summary)
|
|
_update_metadata(spec.metadata, metadata)
|
|
_edge_condition_schemas[name] = spec
|
|
return spec
|
|
|
|
|
|
def iter_edge_condition_schemas() -> Dict[str, EdgeConditionSchemaSpec]:
|
|
return dict(_edge_condition_schemas)
|
|
|
|
|
|
def get_edge_condition_schema(name: str) -> EdgeConditionSchemaSpec:
|
|
try:
|
|
return _edge_condition_schemas[name]
|
|
except KeyError as exc: # pragma: no cover
|
|
raise SchemaLookupError(f"Edge condition schema '{name}' is not registered") from exc
|
|
|
|
|
|
def register_edge_processor_schema(
|
|
name: str,
|
|
*,
|
|
config_cls: Type["BaseConfig"],
|
|
summary: str | None = None,
|
|
metadata: Mapping[str, Any] | None = None,
|
|
) -> EdgeProcessorSchemaSpec:
|
|
spec = _edge_processor_schemas.get(name)
|
|
if spec:
|
|
if spec.config_cls is not config_cls:
|
|
raise SchemaRegistrationError(
|
|
f"Edge processor schema '{name}' already registered with a different config class"
|
|
)
|
|
if summary:
|
|
spec.summary = summary
|
|
_update_metadata(spec.metadata, metadata)
|
|
return spec
|
|
spec = EdgeProcessorSchemaSpec(name=name, config_cls=config_cls, summary=summary)
|
|
_update_metadata(spec.metadata, metadata)
|
|
_edge_processor_schemas[name] = spec
|
|
return spec
|
|
|
|
|
|
def iter_edge_processor_schemas() -> Dict[str, EdgeProcessorSchemaSpec]:
|
|
_ensure_edge_processor_builtins_loaded()
|
|
return dict(_edge_processor_schemas)
|
|
|
|
|
|
def get_edge_processor_schema(name: str) -> EdgeProcessorSchemaSpec:
|
|
_ensure_edge_processor_builtins_loaded()
|
|
try:
|
|
return _edge_processor_schemas[name]
|
|
except KeyError as exc: # pragma: no cover
|
|
raise SchemaLookupError(f"Edge processor schema '{name}' is not registered") from exc
|
|
|
|
|
|
def register_memory_store_schema(
|
|
name: str,
|
|
*,
|
|
config_cls: Type["BaseConfig"],
|
|
summary: str | None = None,
|
|
metadata: Mapping[str, Any] | None = None,
|
|
) -> MemoryStoreSchemaSpec:
|
|
spec = _memory_store_schemas.get(name)
|
|
if spec:
|
|
if spec.config_cls is not config_cls:
|
|
raise SchemaRegistrationError(
|
|
f"Memory store schema '{name}' already registered with a different config class"
|
|
)
|
|
if summary:
|
|
spec.summary = summary
|
|
_update_metadata(spec.metadata, metadata)
|
|
return spec
|
|
spec = MemoryStoreSchemaSpec(name=name, config_cls=config_cls, summary=summary)
|
|
_update_metadata(spec.metadata, metadata)
|
|
_memory_store_schemas[name] = spec
|
|
return spec
|
|
|
|
|
|
def iter_memory_store_schemas() -> Dict[str, MemoryStoreSchemaSpec]:
|
|
return dict(_memory_store_schemas)
|
|
|
|
|
|
def get_memory_store_schema(name: str) -> MemoryStoreSchemaSpec:
|
|
try:
|
|
return _memory_store_schemas[name]
|
|
except KeyError as exc: # pragma: no cover
|
|
raise SchemaLookupError(f"Memory store schema '{name}' is not registered") from exc
|
|
|
|
|
|
def register_thinking_schema(
|
|
name: str,
|
|
*,
|
|
config_cls: Type["BaseConfig"],
|
|
summary: str | None = None,
|
|
metadata: Mapping[str, Any] | None = None,
|
|
) -> ThinkingSchemaSpec:
|
|
spec = _thinking_schemas.get(name)
|
|
if spec:
|
|
if spec.config_cls is not config_cls:
|
|
raise SchemaRegistrationError(
|
|
f"Thinking schema '{name}' already registered with a different config class"
|
|
)
|
|
if summary:
|
|
spec.summary = summary
|
|
_update_metadata(spec.metadata, metadata)
|
|
return spec
|
|
spec = ThinkingSchemaSpec(name=name, config_cls=config_cls, summary=summary)
|
|
_update_metadata(spec.metadata, metadata)
|
|
_thinking_schemas[name] = spec
|
|
return spec
|
|
|
|
|
|
def iter_thinking_schemas() -> Dict[str, ThinkingSchemaSpec]:
|
|
return dict(_thinking_schemas)
|
|
|
|
|
|
def get_thinking_schema(name: str) -> ThinkingSchemaSpec:
|
|
try:
|
|
return _thinking_schemas[name]
|
|
except KeyError as exc: # pragma: no cover
|
|
raise SchemaLookupError(f"Thinking schema '{name}' is not registered") from exc
|
|
|
|
|
|
def register_model_provider_schema(
|
|
name: str,
|
|
*,
|
|
label: str | None = None,
|
|
summary: str | None = None,
|
|
metadata: Mapping[str, Any] | None = None,
|
|
) -> ModelProviderSchemaSpec:
|
|
spec = _model_provider_schemas.get(name)
|
|
if spec:
|
|
if label:
|
|
spec.label = label
|
|
if summary:
|
|
spec.summary = summary
|
|
_update_metadata(spec.metadata, metadata)
|
|
return spec
|
|
spec = ModelProviderSchemaSpec(name=name, label=label, summary=summary)
|
|
_update_metadata(spec.metadata, metadata)
|
|
_model_provider_schemas[name] = spec
|
|
return spec
|
|
|
|
|
|
def iter_model_provider_schemas() -> Dict[str, ModelProviderSchemaSpec]:
|
|
return dict(_model_provider_schemas)
|
|
|
|
|
|
def get_model_provider_schema(name: str) -> ModelProviderSchemaSpec:
|
|
try:
|
|
return _model_provider_schemas[name]
|
|
except KeyError as exc: # pragma: no cover
|
|
raise SchemaLookupError(f"Model provider schema '{name}' is not registered") from exc
|
|
|
|
def _ensure_edge_processor_builtins_loaded() -> None:
|
|
global _edge_processor_builtins_loaded
|
|
if _edge_processor_builtins_loaded:
|
|
return
|
|
try:
|
|
import runtime.edge.processors.builtin_types # noqa: F401
|
|
except Exception:
|
|
pass
|
|
_edge_processor_builtins_loaded = True
|