diff --git a/backend/app/gateway/app.py b/backend/app/gateway/app.py index faec6290d..d4b57bf80 100644 --- a/backend/app/gateway/app.py +++ b/backend/app/gateway/app.py @@ -43,12 +43,19 @@ logger = logging.getLogger(__name__) async def _ensure_admin_user(app: FastAPI) -> None: """Auto-create the admin user on first boot if no users exist. - Prints the generated password to stdout so the operator can log in. - On subsequent boots, warns if any user still needs setup. + After admin creation (or on every boot), run a three-step orphan + migration pipeline: - Multi-worker safe: relies on SQLite UNIQUE constraint to resolve races. - Only the worker that successfully creates/updates the admin prints the - password; losers silently skip. + 1. Fatal: admin creation (can't proceed without an admin user) + 2. Non-fatal: LangGraph store orphan threads (cursor-paginated) + 3. Non-fatal: SQL persistence tables (threads_meta, runs, run_events, + feedback) — every row with owner_id IS NULL gets bound to admin + + Multi-worker safe: relies on SQLite UNIQUE constraint to resolve + races during admin creation. Only the worker that successfully + creates/updates the admin prints the password; losers silently skip. + The orphan migration steps are idempotent — a second call finds + nothing to migrate and returns 0. """ import secrets @@ -57,73 +64,145 @@ async def _ensure_admin_user(app: FastAPI) -> None: provider = get_local_provider() user_count = await provider.count_users() + admin = None + fresh_admin_created = False + if user_count == 0: password = secrets.token_urlsafe(16) try: admin = await provider.create_user(email="admin@deerflow.dev", password=password, system_role="admin", needs_setup=True) + fresh_admin_created = True except ValueError: return # Another worker already created the admin. + else: + # Admin exists but setup never completed — reset password so operator + # can always find it in the console without needing the CLI. + # Multi-worker guard: if admin was created less than 30s ago, another + # worker just created it and will print the password — skip reset. + admin = await provider.get_user_by_email("admin@deerflow.dev") + if admin and admin.needs_setup: + import time - # Migrate orphaned threads (no owner_id) to this admin - store = getattr(app.state, "store", None) - if store is not None: - await _migrate_orphaned_threads(store, str(admin.id)) + age = time.time() - admin.created_at.replace(tzinfo=UTC).timestamp() + if age >= 30: + from app.gateway.auth.password import hash_password_async + password = secrets.token_urlsafe(16) + admin.password_hash = await hash_password_async(password) + admin.token_version += 1 + await provider.update_user(admin) + + logger.info("=" * 60) + logger.info(" Admin account setup incomplete — password reset") + logger.info(" Email: %s", admin.email) + logger.info(" Password: %s", password) + logger.info(" Change it after login: Settings -> Account") + logger.info("=" * 60) + + if admin is None: + return # Nothing to bind orphans to. + + admin_id = str(admin.id) + + # Step 2: LangGraph store orphan migration — non-fatal + store = getattr(app.state, "store", None) + if store is not None: + try: + migrated = await _migrate_orphaned_threads(store, admin_id) + if migrated: + logger.info("Migrated %d orphan LangGraph thread(s) to admin", migrated) + except Exception: + logger.exception("LangGraph thread migration failed (non-fatal)") + + # Step 3: SQL persistence tables — non-fatal + try: + await _migrate_orphan_sql_tables(admin_id) + except Exception: + logger.exception("SQL persistence migration failed (non-fatal)") + + if fresh_admin_created: logger.info("=" * 60) logger.info(" Admin account created on first boot") logger.info(" Email: %s", admin.email) - logger.info(" Password: %s", password) + logger.info(" Password: %s", password) # noqa: F821 — defined in the fresh_admin branch logger.info(" Change it after login: Settings -> Account") logger.info("=" * 60) + + +async def _iter_store_items(store, namespace, *, page_size: int = 500): + """Paginated async iterator over a LangGraph store namespace. + + Replaces the old hardcoded ``limit=1000`` call with a cursor-style + loop so that environments with more than one page of orphans do + not silently lose data. Terminates when a page is empty OR when a + short page arrives (indicating the last page). + """ + offset = 0 + while True: + batch = await store.asearch(namespace, limit=page_size, offset=offset) + if not batch: + return + for item in batch: + yield item + if len(batch) < page_size: + return + offset += page_size + + +async def _migrate_orphaned_threads(store, admin_user_id: str) -> int: + """Migrate LangGraph store threads with no owner_id to the given admin. + + Uses cursor pagination so all orphans are migrated regardless of + count. Returns the number of rows migrated. + """ + migrated = 0 + async for item in _iter_store_items(store, ("threads",)): + metadata = item.value.get("metadata", {}) + if not metadata.get("owner_id"): + metadata["owner_id"] = admin_user_id + item.value["metadata"] = metadata + await store.aput(("threads",), item.key, item.value) + migrated += 1 + return migrated + + +async def _migrate_orphan_sql_tables(admin_user_id: str) -> None: + """Bind NULL owner_id rows in the 4 SQL persistence tables to admin. + + Runs in a single transaction per table via the shared async session + factory. Each UPDATE is idempotent — a second call finds nothing to + migrate (rowcount=0). + """ + from sqlalchemy import update + + from deerflow.persistence.engine import get_session_factory + from deerflow.persistence.feedback.model import FeedbackRow + from deerflow.persistence.models.run_event import RunEventRow + from deerflow.persistence.run.model import RunRow + from deerflow.persistence.thread_meta.model import ThreadMetaRow + + sf = get_session_factory() + if sf is None: + # In-memory / no persistence backend — nothing to migrate. return - # Admin exists but setup never completed — reset password so operator - # can always find it in the console without needing the CLI. - # Multi-worker guard: if admin was created less than 30s ago, another - # worker just created it and will print the password — skip reset. - admin = await provider.get_user_by_email("admin@deerflow.dev") - if admin and admin.needs_setup: - import time + tables = [ + (ThreadMetaRow, "threads_meta"), + (RunRow, "runs"), + (RunEventRow, "run_events"), + (FeedbackRow, "feedback"), + ] - age = time.time() - admin.created_at.replace(tzinfo=UTC).timestamp() - if age < 30: - return # Just created by another worker in this startup; its password is still valid. - - from app.gateway.auth.password import hash_password_async - - password = secrets.token_urlsafe(16) - admin.password_hash = await hash_password_async(password) - admin.token_version += 1 - await provider.update_user(admin) - - logger.info("=" * 60) - logger.info(" Admin account setup incomplete — password reset") - logger.info(" Email: %s", admin.email) - logger.info(" Password: %s", password) - logger.info(" Change it after login: Settings -> Account") - logger.info("=" * 60) - - -async def _migrate_orphaned_threads(store, admin_user_id: str) -> None: - """Migrate threads with no owner_id to the given admin. - - NOTE: This is the initial port. Commit 5 will replace the hardcoded - limit=1000 with cursor pagination and extend to SQL persistence tables. - """ - try: - migrated = 0 - results = await store.asearch(("threads",), limit=1000) - for item in results: - metadata = item.value.get("metadata", {}) - if not metadata.get("owner_id"): - metadata["owner_id"] = admin_user_id - item.value["metadata"] = metadata - await store.aput(("threads",), item.key, item.value) - migrated += 1 - if migrated: - logger.info("Migrated %d orphaned thread(s) to admin", migrated) - except Exception: - logger.exception("Thread migration failed (non-fatal)") + async with sf() as session: + for model, label in tables: + stmt = update(model).where(model.owner_id.is_(None)).values(owner_id=admin_user_id) + result = await session.execute(stmt) + count = result.rowcount or 0 + if count > 0: + logger.info("Migrated %d orphan %s row(s) to admin", count, label) + else: + logger.debug("No orphan %s rows to migrate", label) + await session.commit() @asynccontextmanager