mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-25 19:28:23 +00:00
Introduce an always-on auth layer with auto-created admin on first boot, multi-tenant isolation for threads/stores, and a full setup/login flow. Backend - JWT access tokens with `ver` field for stale-token rejection; bump on password/email change - Password hashing, HttpOnly+Secure cookies (Secure derived from request scheme at runtime) - CSRF middleware covering both REST and LangGraph routes - IP-based login rate limiting (5 attempts / 5-min lockout) with bounded dict growth and X-Forwarded-For bypass fix - Multi-worker-safe admin auto-creation (single DB write, WAL once) - needs_setup + token_version on User model; SQLite schema migration - Thread/store isolation by owner; orphan thread migration on first admin registration - thread_id validated as UUID to prevent log injection - CLI tool to reset admin password - Decorator-based authz module extracted from auth core Frontend - Login and setup pages with SSR guard for needs_setup flow - Account settings page (change password / email) - AuthProvider + route guards; skips redirect when no users registered - i18n (en-US / zh-CN) for auth surfaces - Typed auth API client; parseAuthError unwraps FastAPI detail envelope Infra & tooling - Unified `serve.sh` with gateway mode + auto dep install - Public PyPI uv.toml pin for CI compatibility - Regenerated uv.lock with public index Tests - HTTP vs HTTPS cookie security tests - Auth middleware, rate limiter, CSRF, setup flow coverage
76 lines
2.5 KiB
Python
76 lines
2.5 KiB
Python
"""Tests for auth error types and typed decode_token."""
|
|
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
import jwt as pyjwt
|
|
|
|
from app.gateway.auth.config import AuthConfig, set_auth_config
|
|
from app.gateway.auth.errors import AuthErrorCode, AuthErrorResponse, TokenError
|
|
from app.gateway.auth.jwt import create_access_token, decode_token
|
|
|
|
|
|
def test_auth_error_code_values():
|
|
assert AuthErrorCode.INVALID_CREDENTIALS == "invalid_credentials"
|
|
assert AuthErrorCode.TOKEN_EXPIRED == "token_expired"
|
|
assert AuthErrorCode.NOT_AUTHENTICATED == "not_authenticated"
|
|
|
|
|
|
def test_token_error_values():
|
|
assert TokenError.EXPIRED == "expired"
|
|
assert TokenError.INVALID_SIGNATURE == "invalid_signature"
|
|
assert TokenError.MALFORMED == "malformed"
|
|
|
|
|
|
def test_auth_error_response_serialization():
|
|
err = AuthErrorResponse(
|
|
code=AuthErrorCode.TOKEN_EXPIRED,
|
|
message="Token has expired",
|
|
)
|
|
d = err.model_dump()
|
|
assert d == {"code": "token_expired", "message": "Token has expired"}
|
|
|
|
|
|
def test_auth_error_response_from_dict():
|
|
d = {"code": "invalid_credentials", "message": "Wrong password"}
|
|
err = AuthErrorResponse(**d)
|
|
assert err.code == AuthErrorCode.INVALID_CREDENTIALS
|
|
|
|
|
|
# ── decode_token typed failure tests ──────────────────────────────
|
|
|
|
_TEST_SECRET = "test-secret-for-jwt-decode-token-tests"
|
|
|
|
|
|
def _setup_config():
|
|
set_auth_config(AuthConfig(jwt_secret=_TEST_SECRET))
|
|
|
|
|
|
def test_decode_token_returns_token_error_on_expired():
|
|
_setup_config()
|
|
expired_payload = {"sub": "user-1", "exp": datetime.now(UTC) - timedelta(hours=1), "iat": datetime.now(UTC)}
|
|
token = pyjwt.encode(expired_payload, _TEST_SECRET, algorithm="HS256")
|
|
result = decode_token(token)
|
|
assert result == TokenError.EXPIRED
|
|
|
|
|
|
def test_decode_token_returns_token_error_on_bad_signature():
|
|
_setup_config()
|
|
payload = {"sub": "user-1", "exp": datetime.now(UTC) + timedelta(hours=1), "iat": datetime.now(UTC)}
|
|
token = pyjwt.encode(payload, "wrong-secret", algorithm="HS256")
|
|
result = decode_token(token)
|
|
assert result == TokenError.INVALID_SIGNATURE
|
|
|
|
|
|
def test_decode_token_returns_token_error_on_malformed():
|
|
_setup_config()
|
|
result = decode_token("not-a-jwt")
|
|
assert result == TokenError.MALFORMED
|
|
|
|
|
|
def test_decode_token_returns_payload_on_valid():
|
|
_setup_config()
|
|
token = create_access_token("user-123")
|
|
result = decode_token(token)
|
|
assert not isinstance(result, TokenError)
|
|
assert result.sub == "user-123"
|