greatmengqi 84dccef230 refactor(config): Phase 2 — eliminate AppConfig.current() ambient lookup
Finish Phase 2 of the config refactor: production code no longer calls
AppConfig.current() anywhere. AppConfig now flows as an explicit parameter
down every consumer lane.

Call-site migrations
--------------------
- Memory subsystem (queue/updater/storage): MemoryConfig captured at
  enqueue time so the Timer closure survives the ContextVar boundary.
- Sandbox layer: tools.py, security.py, sandbox_provider.py, local_sandbox_provider,
  aio_sandbox_provider all take app_config explicitly. Module-level
  caching in tools.py's path helpers is removed — pure parameter flow.
- Skills layer: manager.py + loader.py + lead_agent.prompt cache refresh
  all thread app_config; cache worker closes over it.
- Community tools (tavily, jina, firecrawl, exa, ddg, image_search,
  infoquest, aio_sandbox): read runtime.context.app_config.
- Subagents registry: get_subagent_config / list_subagents /
  get_available_subagent_names require app_config.
- Runtime worker: requires RunContext.app_config; no fallback.
- Gateway routers (uploads, skills): add Depends(get_config).
- Channels feishu: uses AppConfig.from_file() (pure) at its sync boundary.
- LangGraph Server bootstrap (make_lead_agent): falls back to
  AppConfig.from_file() — pure load, not ambient lookup.

Context resolution
------------------
- resolve_context(runtime) now raises on non-DeerFlowContext runtime.context.
  Every entry point attaches typed context; dict/None shapes are rejected
  loudly instead of being papered over with an ambient AppConfig lookup.

AppConfig lifecycle
-------------------
- AppConfig.current() kept as a deprecated slot that raises RuntimeError,
  purely so legacy tests that still run `patch.object(AppConfig, "current")`
  don't trip AttributeError at teardown. Production never calls it.
- conftest autouse fixture no longer monkey-patches `current` — it only
  stubs `from_file()` so tests don't need a real config.yaml.

Design refs
-----------
- docs/plans/2026-04-12-config-refactor-plan.md (Phase 2: P2-6..P2-10)
- docs/plans/2026-04-12-config-refactor-design.md §8

All 2338 non-e2e tests pass. Zero AppConfig.current() call sites remain
in backend/packages or backend/app (docstrings in deps.py excepted).
2026-04-17 11:14:13 +08:00

232 lines
9.0 KiB
Python

"""Memory storage providers."""
import abc
import json
import logging
import threading
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from deerflow.config.agents_config import AGENT_NAME_PATTERN
from deerflow.config.memory_config import MemoryConfig
from deerflow.config.paths import get_paths
logger = logging.getLogger(__name__)
def utc_now_iso_z() -> str:
"""Current UTC time as ISO-8601 with ``Z`` suffix (matches prior naive-UTC output)."""
return datetime.now(UTC).isoformat().removesuffix("+00:00") + "Z"
def create_empty_memory() -> dict[str, Any]:
"""Create an empty memory structure."""
return {
"version": "1.0",
"lastUpdated": utc_now_iso_z(),
"user": {
"workContext": {"summary": "", "updatedAt": ""},
"personalContext": {"summary": "", "updatedAt": ""},
"topOfMind": {"summary": "", "updatedAt": ""},
},
"history": {
"recentMonths": {"summary": "", "updatedAt": ""},
"earlierContext": {"summary": "", "updatedAt": ""},
"longTermBackground": {"summary": "", "updatedAt": ""},
},
"facts": [],
}
class MemoryStorage(abc.ABC):
"""Abstract base class for memory storage providers."""
@abc.abstractmethod
def load(self, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
"""Load memory data for the given agent."""
pass
@abc.abstractmethod
def reload(self, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
"""Force reload memory data for the given agent."""
pass
@abc.abstractmethod
def save(self, memory_data: dict[str, Any], agent_name: str | None = None, *, user_id: str | None = None) -> bool:
"""Save memory data for the given agent."""
pass
class FileMemoryStorage(MemoryStorage):
"""File-based memory storage provider."""
def __init__(self, memory_config: MemoryConfig):
"""Initialize the file memory storage.
Args:
memory_config: Memory configuration (storage_path etc.). Stored on
the instance so per-request lookups don't need to reach for
ambient state.
"""
self._memory_config = memory_config
# Per-user/agent memory cache: keyed by (user_id, agent_name) tuple (None = global)
# Value: (memory_data, file_mtime)
self._memory_cache: dict[tuple[str | None, str | None], tuple[dict[str, Any], float | None]] = {}
def _validate_agent_name(self, agent_name: str) -> None:
"""Validate that the agent name is safe to use in filesystem paths.
Uses the repository's established AGENT_NAME_PATTERN to ensure consistency
across the codebase and prevent path traversal or other problematic characters.
"""
if not agent_name:
raise ValueError("Agent name must be a non-empty string.")
if not AGENT_NAME_PATTERN.match(agent_name):
raise ValueError(f"Invalid agent name {agent_name!r}: names must match {AGENT_NAME_PATTERN.pattern}")
def _get_memory_file_path(self, agent_name: str | None = None, *, user_id: str | None = None) -> Path:
"""Get the path to the memory file."""
config = self._memory_config
if user_id is not None:
if agent_name is not None:
self._validate_agent_name(agent_name)
return get_paths().user_agent_memory_file(user_id, agent_name)
if config.storage_path and Path(config.storage_path).is_absolute():
return Path(config.storage_path)
return get_paths().user_memory_file(user_id)
# Legacy: no user_id
if agent_name is not None:
self._validate_agent_name(agent_name)
return get_paths().agent_memory_file(agent_name)
if config.storage_path:
p = Path(config.storage_path)
return p if p.is_absolute() else get_paths().base_dir / p
return get_paths().memory_file
def _load_memory_from_file(self, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
"""Load memory data from file."""
file_path = self._get_memory_file_path(agent_name, user_id=user_id)
if not file_path.exists():
return create_empty_memory()
try:
with open(file_path, encoding="utf-8") as f:
data = json.load(f)
return data
except (json.JSONDecodeError, OSError) as e:
logger.warning("Failed to load memory file: %s", e)
return create_empty_memory()
def load(self, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
"""Load memory data (cached with file modification time check)."""
file_path = self._get_memory_file_path(agent_name, user_id=user_id)
try:
current_mtime = file_path.stat().st_mtime if file_path.exists() else None
except OSError:
current_mtime = None
cache_key = (user_id, agent_name)
cached = self._memory_cache.get(cache_key)
if cached is None or cached[1] != current_mtime:
memory_data = self._load_memory_from_file(agent_name, user_id=user_id)
self._memory_cache[cache_key] = (memory_data, current_mtime)
return memory_data
return cached[0]
def reload(self, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]:
"""Reload memory data from file, forcing cache invalidation."""
file_path = self._get_memory_file_path(agent_name, user_id=user_id)
memory_data = self._load_memory_from_file(agent_name, user_id=user_id)
try:
mtime = file_path.stat().st_mtime if file_path.exists() else None
except OSError:
mtime = None
cache_key = (user_id, agent_name)
self._memory_cache[cache_key] = (memory_data, mtime)
return memory_data
def save(self, memory_data: dict[str, Any], agent_name: str | None = None, *, user_id: str | None = None) -> bool:
"""Save memory data to file and update cache."""
file_path = self._get_memory_file_path(agent_name, user_id=user_id)
try:
file_path.parent.mkdir(parents=True, exist_ok=True)
memory_data["lastUpdated"] = utc_now_iso_z()
temp_path = file_path.with_suffix(".tmp")
with open(temp_path, "w", encoding="utf-8") as f:
json.dump(memory_data, f, indent=2, ensure_ascii=False)
temp_path.replace(file_path)
try:
mtime = file_path.stat().st_mtime
except OSError:
mtime = None
cache_key = (user_id, agent_name)
self._memory_cache[cache_key] = (memory_data, mtime)
logger.info("Memory saved to %s", file_path)
return True
except OSError as e:
logger.error("Failed to save memory file: %s", e)
return False
# Instances keyed by (storage_class_path, id(memory_config)) so tests can
# construct isolated storages and multi-client setups with different configs
# don't collide on a single process-wide singleton.
_storage_instances: dict[tuple[str, int], MemoryStorage] = {}
_storage_lock = threading.Lock()
def get_memory_storage(memory_config: MemoryConfig) -> MemoryStorage:
"""Get the configured memory storage instance.
Caches one instance per ``(storage_class, memory_config)`` pair. In
single-config deployments this collapses to one instance; in multi-client
or test scenarios each config gets its own storage.
"""
key = (memory_config.storage_class, id(memory_config))
existing = _storage_instances.get(key)
if existing is not None:
return existing
with _storage_lock:
existing = _storage_instances.get(key)
if existing is not None:
return existing
storage_class_path = memory_config.storage_class
try:
module_path, class_name = storage_class_path.rsplit(".", 1)
import importlib
module = importlib.import_module(module_path)
storage_class = getattr(module, class_name)
# Validate that the configured storage is a MemoryStorage implementation
if not isinstance(storage_class, type):
raise TypeError(f"Configured memory storage '{storage_class_path}' is not a class: {storage_class!r}")
if not issubclass(storage_class, MemoryStorage):
raise TypeError(f"Configured memory storage '{storage_class_path}' is not a subclass of MemoryStorage")
instance = storage_class(memory_config)
except Exception as e:
logger.error(
"Failed to load memory storage %s, falling back to FileMemoryStorage: %s",
storage_class_path,
e,
)
instance = FileMemoryStorage(memory_config)
_storage_instances[key] = instance
return instance