"""Integration tests for authentication flow. This module tests the complete authentication flow including: - Initial setup and master password configuration - Login with valid/invalid credentials - JWT token generation and validation - Protected endpoint access control - Token refresh and expiration - Logout functionality - Rate limiting and lockout mechanisms - Session management """ import time from typing import Dict, Optional import pytest from httpx import ASGITransport, AsyncClient from src.server.fastapi_app import app from src.server.services.auth_service import auth_service @pytest.fixture(autouse=True) def reset_auth(): """Reset authentication state before each test.""" original_hash = auth_service._hash auth_service._hash = None auth_service._failed.clear() yield auth_service._hash = original_hash auth_service._failed.clear() @pytest.fixture async def client(): """Create an async test client.""" transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as ac: yield ac class TestInitialSetup: """Test initial authentication setup flow.""" async def test_setup_with_strong_password(self, client): """Test setting up master password with strong password.""" response = await client.post( "/api/auth/setup", json={"master_password": "StrongP@ssw0rd123"} ) assert response.status_code == 201 data = response.json() assert data["status"] == "ok" async def test_setup_with_weak_password_fails(self, client): """Test that setup fails with weak password.""" response = await client.post( "/api/auth/setup", json={"master_password": "weak"} ) # Should fail validation assert response.status_code in [400, 422] async def test_setup_cannot_be_called_twice(self, client): """Test that setup can only be called once.""" # First setup succeeds await client.post( "/api/auth/setup", json={"master_password": "FirstPassword123!"} ) # Second setup should fail response = await client.post( "/api/auth/setup", json={"master_password": "SecondPassword123!"} ) assert response.status_code == 400 data = response.json() assert "already configured" in data["detail"].lower() async def test_auth_status_before_setup(self, client): """Test authentication status before setup.""" response = await client.get("/api/auth/status") assert response.status_code == 200 data = response.json() assert data["configured"] is False assert data["authenticated"] is False async def test_auth_status_after_setup(self, client): """Test authentication status after setup.""" # Setup await client.post( "/api/auth/setup", json={"master_password": "SetupPassword123!"} ) # Check status response = await client.get("/api/auth/status") assert response.status_code == 200 data = response.json() assert data["configured"] is True assert data["authenticated"] is False class TestLoginFlow: """Test login flow with valid and invalid credentials.""" async def test_login_with_valid_credentials(self, client): """Test successful login with correct password.""" # Setup password = "ValidPassword123!" await client.post( "/api/auth/setup", json={"master_password": password} ) # Login response = await client.post( "/api/auth/login", json={"password": password} ) assert response.status_code == 200 data = response.json() # Verify token structure assert "access_token" in data assert "token_type" in data assert data["token_type"] == "bearer" assert isinstance(data["access_token"], str) assert len(data["access_token"]) > 0 async def test_login_with_invalid_password(self, client): """Test login failure with incorrect password.""" # Setup await client.post( "/api/auth/setup", json={"master_password": "CorrectPassword123!"} ) # Login with wrong password response = await client.post( "/api/auth/login", json={"password": "WrongPassword123!"} ) assert response.status_code == 401 data = response.json() assert "detail" in data assert "invalid" in data["detail"].lower() async def test_login_before_setup_fails(self, client): """Test that login fails before setup is complete.""" response = await client.post( "/api/auth/login", json={"password": "AnyPassword123!"} ) assert response.status_code in [400, 401] async def test_login_with_remember_me(self, client): """Test login with remember me option.""" # Setup password = "RememberPassword123!" await client.post( "/api/auth/setup", json={"master_password": password} ) # Login with remember=true response = await client.post( "/api/auth/login", json={"password": password, "remember": True} ) assert response.status_code == 200 data = response.json() assert "access_token" in data # Token should be issued (expiration time may be extended) async def test_login_without_remember_me(self, client): """Test login without remember me option.""" # Setup password = "NoRememberPassword123!" await client.post( "/api/auth/setup", json={"master_password": password} ) # Login without remember response = await client.post( "/api/auth/login", json={"password": password, "remember": False} ) assert response.status_code == 200 data = response.json() assert "access_token" in data class TestTokenValidation: """Test JWT token validation and usage.""" async def get_valid_token(self, client) -> str: """Helper to get a valid authentication token.""" password = "TokenTestPassword123!" await client.post( "/api/auth/setup", json={"master_password": password} ) response = await client.post( "/api/auth/login", json={"password": password} ) return response.json()["access_token"] async def test_access_protected_endpoint_with_valid_token(self, client): """Test accessing protected endpoint with valid token.""" token = await self.get_valid_token(client) # Access protected endpoint response = await client.get( "/api/queue/status", headers={"Authorization": f"Bearer {token}"} ) # Should succeed (or return 503 if service not configured) assert response.status_code in [200, 503] async def test_access_protected_endpoint_without_token(self, client): """Test accessing protected endpoint without token.""" response = await client.get("/api/queue/status") assert response.status_code == 401 async def test_access_protected_endpoint_with_invalid_token(self, client): """Test accessing protected endpoint with invalid token.""" response = await client.get( "/api/queue/status", headers={"Authorization": "Bearer invalid_token_12345"} ) assert response.status_code == 401 async def test_access_protected_endpoint_with_malformed_header( self, client ): """Test accessing protected endpoint with malformed auth header.""" token = await self.get_valid_token(client) # Missing "Bearer" prefix response = await client.get( "/api/queue/status", headers={"Authorization": token} ) assert response.status_code == 401 async def test_token_works_for_multiple_requests(self, client): """Test that token can be reused for multiple requests.""" token = await self.get_valid_token(client) headers = {"Authorization": f"Bearer {token}"} # Make multiple requests with same token for _ in range(5): response = await client.get("/api/queue/status", headers=headers) assert response.status_code in [200, 503] async def test_auth_status_with_valid_token(self, client): """Test auth status endpoint with valid token.""" token = await self.get_valid_token(client) response = await client.get( "/api/auth/status", headers={"Authorization": f"Bearer {token}"} ) assert response.status_code == 200 data = response.json() assert data["configured"] is True assert data["authenticated"] is True @pytest.mark.requires_clean_auth class TestProtectedEndpoints: """Test that all protected endpoints enforce authentication.""" async def get_valid_token(self, client) -> str: """Helper to get a valid authentication token.""" password = "ProtectedTestPassword123!" await client.post( "/api/auth/setup", json={"master_password": password} ) response = await client.post( "/api/auth/login", json={"password": password} ) return response.json()["access_token"] async def test_anime_endpoints_require_auth(self, client): """Test that anime endpoints require authentication.""" # Without token response = await client.get("/api/anime/") assert response.status_code == 401 # With valid token token = await self.get_valid_token(client) response = await client.get( "/api/anime/", headers={"Authorization": f"Bearer {token}"} ) assert response.status_code in [200, 503] async def test_queue_endpoints_require_auth(self, client): """Test that queue endpoints require authentication.""" endpoints = [ ("/api/queue/status", "GET"), ("/api/queue/add", "POST"), ("/api/queue/start", "POST"), ("/api/queue/pause", "POST"), ] token = await self.get_valid_token(client) for endpoint, method in endpoints: # Without token if method == "GET": response = await client.get(endpoint) else: response = await client.post(endpoint, json={}) assert response.status_code in [400, 401, 422] # With token (should pass auth, may fail validation) headers = {"Authorization": f"Bearer {token}"} if method == "GET": response = await client.get(endpoint, headers=headers) else: response = await client.post(endpoint, json={}, headers=headers) assert response.status_code not in [401] async def test_config_endpoints_require_auth(self, client): """Test that config endpoints require authentication.""" # Setup auth first so middleware doesn't redirect token = await self.get_valid_token(client) # Without token - should require auth response = await client.get("/api/config") assert response.status_code == 401 # With token - should work response = await client.get( "/api/config", headers={"Authorization": f"Bearer {token}"} ) assert response.status_code in [200, 503] async def test_download_endpoints_require_auth(self, client): """Test that download endpoints require authentication.""" token = await self.get_valid_token(client) # Test queue operations require auth response = await client.get("/api/queue/status") assert response.status_code == 401 response = await client.get( "/api/queue/status", headers={"Authorization": f"Bearer {token}"} ) assert response.status_code in [200, 503] class TestLogoutFlow: """Test logout functionality.""" async def get_valid_token(self, client) -> str: """Helper to get a valid authentication token.""" password = "LogoutTestPassword123!" await client.post( "/api/auth/setup", json={"master_password": password} ) response = await client.post( "/api/auth/login", json={"password": password} ) return response.json()["access_token"] async def test_logout_with_valid_token(self, client): """Test logout with valid token.""" token = await self.get_valid_token(client) response = await client.post( "/api/auth/logout", headers={"Authorization": f"Bearer {token}"} ) assert response.status_code == 200 data = response.json() assert data["status"] == "ok" async def test_logout_without_token(self, client): """Test logout without token.""" response = await client.post("/api/auth/logout") # May succeed as logout is sometimes allowed without auth assert response.status_code in [200, 401] async def test_token_after_logout(self, client): """Test that token still works after logout (stateless JWT).""" token = await self.get_valid_token(client) # Logout await client.post( "/api/auth/logout", headers={"Authorization": f"Bearer {token}"} ) # Try to use token (may still work if JWT is stateless) response = await client.get( "/api/queue/status", headers={"Authorization": f"Bearer {token}"} ) # Stateless JWT: token may still work # Stateful: should return 401 assert response.status_code in [200, 401, 503] class TestRateLimitingAndLockout: """Test rate limiting and lockout mechanisms.""" async def test_failed_login_attempts_tracked(self, client): """Test that failed login attempts are tracked.""" # Setup await client.post( "/api/auth/setup", json={"master_password": "CorrectPassword123!"} ) # Multiple failed attempts for _ in range(3): response = await client.post( "/api/auth/login", json={"password": "WrongPassword123!"} ) assert response.status_code == 401 async def test_lockout_after_max_failed_attempts(self, client): """Test account lockout after maximum failed attempts.""" # Setup (counts as 1 request towards rate limit) await client.post( "/api/auth/setup", json={"master_password": "CorrectPassword123!"} ) # Make multiple failed attempts to trigger lockout # Note: setup used 1 request, so we can make 4 more before rate limit for i in range(6): # More than max allowed response = await client.post( "/api/auth/login", json={"password": "WrongPassword123!"} ) if i < 4: # First 4 login attempts get 401 (setup + 4 = 5 total) assert response.status_code == 401 else: # 5th and 6th attempts should be rate limited or rejected assert response.status_code in [401, 429] async def test_successful_login_resets_failed_attempts(self, client): """Test that successful login resets failed attempt counter.""" # Setup password = "ResetCounterPassword123!" await client.post( "/api/auth/setup", json={"master_password": password} ) # Failed attempts for _ in range(2): await client.post( "/api/auth/login", json={"password": "WrongPassword123!"} ) # Successful login response = await client.post( "/api/auth/login", json={"password": password} ) assert response.status_code == 200 # Should be able to make more attempts (counter reset) await client.post( "/api/auth/login", json={"password": "WrongPassword123!"} ) class TestSessionManagement: """Test session management and concurrent sessions.""" async def get_valid_token(self, client) -> str: """Helper to get a valid authentication token.""" password = "SessionTestPassword123!" await client.post( "/api/auth/setup", json={"master_password": password} ) response = await client.post( "/api/auth/login", json={"password": password} ) return response.json()["access_token"] async def test_multiple_concurrent_sessions(self, client): """Test that multiple sessions can exist simultaneously.""" password = "MultiSessionPassword123!" await client.post( "/api/auth/setup", json={"master_password": password} ) # Create multiple sessions tokens = [] for _ in range(3): response = await client.post( "/api/auth/login", json={"password": password} ) assert response.status_code == 200 tokens.append(response.json()["access_token"]) # All tokens should work for token in tokens: response = await client.get( "/api/queue/status", headers={"Authorization": f"Bearer {token}"} ) assert response.status_code in [200, 503] async def test_independent_token_lifetimes(self, client): """Test that tokens have independent lifetimes.""" token1 = await self.get_valid_token(client) # Small delay time.sleep(0.1) token2 = await self.get_valid_token(client) # Both tokens should work for token in [token1, token2]: response = await client.get( "/api/queue/status", headers={"Authorization": f"Bearer {token}"} ) assert response.status_code in [200, 503] class TestAuthenticationEdgeCases: """Test edge cases and error scenarios.""" async def test_empty_password_in_setup(self, client): """Test setup with empty password.""" response = await client.post( "/api/auth/setup", json={"master_password": ""} ) assert response.status_code in [400, 422] async def test_empty_password_in_login(self, client): """Test login with empty password.""" # Setup first await client.post( "/api/auth/setup", json={"master_password": "ValidPassword123!"} ) response = await client.post( "/api/auth/login", json={"password": ""} ) assert response.status_code in [400, 401, 422] async def test_missing_password_field(self, client): """Test requests with missing password field.""" response = await client.post( "/api/auth/setup", json={} ) assert response.status_code == 422 # Validation error async def test_malformed_json_in_auth_requests(self, client): """Test authentication with malformed JSON.""" response = await client.post( "/api/auth/setup", content="not valid json", headers={"Content-Type": "application/json"} ) assert response.status_code in [400, 422] async def test_extremely_long_password(self, client): """Test setup with extremely long password.""" long_password = "P@ssw0rd" + "x" * 10000 response = await client.post( "/api/auth/setup", json={"master_password": long_password} ) # Should handle gracefully (accept or reject) assert response.status_code in [201, 400, 413, 422] async def test_special_characters_in_password(self, client): """Test password with various special characters.""" special_password = "P@$$w0rd!#%^&*()_+-=[]{}|;:',.<>?/~`" response = await client.post( "/api/auth/setup", json={"master_password": special_password} ) # Should accept special characters assert response.status_code in [201, 400] async def test_unicode_characters_in_password(self, client): """Test password with unicode characters.""" unicode_password = "Pässwörd123!日本語" response = await client.post( "/api/auth/setup", json={"master_password": unicode_password} ) # Should handle unicode gracefully assert response.status_code in [201, 400, 422] class TestCompleteAuthenticationWorkflow: """Test complete authentication workflows.""" async def test_full_authentication_cycle(self, client): """Test complete authentication cycle from setup to logout.""" password = "CompleteWorkflowPassword123!" # 1. Check initial status (not configured) status = await client.get("/api/auth/status") assert status.json()["configured"] is False # 2. Setup master password setup = await client.post( "/api/auth/setup", json={"master_password": password} ) assert setup.status_code == 201 # 3. Check status (configured, not authenticated) status = await client.get("/api/auth/status") data = status.json() assert data["configured"] is True assert data["authenticated"] is False # 4. Login login = await client.post( "/api/auth/login", json={"password": password} ) assert login.status_code == 200 token = login.json()["access_token"] # 5. Access protected endpoint protected = await client.get( "/api/queue/status", headers={"Authorization": f"Bearer {token}"} ) assert protected.status_code in [200, 503] # 6. Check authenticated status status = await client.get( "/api/auth/status", headers={"Authorization": f"Bearer {token}"} ) data = status.json() assert data["configured"] is True assert data["authenticated"] is True # 7. Logout logout = await client.post( "/api/auth/logout", headers={"Authorization": f"Bearer {token}"} ) assert logout.status_code == 200 async def test_workflow_with_failed_and_successful_attempts(self, client): """Test workflow with mixed failed and successful attempts.""" password = "MixedAttemptsPassword123!" # Setup await client.post( "/api/auth/setup", json={"master_password": password} ) # Failed attempt response = await client.post( "/api/auth/login", json={"password": "WrongPassword123!"} ) assert response.status_code == 401 # Successful attempt response = await client.post( "/api/auth/login", json={"password": password} ) assert response.status_code == 200 # Another failed attempt response = await client.post( "/api/auth/login", json={"password": "WrongAgain123!"} ) assert response.status_code == 401 # Another successful attempt response = await client.post( "/api/auth/login", json={"password": password} ) assert response.status_code == 200