diff --git a/src/tests/e2e/test_auth_flow.py b/src/tests/e2e/test_auth_flow.py new file mode 100644 index 0000000..61da8b6 --- /dev/null +++ b/src/tests/e2e/test_auth_flow.py @@ -0,0 +1,231 @@ +""" +End-to-end tests for authentication flow. + +Tests complete user authentication scenarios including login/logout flow +and session management. +""" + +import pytest +import sys +import os +from fastapi.testclient import TestClient +from unittest.mock import patch + +# Add source directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..')) + +# Import after path setup +from src.server.fastapi_app import app # noqa: E402 + + +@pytest.fixture +def client(): + """Test client for E2E authentication tests.""" + return TestClient(app) + + +@pytest.mark.e2e +class TestAuthenticationE2E: + """End-to-end authentication tests.""" + + def test_full_authentication_workflow(self, client, mock_settings): + """Test complete authentication workflow from user perspective.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Scenario: User wants to access protected resource + + # Step 1: Try to access protected endpoint without authentication + protected_response = client.get("/api/anime/search?query=test") + assert protected_response.status_code in [401, 403] # Should be unauthorized + + # Step 2: User logs in with correct password + login_response = client.post( + "/auth/login", + json={"password": "test_password"} + ) + assert login_response.status_code == 200 + + login_data = login_response.json() + assert login_data["success"] is True + token = login_data["token"] + + # Step 3: Verify token is working + verify_response = client.get( + "/auth/verify", + headers={"Authorization": f"Bearer {token}"} + ) + assert verify_response.status_code == 200 + assert verify_response.json()["valid"] is True + + # Step 4: Access protected resource with token + # Note: This test assumes anime search endpoint exists and requires auth + protected_response_with_auth = client.get( + "/api/anime/search?query=test", + headers={"Authorization": f"Bearer {token}"} + ) + # Should not be 401/403 (actual response depends on implementation) + assert protected_response_with_auth.status_code != 403 + + # Step 5: User logs out + logout_response = client.post( + "/auth/logout", + headers={"Authorization": f"Bearer {token}"} + ) + assert logout_response.status_code == 200 + assert logout_response.json()["success"] is True + + # Step 6: Verify token behavior after logout + # Note: This depends on implementation - some systems invalidate tokens, + # others rely on expiry + # Just verify the logout endpoint worked + assert logout_response.json()["success"] is True + + def test_authentication_with_wrong_password_flow(self, client, mock_settings): + """Test authentication flow with wrong password.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Step 1: User tries to login with wrong password + login_response = client.post( + "/auth/login", + json={"password": "wrong_password"} + ) + assert login_response.status_code == 401 + + login_data = login_response.json() + assert login_data["success"] is False + assert "token" not in login_data + + # Step 2: User tries to access protected resource without valid token + protected_response = client.get("/api/anime/search?query=test") + assert protected_response.status_code in [401, 403] + + # Step 3: User tries again with correct password + correct_login_response = client.post( + "/auth/login", + json={"password": "test_password"} + ) + assert correct_login_response.status_code == 200 + assert correct_login_response.json()["success"] is True + + def test_session_expiry_simulation(self, client, mock_settings): + """Test session expiry behavior.""" + # Set very short token expiry for testing + mock_settings.token_expiry_hours = 0.001 # About 3.6 seconds + + with patch('src.server.fastapi_app.settings', mock_settings): + # Login to get token + login_response = client.post( + "/auth/login", + json={"password": "test_password"} + ) + assert login_response.status_code == 200 + token = login_response.json()["token"] + + # Token should be valid immediately + verify_response = client.get( + "/auth/verify", + headers={"Authorization": f"Bearer {token}"} + ) + assert verify_response.status_code == 200 + + # Wait for token to expire (in real implementation) + # For testing, we'll just verify the token structure is correct + import jwt + payload = jwt.decode(token, options={"verify_signature": False}) + assert "exp" in payload + assert payload["exp"] > 0 + + def test_multiple_session_management(self, client, mock_settings): + """Test managing multiple concurrent sessions.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Create multiple sessions (simulate multiple browser tabs/devices) + sessions = [] + + for i in range(3): + login_response = client.post( + "/auth/login", + json={"password": "test_password"} + ) + assert login_response.status_code == 200 + sessions.append(login_response.json()["token"]) + + # All sessions should be valid + for token in sessions: + verify_response = client.get( + "/auth/verify", + headers={"Authorization": f"Bearer {token}"} + ) + assert verify_response.status_code == 200 + + # Logout from one session + logout_response = client.post( + "/auth/logout", + headers={"Authorization": f"Bearer {sessions[0]}"} + ) + assert logout_response.status_code == 200 + + # Other sessions should still be valid (depending on implementation) + for token in sessions[1:]: + verify_response = client.get( + "/auth/verify", + headers={"Authorization": f"Bearer {token}"} + ) + # Should still be valid unless implementation invalidates all sessions + assert verify_response.status_code == 200 + + def test_authentication_error_handling(self, client, mock_settings): + """Test error handling in authentication flow.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Test various error scenarios + + # Invalid JSON + invalid_json_response = client.post( + "/auth/login", + data="invalid json", + headers={"Content-Type": "application/json"} + ) + assert invalid_json_response.status_code == 422 + + # Missing password field + missing_field_response = client.post( + "/auth/login", + json={} + ) + assert missing_field_response.status_code == 422 + + # Empty password + empty_password_response = client.post( + "/auth/login", + json={"password": ""} + ) + assert empty_password_response.status_code == 422 + + # Malformed authorization header + malformed_auth_response = client.get( + "/auth/verify", + headers={"Authorization": "InvalidFormat"} + ) + assert malformed_auth_response.status_code == 403 + + def test_security_headers_and_responses(self, client, mock_settings): + """Test security-related headers and response formats.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Test login response format + login_response = client.post( + "/auth/login", + json={"password": "test_password"} + ) + + # Check response doesn't leak sensitive information + login_data = login_response.json() + assert "password" not in str(login_data) + assert "secret" not in str(login_data).lower() + + # Test error responses don't leak sensitive information + error_response = client.post( + "/auth/login", + json={"password": "wrong_password"} + ) + + error_data = error_response.json() + assert "password" not in str(error_data) + assert "hash" not in str(error_data).lower() + assert "secret" not in str(error_data).lower() \ No newline at end of file diff --git a/src/tests/integration/test_auth_endpoints.py b/src/tests/integration/test_auth_endpoints.py new file mode 100644 index 0000000..04415a8 --- /dev/null +++ b/src/tests/integration/test_auth_endpoints.py @@ -0,0 +1,313 @@ +""" +Integration tests for authentication API endpoints. + +Tests POST /auth/login, GET /auth/verify, POST /auth/logout endpoints +with valid/invalid credentials and tokens. +""" + +import pytest +import sys +import os +from fastapi.testclient import TestClient +from unittest.mock import patch, Mock + +# Add source directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..')) + +# Import after path setup +from src.server.fastapi_app import app # noqa: E402 + + +@pytest.fixture +def client(): + """Test client for FastAPI app.""" + return TestClient(app) + + +@pytest.fixture +def mock_auth_settings(): + """Mock settings for authentication tests.""" + settings = Mock() + settings.jwt_secret_key = "test-secret-key" + settings.password_salt = "test-salt" + settings.master_password = "test_password" + settings.master_password_hash = None + settings.token_expiry_hours = 1 + return settings + + +@pytest.mark.integration +class TestAuthLogin: + """Test authentication login endpoint.""" + + def test_login_valid_credentials(self, client, mock_auth_settings): + """Test login with valid credentials.""" + with patch('src.server.fastapi_app.settings', mock_auth_settings): + response = client.post( + "/auth/login", + json={"password": "test_password"} + ) + + assert response.status_code == 200 + data = response.json() + + assert data["success"] is True + assert "token" in data + assert "expires_at" in data + assert data["message"] == "Login successful" + + def test_login_invalid_credentials(self, client, mock_auth_settings): + """Test login with invalid credentials.""" + with patch('src.server.fastapi_app.settings', mock_auth_settings): + response = client.post( + "/auth/login", + json={"password": "wrong_password"} + ) + + assert response.status_code == 401 + data = response.json() + + assert data["success"] is False + assert "token" not in data + assert "Invalid password" in data["message"] + + def test_login_missing_password(self, client): + """Test login with missing password field.""" + response = client.post( + "/auth/login", + json={} + ) + + assert response.status_code == 422 # Validation error + + def test_login_empty_password(self, client, mock_auth_settings): + """Test login with empty password.""" + with patch('src.server.fastapi_app.settings', mock_auth_settings): + response = client.post( + "/auth/login", + json={"password": ""} + ) + + assert response.status_code == 422 # Validation error (min_length=1) + + def test_login_invalid_json(self, client): + """Test login with invalid JSON payload.""" + response = client.post( + "/auth/login", + data="invalid json", + headers={"Content-Type": "application/json"} + ) + + assert response.status_code == 422 + + def test_login_wrong_content_type(self, client): + """Test login with wrong content type.""" + response = client.post( + "/auth/login", + data="password=test_password" + ) + + assert response.status_code == 422 + + +@pytest.mark.integration +class TestAuthVerify: + """Test authentication token verification endpoint.""" + + def test_verify_valid_token(self, client, mock_auth_settings, valid_jwt_token): + """Test token verification with valid token.""" + with patch('src.server.fastapi_app.settings', mock_auth_settings): + response = client.get( + "/auth/verify", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 200 + data = response.json() + + assert data["valid"] is True + assert data["user"] == "test_user" + assert "expires_at" in data + + def test_verify_expired_token(self, client, mock_auth_settings, expired_jwt_token): + """Test token verification with expired token.""" + with patch('src.server.fastapi_app.settings', mock_auth_settings): + response = client.get( + "/auth/verify", + headers={"Authorization": f"Bearer {expired_jwt_token}"} + ) + + assert response.status_code == 401 + data = response.json() + + assert data["valid"] is False + assert "expired" in data["message"].lower() + + def test_verify_invalid_token(self, client, mock_auth_settings): + """Test token verification with invalid token.""" + with patch('src.server.fastapi_app.settings', mock_auth_settings): + response = client.get( + "/auth/verify", + headers={"Authorization": "Bearer invalid.token.here"} + ) + + assert response.status_code == 401 + data = response.json() + + assert data["valid"] is False + + def test_verify_missing_token(self, client): + """Test token verification without token.""" + response = client.get("/auth/verify") + + assert response.status_code == 403 # Forbidden - no credentials + + def test_verify_malformed_header(self, client): + """Test token verification with malformed authorization header.""" + response = client.get( + "/auth/verify", + headers={"Authorization": "InvalidFormat token"} + ) + + assert response.status_code == 403 + + def test_verify_empty_token(self, client): + """Test token verification with empty token.""" + response = client.get( + "/auth/verify", + headers={"Authorization": "Bearer "} + ) + + assert response.status_code == 401 + + +@pytest.mark.integration +class TestAuthLogout: + """Test authentication logout endpoint.""" + + def test_logout_valid_token(self, client, mock_auth_settings, valid_jwt_token): + """Test logout with valid token.""" + with patch('src.server.fastapi_app.settings', mock_auth_settings): + response = client.post( + "/auth/logout", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 200 + data = response.json() + + assert data["success"] is True + assert "logged out" in data["message"].lower() + + def test_logout_invalid_token(self, client, mock_auth_settings): + """Test logout with invalid token.""" + with patch('src.server.fastapi_app.settings', mock_auth_settings): + response = client.post( + "/auth/logout", + headers={"Authorization": "Bearer invalid.token"} + ) + + assert response.status_code == 401 + + def test_logout_missing_token(self, client): + """Test logout without token.""" + response = client.post("/auth/logout") + + assert response.status_code == 403 + + def test_logout_expired_token(self, client, mock_auth_settings, expired_jwt_token): + """Test logout with expired token.""" + with patch('src.server.fastapi_app.settings', mock_auth_settings): + response = client.post( + "/auth/logout", + headers={"Authorization": f"Bearer {expired_jwt_token}"} + ) + + assert response.status_code == 401 + + +@pytest.mark.integration +class TestAuthFlow: + """Test complete authentication flow.""" + + def test_complete_login_verify_logout_flow(self, client, mock_auth_settings): + """Test complete authentication flow: login -> verify -> logout.""" + with patch('src.server.fastapi_app.settings', mock_auth_settings): + # Step 1: Login + login_response = client.post( + "/auth/login", + json={"password": "test_password"} + ) + + assert login_response.status_code == 200 + login_data = login_response.json() + token = login_data["token"] + + # Step 2: Verify token + verify_response = client.get( + "/auth/verify", + headers={"Authorization": f"Bearer {token}"} + ) + + assert verify_response.status_code == 200 + verify_data = verify_response.json() + assert verify_data["valid"] is True + + # Step 3: Logout + logout_response = client.post( + "/auth/logout", + headers={"Authorization": f"Bearer {token}"} + ) + + assert logout_response.status_code == 200 + logout_data = logout_response.json() + assert logout_data["success"] is True + + def test_multiple_login_attempts(self, client, mock_auth_settings): + """Test multiple login attempts with rate limiting consideration.""" + with patch('src.server.fastapi_app.settings', mock_auth_settings): + # Multiple successful logins should work + for _ in range(3): + response = client.post( + "/auth/login", + json={"password": "test_password"} + ) + assert response.status_code == 200 + + # Failed login attempts + for _ in range(3): + response = client.post( + "/auth/login", + json={"password": "wrong_password"} + ) + assert response.status_code == 401 + + def test_concurrent_sessions(self, client, mock_auth_settings): + """Test that multiple valid tokens can exist simultaneously.""" + with patch('src.server.fastapi_app.settings', mock_auth_settings): + # Get first token + response1 = client.post( + "/auth/login", + json={"password": "test_password"} + ) + token1 = response1.json()["token"] + + # Get second token + response2 = client.post( + "/auth/login", + json={"password": "test_password"} + ) + token2 = response2.json()["token"] + + # Both tokens should be valid + verify1 = client.get( + "/auth/verify", + headers={"Authorization": f"Bearer {token1}"} + ) + verify2 = client.get( + "/auth/verify", + headers={"Authorization": f"Bearer {token2}"} + ) + + assert verify1.status_code == 200 + assert verify2.status_code == 200 \ No newline at end of file diff --git a/src/tests/unit/test_auth_security.py b/src/tests/unit/test_auth_security.py new file mode 100644 index 0000000..1ed3a7e --- /dev/null +++ b/src/tests/unit/test_auth_security.py @@ -0,0 +1,269 @@ +""" +Unit tests for authentication and security functionality. + +Tests password hashing, JWT creation/validation, session timeout logic, +and secure environment variable management. +""" + +import pytest +import hashlib +import jwt +import sys +import os +from datetime import datetime, timedelta +from unittest.mock import patch + +# Add source directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..')) + +from src.server.fastapi_app import ( + hash_password, + verify_master_password, + generate_jwt_token, + verify_jwt_token, + Settings +) + + +@pytest.mark.unit +class TestPasswordHashing: + """Test password hashing functionality.""" + + def test_hash_password_with_salt(self, mock_settings): + """Test password hashing with salt.""" + with patch('src.server.fastapi_app.settings', mock_settings): + password = "test_password" + expected_hash = hashlib.sha256( + (password + mock_settings.password_salt).encode() + ).hexdigest() + + result = hash_password(password) + + assert result == expected_hash + assert len(result) == 64 # SHA-256 produces 64 character hex string + + def test_hash_password_different_inputs(self, mock_settings): + """Test that different passwords produce different hashes.""" + with patch('src.server.fastapi_app.settings', mock_settings): + hash1 = hash_password("password1") + hash2 = hash_password("password2") + + assert hash1 != hash2 + + def test_hash_password_consistent(self, mock_settings): + """Test that same password always produces same hash.""" + with patch('src.server.fastapi_app.settings', mock_settings): + password = "consistent_password" + hash1 = hash_password(password) + hash2 = hash_password(password) + + assert hash1 == hash2 + + def test_hash_password_empty_string(self, mock_settings): + """Test hashing empty string.""" + with patch('src.server.fastapi_app.settings', mock_settings): + result = hash_password("") + + assert isinstance(result, str) + assert len(result) == 64 + + +@pytest.mark.unit +class TestMasterPasswordVerification: + """Test master password verification functionality.""" + + def test_verify_master_password_with_hash(self, mock_settings): + """Test password verification using stored hash.""" + password = "test_password" + mock_settings.master_password_hash = hash_password(password) + mock_settings.master_password = None + + with patch('src.server.fastapi_app.settings', mock_settings): + assert verify_master_password(password) is True + assert verify_master_password("wrong_password") is False + + def test_verify_master_password_with_plain_text(self, mock_settings): + """Test password verification using plain text (development mode).""" + password = "test_password" + mock_settings.master_password_hash = None + mock_settings.master_password = password + + with patch('src.server.fastapi_app.settings', mock_settings): + assert verify_master_password(password) is True + assert verify_master_password("wrong_password") is False + + def test_verify_master_password_no_config(self, mock_settings): + """Test password verification when no password is configured.""" + mock_settings.master_password_hash = None + mock_settings.master_password = None + + with patch('src.server.fastapi_app.settings', mock_settings): + assert verify_master_password("any_password") is False + + +@pytest.mark.unit +class TestJWTGeneration: + """Test JWT token generation functionality.""" + + def test_generate_jwt_token_structure(self, mock_settings): + """Test JWT token generation returns correct structure.""" + with patch('src.server.fastapi_app.settings', mock_settings): + result = generate_jwt_token() + + assert "token" in result + assert "expires_at" in result + assert isinstance(result["token"], str) + assert isinstance(result["expires_at"], datetime) + + def test_generate_jwt_token_expiry(self, mock_settings): + """Test JWT token has correct expiry time.""" + with patch('src.server.fastapi_app.settings', mock_settings): + before_generation = datetime.utcnow() + result = generate_jwt_token() + after_generation = datetime.utcnow() + + expected_expiry_min = before_generation + timedelta( + hours=mock_settings.token_expiry_hours + ) + expected_expiry_max = after_generation + timedelta( + hours=mock_settings.token_expiry_hours + ) + + assert expected_expiry_min <= result["expires_at"] <= expected_expiry_max + + def test_generate_jwt_token_payload(self, mock_settings): + """Test JWT token contains correct payload.""" + with patch('src.server.fastapi_app.settings', mock_settings): + result = generate_jwt_token() + + # Decode without verification to check payload + payload = jwt.decode( + result["token"], + options={"verify_signature": False} + ) + + assert payload["user"] == "master" + assert payload["iss"] == "aniworld-fastapi-server" + assert "exp" in payload + assert "iat" in payload + + +@pytest.mark.unit +class TestJWTVerification: + """Test JWT token verification functionality.""" + + def test_verify_valid_jwt_token(self, mock_settings, valid_jwt_token): + """Test verification of valid JWT token.""" + with patch('src.server.fastapi_app.settings', mock_settings): + result = verify_jwt_token(valid_jwt_token) + + assert result is not None + assert result["user"] == "test_user" + assert "exp" in result + + def test_verify_expired_jwt_token(self, mock_settings, expired_jwt_token): + """Test verification of expired JWT token.""" + with patch('src.server.fastapi_app.settings', mock_settings): + result = verify_jwt_token(expired_jwt_token) + + assert result is None + + def test_verify_invalid_jwt_token(self, mock_settings): + """Test verification of invalid JWT token.""" + with patch('src.server.fastapi_app.settings', mock_settings): + result = verify_jwt_token("invalid.token.here") + + assert result is None + + def test_verify_jwt_token_wrong_secret(self, mock_settings): + """Test verification with wrong secret key.""" + # Generate token with different secret + payload = { + "user": "test_user", + "exp": datetime.utcnow() + timedelta(hours=1) + } + wrong_token = jwt.encode(payload, "wrong-secret", algorithm="HS256") + + with patch('src.server.fastapi_app.settings', mock_settings): + result = verify_jwt_token(wrong_token) + + assert result is None + + +@pytest.mark.unit +class TestSessionTimeout: + """Test session timeout logic.""" + + def test_token_expiry_calculation(self, mock_settings): + """Test that token expiry is calculated correctly.""" + mock_settings.token_expiry_hours = 24 + + with patch('src.server.fastapi_app.settings', mock_settings): + before = datetime.utcnow() + result = generate_jwt_token() + after = datetime.utcnow() + + # Check expiry is approximately 24 hours from now + expected_min = before + timedelta(hours=24) + expected_max = after + timedelta(hours=24) + + assert expected_min <= result["expires_at"] <= expected_max + + def test_custom_expiry_hours(self, mock_settings): + """Test token generation with custom expiry hours.""" + mock_settings.token_expiry_hours = 1 + + with patch('src.server.fastapi_app.settings', mock_settings): + result = generate_jwt_token() + + # Decode token to check expiry + payload = jwt.decode( + result["token"], + mock_settings.jwt_secret_key, + algorithms=["HS256"] + ) + + token_exp = datetime.fromtimestamp(payload["exp"]) + expected_exp = result["expires_at"] + + # Should be approximately the same (within 1 second) + assert abs((token_exp - expected_exp).total_seconds()) < 1 + + +@pytest.mark.unit +class TestSecurityConfiguration: + """Test secure environment variable management.""" + + def test_settings_defaults(self): + """Test that settings have secure defaults.""" + settings = Settings() + + # Should have default values + assert settings.jwt_secret_key is not None + assert settings.password_salt is not None + assert settings.token_expiry_hours > 0 + + def test_settings_from_env(self): + """Test settings loading from environment variables.""" + with patch.dict(os.environ, { + 'JWT_SECRET_KEY': 'test-secret', + 'PASSWORD_SALT': 'test-salt', + 'SESSION_TIMEOUT_HOURS': '12' + }): + settings = Settings() + + assert settings.jwt_secret_key == 'test-secret' + assert settings.password_salt == 'test-salt' + assert settings.token_expiry_hours == 12 + + def test_sensitive_data_not_logged(self, mock_settings, caplog): + """Test that sensitive data is not logged.""" + password = "sensitive_password" + + with patch('src.server.fastapi_app.settings', mock_settings): + hash_password(password) + verify_master_password(password) + + # Check that password doesn't appear in logs + for record in caplog.records: + assert password not in record.message \ No newline at end of file