fix tests
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
"""Authentication API endpoints for Aniworld."""
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi.security import HTTPAuthorizationCredentials
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from fastapi import status as http_status
|
||||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||||
|
||||
from src.server.models.auth import AuthStatus, LoginRequest, LoginResponse, SetupRequest
|
||||
from src.server.services.auth_service import AuthError, LockedOutError, auth_service
|
||||
@@ -11,20 +13,23 @@ from src.server.services.auth_service import AuthError, LockedOutError, auth_ser
|
||||
|
||||
router = APIRouter(prefix="/api/auth", tags=["auth"])
|
||||
|
||||
# HTTPBearer for optional authentication
|
||||
optional_bearer = HTTPBearer(auto_error=False)
|
||||
|
||||
@router.post("/setup", status_code=status.HTTP_201_CREATED)
|
||||
|
||||
@router.post("/setup", status_code=http_status.HTTP_201_CREATED)
|
||||
def setup_auth(req: SetupRequest):
|
||||
"""Initial setup endpoint to configure the master password."""
|
||||
if auth_service.is_configured():
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
status_code=http_status.HTTP_400_BAD_REQUEST,
|
||||
detail="Master password already configured",
|
||||
)
|
||||
|
||||
try:
|
||||
auth_service.setup_master_password(req.master_password)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
raise HTTPException(status_code=400, detail=str(e)) from e
|
||||
|
||||
return {"status": "ok"}
|
||||
|
||||
@@ -32,53 +37,65 @@ def setup_auth(req: SetupRequest):
|
||||
@router.post("/login", response_model=LoginResponse)
|
||||
def login(req: LoginRequest):
|
||||
"""Validate master password and return JWT token."""
|
||||
# Use a simple identifier for failed attempts; prefer IP in a real app
|
||||
# Use a simple identifier for failed attempts; prefer IP in real app
|
||||
identifier = "global"
|
||||
|
||||
try:
|
||||
valid = auth_service.validate_master_password(req.password, identifier=identifier)
|
||||
except AuthError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
valid = auth_service.validate_master_password(
|
||||
req.password, identifier=identifier
|
||||
)
|
||||
except LockedOutError as e:
|
||||
raise HTTPException(status_code=429, detail=str(e))
|
||||
raise HTTPException(
|
||||
status_code=http_status.HTTP_429_TOO_MANY_REQUESTS,
|
||||
detail=str(e),
|
||||
) from e
|
||||
except AuthError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e)) from e
|
||||
|
||||
if not valid:
|
||||
raise HTTPException(status_code=401, detail="Invalid credentials")
|
||||
|
||||
token = auth_service.create_access_token(subject="master", remember=bool(req.remember))
|
||||
token = auth_service.create_access_token(
|
||||
subject="master", remember=bool(req.remember)
|
||||
)
|
||||
return token
|
||||
|
||||
|
||||
@router.post("/logout")
|
||||
def logout(credentials: HTTPAuthorizationCredentials = None):
|
||||
def logout_endpoint(
|
||||
credentials: Optional[HTTPAuthorizationCredentials] = Depends(optional_bearer),
|
||||
):
|
||||
"""Logout by revoking token (no-op for stateless JWT)."""
|
||||
# Import security dependency lazily to avoid heavy imports during test
|
||||
if credentials is None:
|
||||
from fastapi import Depends
|
||||
|
||||
from src.server.utils.dependencies import security as _security
|
||||
|
||||
# Trigger dependency resolution during normal request handling
|
||||
credentials = Depends(_security)
|
||||
|
||||
# If a plain credentials object was provided, extract token
|
||||
token = getattr(credentials, "credentials", None)
|
||||
# Placeholder; auth_service.revoke_token can be expanded to persist revocations
|
||||
auth_service.revoke_token(token)
|
||||
return {"status": "ok"}
|
||||
# Placeholder; auth_service.revoke_token can be expanded to persist
|
||||
# revocations
|
||||
if token:
|
||||
auth_service.revoke_token(token)
|
||||
return {"status": "ok", "message": "Logged out successfully"}
|
||||
|
||||
|
||||
async def get_optional_auth(
|
||||
credentials: Optional[HTTPAuthorizationCredentials] = Depends(
|
||||
optional_bearer
|
||||
),
|
||||
) -> Optional[dict]:
|
||||
"""Get optional authentication from bearer token."""
|
||||
if credentials is None:
|
||||
return None
|
||||
|
||||
token = credentials.credentials
|
||||
try:
|
||||
# Validate and decode token using the auth service
|
||||
session = auth_service.create_session_model(token)
|
||||
return session.dict()
|
||||
except AuthError:
|
||||
return None
|
||||
|
||||
|
||||
@router.get("/status", response_model=AuthStatus)
|
||||
def status(auth: Optional[dict] = None):
|
||||
"""Return whether master password is configured and if caller is authenticated."""
|
||||
# Lazy import to avoid pulling in database/sqlalchemy during module import
|
||||
from fastapi import Depends
|
||||
try:
|
||||
from src.server.utils.dependencies import optional_auth as _optional_auth
|
||||
except Exception:
|
||||
_optional_auth = None
|
||||
|
||||
# If dependency injection didn't provide auth, attempt to resolve optionally
|
||||
if auth is None and _optional_auth is not None:
|
||||
auth = Depends(_optional_auth)
|
||||
return AuthStatus(configured=auth_service.is_configured(), authenticated=bool(auth))
|
||||
async def auth_status(auth: Optional[dict] = Depends(get_optional_auth)):
|
||||
"""Return whether master password is configured and authenticated."""
|
||||
return AuthStatus(
|
||||
configured=auth_service.is_configured(), authenticated=bool(auth)
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user