Aniworld/tests/integration/test_auth_flow.py
Lukas 731fd56768 feat: implement setup redirect middleware and fix test suite
- Created SetupRedirectMiddleware to redirect unconfigured apps to /setup
- Enhanced /api/auth/setup endpoint to save anime_directory to config
- Updated SetupRequest model to accept optional anime_directory parameter
- Modified setup.html to send anime_directory in setup API call
- Added @pytest.mark.requires_clean_auth marker for tests needing unconfigured state
- Modified conftest.py to conditionally setup auth based on test marker
- Fixed all test failures (846/846 tests now passing)
- Updated instructions.md to mark setup tasks as complete

This implementation ensures users are guided through initial setup
before accessing the application, while maintaining test isolation
and preventing auth state leakage between tests.
2025-10-24 19:55:26 +02:00

745 lines
24 KiB
Python

"""Integration tests for authentication flow.
This module tests the complete authentication flow including:
- Initial setup and master password configuration
- Login with valid/invalid credentials
- JWT token generation and validation
- Protected endpoint access control
- Token refresh and expiration
- Logout functionality
- Rate limiting and lockout mechanisms
- Session management
"""
import time
from typing import Dict, Optional
import pytest
from httpx import ASGITransport, AsyncClient
from src.server.fastapi_app import app
from src.server.services.auth_service import auth_service
@pytest.fixture(autouse=True)
def reset_auth():
"""Reset authentication state before each test."""
original_hash = auth_service._hash
auth_service._hash = None
auth_service._failed.clear()
yield
auth_service._hash = original_hash
auth_service._failed.clear()
@pytest.fixture
async def client():
"""Create an async test client."""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
class TestInitialSetup:
"""Test initial authentication setup flow."""
async def test_setup_with_strong_password(self, client):
"""Test setting up master password with strong password."""
response = await client.post(
"/api/auth/setup",
json={"master_password": "StrongP@ssw0rd123"}
)
assert response.status_code == 201
data = response.json()
assert data["status"] == "ok"
async def test_setup_with_weak_password_fails(self, client):
"""Test that setup fails with weak password."""
response = await client.post(
"/api/auth/setup",
json={"master_password": "weak"}
)
# Should fail validation
assert response.status_code in [400, 422]
async def test_setup_cannot_be_called_twice(self, client):
"""Test that setup can only be called once."""
# First setup succeeds
await client.post(
"/api/auth/setup",
json={"master_password": "FirstPassword123!"}
)
# Second setup should fail
response = await client.post(
"/api/auth/setup",
json={"master_password": "SecondPassword123!"}
)
assert response.status_code == 400
data = response.json()
assert "already configured" in data["detail"].lower()
async def test_auth_status_before_setup(self, client):
"""Test authentication status before setup."""
response = await client.get("/api/auth/status")
assert response.status_code == 200
data = response.json()
assert data["configured"] is False
assert data["authenticated"] is False
async def test_auth_status_after_setup(self, client):
"""Test authentication status after setup."""
# Setup
await client.post(
"/api/auth/setup",
json={"master_password": "SetupPassword123!"}
)
# Check status
response = await client.get("/api/auth/status")
assert response.status_code == 200
data = response.json()
assert data["configured"] is True
assert data["authenticated"] is False
class TestLoginFlow:
"""Test login flow with valid and invalid credentials."""
async def test_login_with_valid_credentials(self, client):
"""Test successful login with correct password."""
# Setup
password = "ValidPassword123!"
await client.post(
"/api/auth/setup",
json={"master_password": password}
)
# Login
response = await client.post(
"/api/auth/login",
json={"password": password}
)
assert response.status_code == 200
data = response.json()
# Verify token structure
assert "access_token" in data
assert "token_type" in data
assert data["token_type"] == "bearer"
assert isinstance(data["access_token"], str)
assert len(data["access_token"]) > 0
async def test_login_with_invalid_password(self, client):
"""Test login failure with incorrect password."""
# Setup
await client.post(
"/api/auth/setup",
json={"master_password": "CorrectPassword123!"}
)
# Login with wrong password
response = await client.post(
"/api/auth/login",
json={"password": "WrongPassword123!"}
)
assert response.status_code == 401
data = response.json()
assert "detail" in data
assert "invalid" in data["detail"].lower()
async def test_login_before_setup_fails(self, client):
"""Test that login fails before setup is complete."""
response = await client.post(
"/api/auth/login",
json={"password": "AnyPassword123!"}
)
assert response.status_code in [400, 401]
async def test_login_with_remember_me(self, client):
"""Test login with remember me option."""
# Setup
password = "RememberPassword123!"
await client.post(
"/api/auth/setup",
json={"master_password": password}
)
# Login with remember=true
response = await client.post(
"/api/auth/login",
json={"password": password, "remember": True}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
# Token should be issued (expiration time may be extended)
async def test_login_without_remember_me(self, client):
"""Test login without remember me option."""
# Setup
password = "NoRememberPassword123!"
await client.post(
"/api/auth/setup",
json={"master_password": password}
)
# Login without remember
response = await client.post(
"/api/auth/login",
json={"password": password, "remember": False}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
class TestTokenValidation:
"""Test JWT token validation and usage."""
async def get_valid_token(self, client) -> str:
"""Helper to get a valid authentication token."""
password = "TokenTestPassword123!"
await client.post(
"/api/auth/setup",
json={"master_password": password}
)
response = await client.post(
"/api/auth/login",
json={"password": password}
)
return response.json()["access_token"]
async def test_access_protected_endpoint_with_valid_token(self, client):
"""Test accessing protected endpoint with valid token."""
token = await self.get_valid_token(client)
# Access protected endpoint
response = await client.get(
"/api/queue/status",
headers={"Authorization": f"Bearer {token}"}
)
# Should succeed (or return 503 if service not configured)
assert response.status_code in [200, 503]
async def test_access_protected_endpoint_without_token(self, client):
"""Test accessing protected endpoint without token."""
response = await client.get("/api/queue/status")
assert response.status_code == 401
async def test_access_protected_endpoint_with_invalid_token(self, client):
"""Test accessing protected endpoint with invalid token."""
response = await client.get(
"/api/queue/status",
headers={"Authorization": "Bearer invalid_token_12345"}
)
assert response.status_code == 401
async def test_access_protected_endpoint_with_malformed_header(
self, client
):
"""Test accessing protected endpoint with malformed auth header."""
token = await self.get_valid_token(client)
# Missing "Bearer" prefix
response = await client.get(
"/api/queue/status",
headers={"Authorization": token}
)
assert response.status_code == 401
async def test_token_works_for_multiple_requests(self, client):
"""Test that token can be reused for multiple requests."""
token = await self.get_valid_token(client)
headers = {"Authorization": f"Bearer {token}"}
# Make multiple requests with same token
for _ in range(5):
response = await client.get("/api/queue/status", headers=headers)
assert response.status_code in [200, 503]
async def test_auth_status_with_valid_token(self, client):
"""Test auth status endpoint with valid token."""
token = await self.get_valid_token(client)
response = await client.get(
"/api/auth/status",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["configured"] is True
assert data["authenticated"] is True
@pytest.mark.requires_clean_auth
class TestProtectedEndpoints:
"""Test that all protected endpoints enforce authentication."""
async def get_valid_token(self, client) -> str:
"""Helper to get a valid authentication token."""
password = "ProtectedTestPassword123!"
await client.post(
"/api/auth/setup",
json={"master_password": password}
)
response = await client.post(
"/api/auth/login",
json={"password": password}
)
return response.json()["access_token"]
async def test_anime_endpoints_require_auth(self, client):
"""Test that anime endpoints require authentication."""
# Without token
response = await client.get("/api/anime/")
assert response.status_code == 401
# With valid token
token = await self.get_valid_token(client)
response = await client.get(
"/api/anime/",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code in [200, 503]
async def test_queue_endpoints_require_auth(self, client):
"""Test that queue endpoints require authentication."""
endpoints = [
("/api/queue/status", "GET"),
("/api/queue/add", "POST"),
("/api/queue/control/start", "POST"),
("/api/queue/control/pause", "POST"),
]
token = await self.get_valid_token(client)
for endpoint, method in endpoints:
# Without token
if method == "GET":
response = await client.get(endpoint)
else:
response = await client.post(endpoint, json={})
assert response.status_code in [400, 401, 422]
# With token (should pass auth, may fail validation)
headers = {"Authorization": f"Bearer {token}"}
if method == "GET":
response = await client.get(endpoint, headers=headers)
else:
response = await client.post(endpoint, json={}, headers=headers)
assert response.status_code not in [401]
async def test_config_endpoints_require_auth(self, client):
"""Test that config endpoints require authentication."""
# Setup auth first so middleware doesn't redirect
token = await self.get_valid_token(client)
# Without token - should require auth
response = await client.get("/api/config")
assert response.status_code == 401
# With token - should work
response = await client.get(
"/api/config",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code in [200, 503]
async def test_download_endpoints_require_auth(self, client):
"""Test that download endpoints require authentication."""
token = await self.get_valid_token(client)
# Test queue operations require auth
response = await client.get("/api/queue/status")
assert response.status_code == 401
response = await client.get(
"/api/queue/status",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code in [200, 503]
class TestLogoutFlow:
"""Test logout functionality."""
async def get_valid_token(self, client) -> str:
"""Helper to get a valid authentication token."""
password = "LogoutTestPassword123!"
await client.post(
"/api/auth/setup",
json={"master_password": password}
)
response = await client.post(
"/api/auth/login",
json={"password": password}
)
return response.json()["access_token"]
async def test_logout_with_valid_token(self, client):
"""Test logout with valid token."""
token = await self.get_valid_token(client)
response = await client.post(
"/api/auth/logout",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
async def test_logout_without_token(self, client):
"""Test logout without token."""
response = await client.post("/api/auth/logout")
# May succeed as logout is sometimes allowed without auth
assert response.status_code in [200, 401]
async def test_token_after_logout(self, client):
"""Test that token still works after logout (stateless JWT)."""
token = await self.get_valid_token(client)
# Logout
await client.post(
"/api/auth/logout",
headers={"Authorization": f"Bearer {token}"}
)
# Try to use token (may still work if JWT is stateless)
response = await client.get(
"/api/queue/status",
headers={"Authorization": f"Bearer {token}"}
)
# Stateless JWT: token may still work
# Stateful: should return 401
assert response.status_code in [200, 401, 503]
class TestRateLimitingAndLockout:
"""Test rate limiting and lockout mechanisms."""
async def test_failed_login_attempts_tracked(self, client):
"""Test that failed login attempts are tracked."""
# Setup
await client.post(
"/api/auth/setup",
json={"master_password": "CorrectPassword123!"}
)
# Multiple failed attempts
for _ in range(3):
response = await client.post(
"/api/auth/login",
json={"password": "WrongPassword123!"}
)
assert response.status_code == 401
async def test_lockout_after_max_failed_attempts(self, client):
"""Test account lockout after maximum failed attempts."""
# Setup (counts as 1 request towards rate limit)
await client.post(
"/api/auth/setup",
json={"master_password": "CorrectPassword123!"}
)
# Make multiple failed attempts to trigger lockout
# Note: setup used 1 request, so we can make 4 more before rate limit
for i in range(6): # More than max allowed
response = await client.post(
"/api/auth/login",
json={"password": "WrongPassword123!"}
)
if i < 4:
# First 4 login attempts get 401 (setup + 4 = 5 total)
assert response.status_code == 401
else:
# 5th and 6th attempts should be rate limited or rejected
assert response.status_code in [401, 429]
async def test_successful_login_resets_failed_attempts(self, client):
"""Test that successful login resets failed attempt counter."""
# Setup
password = "ResetCounterPassword123!"
await client.post(
"/api/auth/setup",
json={"master_password": password}
)
# Failed attempts
for _ in range(2):
await client.post(
"/api/auth/login",
json={"password": "WrongPassword123!"}
)
# Successful login
response = await client.post(
"/api/auth/login",
json={"password": password}
)
assert response.status_code == 200
# Should be able to make more attempts (counter reset)
await client.post(
"/api/auth/login",
json={"password": "WrongPassword123!"}
)
class TestSessionManagement:
"""Test session management and concurrent sessions."""
async def get_valid_token(self, client) -> str:
"""Helper to get a valid authentication token."""
password = "SessionTestPassword123!"
await client.post(
"/api/auth/setup",
json={"master_password": password}
)
response = await client.post(
"/api/auth/login",
json={"password": password}
)
return response.json()["access_token"]
async def test_multiple_concurrent_sessions(self, client):
"""Test that multiple sessions can exist simultaneously."""
password = "MultiSessionPassword123!"
await client.post(
"/api/auth/setup",
json={"master_password": password}
)
# Create multiple sessions
tokens = []
for _ in range(3):
response = await client.post(
"/api/auth/login",
json={"password": password}
)
assert response.status_code == 200
tokens.append(response.json()["access_token"])
# All tokens should work
for token in tokens:
response = await client.get(
"/api/queue/status",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code in [200, 503]
async def test_independent_token_lifetimes(self, client):
"""Test that tokens have independent lifetimes."""
token1 = await self.get_valid_token(client)
# Small delay
time.sleep(0.1)
token2 = await self.get_valid_token(client)
# Both tokens should work
for token in [token1, token2]:
response = await client.get(
"/api/queue/status",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code in [200, 503]
class TestAuthenticationEdgeCases:
"""Test edge cases and error scenarios."""
async def test_empty_password_in_setup(self, client):
"""Test setup with empty password."""
response = await client.post(
"/api/auth/setup",
json={"master_password": ""}
)
assert response.status_code in [400, 422]
async def test_empty_password_in_login(self, client):
"""Test login with empty password."""
# Setup first
await client.post(
"/api/auth/setup",
json={"master_password": "ValidPassword123!"}
)
response = await client.post(
"/api/auth/login",
json={"password": ""}
)
assert response.status_code in [400, 401, 422]
async def test_missing_password_field(self, client):
"""Test requests with missing password field."""
response = await client.post(
"/api/auth/setup",
json={}
)
assert response.status_code == 422 # Validation error
async def test_malformed_json_in_auth_requests(self, client):
"""Test authentication with malformed JSON."""
response = await client.post(
"/api/auth/setup",
content="not valid json",
headers={"Content-Type": "application/json"}
)
assert response.status_code in [400, 422]
async def test_extremely_long_password(self, client):
"""Test setup with extremely long password."""
long_password = "P@ssw0rd" + "x" * 10000
response = await client.post(
"/api/auth/setup",
json={"master_password": long_password}
)
# Should handle gracefully (accept or reject)
assert response.status_code in [201, 400, 413, 422]
async def test_special_characters_in_password(self, client):
"""Test password with various special characters."""
special_password = "P@$$w0rd!#%^&*()_+-=[]{}|;:',.<>?/~`"
response = await client.post(
"/api/auth/setup",
json={"master_password": special_password}
)
# Should accept special characters
assert response.status_code in [201, 400]
async def test_unicode_characters_in_password(self, client):
"""Test password with unicode characters."""
unicode_password = "Pässwörd123!日本語"
response = await client.post(
"/api/auth/setup",
json={"master_password": unicode_password}
)
# Should handle unicode gracefully
assert response.status_code in [201, 400, 422]
class TestCompleteAuthenticationWorkflow:
"""Test complete authentication workflows."""
async def test_full_authentication_cycle(self, client):
"""Test complete authentication cycle from setup to logout."""
password = "CompleteWorkflowPassword123!"
# 1. Check initial status (not configured)
status = await client.get("/api/auth/status")
assert status.json()["configured"] is False
# 2. Setup master password
setup = await client.post(
"/api/auth/setup",
json={"master_password": password}
)
assert setup.status_code == 201
# 3. Check status (configured, not authenticated)
status = await client.get("/api/auth/status")
data = status.json()
assert data["configured"] is True
assert data["authenticated"] is False
# 4. Login
login = await client.post(
"/api/auth/login",
json={"password": password}
)
assert login.status_code == 200
token = login.json()["access_token"]
# 5. Access protected endpoint
protected = await client.get(
"/api/queue/status",
headers={"Authorization": f"Bearer {token}"}
)
assert protected.status_code in [200, 503]
# 6. Check authenticated status
status = await client.get(
"/api/auth/status",
headers={"Authorization": f"Bearer {token}"}
)
data = status.json()
assert data["configured"] is True
assert data["authenticated"] is True
# 7. Logout
logout = await client.post(
"/api/auth/logout",
headers={"Authorization": f"Bearer {token}"}
)
assert logout.status_code == 200
async def test_workflow_with_failed_and_successful_attempts(self, client):
"""Test workflow with mixed failed and successful attempts."""
password = "MixedAttemptsPassword123!"
# Setup
await client.post(
"/api/auth/setup",
json={"master_password": password}
)
# Failed attempt
response = await client.post(
"/api/auth/login",
json={"password": "WrongPassword123!"}
)
assert response.status_code == 401
# Successful attempt
response = await client.post(
"/api/auth/login",
json={"password": password}
)
assert response.status_code == 200
# Another failed attempt
response = await client.post(
"/api/auth/login",
json={"password": "WrongAgain123!"}
)
assert response.status_code == 401
# Another successful attempt
response = await client.post(
"/api/auth/login",
json={"password": password}
)
assert response.status_code == 200