From 3877aabcfd6f8c4a0817c3222e61c52ef5ff4d32 Mon Sep 17 00:00:00 2001 From: rayhpeng Date: Sun, 12 Apr 2026 12:37:25 +0800 Subject: [PATCH] feat(memory): add user_id to MemoryStorage interface for per-user isolation Thread user_id through MemoryStorage.load/reload/save abstract methods and FileMemoryStorage, re-keying the in-memory cache from bare agent_name to a (user_id, agent_name) tuple to prevent cross-user cache collisions. Co-Authored-By: Claude Sonnet 4.6 --- .../harness/deerflow/agents/memory/storage.py | 53 ++++--- .../test_memory_storage_user_isolation.py | 150 ++++++++++++++++++ 2 files changed, 182 insertions(+), 21 deletions(-) create mode 100644 backend/tests/test_memory_storage_user_isolation.py diff --git a/backend/packages/harness/deerflow/agents/memory/storage.py b/backend/packages/harness/deerflow/agents/memory/storage.py index 3d57d059b..d35fefa9d 100644 --- a/backend/packages/harness/deerflow/agents/memory/storage.py +++ b/backend/packages/harness/deerflow/agents/memory/storage.py @@ -43,17 +43,17 @@ class MemoryStorage(abc.ABC): """Abstract base class for memory storage providers.""" @abc.abstractmethod - def load(self, agent_name: str | None = None) -> dict[str, Any]: + def load(self, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]: """Load memory data for the given agent.""" pass @abc.abstractmethod - def reload(self, agent_name: str | None = None) -> dict[str, Any]: + def reload(self, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]: """Force reload memory data for the given agent.""" pass @abc.abstractmethod - def save(self, memory_data: dict[str, Any], agent_name: str | None = None) -> bool: + def save(self, memory_data: dict[str, Any], agent_name: str | None = None, *, user_id: str | None = None) -> bool: """Save memory data for the given agent.""" pass @@ -63,9 +63,9 @@ class FileMemoryStorage(MemoryStorage): def __init__(self): """Initialize the file memory storage.""" - # Per-agent memory cache: keyed by agent_name (None = global) + # Per-user/agent memory cache: keyed by (user_id, agent_name) tuple (None = global) # Value: (memory_data, file_mtime) - self._memory_cache: dict[str | None, tuple[dict[str, Any], float | None]] = {} + self._memory_cache: dict[tuple[str | None, str | None], tuple[dict[str, Any], float | None]] = {} def _validate_agent_name(self, agent_name: str) -> None: """Validate that the agent name is safe to use in filesystem paths. @@ -78,21 +78,29 @@ class FileMemoryStorage(MemoryStorage): if not AGENT_NAME_PATTERN.match(agent_name): raise ValueError(f"Invalid agent name {agent_name!r}: names must match {AGENT_NAME_PATTERN.pattern}") - def _get_memory_file_path(self, agent_name: str | None = None) -> Path: + def _get_memory_file_path(self, agent_name: str | None = None, *, user_id: str | None = None) -> Path: """Get the path to the memory file.""" + if user_id is not None: + if agent_name is not None: + self._validate_agent_name(agent_name) + return get_paths().user_agent_memory_file(user_id, agent_name) + config = get_memory_config() + if config.storage_path and Path(config.storage_path).is_absolute(): + return Path(config.storage_path) + return get_paths().user_memory_file(user_id) + # Legacy: no user_id if agent_name is not None: self._validate_agent_name(agent_name) return get_paths().agent_memory_file(agent_name) - config = get_memory_config() if config.storage_path: p = Path(config.storage_path) return p if p.is_absolute() else get_paths().base_dir / p return get_paths().memory_file - def _load_memory_from_file(self, agent_name: str | None = None) -> dict[str, Any]: + def _load_memory_from_file(self, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]: """Load memory data from file.""" - file_path = self._get_memory_file_path(agent_name) + file_path = self._get_memory_file_path(agent_name, user_id=user_id) if not file_path.exists(): return create_empty_memory() @@ -105,40 +113,42 @@ class FileMemoryStorage(MemoryStorage): logger.warning("Failed to load memory file: %s", e) return create_empty_memory() - def load(self, agent_name: str | None = None) -> dict[str, Any]: + def load(self, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]: """Load memory data (cached with file modification time check).""" - file_path = self._get_memory_file_path(agent_name) + file_path = self._get_memory_file_path(agent_name, user_id=user_id) try: current_mtime = file_path.stat().st_mtime if file_path.exists() else None except OSError: current_mtime = None - cached = self._memory_cache.get(agent_name) + cache_key = (user_id, agent_name) + cached = self._memory_cache.get(cache_key) if cached is None or cached[1] != current_mtime: - memory_data = self._load_memory_from_file(agent_name) - self._memory_cache[agent_name] = (memory_data, current_mtime) + memory_data = self._load_memory_from_file(agent_name, user_id=user_id) + self._memory_cache[cache_key] = (memory_data, current_mtime) return memory_data return cached[0] - def reload(self, agent_name: str | None = None) -> dict[str, Any]: + def reload(self, agent_name: str | None = None, *, user_id: str | None = None) -> dict[str, Any]: """Reload memory data from file, forcing cache invalidation.""" - file_path = self._get_memory_file_path(agent_name) - memory_data = self._load_memory_from_file(agent_name) + file_path = self._get_memory_file_path(agent_name, user_id=user_id) + memory_data = self._load_memory_from_file(agent_name, user_id=user_id) try: mtime = file_path.stat().st_mtime if file_path.exists() else None except OSError: mtime = None - self._memory_cache[agent_name] = (memory_data, mtime) + cache_key = (user_id, agent_name) + self._memory_cache[cache_key] = (memory_data, mtime) return memory_data - def save(self, memory_data: dict[str, Any], agent_name: str | None = None) -> bool: + def save(self, memory_data: dict[str, Any], agent_name: str | None = None, *, user_id: str | None = None) -> bool: """Save memory data to file and update cache.""" - file_path = self._get_memory_file_path(agent_name) + file_path = self._get_memory_file_path(agent_name, user_id=user_id) try: file_path.parent.mkdir(parents=True, exist_ok=True) @@ -155,7 +165,8 @@ class FileMemoryStorage(MemoryStorage): except OSError: mtime = None - self._memory_cache[agent_name] = (memory_data, mtime) + cache_key = (user_id, agent_name) + self._memory_cache[cache_key] = (memory_data, mtime) logger.info("Memory saved to %s", file_path) return True except OSError as e: diff --git a/backend/tests/test_memory_storage_user_isolation.py b/backend/tests/test_memory_storage_user_isolation.py new file mode 100644 index 000000000..a82fffa50 --- /dev/null +++ b/backend/tests/test_memory_storage_user_isolation.py @@ -0,0 +1,150 @@ +"""Tests for per-user memory storage isolation.""" +import pytest +from pathlib import Path +from unittest.mock import patch + +from deerflow.agents.memory.storage import FileMemoryStorage, create_empty_memory + + +@pytest.fixture +def base_dir(tmp_path: Path) -> Path: + return tmp_path + + +@pytest.fixture +def storage() -> FileMemoryStorage: + return FileMemoryStorage() + + +class TestUserIsolatedStorage: + def test_save_and_load_per_user(self, storage: FileMemoryStorage, base_dir: Path): + from deerflow.config.paths import Paths + + paths = Paths(base_dir) + with patch("deerflow.agents.memory.storage.get_paths", return_value=paths): + memory_a = create_empty_memory() + memory_a["user"]["workContext"]["summary"] = "User A context" + storage.save(memory_a, user_id="alice") + + memory_b = create_empty_memory() + memory_b["user"]["workContext"]["summary"] = "User B context" + storage.save(memory_b, user_id="bob") + + loaded_a = storage.load(user_id="alice") + loaded_b = storage.load(user_id="bob") + + assert loaded_a["user"]["workContext"]["summary"] == "User A context" + assert loaded_b["user"]["workContext"]["summary"] == "User B context" + + def test_user_memory_file_location(self, base_dir: Path): + from deerflow.config.paths import Paths + + paths = Paths(base_dir) + with patch("deerflow.agents.memory.storage.get_paths", return_value=paths): + s = FileMemoryStorage() + memory = create_empty_memory() + s.save(memory, user_id="alice") + expected_path = base_dir / "users" / "alice" / "memory.json" + assert expected_path.exists() + + def test_cache_isolated_per_user(self, base_dir: Path): + from deerflow.config.paths import Paths + + paths = Paths(base_dir) + with patch("deerflow.agents.memory.storage.get_paths", return_value=paths): + s = FileMemoryStorage() + memory_a = create_empty_memory() + memory_a["user"]["workContext"]["summary"] = "A" + s.save(memory_a, user_id="alice") + + memory_b = create_empty_memory() + memory_b["user"]["workContext"]["summary"] = "B" + s.save(memory_b, user_id="bob") + + loaded_a = s.load(user_id="alice") + assert loaded_a["user"]["workContext"]["summary"] == "A" + + def test_no_user_id_uses_legacy_path(self, base_dir: Path): + from deerflow.config.paths import Paths + from deerflow.config.memory_config import MemoryConfig + + paths = Paths(base_dir) + with patch("deerflow.agents.memory.storage.get_paths", return_value=paths): + with patch("deerflow.agents.memory.storage.get_memory_config", return_value=MemoryConfig(storage_path="")): + s = FileMemoryStorage() + memory = create_empty_memory() + s.save(memory, user_id=None) + expected_path = base_dir / "memory.json" + assert expected_path.exists() + + def test_user_and_legacy_do_not_interfere(self, base_dir: Path): + """user_id=None (legacy) and user_id='alice' must use different files and caches.""" + from deerflow.config.paths import Paths + from deerflow.config.memory_config import MemoryConfig + + paths = Paths(base_dir) + with patch("deerflow.agents.memory.storage.get_paths", return_value=paths): + with patch("deerflow.agents.memory.storage.get_memory_config", return_value=MemoryConfig(storage_path="")): + s = FileMemoryStorage() + + legacy_mem = create_empty_memory() + legacy_mem["user"]["workContext"]["summary"] = "legacy" + s.save(legacy_mem, user_id=None) + + user_mem = create_empty_memory() + user_mem["user"]["workContext"]["summary"] = "alice" + s.save(user_mem, user_id="alice") + + assert s.load(user_id=None)["user"]["workContext"]["summary"] == "legacy" + assert s.load(user_id="alice")["user"]["workContext"]["summary"] == "alice" + + def test_user_agent_memory_file_location(self, base_dir: Path): + """Per-user per-agent memory uses the user_agent_memory_file path.""" + from deerflow.config.paths import Paths + + paths = Paths(base_dir) + with patch("deerflow.agents.memory.storage.get_paths", return_value=paths): + s = FileMemoryStorage() + memory = create_empty_memory() + memory["user"]["workContext"]["summary"] = "agent scoped" + s.save(memory, "test-agent", user_id="alice") + expected_path = base_dir / "users" / "alice" / "agents" / "test-agent" / "memory.json" + assert expected_path.exists() + + def test_cache_key_is_user_agent_tuple(self, base_dir: Path): + """Cache keys must be (user_id, agent_name) tuples, not bare agent names.""" + from deerflow.config.paths import Paths + + paths = Paths(base_dir) + with patch("deerflow.agents.memory.storage.get_paths", return_value=paths): + s = FileMemoryStorage() + memory = create_empty_memory() + s.save(memory, user_id="alice") + # After save, cache should have tuple key + assert ("alice", None) in s._memory_cache + + def test_reload_with_user_id(self, base_dir: Path): + """reload() with user_id should force re-read from the user-scoped file.""" + from deerflow.config.paths import Paths + + paths = Paths(base_dir) + with patch("deerflow.agents.memory.storage.get_paths", return_value=paths): + s = FileMemoryStorage() + memory = create_empty_memory() + memory["user"]["workContext"]["summary"] = "initial" + s.save(memory, user_id="alice") + + # Load once to prime cache + s.load(user_id="alice") + + # Write updated content directly to file + user_file = base_dir / "users" / "alice" / "memory.json" + import json + + updated = create_empty_memory() + updated["user"]["workContext"]["summary"] = "updated" + user_file.write_text(json.dumps(updated)) + + # reload should pick up the new content + reloaded = s.reload(user_id="alice") + assert reloaded["user"]["workContext"]["summary"] == "updated"