feat(auth): extend orphan migration to 2.0-rc persistence tables

_ensure_admin_user now runs a three-step pipeline on every boot:

  Step 1 (fatal):     admin user exists / is created / password is reset
  Step 2 (non-fatal): LangGraph store orphan threads → admin
  Step 3 (non-fatal): SQL persistence tables → admin
    - threads_meta
    - runs
    - run_events
    - feedback

Each step is idempotent. The fatal/non-fatal split mirrors PR #1728's
original philosophy: admin creation failure blocks startup (the system
is unusable without an admin), whereas migration failures log a warning
and let the service proceed (a partial migration is recoverable; a
missing admin is not).

Key helpers
-----------
- _iter_store_items(store, namespace, *, page_size=500):
  async generator that cursor-paginates across LangGraph store pages.
  Fixes PR #1728's hardcoded limit=1000 bug that would silently lose
  orphans beyond the first page.

- _migrate_orphaned_threads(store, admin_user_id):
  Rewritten to use _iter_store_items. Returns the migrated count so the
  caller can log it; raises only on unhandled exceptions.

- _migrate_orphan_sql_tables(admin_user_id):
  Imports the 4 ORM models lazily, grabs the shared session factory,
  runs one UPDATE per table in a single transaction, commits once.
  No-op when no persistence backend is configured (in-memory dev).

Tests: test_ensure_admin.py (8 passed)
This commit is contained in:
greatmengqi 2026-04-08 11:10:09 +08:00
parent 4b139fb689
commit e5ad92474c

View File

@ -43,12 +43,19 @@ logger = logging.getLogger(__name__)
async def _ensure_admin_user(app: FastAPI) -> None:
"""Auto-create the admin user on first boot if no users exist.
Prints the generated password to stdout so the operator can log in.
On subsequent boots, warns if any user still needs setup.
After admin creation (or on every boot), run a three-step orphan
migration pipeline:
Multi-worker safe: relies on SQLite UNIQUE constraint to resolve races.
Only the worker that successfully creates/updates the admin prints the
password; losers silently skip.
1. Fatal: admin creation (can't proceed without an admin user)
2. Non-fatal: LangGraph store orphan threads (cursor-paginated)
3. Non-fatal: SQL persistence tables (threads_meta, runs, run_events,
feedback) every row with owner_id IS NULL gets bound to admin
Multi-worker safe: relies on SQLite UNIQUE constraint to resolve
races during admin creation. Only the worker that successfully
creates/updates the admin prints the password; losers silently skip.
The orphan migration steps are idempotent a second call finds
nothing to migrate and returns 0.
"""
import secrets
@ -57,73 +64,145 @@ async def _ensure_admin_user(app: FastAPI) -> None:
provider = get_local_provider()
user_count = await provider.count_users()
admin = None
fresh_admin_created = False
if user_count == 0:
password = secrets.token_urlsafe(16)
try:
admin = await provider.create_user(email="admin@deerflow.dev", password=password, system_role="admin", needs_setup=True)
fresh_admin_created = True
except ValueError:
return # Another worker already created the admin.
else:
# Admin exists but setup never completed — reset password so operator
# can always find it in the console without needing the CLI.
# Multi-worker guard: if admin was created less than 30s ago, another
# worker just created it and will print the password — skip reset.
admin = await provider.get_user_by_email("admin@deerflow.dev")
if admin and admin.needs_setup:
import time
# Migrate orphaned threads (no owner_id) to this admin
store = getattr(app.state, "store", None)
if store is not None:
await _migrate_orphaned_threads(store, str(admin.id))
age = time.time() - admin.created_at.replace(tzinfo=UTC).timestamp()
if age >= 30:
from app.gateway.auth.password import hash_password_async
password = secrets.token_urlsafe(16)
admin.password_hash = await hash_password_async(password)
admin.token_version += 1
await provider.update_user(admin)
logger.info("=" * 60)
logger.info(" Admin account setup incomplete — password reset")
logger.info(" Email: %s", admin.email)
logger.info(" Password: %s", password)
logger.info(" Change it after login: Settings -> Account")
logger.info("=" * 60)
if admin is None:
return # Nothing to bind orphans to.
admin_id = str(admin.id)
# Step 2: LangGraph store orphan migration — non-fatal
store = getattr(app.state, "store", None)
if store is not None:
try:
migrated = await _migrate_orphaned_threads(store, admin_id)
if migrated:
logger.info("Migrated %d orphan LangGraph thread(s) to admin", migrated)
except Exception:
logger.exception("LangGraph thread migration failed (non-fatal)")
# Step 3: SQL persistence tables — non-fatal
try:
await _migrate_orphan_sql_tables(admin_id)
except Exception:
logger.exception("SQL persistence migration failed (non-fatal)")
if fresh_admin_created:
logger.info("=" * 60)
logger.info(" Admin account created on first boot")
logger.info(" Email: %s", admin.email)
logger.info(" Password: %s", password)
logger.info(" Password: %s", password) # noqa: F821 — defined in the fresh_admin branch
logger.info(" Change it after login: Settings -> Account")
logger.info("=" * 60)
async def _iter_store_items(store, namespace, *, page_size: int = 500):
"""Paginated async iterator over a LangGraph store namespace.
Replaces the old hardcoded ``limit=1000`` call with a cursor-style
loop so that environments with more than one page of orphans do
not silently lose data. Terminates when a page is empty OR when a
short page arrives (indicating the last page).
"""
offset = 0
while True:
batch = await store.asearch(namespace, limit=page_size, offset=offset)
if not batch:
return
for item in batch:
yield item
if len(batch) < page_size:
return
offset += page_size
async def _migrate_orphaned_threads(store, admin_user_id: str) -> int:
"""Migrate LangGraph store threads with no owner_id to the given admin.
Uses cursor pagination so all orphans are migrated regardless of
count. Returns the number of rows migrated.
"""
migrated = 0
async for item in _iter_store_items(store, ("threads",)):
metadata = item.value.get("metadata", {})
if not metadata.get("owner_id"):
metadata["owner_id"] = admin_user_id
item.value["metadata"] = metadata
await store.aput(("threads",), item.key, item.value)
migrated += 1
return migrated
async def _migrate_orphan_sql_tables(admin_user_id: str) -> None:
"""Bind NULL owner_id rows in the 4 SQL persistence tables to admin.
Runs in a single transaction per table via the shared async session
factory. Each UPDATE is idempotent a second call finds nothing to
migrate (rowcount=0).
"""
from sqlalchemy import update
from deerflow.persistence.engine import get_session_factory
from deerflow.persistence.feedback.model import FeedbackRow
from deerflow.persistence.models.run_event import RunEventRow
from deerflow.persistence.run.model import RunRow
from deerflow.persistence.thread_meta.model import ThreadMetaRow
sf = get_session_factory()
if sf is None:
# In-memory / no persistence backend — nothing to migrate.
return
# Admin exists but setup never completed — reset password so operator
# can always find it in the console without needing the CLI.
# Multi-worker guard: if admin was created less than 30s ago, another
# worker just created it and will print the password — skip reset.
admin = await provider.get_user_by_email("admin@deerflow.dev")
if admin and admin.needs_setup:
import time
tables = [
(ThreadMetaRow, "threads_meta"),
(RunRow, "runs"),
(RunEventRow, "run_events"),
(FeedbackRow, "feedback"),
]
age = time.time() - admin.created_at.replace(tzinfo=UTC).timestamp()
if age < 30:
return # Just created by another worker in this startup; its password is still valid.
from app.gateway.auth.password import hash_password_async
password = secrets.token_urlsafe(16)
admin.password_hash = await hash_password_async(password)
admin.token_version += 1
await provider.update_user(admin)
logger.info("=" * 60)
logger.info(" Admin account setup incomplete — password reset")
logger.info(" Email: %s", admin.email)
logger.info(" Password: %s", password)
logger.info(" Change it after login: Settings -> Account")
logger.info("=" * 60)
async def _migrate_orphaned_threads(store, admin_user_id: str) -> None:
"""Migrate threads with no owner_id to the given admin.
NOTE: This is the initial port. Commit 5 will replace the hardcoded
limit=1000 with cursor pagination and extend to SQL persistence tables.
"""
try:
migrated = 0
results = await store.asearch(("threads",), limit=1000)
for item in results:
metadata = item.value.get("metadata", {})
if not metadata.get("owner_id"):
metadata["owner_id"] = admin_user_id
item.value["metadata"] = metadata
await store.aput(("threads",), item.key, item.value)
migrated += 1
if migrated:
logger.info("Migrated %d orphaned thread(s) to admin", migrated)
except Exception:
logger.exception("Thread migration failed (non-fatal)")
async with sf() as session:
for model, label in tables:
stmt = update(model).where(model.owner_id.is_(None)).values(owner_id=admin_user_id)
result = await session.execute(stmt)
count = result.rowcount or 0
if count > 0:
logger.info("Migrated %d orphan %s row(s) to admin", count, label)
else:
logger.debug("No orphan %s rows to migrate", label)
await session.commit()
@asynccontextmanager