From 03952eca53d8b2840a9f92b1cf949b78f6b43769 Mon Sep 17 00:00:00 2001 From: rayhpeng Date: Fri, 10 Apr 2026 10:08:23 +0800 Subject: [PATCH] refactor(persistence): unify SQLite to single deerflow.db and move checkpointer to runtime Merge checkpoints.db and app.db into a single deerflow.db file (WAL mode handles concurrent access safely). Move checkpointer module from agents/checkpointer to runtime/checkpointer to better reflect its role as a runtime infrastructure concern. Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/gateway/deps.py | 2 +- .../docs/TITLE_GENERATION_IMPLEMENTATION.md | 2 +- .../harness/deerflow/agents/__init__.py | 4 ---- backend/packages/harness/deerflow/client.py | 2 +- .../deerflow/config/database_config.py | 24 ++++++++++++------- .../persistence/migrations/alembic.ini | 2 +- .../harness/deerflow/runtime/__init__.py | 6 +++++ .../checkpointer/__init__.py | 0 .../checkpointer/async_provider.py | 6 ++--- .../checkpointer/provider.py | 2 +- .../deerflow/runtime/store/async_provider.py | 2 +- .../runtime/stream_bridge/async_provider.py | 2 +- backend/tests/test_checkpointer.py | 4 ++-- backend/tests/test_checkpointer_none_fix.py | 8 +++---- backend/tests/test_client.py | 14 +++++------ backend/tests/test_persistence_scaffold.py | 13 +++++----- config.example.yaml | 4 ++-- 17 files changed, 54 insertions(+), 43 deletions(-) rename backend/packages/harness/deerflow/{agents => runtime}/checkpointer/__init__.py (100%) rename backend/packages/harness/deerflow/{agents => runtime}/checkpointer/async_provider.py (96%) rename backend/packages/harness/deerflow/{agents => runtime}/checkpointer/provider.py (98%) diff --git a/backend/app/gateway/deps.py b/backend/app/gateway/deps.py index b36bfa4ce..271aa2e6e 100644 --- a/backend/app/gateway/deps.py +++ b/backend/app/gateway/deps.py @@ -31,7 +31,7 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]: async with langgraph_runtime(app): yield """ - from deerflow.agents.checkpointer.async_provider import make_checkpointer + from deerflow.runtime.checkpointer.async_provider import make_checkpointer from deerflow.config import get_app_config from deerflow.persistence.engine import close_engine, get_session_factory, init_engine_from_config from deerflow.runtime import make_store, make_stream_bridge diff --git a/backend/docs/TITLE_GENERATION_IMPLEMENTATION.md b/backend/docs/TITLE_GENERATION_IMPLEMENTATION.md index 07a026e79..87e8aa61a 100644 --- a/backend/docs/TITLE_GENERATION_IMPLEMENTATION.md +++ b/backend/docs/TITLE_GENERATION_IMPLEMENTATION.md @@ -124,7 +124,7 @@ title: # checkpointer.py from langgraph.checkpoint.sqlite import SqliteSaver -checkpointer = SqliteSaver.from_conn_string("checkpoints.db") +checkpointer = SqliteSaver.from_conn_string("deerflow.db") ``` ```json diff --git a/backend/packages/harness/deerflow/agents/__init__.py b/backend/packages/harness/deerflow/agents/__init__.py index 2c31a514a..397f67f8e 100644 --- a/backend/packages/harness/deerflow/agents/__init__.py +++ b/backend/packages/harness/deerflow/agents/__init__.py @@ -1,4 +1,3 @@ -from .checkpointer import get_checkpointer, make_checkpointer, reset_checkpointer from .factory import create_deerflow_agent from .features import Next, Prev, RuntimeFeatures from .lead_agent import make_lead_agent @@ -18,7 +17,4 @@ __all__ = [ "make_lead_agent", "SandboxState", "ThreadState", - "get_checkpointer", - "reset_checkpointer", - "make_checkpointer", ] diff --git a/backend/packages/harness/deerflow/client.py b/backend/packages/harness/deerflow/client.py index 1c64ba52a..996625a22 100644 --- a/backend/packages/harness/deerflow/client.py +++ b/backend/packages/harness/deerflow/client.py @@ -240,7 +240,7 @@ class DeerFlowClient: } checkpointer = self._checkpointer if checkpointer is None: - from deerflow.agents.checkpointer import get_checkpointer + from deerflow.runtime.checkpointer import get_checkpointer checkpointer = get_checkpointer() if checkpointer is not None: diff --git a/backend/packages/harness/deerflow/config/database_config.py b/backend/packages/harness/deerflow/config/database_config.py index 207f8ef77..a4160c79f 100644 --- a/backend/packages/harness/deerflow/config/database_config.py +++ b/backend/packages/harness/deerflow/config/database_config.py @@ -4,8 +4,10 @@ Controls BOTH the LangGraph checkpointer and the DeerFlow application persistence layer (runs, threads metadata, users, etc.). The user configures one backend; the system handles physical separation details. -SQLite mode: checkpointer and app use different .db files in the same -directory to avoid write-lock contention. This is automatic. +SQLite mode: checkpointer and app share a single .db file +({sqlite_dir}/deerflow.db) with WAL journal mode enabled on every +connection. WAL allows concurrent readers and a single writer without +blocking, making a unified file safe for both workloads. Postgres mode: both use the same database URL but maintain independent connection pools with different lifecycles. @@ -40,7 +42,7 @@ class DatabaseConfig(BaseModel): ) sqlite_dir: str = Field( default=".deer-flow/data", - description=("Directory for SQLite database files. Checkpointer uses {sqlite_dir}/checkpoints.db, application data uses {sqlite_dir}/app.db."), + description=("Directory for the SQLite database file. Both checkpointer and application data share {sqlite_dir}/deerflow.db."), ) postgres_url: str = Field( default="", @@ -69,21 +71,27 @@ class DatabaseConfig(BaseModel): return str(Path(self.sqlite_dir).resolve()) + @property + def sqlite_path(self) -> str: + """Unified SQLite file path shared by checkpointer and app.""" + return os.path.join(self._resolved_sqlite_dir, "deerflow.db") + + # Backward-compatible aliases @property def checkpointer_sqlite_path(self) -> str: - """SQLite file path for the LangGraph checkpointer.""" - return os.path.join(self._resolved_sqlite_dir, "checkpoints.db") + """SQLite file path for the LangGraph checkpointer (alias for sqlite_path).""" + return self.sqlite_path @property def app_sqlite_path(self) -> str: - """SQLite file path for application ORM data.""" - return os.path.join(self._resolved_sqlite_dir, "app.db") + """SQLite file path for application ORM data (alias for sqlite_path).""" + return self.sqlite_path @property def app_sqlalchemy_url(self) -> str: """SQLAlchemy async URL for the application ORM engine.""" if self.backend == "sqlite": - return f"sqlite+aiosqlite:///{self.app_sqlite_path}" + return f"sqlite+aiosqlite:///{self.sqlite_path}" if self.backend == "postgres": url = self.postgres_url if url.startswith("postgresql://"): diff --git a/backend/packages/harness/deerflow/persistence/migrations/alembic.ini b/backend/packages/harness/deerflow/persistence/migrations/alembic.ini index adeccef32..71b4b1dc0 100644 --- a/backend/packages/harness/deerflow/persistence/migrations/alembic.ini +++ b/backend/packages/harness/deerflow/persistence/migrations/alembic.ini @@ -2,7 +2,7 @@ script_location = %(here)s # Default URL for offline mode / autogenerate. # Runtime uses engine from DeerFlow config. -sqlalchemy.url = sqlite+aiosqlite:///./data/app.db +sqlalchemy.url = sqlite+aiosqlite:///./data/deerflow.db [loggers] keys = root,sqlalchemy,alembic diff --git a/backend/packages/harness/deerflow/runtime/__init__.py b/backend/packages/harness/deerflow/runtime/__init__.py index d5faa9018..5a3df2eb6 100644 --- a/backend/packages/harness/deerflow/runtime/__init__.py +++ b/backend/packages/harness/deerflow/runtime/__init__.py @@ -5,12 +5,18 @@ Re-exports the public API of :mod:`~deerflow.runtime.runs` and directly from ``deerflow.runtime``. """ +from .checkpointer import checkpointer_context, get_checkpointer, make_checkpointer, reset_checkpointer from .runs import ConflictError, DisconnectMode, RunContext, RunManager, RunRecord, RunStatus, UnsupportedStrategyError, run_agent from .serialization import serialize, serialize_channel_values, serialize_lc_object, serialize_messages_tuple from .store import get_store, make_store, reset_store, store_context from .stream_bridge import END_SENTINEL, HEARTBEAT_SENTINEL, MemoryStreamBridge, StreamBridge, StreamEvent, make_stream_bridge __all__ = [ + # checkpointer + "checkpointer_context", + "get_checkpointer", + "make_checkpointer", + "reset_checkpointer", # runs "ConflictError", "DisconnectMode", diff --git a/backend/packages/harness/deerflow/agents/checkpointer/__init__.py b/backend/packages/harness/deerflow/runtime/checkpointer/__init__.py similarity index 100% rename from backend/packages/harness/deerflow/agents/checkpointer/__init__.py rename to backend/packages/harness/deerflow/runtime/checkpointer/__init__.py diff --git a/backend/packages/harness/deerflow/agents/checkpointer/async_provider.py b/backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py similarity index 96% rename from backend/packages/harness/deerflow/agents/checkpointer/async_provider.py rename to backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py index 623333705..84d5fa442 100644 --- a/backend/packages/harness/deerflow/agents/checkpointer/async_provider.py +++ b/backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py @@ -7,12 +7,12 @@ Supported backends: memory, sqlite, postgres. Usage (e.g. FastAPI lifespan):: - from deerflow.agents.checkpointer.async_provider import make_checkpointer + from deerflow.runtime.checkpointer.async_provider import make_checkpointer async with make_checkpointer() as checkpointer: app.state.checkpointer = checkpointer # InMemorySaver if not configured -For sync usage see :mod:`deerflow.agents.checkpointer.provider`. +For sync usage see :mod:`deerflow.runtime.checkpointer.provider`. """ from __future__ import annotations @@ -24,7 +24,7 @@ from collections.abc import AsyncIterator from langgraph.types import Checkpointer -from deerflow.agents.checkpointer.provider import ( +from deerflow.runtime.checkpointer.provider import ( POSTGRES_CONN_REQUIRED, POSTGRES_INSTALL, SQLITE_INSTALL, diff --git a/backend/packages/harness/deerflow/agents/checkpointer/provider.py b/backend/packages/harness/deerflow/runtime/checkpointer/provider.py similarity index 98% rename from backend/packages/harness/deerflow/agents/checkpointer/provider.py rename to backend/packages/harness/deerflow/runtime/checkpointer/provider.py index 6f09aac94..59f8b1ab2 100644 --- a/backend/packages/harness/deerflow/agents/checkpointer/provider.py +++ b/backend/packages/harness/deerflow/runtime/checkpointer/provider.py @@ -7,7 +7,7 @@ Supported backends: memory, sqlite, postgres. Usage:: - from deerflow.agents.checkpointer.provider import get_checkpointer, checkpointer_context + from deerflow.runtime.checkpointer.provider import get_checkpointer, checkpointer_context # Singleton — reused across calls, closed on process exit cp = get_checkpointer() diff --git a/backend/packages/harness/deerflow/runtime/store/async_provider.py b/backend/packages/harness/deerflow/runtime/store/async_provider.py index bc7a60559..68cd107c8 100644 --- a/backend/packages/harness/deerflow/runtime/store/async_provider.py +++ b/backend/packages/harness/deerflow/runtime/store/async_provider.py @@ -91,7 +91,7 @@ async def make_store() -> AsyncIterator[BaseStore]: configured checkpointer. Reads from the same ``checkpointer`` section of *config.yaml* used by - :func:`deerflow.agents.checkpointer.async_provider.make_checkpointer` so + :func:`deerflow.runtime.checkpointer.async_provider.make_checkpointer` so that both singletons always use the same persistence technology:: async with make_store() as store: diff --git a/backend/packages/harness/deerflow/runtime/stream_bridge/async_provider.py b/backend/packages/harness/deerflow/runtime/stream_bridge/async_provider.py index 891f79fa0..f35b7d639 100644 --- a/backend/packages/harness/deerflow/runtime/stream_bridge/async_provider.py +++ b/backend/packages/harness/deerflow/runtime/stream_bridge/async_provider.py @@ -1,7 +1,7 @@ """Async stream bridge factory. Provides an **async context manager** aligned with -:func:`deerflow.agents.checkpointer.async_provider.make_checkpointer`. +:func:`deerflow.runtime.checkpointer.async_provider.make_checkpointer`. Usage (e.g. FastAPI lifespan):: diff --git a/backend/tests/test_checkpointer.py b/backend/tests/test_checkpointer.py index 79a4912d9..5eeb9265d 100644 --- a/backend/tests/test_checkpointer.py +++ b/backend/tests/test_checkpointer.py @@ -6,7 +6,7 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest import deerflow.config.app_config as app_config_module -from deerflow.agents.checkpointer import get_checkpointer, reset_checkpointer +from deerflow.runtime.checkpointer import get_checkpointer, reset_checkpointer from deerflow.config.checkpointer_config import ( CheckpointerConfig, get_checkpointer_config, @@ -78,7 +78,7 @@ class TestGetCheckpointer: """get_checkpointer should return InMemorySaver when not configured.""" from langgraph.checkpoint.memory import InMemorySaver - with patch("deerflow.agents.checkpointer.provider.get_app_config", side_effect=FileNotFoundError): + with patch("deerflow.runtime.checkpointer.provider.get_app_config", side_effect=FileNotFoundError): cp = get_checkpointer() assert cp is not None assert isinstance(cp, InMemorySaver) diff --git a/backend/tests/test_checkpointer_none_fix.py b/backend/tests/test_checkpointer_none_fix.py index 1da435c85..3c7a25fa1 100644 --- a/backend/tests/test_checkpointer_none_fix.py +++ b/backend/tests/test_checkpointer_none_fix.py @@ -12,14 +12,14 @@ class TestCheckpointerNoneFix: @pytest.mark.anyio async def test_async_make_checkpointer_returns_in_memory_saver_when_not_configured(self): """make_checkpointer should return InMemorySaver when config.checkpointer is None.""" - from deerflow.agents.checkpointer.async_provider import make_checkpointer + from deerflow.runtime.checkpointer.async_provider import make_checkpointer # Mock get_app_config to return a config with checkpointer=None and database=None mock_config = MagicMock() mock_config.checkpointer = None mock_config.database = None - with patch("deerflow.agents.checkpointer.async_provider.get_app_config", return_value=mock_config): + with patch("deerflow.runtime.checkpointer.async_provider.get_app_config", return_value=mock_config): async with make_checkpointer() as checkpointer: # Should return InMemorySaver, not None assert checkpointer is not None @@ -36,13 +36,13 @@ class TestCheckpointerNoneFix: def test_sync_checkpointer_context_returns_in_memory_saver_when_not_configured(self): """checkpointer_context should return InMemorySaver when config.checkpointer is None.""" - from deerflow.agents.checkpointer.provider import checkpointer_context + from deerflow.runtime.checkpointer.provider import checkpointer_context # Mock get_app_config to return a config with checkpointer=None mock_config = MagicMock() mock_config.checkpointer = None - with patch("deerflow.agents.checkpointer.provider.get_app_config", return_value=mock_config): + with patch("deerflow.runtime.checkpointer.provider.get_app_config", return_value=mock_config): with checkpointer_context() as checkpointer: # Should return InMemorySaver, not None assert checkpointer is not None diff --git a/backend/tests/test_client.py b/backend/tests/test_client.py index a6d2ebfb3..579b13a47 100644 --- a/backend/tests/test_client.py +++ b/backend/tests/test_client.py @@ -817,7 +817,7 @@ class TestEnsureAgent: patch("deerflow.client._build_middlewares", return_value=[]) as mock_build_middlewares, patch("deerflow.client.apply_prompt_template", return_value="prompt") as mock_apply_prompt, patch.object(client, "_get_tools", return_value=[]), - patch("deerflow.agents.checkpointer.get_checkpointer", return_value=MagicMock()), + patch("deerflow.runtime.checkpointer.get_checkpointer", return_value=MagicMock()), ): client._agent_name = "custom-agent" client._available_skills = {"test_skill"} @@ -842,7 +842,7 @@ class TestEnsureAgent: patch("deerflow.client._build_middlewares", return_value=[]), patch("deerflow.client.apply_prompt_template", return_value="prompt"), patch.object(client, "_get_tools", return_value=[]), - patch("deerflow.agents.checkpointer.get_checkpointer", return_value=mock_checkpointer), + patch("deerflow.runtime.checkpointer.get_checkpointer", return_value=mock_checkpointer), ): client._ensure_agent(config) @@ -867,7 +867,7 @@ class TestEnsureAgent: patch("deerflow.client._build_middlewares", side_effect=fake_build_middlewares), patch("deerflow.client.apply_prompt_template", return_value="prompt"), patch.object(client, "_get_tools", return_value=[]), - patch("deerflow.agents.checkpointer.get_checkpointer", return_value=MagicMock()), + patch("deerflow.runtime.checkpointer.get_checkpointer", return_value=MagicMock()), ): client._ensure_agent(config) @@ -886,7 +886,7 @@ class TestEnsureAgent: patch("deerflow.client._build_middlewares", return_value=[]), patch("deerflow.client.apply_prompt_template", return_value="prompt"), patch.object(client, "_get_tools", return_value=[]), - patch("deerflow.agents.checkpointer.get_checkpointer", return_value=None), + patch("deerflow.runtime.checkpointer.get_checkpointer", return_value=None), ): client._ensure_agent(config) @@ -1844,7 +1844,7 @@ class TestScenarioAgentRecreation: patch("deerflow.client._build_middlewares", return_value=[]), patch("deerflow.client.apply_prompt_template", return_value="prompt"), patch.object(client, "_get_tools", return_value=[]), - patch("deerflow.agents.checkpointer.get_checkpointer", return_value=MagicMock()), + patch("deerflow.runtime.checkpointer.get_checkpointer", return_value=MagicMock()), ): client._ensure_agent(config_a) first_agent = client._agent @@ -1872,7 +1872,7 @@ class TestScenarioAgentRecreation: patch("deerflow.client._build_middlewares", return_value=[]), patch("deerflow.client.apply_prompt_template", return_value="prompt"), patch.object(client, "_get_tools", return_value=[]), - patch("deerflow.agents.checkpointer.get_checkpointer", return_value=MagicMock()), + patch("deerflow.runtime.checkpointer.get_checkpointer", return_value=MagicMock()), ): client._ensure_agent(config) client._ensure_agent(config) @@ -1897,7 +1897,7 @@ class TestScenarioAgentRecreation: patch("deerflow.client._build_middlewares", return_value=[]), patch("deerflow.client.apply_prompt_template", return_value="prompt"), patch.object(client, "_get_tools", return_value=[]), - patch("deerflow.agents.checkpointer.get_checkpointer", return_value=MagicMock()), + patch("deerflow.runtime.checkpointer.get_checkpointer", return_value=MagicMock()), ): client._ensure_agent(config) client.reset_agent() diff --git a/backend/tests/test_persistence_scaffold.py b/backend/tests/test_persistence_scaffold.py index bd098c707..dd9b66b07 100644 --- a/backend/tests/test_persistence_scaffold.py +++ b/backend/tests/test_persistence_scaffold.py @@ -24,18 +24,19 @@ class TestDatabaseConfig: assert c.backend == "memory" assert c.pool_size == 5 - def test_sqlite_paths_are_different(self): + def test_sqlite_paths_unified(self): c = DatabaseConfig(backend="sqlite", sqlite_dir="./mydata") - assert c.checkpointer_sqlite_path.endswith("checkpoints.db") - assert c.app_sqlite_path.endswith("app.db") - assert "mydata" in c.checkpointer_sqlite_path - assert c.checkpointer_sqlite_path != c.app_sqlite_path + assert c.sqlite_path.endswith("deerflow.db") + assert "mydata" in c.sqlite_path + # Backward-compatible aliases point to the same file + assert c.checkpointer_sqlite_path == c.sqlite_path + assert c.app_sqlite_path == c.sqlite_path def test_app_sqlalchemy_url_sqlite(self): c = DatabaseConfig(backend="sqlite", sqlite_dir="./data") url = c.app_sqlalchemy_url assert url.startswith("sqlite+aiosqlite:///") - assert "app.db" in url + assert "deerflow.db" in url def test_app_sqlalchemy_url_postgres(self): c = DatabaseConfig( diff --git a/config.example.yaml b/config.example.yaml index aa78cc67c..07ef54bb3 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -740,8 +740,8 @@ skill_evolution: # backend: sqlite -- Single-node deployment, files in sqlite_dir # backend: postgres -- Production multi-node deployment # -# SQLite mode automatically uses separate .db files for checkpointer -# and application data to avoid write-lock contention. +# SQLite mode uses a single deerflow.db file with WAL journal mode +# for both checkpointer and application data. # # Postgres mode: put your connection URL in .env as DATABASE_URL, # then reference it here with $DATABASE_URL.