deer-flow/backend/app/gateway/auth/reset_admin.py
greatmengqi 27b66d6753 feat(auth): authentication module with multi-tenant isolation (RFC-001)
Introduce an always-on auth layer with auto-created admin on first boot,
multi-tenant isolation for threads/stores, and a full setup/login flow.

Backend
- JWT access tokens with `ver` field for stale-token rejection; bump on
  password/email change
- Password hashing, HttpOnly+Secure cookies (Secure derived from request
  scheme at runtime)
- CSRF middleware covering both REST and LangGraph routes
- IP-based login rate limiting (5 attempts / 5-min lockout) with bounded
  dict growth and X-Forwarded-For bypass fix
- Multi-worker-safe admin auto-creation (single DB write, WAL once)
- needs_setup + token_version on User model; SQLite schema migration
- Thread/store isolation by owner; orphan thread migration on first admin
  registration
- thread_id validated as UUID to prevent log injection
- CLI tool to reset admin password
- Decorator-based authz module extracted from auth core

Frontend
- Login and setup pages with SSR guard for needs_setup flow
- Account settings page (change password / email)
- AuthProvider + route guards; skips redirect when no users registered
- i18n (en-US / zh-CN) for auth surfaces
- Typed auth API client; parseAuthError unwraps FastAPI detail envelope

Infra & tooling
- Unified `serve.sh` with gateway mode + auto dep install
- Public PyPI uv.toml pin for CI compatibility
- Regenerated uv.lock with public index

Tests
- HTTP vs HTTPS cookie security tests
- Auth middleware, rate limiter, CSRF, setup flow coverage
2026-04-08 00:31:43 +08:00

67 lines
1.9 KiB
Python

"""CLI tool to reset admin password.
Usage:
python -m app.gateway.auth.reset_admin
python -m app.gateway.auth.reset_admin --email admin@example.com
"""
import argparse
import secrets
import sys
from app.gateway.auth.password import hash_password
from app.gateway.auth.repositories.sqlite import SQLiteUserRepository
def main() -> None:
parser = argparse.ArgumentParser(description="Reset admin password")
parser.add_argument("--email", help="Admin email (default: first admin found)")
args = parser.parse_args()
repo = SQLiteUserRepository()
# Find admin user synchronously (CLI context, no event loop)
import asyncio
user = asyncio.run(_find_admin(repo, args.email))
if user is None:
if args.email:
print(f"Error: user '{args.email}' not found.", file=sys.stderr)
else:
print("Error: no admin user found.", file=sys.stderr)
sys.exit(1)
new_password = secrets.token_urlsafe(16)
user.password_hash = hash_password(new_password)
user.token_version += 1
user.needs_setup = True
asyncio.run(repo.update_user(user))
print(f"Password reset for: {user.email}")
print(f"New password: {new_password}")
print("Next login will require setup (new email + password).")
async def _find_admin(repo: SQLiteUserRepository, email: str | None):
if email:
return await repo.get_user_by_email(email)
# Find first admin
import asyncio
from app.gateway.auth.repositories.sqlite import _get_users_conn
def _find_sync():
with _get_users_conn() as conn:
cursor = conn.execute("SELECT id FROM users WHERE system_role = 'admin' LIMIT 1")
row = cursor.fetchone()
return dict(row)["id"] if row else None
admin_id = await asyncio.to_thread(_find_sync)
if admin_id:
return await repo.get_user_by_id(admin_id)
return None
if __name__ == "__main__":
main()