feat(storage): enhance storage package with async provider and docs

Add to packages/storage/:
- README.md and README_zh.md - comprehensive documentation
- store/persistence/async_provider.py - async persistence provider

Update repositories:
- contracts/thread_meta.py - add new contract method
- db/thread_meta.py - implement new method
- factory.py - update factory logic

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
rayhpeng 2026-04-22 11:29:42 +08:00
parent e3e00af51d
commit b5e18f5b47
6 changed files with 1072 additions and 1 deletions

View File

@ -0,0 +1,492 @@
# deerflow-storage Design Overview
This document explains the current responsibilities of `backend/packages/storage`, its overall design, how database integration works, how persistence models are defined, what database access interfaces it exposes, and how the `app` layer consumes it through `infra`.
## 1. Package Role
`deerflow-storage` is DeerFlow's unified persistence foundation package. Its purpose is to pull database integration and business data persistence out of the `app` layer and provide them as a reusable storage layer.
At the moment, it mainly provides two kinds of capabilities:
1. A checkpointer for the LangGraph runtime.
2. ORM models, repository contracts, and database implementations for DeerFlow application data.
This package does not expose HTTP endpoints directly, does not depend on FastAPI routes directly, and does not own business orchestration. It acts as a storage kernel.
## 2. Overall Layering
The current code is roughly split into the following layers:
```text
config
└─ Reads configuration, resolves environment variables, and determines database parameters
persistence
└─ Creates AsyncEngine / SessionFactory / LangGraph checkpointer
repositories/contracts
└─ Defines domain objects and repository protocols (Pydantic + Protocol)
repositories/models
└─ Defines SQLAlchemy ORM table models
repositories/db
└─ Implements database access on top of AsyncSession
app.infra.storage
└─ Adapts storage repositories into app-facing interfaces
gateway / runtime
└─ Uses infra through dependency injection, facades, observers, and event stores
```
The core idea is:
1. The `storage` package only decides how data is stored and what is stored.
2. `app.infra` translates low-level repositories into application-facing semantics.
3. `gateway` and `runtime` depend only on interfaces exposed by `infra`, not on ORM models or SQL directly.
## 3. How Database Integration Works
### 3.1 Configuration Entry
Database configuration is defined in [`store/config/storage_config.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/config/storage_config.py), while the outer application configuration is loaded by [`store/config/app_config.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/config/app_config.py).
The configuration flow has several notable traits:
1. It loads from `backend/config.yaml` or the repository root `config.yaml` by default.
2. It supports overriding the path via `DEER_FLOW_CONFIG_PATH`.
3. It supports `$ENV_VAR` syntax in config values.
4. Timezone configuration also affects how timestamp fields are handled in the storage layer.
### 3.2 Persistence Entry Point
The unified entry point is `create_persistence()` in [`store/persistence/factory.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/persistence/factory.py).
It performs three main tasks:
1. Builds a SQLAlchemy URL from `StorageConfig`.
2. Selects the SQLite / MySQL / PostgreSQL builder based on the configured driver.
3. Returns `AppPersistence`, which contains:
- `checkpointer`
- `engine`
- `session_factory`
- `setup`
- `aclose`
So the application startup does not just get a bare database connection. It gets a full runtime persistence bundle.
### 3.3 Driver Integration Pattern
Driver implementations live in:
1. [`store/persistence/drivers/sqlite.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/persistence/drivers/sqlite.py)
2. [`store/persistence/drivers/mysql.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/persistence/drivers/mysql.py)
3. [`store/persistence/drivers/postgres.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/persistence/drivers/postgres.py)
All three follow the same pattern:
1. Create an `AsyncEngine`
2. Create an `async_sessionmaker`
3. Create the LangGraph async checkpointer for that backend
4. In `setup()`, initialize the checkpointer first, then run `MappedBase.metadata.create_all`
5. In `aclose()`, close the engine and checkpointer in order
This means the current initialization strategy is:
1. Checkpointer tables and business tables are initialized together at runtime startup.
2. Business tables currently rely on `SQLAlchemy create_all()`.
3. There is no separate migration orchestration path inside this package as the main workflow.
### 3.4 Current SQLite Behavior
SQLite uses [`StorageConfig.sqlite_storage_path`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/config/storage_config.py) to generate the database file path, which defaults to `.deer-flow/data/deerflow.db`.
For SQLite, the model primary key type falls back to `Integer PRIMARY KEY`, because SQLite auto-increment behavior works more reliably with that than with `BIGINT`.
## 4. How Persistence Models Are Defined
### 4.1 Base Model Conventions
Base definitions are in [`store/persistence/base_model.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/persistence/base_model.py).
That file standardizes several things:
1. `MappedBase` as the declarative base for all ORM models.
2. `DataClassBase` to support native dataclass-style models.
3. `Base` to include `created_time` and `updated_time`.
4. `id_key` as the unified primary key definition.
5. `UniversalText` as a cross-dialect long-text type.
6. `TimeZone` as a timezone-aware datetime type wrapper.
As a result, new models in this package usually follow this pattern:
1. Inherit from `Base` if they need `created_time` and `updated_time`.
2. Inherit from `DataClassBase` if they only need dataclass-style mapping without `updated_time`.
3. Use `id: Mapped[id_key]` for the primary key.
### 4.2 Current Business Models
Models are under [`store/repositories/models`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/repositories/models):
1. `Run`
- Table: `runs`
- Stores run metadata, status, token statistics, message summaries, and error details.
2. `ThreadMeta`
- Table: `thread_meta`
- Stores thread-level metadata, status, title, and ownership data.
3. `RunEvent`
- Table: `run_events`
- Stores events and messages emitted by a run.
- Uses a `(thread_id, seq)` unique constraint to maintain per-thread ordering.
4. `Feedback`
- Table: `feedback`
- Stores feedback records associated with runs.
### 4.3 Model Field Design Traits
There are a few common conventions in the current models:
1. Business identifiers are string fields such as `run_id`, `thread_id`, and `feedback_id`; the auto-increment `id` is only an internal primary key.
2. Structured extension data is usually stored in a JSON `metadata` column, while the ORM attribute name is often `meta`.
3. Long text fields use `UniversalText`.
4. Timestamp fields go through `TimeZone` to keep timezone handling consistent.
`RunEvent.content` has one additional rule:
1. If `content` is a `dict`, it is serialized to JSON before being written.
2. A `content_is_dict=True` marker is added into `metadata`.
3. On reads, the value is deserialized again based on that marker.
This lets `run_events` support both plain text messages and structured event payloads.
## 5. How Database Access Interfaces Are Defined
### 5.1 contracts: Repository Contract Layer
Contracts are defined in [`store/repositories/contracts`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/repositories/contracts).
This layer does two things:
1. Uses Pydantic models to define input and output objects such as `RunCreate`, `Run`, `ThreadMetaCreate`, and `ThreadMeta`.
2. Uses `Protocol` to define repository interfaces such as `RunRepositoryProtocol` and `ThreadMetaRepositoryProtocol`.
That means upper layers depend on contracts and protocols rather than on a specific SQLAlchemy implementation.
### 5.2 db: Database Implementation Layer
Implementations live in [`store/repositories/db`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/repositories/db).
Each repository implementation follows the same pattern:
1. The constructor receives an `AsyncSession`
2. It uses `select / update / delete` for database operations
3. It converts ORM models into the Pydantic objects defined by the contracts layer
For example:
1. `DbRunRepository` handles CRUD and completion-stat updates for the `runs` table.
2. `DbThreadMetaRepository` handles thread metadata retrieval, updates, and search.
3. `DbRunEventRepository` handles batched event append, message pagination, and deletion by thread or run.
4. `DbFeedbackRepository` handles feedback creation and retrieval.
### 5.3 factory: Repository Construction Entry
[`store/repositories/factory.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/repositories/factory.py) provides unified factory functions:
1. `build_run_repository(session)`
2. `build_thread_meta_repository(session)`
3. `build_feedback_repository(session)`
4. `build_run_event_repository(session)`
So upper layers only need an `AsyncSession` and do not need to depend directly on concrete repository class names.
## 6. What the Package Exposes
If you look only at the `storage` package itself, it exposes two categories of interfaces.
### 6.1 Runtime Persistence Entry
Exported from [`store/persistence/__init__.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/persistence/__init__.py):
1. `create_persistence()`
2. `AppPersistence`
3. The ORM base classes and shared persistence types
This is the entry point used by the application to initialize database access and the checkpointer.
### 6.2 Repository Contracts and Builders
Exported from [`store/repositories/__init__.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/repositories/__init__.py):
1. Contract-layer input and output models
2. Repository `Protocol`s
3. Repository builder factory functions
This is how the application integrates business persistence by repository contract.
In other words, the `storage` package does not provide an HTTP SDK to the `app` layer. It provides:
1. Initialization capabilities
2. A session factory
3. Repository protocols and repository builders
## 7. How the app Layer Uses It Through infra
The `app` layer does not operate on `store.repositories.db.*` directly. It goes through `app.infra.storage`.
Relevant code lives in:
1. [`backend/app/infra/storage/runs.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/app/infra/storage/runs.py)
2. [`backend/app/infra/storage/thread_meta.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/app/infra/storage/thread_meta.py)
3. [`backend/app/infra/storage/run_events.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/app/infra/storage/run_events.py)
### 7.1 Why the infra Layer Exists
The `app` layer does not want raw repository interfaces. It needs persistence services aligned with application semantics:
1. Automatic session lifecycle management
2. Automatic commit / rollback behavior
3. Actor / user visibility checks
4. Conversion from lower-level Pydantic models into app-facing dict structures
5. Alignment with the expectations of facades, observers, and routers
### 7.2 Run Integration
`RunStoreAdapter` wraps `build_run_repository(session)` with a `session_factory` and exposes:
1. `get`
2. `list_by_thread`
3. `create`
4. `update_status`
5. `set_error`
6. `update_run_completion`
7. `delete`
Important details:
1. Each call creates its own `AsyncSession`.
2. Read and write flows manage transactions separately.
3. Visibility filtering is applied through `actor_context` and `user_id`.
4. Run creation serializes metadata and kwargs before persisting them.
### 7.3 Thread Integration
Thread metadata is split into two layers:
1. `ThreadMetaStoreAdapter`
- A session-managed wrapper around the repository.
2. `ThreadMetaStorage`
- A higher-level app-facing interface.
`ThreadMetaStorage` adds application-oriented methods such as:
1. `ensure_thread`
2. `ensure_thread_running`
3. `sync_thread_title`
4. `sync_thread_assistant_id`
5. `sync_thread_status`
6. `sync_thread_metadata`
7. `search_threads`
So the `app` layer typically depends on `ThreadMetaStorage`, not directly on the low-level repository protocol.
### 7.4 RunEvent Integration
`AppRunEventStore` is a runtime-oriented event storage adapter. It is not just a CRUD wrapper. It is shaped around the runtime event-store protocol:
1. `put_batch`
2. `list_messages`
3. `list_events`
4. `list_messages_by_run`
5. `count_messages`
6. `delete_by_thread`
7. `delete_by_run`
It also performs thread visibility checks. If the current actor has a `user_id`, it first loads the thread owner and then decides whether the actor can read or write events for that thread.
## 8. How storage Is Wired at app Startup
Application startup wiring happens in [`backend/app/gateway/registrar.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/app/gateway/registrar.py).
The `init_persistence()` flow is:
1. Call `create_persistence()`
2. Run `app_persistence.setup()`
3. Use `session_factory` to build:
- `RunStoreAdapter`
- `ThreadMetaStoreAdapter`
- `FeedbackStoreAdapter`
- `AppRunEventStore`
4. Build `ThreadMetaStorage` on top of that
5. Inject all of them into `app.state`
So from the application's point of view, `storage` is not wired as a single global repository object. Instead:
1. Lower layers share a single `session_factory`
2. Upper layers create sessions per call through adapters
3. The final objects are attached to `FastAPI app.state` for routers and services
## 9. How gateway and service Layers Use These Capabilities
### 9.1 Dependency Injection
[`backend/app/gateway/dependencies/repositories.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/app/gateway/dependencies/repositories.py) reads the following objects from `request.app.state`:
1. `run_store`
2. `thread_meta_repo`
3. `thread_meta_storage`
4. `feedback_repo`
These are then exposed as FastAPI dependencies to route handlers.
### 9.2 Usage in Thread Routes
In [`backend/app/gateway/routers/langgraph/threads.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/app/gateway/routers/langgraph/threads.py):
1. Thread creation calls `ThreadMetaStorage.ensure_thread()`
2. Thread search calls `ThreadMetaStorage.search_threads()`
3. Thread deletion calls `ThreadMetaStorage.delete_thread()`
So the thread API does not touch ORM tables directly. It goes through the infra layer.
### 9.3 Usage in the Runs Facade
[`backend/app/gateway/services/runs/facade_factory.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/app/gateway/services/runs/facade_factory.py) injects storage-related objects into `RunsFacade`:
1. `run_read_repo`
2. `run_write_repo`
3. `run_delete_repo`
4. `thread_meta_storage`
5. `run_event_store`
These are then consumed by app-layer components such as:
1. `AppRunCreateStore`
2. `AppRunQueryStore`
3. `AppRunDeleteStore`
4. `StorageRunObserver`
### 9.4 How Run Lifecycle State Is Written Back
`StorageRunObserver` is a key integration path.
It listens to runtime lifecycle events and writes their results back into persistence:
1. `RUN_STARTED` -> updates run status to `running`
2. `RUN_COMPLETED` -> updates completion stats and, when needed, syncs the thread title
3. `RUN_FAILED` -> updates error state and error details
4. `RUN_CANCELLED` -> updates run status to `interrupted`
5. `THREAD_STATUS_UPDATED` -> syncs thread status
This means the `storage` package itself does not listen to runtime events directly, but `app.infra.storage` already plugs it into the runtime observer system.
## 10. How It Communicates with External Systems
The phrase "external communication" splits into two cases here.
### 10.1 Communication with Databases
The `storage` package communicates with SQLite / MySQL / PostgreSQL through SQLAlchemy async engines.
The main entry points are:
1. `create_async_engine`
2. `AsyncSession`
3. Repository-level `select / update / delete`
The LangGraph checkpointer also communicates with the database through backend-specific async savers, but that logic is centralized in `persistence/drivers`.
### 10.2 Communication with External Application Interfaces
The `storage` package does not expose external APIs by itself. External communication is handled by the `app` layer.
The typical path is:
1. An HTTP request enters a FastAPI route
2. The route gets an `infra` adapter through dependency injection
3. `infra` calls a `storage` repository
4. The route converts the result into an API response
There is also a runtime-event path:
1. The runtime emits a run lifecycle event or a run event
2. An observer or event store calls `infra`
3. `infra` calls `storage`
4. The data is persisted into the database
So more precisely, `storage` does not communicate outward on its own. It acts as the database boundary inside the application and is consumed by both the HTTP layer and the runtime layer.
## 11. Current Design Philosophy
The current code reflects a fairly clear design philosophy:
1. Unify checkpointer storage and application data storage under one entry point.
2. Use repository contracts to isolate upper layers from ORM details.
3. Use an `infra` adapter layer to isolate app semantics from storage semantics.
4. Prefer async SQLAlchemy to fit the modern async application stack.
5. Keep database dialect differences contained in shared base types and driver builders.
6. Keep actor / user visibility rules in app infra rather than hard-coding them into ORM models.
That means this is not meant to be a full business data layer. It is a composable low-level persistence package.
## 12. Scope and Boundaries
What the current `storage` package is responsible for:
1. Database connection parameters and initialization
2. LangGraph checkpointer integration
3. ORM base model conventions
4. Core DeerFlow persistence models
5. Repository contracts and database implementations
What it is not responsible for:
1. FastAPI route protocols
2. Authentication and authorization
3. Business workflow orchestration
4. Actor context binding
5. SSE / stream-bridge network communication
6. Higher-level facade semantics
Those responsibilities live in `app.gateway`, `app.plugins.auth`, `deerflow.runtime`, and `app.infra`.
## 13. Example End-to-End Call Chains
For "create a run and then update its state when execution finishes", the chain looks like this:
```text
HTTP POST /api/threads/{thread_id}/runs
-> gateway router
-> RunsFacade
-> AppRunCreateStore
-> RunStoreAdapter
-> build_run_repository(session)
-> DbRunRepository
-> runs table
execution completes
-> runtime emits lifecycle event
-> StorageRunObserver
-> RunStoreAdapter / ThreadMetaStorage
-> DbRunRepository / DbThreadMetaRepository
-> runs / thread_meta tables
```
For "query the messages of a run", the chain looks like this:
```text
HTTP GET /api/threads/{thread_id}/runs/{run_id}/messages
-> gateway router
-> get run_event_store from app.state
-> AppRunEventStore
-> build_run_event_repository(session)
-> DbRunEventRepository
-> run_events table
```
## 14. Summary
In one sentence, the role of `backend/packages/storage` in the current system is:
It is DeerFlow's database and persistence foundation, unifying database integration, ORM models, repository contracts, database implementations, and LangGraph checkpointer integration; the `app` layer then turns those low-level capabilities into thread, run, event, and feedback semantics through `infra`, and exposes them through HTTP routes and the runtime event system.

View File

@ -0,0 +1,494 @@
# deerflow-storage 设计说明
本文说明 `backend/packages/storage` 的当前职责、总体设计、数据库接入方式、模型定义方式、数据库访问接口,以及它在 `app` 层中的使用路径。
## 1. 包的定位
`deerflow-storage` 是 DeerFlow 的统一持久化基础包,目标是把“数据库接入”和“业务对象持久化”从 `app` 层拆出来,形成一个独立、可复用的存储层。
它当前主要承担两类能力:
1. 为 LangGraph 运行时提供 checkpointer。
2. 为 DeerFlow 应用数据提供 ORM 模型、仓储协议和数据库实现。
这个包本身不直接提供 HTTP 接口,不直接依赖 FastAPI 路由,也不承担业务编排。它更接近一个“存储内核”。
## 2. 总体分层
当前代码大致分成下面几层:
```text
config
└─ 读取配置、解析环境变量、确定数据库参数
persistence
└─ 创建 AsyncEngine / SessionFactory / LangGraph checkpointer
repositories/contracts
└─ 定义领域对象和仓储协议Pydantic + Protocol
repositories/models
└─ 定义 SQLAlchemy ORM 表模型
repositories/db
└─ 基于 AsyncSession 的数据库实现
app.infra.storage
└─ 把 storage 仓储适配成 app 层直接可用的接口
gateway / runtime
└─ 通过依赖注入、facade、observer、event store 使用 infra
```
核心思想是:
1. `storage` 包只负责“如何存”和“存什么”。
2. `app.infra` 负责把底层仓储转换为应用层语义。
3. `gateway` / `runtime` 只依赖 `infra` 暴露出来的接口,不直接碰 ORM 和 SQL。
## 3. 数据库如何接入
### 3.1 配置入口
数据库配置由 [`store/config/storage_config.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/config/storage_config.py) 定义,外层应用配置由 [`store/config/app_config.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/config/app_config.py) 负责读取。
配置来源有几个特点:
1. 默认从 `backend/config.yaml` 或仓库根 `config.yaml` 读取。
2. 支持 `DEER_FLOW_CONFIG_PATH` 指定配置文件。
3. 支持在配置中使用 `$ENV_VAR` 形式引用环境变量。
4. 时区配置也会影响存储层时间字段的处理。
### 3.2 persistence 入口
存储层统一入口是 [`store/persistence/factory.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/persistence/factory.py) 里的 `create_persistence()`
它会做三件事:
1. 根据 `StorageConfig` 生成 SQLAlchemy URL。
2. 根据 driver 选择 SQLite / MySQL / PostgreSQL 的构建函数。
3. 返回 `AppPersistence`,其中包含:
- `checkpointer`
- `engine`
- `session_factory`
- `setup`
- `aclose`
也就是说,应用启动时拿到的不是单一数据库连接,而是一整套“运行期持久化能力包”。
### 3.3 各数据库驱动的接入方式
驱动实现位于:
1. [`store/persistence/drivers/sqlite.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/persistence/drivers/sqlite.py)
2. [`store/persistence/drivers/mysql.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/persistence/drivers/mysql.py)
3. [`store/persistence/drivers/postgres.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/persistence/drivers/postgres.py)
三者的共同模式一致:
1. 创建 `AsyncEngine`
2. 创建 `async_sessionmaker`
3. 创建 LangGraph 对应的异步 checkpointer
4. 在 `setup()` 中先执行 checkpointer 初始化,再执行 `MappedBase.metadata.create_all`
5. 在 `aclose()` 中按顺序关闭 engine 和 checkpointer
这说明当前包的初始化策略是:
1. checkpointer 表和业务表一起由运行时启动时初始化。
2. 业务表当前依赖 `SQLAlchemy create_all()` 自动建表。
3. 当前包内没有独立的 migration 编排入口作为主路径。
### 3.4 SQLite 的当前行为
SQLite 使用 [`StorageConfig.sqlite_storage_path`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/config/storage_config.py) 生成数据库文件路径,默认落到 `.deer-flow/data/deerflow.db`
对于 SQLite当前模型主键会退化为 `Integer PRIMARY KEY`,这是因为 SQLite 的自增主键对 `BIGINT` 支持不如 `INTEGER PRIMARY KEY` 直接。
## 4. 持久化模型如何定义
### 4.1 基础模型约定
基础定义位于 [`store/persistence/base_model.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/persistence/base_model.py)。
这里统一了几件事:
1. `MappedBase` 作为所有 ORM 模型的声明基类。
2. `DataClassBase` 让模型天然支持 dataclass 风格。
3. `Base` 额外带上 `created_time` / `updated_time`
4. `id_key` 统一主键定义。
5. `UniversalText` 统一长文本类型,兼容 MySQL 和其他方言。
6. `TimeZone` 统一时区感知的时间字段转换。
因此,包内新模型通常遵循这样的模式:
1. 如果需要 `created_time` / `updated_time`,继承 `Base`
2. 如果只要 dataclass 风格、不要 `updated_time`,继承 `DataClassBase`
3. 主键统一使用 `id: Mapped[id_key]`
### 4.2 当前已定义的业务模型
模型位于 [`store/repositories/models`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/repositories/models)
1. `Run`
- 表:`runs`
- 用于保存运行元数据、状态、token 统计、消息摘要、错误信息等。
2. `ThreadMeta`
- 表:`thread_meta`
- 用于保存线程级元数据、状态、标题、所属用户等。
3. `RunEvent`
- 表:`run_events`
- 用于保存 run 产生的事件流和消息流。
- 通过 `(thread_id, seq)` 唯一约束维护线程内事件顺序。
4. `Feedback`
- 表:`feedback`
- 用于保存对 run 的反馈记录。
### 4.3 模型字段设计特点
当前模型有几个统一约定:
1. 业务主标识使用字符串字段,如 `run_id``thread_id``feedback_id`,数据库自增 `id` 仅作为内部主键。
2. 结构化扩展信息一般放在 `metadata` JSON 字段中ORM 内部属性名通常映射为 `meta`
3. 长文本内容统一用 `UniversalText`
4. 时间字段统一走 `TimeZone`,避免不同时区下行为不一致。
`RunEvent.content` 还有一个额外约定:
1. 落库时如果 `content``dict`,会先序列化成 JSON 字符串。
2. 同时在 `metadata` 中写入 `content_is_dict=True`
3. 读出时再按标记反序列化。
这让 `run_events` 同时兼容“纯文本消息”和“结构化事件内容”。
## 5. 数据库访问接口如何定义
### 5.1 contracts仓储协议层
协议定义在 [`store/repositories/contracts`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/repositories/contracts)。
这一层做了两件事:
1. 用 Pydantic 模型定义输入对象和输出对象,例如 `RunCreate``Run``ThreadMetaCreate``ThreadMeta`
2. 用 `Protocol` 定义仓储接口,例如 `RunRepositoryProtocol``ThreadMetaRepositoryProtocol`
这意味着上层依赖的是“协议”和“数据契约”,而不是某个具体 SQLAlchemy 实现。
### 5.2 db数据库实现层
实现位于 [`store/repositories/db`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/repositories/db)。
每个仓储实现都遵循同样的模式:
1. 构造函数接收 `AsyncSession`
2. 用 `select / update / delete` 执行数据库操作
3. 把 ORM 模型转换成 contracts 层的 Pydantic 对象返回
例如:
1. `DbRunRepository` 负责 `runs` 表的增删改查和完结统计更新。
2. `DbThreadMetaRepository` 负责线程元数据的查询、更新和搜索。
3. `DbRunEventRepository` 负责事件批量追加、消息分页、按线程或按 run 删除。
4. `DbFeedbackRepository` 负责反馈创建、查询、聚合前置数据读取。
### 5.3 factory仓储构造入口
[`store/repositories/factory.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/repositories/factory.py) 提供了统一工厂函数:
1. `build_run_repository(session)`
2. `build_thread_meta_repository(session)`
3. `build_feedback_repository(session)`
4. `build_run_event_repository(session)`
这样上层只需要拿到 `AsyncSession`,就可以构造对应仓储,而不需要直接依赖具体类名。
## 6. 对外接口是什么
如果只看 `storage` 包本身,它对外暴露的是两类接口。
### 6.1 持久化运行时入口
由 [`store/persistence/__init__.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/persistence/__init__.py) 暴露:
1. `create_persistence()`
2. `AppPersistence`
3. ORM 基础模型相关基类与类型
这是应用初始化数据库和 checkpointer 的入口。
### 6.2 仓储接口与工厂
由 [`store/repositories/__init__.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/packages/storage/store/repositories/__init__.py) 暴露:
1. contracts 层的输入输出模型
2. 各仓储 `Protocol`
3. 各仓储 builder 工厂函数
这是应用按“仓储协议”接入业务持久化的入口。
换句话说,`storage` 包不会直接给 `app` 层一个 HTTP SDK而是给它
1. 初始化能力
2. session factory
3. repository protocol + repository builder
## 7. app 层如何调用:通过 infra 接入
`app` 层没有直接操作 `store.repositories.db.*`,而是通过 `app.infra.storage` 做一层适配。
相关代码在:
1. [`backend/app/infra/storage/runs.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/app/infra/storage/runs.py)
2. [`backend/app/infra/storage/thread_meta.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/app/infra/storage/thread_meta.py)
3. [`backend/app/infra/storage/run_events.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/app/infra/storage/run_events.py)
### 7.1 为什么要有 infra 这一层
因为 `app` 层需要的不是“裸仓储接口”,而是“符合应用语义的持久化服务”:
1. 自动开关 session
2. 自动 commit / rollback
3. 补充 actor / user 可见性控制
4. 把底层 Pydantic 模型转换成 app 需要的字典结构
5. 对齐运行时 facade、observer、router 的接口习惯
### 7.2 Run 的接入方式
`RunStoreAdapter``session_factory` 包装了 `build_run_repository(session)`,向上暴露:
1. `get`
2. `list_by_thread`
3. `create`
4. `update_status`
5. `set_error`
6. `update_run_completion`
7. `delete`
这里的关键点是:
1. 每次调用都会创建独立的 `AsyncSession`
2. 读操作和写操作分开管理事务。
3. 会结合 `actor_context``user_id` 维度的可见性过滤。
4. 创建 run 时会先把 metadata / kwargs 做序列化,确保可安全落库。
### 7.3 Thread 的接入方式
线程元数据分成两层:
1. `ThreadMetaStoreAdapter`
- 是 repository 级别的 session 包装器。
2. `ThreadMetaStorage`
- 是面向 app 的更高层接口。
`ThreadMetaStorage` 额外提供了应用语义方法:
1. `ensure_thread`
2. `ensure_thread_running`
3. `sync_thread_title`
4. `sync_thread_assistant_id`
5. `sync_thread_status`
6. `sync_thread_metadata`
7. `search_threads`
也就是说,`app` 层通常依赖 `ThreadMetaStorage`,而不是直接依赖底层仓储协议。
### 7.4 RunEvent 的接入方式
`AppRunEventStore` 是运行时事件存储适配器。它不是简单 CRUD 包装,而是面向运行时协议设计的:
1. `put_batch`
2. `list_messages`
3. `list_events`
4. `list_messages_by_run`
5. `count_messages`
6. `delete_by_thread`
7. `delete_by_run`
它额外做了线程可见性校验:如果 actor 有 `user_id`,则会先查询线程归属,再决定是否允许读写该线程事件。
## 8. app 启动时怎么装配 storage
应用启动装配发生在 [`backend/app/gateway/registrar.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/app/gateway/registrar.py)。
`init_persistence()` 的流程是:
1. 调用 `create_persistence()`
2. 执行 `app_persistence.setup()`
3. 用 `session_factory` 构造:
- `RunStoreAdapter`
- `ThreadMetaStoreAdapter`
- `FeedbackStoreAdapter`
- `AppRunEventStore`
4. 再进一步构造 `ThreadMetaStorage`
5. 把这些对象注入到 `app.state`
因此对 `app` 而言,`storage` 并不是按“全局单例 repository”接入的而是
1. 底层共享一个 `session_factory`
2. 上层通过适配器按调用粒度创建 session
3. 最终挂在 `FastAPI app.state` 中给路由和服务使用
## 9. gateway / service 如何使用这些能力
### 9.1 依赖注入
[`backend/app/gateway/dependencies/repositories.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/app/gateway/dependencies/repositories.py) 从 `request.app.state` 取出:
1. `run_store`
2. `thread_meta_repo`
3. `thread_meta_storage`
4. `feedback_repo`
然后作为 FastAPI 依赖注入给路由层。
### 9.2 在线程路由中的调用
在线程路由 [`backend/app/gateway/routers/langgraph/threads.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/app/gateway/routers/langgraph/threads.py) 中:
1. 创建线程时调用 `ThreadMetaStorage.ensure_thread()`
2. 查询线程时调用 `ThreadMetaStorage.search_threads()`
3. 删除线程时调用 `ThreadMetaStorage.delete_thread()`
因此线程 API 并不直接碰 ORM 表,而是通过 infra 层完成。
### 9.3 在 runs facade 中的调用
[`backend/app/gateway/services/runs/facade_factory.py`](/Users/rayhpeng/workspace/open-source/deer-flow/backend/app/gateway/services/runs/facade_factory.py) 会把 storage 相关对象注入到 `RunsFacade`
1. `run_read_repo`
2. `run_write_repo`
3. `run_delete_repo`
4. `thread_meta_storage`
5. `run_event_store`
然后再由:
1. `AppRunCreateStore`
2. `AppRunQueryStore`
3. `AppRunDeleteStore`
4. `StorageRunObserver`
这些 app 层组件去消费。
### 9.4 Run 生命周期如何回写数据库
`StorageRunObserver` 是一条很关键的链路。
它监听运行时生命周期事件,并把结果回写到持久层:
1. `RUN_STARTED` -> 更新 run 状态为 `running`
2. `RUN_COMPLETED` -> 更新 run 完结统计;必要时同步 thread title
3. `RUN_FAILED` -> 更新 run 错误状态和错误信息
4. `RUN_CANCELLED` -> 更新 run 为 `interrupted`
5. `THREAD_STATUS_UPDATED` -> 同步 thread status
这说明 `storage` 包不负责主动监听运行时事件,但 `app.infra.storage` 已经把它接到了运行时 observer 体系中。
## 10. 如何与外部通信
这里的“外部通信”可以分成两类。
### 10.1 与数据库通信
`storage` 包通过 SQLAlchemy async engine 与 SQLite / MySQL / PostgreSQL 通信。
通信入口是:
1. `create_async_engine`
2. `AsyncSession`
3. repository 中的 `select / update / delete`
LangGraph checkpointer 也通过各自的异步 saver 与数据库通信,但这部分被统一收敛在 `persistence/drivers` 中。
### 10.2 与应用外部接口通信
`storage` 包本身不直接暴露外部 API对外通信由 `app` 层完成。
当前典型路径是:
1. HTTP 请求进入 FastAPI 路由
2. 路由从依赖注入中拿到 `infra` 适配器
3. `infra` 调用 `storage` 仓储
4. 数据结果再由路由转换为 API 响应
另外一条链路是运行时事件:
1. runtime 产生 run lifecycle event 或 run event
2. observer / event store 调用 `infra`
3. `infra` 调用 `storage`
4. 数据写入数据库
所以更准确地说,`storage` 的“对外通信方式”不是自己发请求,而是作为应用内部的数据库边界,被 HTTP 层和 runtime 层共同消费。
## 11. 当前包的设计理念
从现有代码看,这个包的设计理念比较明确:
1. 统一 checkpointer 和应用数据存储入口。
2. 以 repository contract 隔离上层业务与底层 ORM。
3. 用 `infra` 适配层隔离 app 语义与 storage 语义。
4. 优先采用异步 SQLAlchemy适配现代 async 应用栈。
5. 保持数据库方言兼容性,把差异尽量收敛在基础类型和 driver 构造层。
6. 把 actor / user 可见性控制放在 app infra而不是硬编码进底层 ORM 模型。
这意味着它不是一个“全功能业务数据层”,而是一个“可装配的底层持久化能力包”。
## 12. 作用范围与边界
当前 `storage` 包负责的范围:
1. 数据库连接参数和初始化
2. LangGraph checkpointer 接入
3. ORM 基础模型约定
4. DeerFlow 核心持久化模型
5. 仓储协议与数据库实现
当前不负责的范围:
1. FastAPI 路由协议
2. 认证鉴权
3. 业务工作流编排
4. actor 上下文绑定
5. SSE / stream bridge 的网络层通信
6. 更高层的 facade 业务语义
这些职责分别落在 `app.gateway``app.plugins.auth``deerflow.runtime``app.infra` 中。
## 13. 一条完整调用链示例
以“创建 run 并在运行结束后更新状态”为例,链路如下:
```text
HTTP POST /api/threads/{thread_id}/runs
-> gateway router
-> RunsFacade
-> AppRunCreateStore
-> RunStoreAdapter
-> build_run_repository(session)
-> DbRunRepository
-> runs 表
运行完成
-> runtime 产生 lifecycle event
-> StorageRunObserver
-> RunStoreAdapter / ThreadMetaStorage
-> DbRunRepository / DbThreadMetaRepository
-> runs / thread_meta 表
```
以“查询 run 的消息”为例:
```text
HTTP GET /api/threads/{thread_id}/runs/{run_id}/messages
-> gateway router
-> 从 app.state 获取 run_event_store
-> AppRunEventStore
-> build_run_event_repository(session)
-> DbRunEventRepository
-> run_events 表
```
## 14. 总结
`backend/packages/storage` 当前在整个工程中的角色,可以概括为一句话:
它是 DeerFlow 的数据库和持久化能力底座统一封装了数据库接入、ORM 模型、仓储协议、数据库实现以及 LangGraph checkpointer 接入;而 `app` 层通过 `infra` 适配器把这些底层能力转化成线程、运行、事件、反馈等上层语义,再通过 HTTP 路由和运行时事件系统对外提供服务。

View File

@ -0,0 +1,76 @@
from __future__ import annotations
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncIterator
from sqlalchemy import URL
from store.config.app_config import get_app_config
@asynccontextmanager
async def make_checkpointer() -> AsyncIterator[object]:
"""Create a LangGraph checkpointer from the unified storage config."""
storage = get_app_config().storage
if storage.driver == "sqlite":
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
conn_str = storage.sqlite_storage_path
await asyncio.to_thread(_ensure_sqlite_parent_dir, conn_str)
async with AsyncSqliteSaver.from_conn_string(conn_str) as saver:
await saver.setup()
yield saver
return
if storage.driver == "postgres":
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
async with AsyncPostgresSaver.from_conn_string(_create_postgres_url(storage)) as saver:
await saver.setup()
yield saver
return
if storage.driver == "mysql":
from langgraph.checkpoint.mysql.aio import AIOMySQLSaver
async with AIOMySQLSaver.from_conn_string(_create_mysql_url(storage)) as saver:
await saver.setup()
yield saver
return
raise ValueError(f"Unsupported storage driver for checkpointer: {storage.driver}")
def _ensure_sqlite_parent_dir(connection_string: str) -> None:
from pathlib import Path
if connection_string == ":memory:":
return
Path(connection_string).expanduser().parent.mkdir(parents=True, exist_ok=True)
def _create_postgres_url(storage) -> URL:
return URL.create(
drivername="postgresql+asyncpg",
username=storage.username,
password=storage.password,
host=storage.host,
port=storage.port,
database=storage.db_name or "deerflow",
)
def _create_mysql_url(storage) -> URL:
return URL.create(
drivername="mysql+aiomysql",
username=storage.username,
password=storage.password,
host=storage.host,
port=storage.port,
database=storage.db_name or "deerflow",
)
__all__ = ["make_checkpointer"]

View File

@ -39,6 +39,7 @@ class ThreadMetaRepositoryProtocol(Protocol):
self,
thread_id: str,
*,
assistant_id: str | None = None,
display_name: str | None = None,
status: str | None = None,
metadata: dict[str, Any] | None = None,

View File

@ -49,11 +49,14 @@ class DbThreadMetaRepository(ThreadMetaRepositoryProtocol):
self,
thread_id: str,
*,
assistant_id: str | None = None,
display_name: str | None = None,
status: str | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
values: dict = {}
if assistant_id is not None:
values["assistant_id"] = assistant_id
if display_name is not None:
values["display_name"] = display_name
if status is not None:

View File

@ -6,7 +6,12 @@ from store.repositories import (
RunRepositoryProtocol,
ThreadMetaRepositoryProtocol,
)
from store.repositories.db import DbFeedbackRepository, DbRunEventRepository, DbRunRepository, DbThreadMetaRepository
from store.repositories.db import (
DbFeedbackRepository,
DbRunEventRepository,
DbRunRepository,
DbThreadMetaRepository,
)
def build_thread_meta_repository(session: AsyncSession) -> ThreadMetaRepositoryProtocol: