Add comprehensive authentication and security tests

- Unit tests for password hashing, JWT generation/validation, session timeout
- Integration tests for auth endpoints (login, verify, logout)
- E2E tests for complete authentication flows
- Tests cover valid/invalid credentials, token expiry, error handling
- Added security tests to prevent information leakage
This commit is contained in:
Lukas Pupka-Lipinski 2025-10-06 10:50:19 +02:00
parent f550ec05e3
commit 7f27ff823a
3 changed files with 813 additions and 0 deletions

View File

@ -0,0 +1,231 @@
"""
End-to-end tests for authentication flow.
Tests complete user authentication scenarios including login/logout flow
and session management.
"""
import pytest
import sys
import os
from fastapi.testclient import TestClient
from unittest.mock import patch
# Add source directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..'))
# Import after path setup
from src.server.fastapi_app import app # noqa: E402
@pytest.fixture
def client():
"""Test client for E2E authentication tests."""
return TestClient(app)
@pytest.mark.e2e
class TestAuthenticationE2E:
"""End-to-end authentication tests."""
def test_full_authentication_workflow(self, client, mock_settings):
"""Test complete authentication workflow from user perspective."""
with patch('src.server.fastapi_app.settings', mock_settings):
# Scenario: User wants to access protected resource
# Step 1: Try to access protected endpoint without authentication
protected_response = client.get("/api/anime/search?query=test")
assert protected_response.status_code in [401, 403] # Should be unauthorized
# Step 2: User logs in with correct password
login_response = client.post(
"/auth/login",
json={"password": "test_password"}
)
assert login_response.status_code == 200
login_data = login_response.json()
assert login_data["success"] is True
token = login_data["token"]
# Step 3: Verify token is working
verify_response = client.get(
"/auth/verify",
headers={"Authorization": f"Bearer {token}"}
)
assert verify_response.status_code == 200
assert verify_response.json()["valid"] is True
# Step 4: Access protected resource with token
# Note: This test assumes anime search endpoint exists and requires auth
protected_response_with_auth = client.get(
"/api/anime/search?query=test",
headers={"Authorization": f"Bearer {token}"}
)
# Should not be 401/403 (actual response depends on implementation)
assert protected_response_with_auth.status_code != 403
# Step 5: User logs out
logout_response = client.post(
"/auth/logout",
headers={"Authorization": f"Bearer {token}"}
)
assert logout_response.status_code == 200
assert logout_response.json()["success"] is True
# Step 6: Verify token behavior after logout
# Note: This depends on implementation - some systems invalidate tokens,
# others rely on expiry
# Just verify the logout endpoint worked
assert logout_response.json()["success"] is True
def test_authentication_with_wrong_password_flow(self, client, mock_settings):
"""Test authentication flow with wrong password."""
with patch('src.server.fastapi_app.settings', mock_settings):
# Step 1: User tries to login with wrong password
login_response = client.post(
"/auth/login",
json={"password": "wrong_password"}
)
assert login_response.status_code == 401
login_data = login_response.json()
assert login_data["success"] is False
assert "token" not in login_data
# Step 2: User tries to access protected resource without valid token
protected_response = client.get("/api/anime/search?query=test")
assert protected_response.status_code in [401, 403]
# Step 3: User tries again with correct password
correct_login_response = client.post(
"/auth/login",
json={"password": "test_password"}
)
assert correct_login_response.status_code == 200
assert correct_login_response.json()["success"] is True
def test_session_expiry_simulation(self, client, mock_settings):
"""Test session expiry behavior."""
# Set very short token expiry for testing
mock_settings.token_expiry_hours = 0.001 # About 3.6 seconds
with patch('src.server.fastapi_app.settings', mock_settings):
# Login to get token
login_response = client.post(
"/auth/login",
json={"password": "test_password"}
)
assert login_response.status_code == 200
token = login_response.json()["token"]
# Token should be valid immediately
verify_response = client.get(
"/auth/verify",
headers={"Authorization": f"Bearer {token}"}
)
assert verify_response.status_code == 200
# Wait for token to expire (in real implementation)
# For testing, we'll just verify the token structure is correct
import jwt
payload = jwt.decode(token, options={"verify_signature": False})
assert "exp" in payload
assert payload["exp"] > 0
def test_multiple_session_management(self, client, mock_settings):
"""Test managing multiple concurrent sessions."""
with patch('src.server.fastapi_app.settings', mock_settings):
# Create multiple sessions (simulate multiple browser tabs/devices)
sessions = []
for i in range(3):
login_response = client.post(
"/auth/login",
json={"password": "test_password"}
)
assert login_response.status_code == 200
sessions.append(login_response.json()["token"])
# All sessions should be valid
for token in sessions:
verify_response = client.get(
"/auth/verify",
headers={"Authorization": f"Bearer {token}"}
)
assert verify_response.status_code == 200
# Logout from one session
logout_response = client.post(
"/auth/logout",
headers={"Authorization": f"Bearer {sessions[0]}"}
)
assert logout_response.status_code == 200
# Other sessions should still be valid (depending on implementation)
for token in sessions[1:]:
verify_response = client.get(
"/auth/verify",
headers={"Authorization": f"Bearer {token}"}
)
# Should still be valid unless implementation invalidates all sessions
assert verify_response.status_code == 200
def test_authentication_error_handling(self, client, mock_settings):
"""Test error handling in authentication flow."""
with patch('src.server.fastapi_app.settings', mock_settings):
# Test various error scenarios
# Invalid JSON
invalid_json_response = client.post(
"/auth/login",
data="invalid json",
headers={"Content-Type": "application/json"}
)
assert invalid_json_response.status_code == 422
# Missing password field
missing_field_response = client.post(
"/auth/login",
json={}
)
assert missing_field_response.status_code == 422
# Empty password
empty_password_response = client.post(
"/auth/login",
json={"password": ""}
)
assert empty_password_response.status_code == 422
# Malformed authorization header
malformed_auth_response = client.get(
"/auth/verify",
headers={"Authorization": "InvalidFormat"}
)
assert malformed_auth_response.status_code == 403
def test_security_headers_and_responses(self, client, mock_settings):
"""Test security-related headers and response formats."""
with patch('src.server.fastapi_app.settings', mock_settings):
# Test login response format
login_response = client.post(
"/auth/login",
json={"password": "test_password"}
)
# Check response doesn't leak sensitive information
login_data = login_response.json()
assert "password" not in str(login_data)
assert "secret" not in str(login_data).lower()
# Test error responses don't leak sensitive information
error_response = client.post(
"/auth/login",
json={"password": "wrong_password"}
)
error_data = error_response.json()
assert "password" not in str(error_data)
assert "hash" not in str(error_data).lower()
assert "secret" not in str(error_data).lower()

View File

@ -0,0 +1,313 @@
"""
Integration tests for authentication API endpoints.
Tests POST /auth/login, GET /auth/verify, POST /auth/logout endpoints
with valid/invalid credentials and tokens.
"""
import pytest
import sys
import os
from fastapi.testclient import TestClient
from unittest.mock import patch, Mock
# Add source directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..'))
# Import after path setup
from src.server.fastapi_app import app # noqa: E402
@pytest.fixture
def client():
"""Test client for FastAPI app."""
return TestClient(app)
@pytest.fixture
def mock_auth_settings():
"""Mock settings for authentication tests."""
settings = Mock()
settings.jwt_secret_key = "test-secret-key"
settings.password_salt = "test-salt"
settings.master_password = "test_password"
settings.master_password_hash = None
settings.token_expiry_hours = 1
return settings
@pytest.mark.integration
class TestAuthLogin:
"""Test authentication login endpoint."""
def test_login_valid_credentials(self, client, mock_auth_settings):
"""Test login with valid credentials."""
with patch('src.server.fastapi_app.settings', mock_auth_settings):
response = client.post(
"/auth/login",
json={"password": "test_password"}
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert "token" in data
assert "expires_at" in data
assert data["message"] == "Login successful"
def test_login_invalid_credentials(self, client, mock_auth_settings):
"""Test login with invalid credentials."""
with patch('src.server.fastapi_app.settings', mock_auth_settings):
response = client.post(
"/auth/login",
json={"password": "wrong_password"}
)
assert response.status_code == 401
data = response.json()
assert data["success"] is False
assert "token" not in data
assert "Invalid password" in data["message"]
def test_login_missing_password(self, client):
"""Test login with missing password field."""
response = client.post(
"/auth/login",
json={}
)
assert response.status_code == 422 # Validation error
def test_login_empty_password(self, client, mock_auth_settings):
"""Test login with empty password."""
with patch('src.server.fastapi_app.settings', mock_auth_settings):
response = client.post(
"/auth/login",
json={"password": ""}
)
assert response.status_code == 422 # Validation error (min_length=1)
def test_login_invalid_json(self, client):
"""Test login with invalid JSON payload."""
response = client.post(
"/auth/login",
data="invalid json",
headers={"Content-Type": "application/json"}
)
assert response.status_code == 422
def test_login_wrong_content_type(self, client):
"""Test login with wrong content type."""
response = client.post(
"/auth/login",
data="password=test_password"
)
assert response.status_code == 422
@pytest.mark.integration
class TestAuthVerify:
"""Test authentication token verification endpoint."""
def test_verify_valid_token(self, client, mock_auth_settings, valid_jwt_token):
"""Test token verification with valid token."""
with patch('src.server.fastapi_app.settings', mock_auth_settings):
response = client.get(
"/auth/verify",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 200
data = response.json()
assert data["valid"] is True
assert data["user"] == "test_user"
assert "expires_at" in data
def test_verify_expired_token(self, client, mock_auth_settings, expired_jwt_token):
"""Test token verification with expired token."""
with patch('src.server.fastapi_app.settings', mock_auth_settings):
response = client.get(
"/auth/verify",
headers={"Authorization": f"Bearer {expired_jwt_token}"}
)
assert response.status_code == 401
data = response.json()
assert data["valid"] is False
assert "expired" in data["message"].lower()
def test_verify_invalid_token(self, client, mock_auth_settings):
"""Test token verification with invalid token."""
with patch('src.server.fastapi_app.settings', mock_auth_settings):
response = client.get(
"/auth/verify",
headers={"Authorization": "Bearer invalid.token.here"}
)
assert response.status_code == 401
data = response.json()
assert data["valid"] is False
def test_verify_missing_token(self, client):
"""Test token verification without token."""
response = client.get("/auth/verify")
assert response.status_code == 403 # Forbidden - no credentials
def test_verify_malformed_header(self, client):
"""Test token verification with malformed authorization header."""
response = client.get(
"/auth/verify",
headers={"Authorization": "InvalidFormat token"}
)
assert response.status_code == 403
def test_verify_empty_token(self, client):
"""Test token verification with empty token."""
response = client.get(
"/auth/verify",
headers={"Authorization": "Bearer "}
)
assert response.status_code == 401
@pytest.mark.integration
class TestAuthLogout:
"""Test authentication logout endpoint."""
def test_logout_valid_token(self, client, mock_auth_settings, valid_jwt_token):
"""Test logout with valid token."""
with patch('src.server.fastapi_app.settings', mock_auth_settings):
response = client.post(
"/auth/logout",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert "logged out" in data["message"].lower()
def test_logout_invalid_token(self, client, mock_auth_settings):
"""Test logout with invalid token."""
with patch('src.server.fastapi_app.settings', mock_auth_settings):
response = client.post(
"/auth/logout",
headers={"Authorization": "Bearer invalid.token"}
)
assert response.status_code == 401
def test_logout_missing_token(self, client):
"""Test logout without token."""
response = client.post("/auth/logout")
assert response.status_code == 403
def test_logout_expired_token(self, client, mock_auth_settings, expired_jwt_token):
"""Test logout with expired token."""
with patch('src.server.fastapi_app.settings', mock_auth_settings):
response = client.post(
"/auth/logout",
headers={"Authorization": f"Bearer {expired_jwt_token}"}
)
assert response.status_code == 401
@pytest.mark.integration
class TestAuthFlow:
"""Test complete authentication flow."""
def test_complete_login_verify_logout_flow(self, client, mock_auth_settings):
"""Test complete authentication flow: login -> verify -> logout."""
with patch('src.server.fastapi_app.settings', mock_auth_settings):
# Step 1: Login
login_response = client.post(
"/auth/login",
json={"password": "test_password"}
)
assert login_response.status_code == 200
login_data = login_response.json()
token = login_data["token"]
# Step 2: Verify token
verify_response = client.get(
"/auth/verify",
headers={"Authorization": f"Bearer {token}"}
)
assert verify_response.status_code == 200
verify_data = verify_response.json()
assert verify_data["valid"] is True
# Step 3: Logout
logout_response = client.post(
"/auth/logout",
headers={"Authorization": f"Bearer {token}"}
)
assert logout_response.status_code == 200
logout_data = logout_response.json()
assert logout_data["success"] is True
def test_multiple_login_attempts(self, client, mock_auth_settings):
"""Test multiple login attempts with rate limiting consideration."""
with patch('src.server.fastapi_app.settings', mock_auth_settings):
# Multiple successful logins should work
for _ in range(3):
response = client.post(
"/auth/login",
json={"password": "test_password"}
)
assert response.status_code == 200
# Failed login attempts
for _ in range(3):
response = client.post(
"/auth/login",
json={"password": "wrong_password"}
)
assert response.status_code == 401
def test_concurrent_sessions(self, client, mock_auth_settings):
"""Test that multiple valid tokens can exist simultaneously."""
with patch('src.server.fastapi_app.settings', mock_auth_settings):
# Get first token
response1 = client.post(
"/auth/login",
json={"password": "test_password"}
)
token1 = response1.json()["token"]
# Get second token
response2 = client.post(
"/auth/login",
json={"password": "test_password"}
)
token2 = response2.json()["token"]
# Both tokens should be valid
verify1 = client.get(
"/auth/verify",
headers={"Authorization": f"Bearer {token1}"}
)
verify2 = client.get(
"/auth/verify",
headers={"Authorization": f"Bearer {token2}"}
)
assert verify1.status_code == 200
assert verify2.status_code == 200

View File

@ -0,0 +1,269 @@
"""
Unit tests for authentication and security functionality.
Tests password hashing, JWT creation/validation, session timeout logic,
and secure environment variable management.
"""
import pytest
import hashlib
import jwt
import sys
import os
from datetime import datetime, timedelta
from unittest.mock import patch
# Add source directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..'))
from src.server.fastapi_app import (
hash_password,
verify_master_password,
generate_jwt_token,
verify_jwt_token,
Settings
)
@pytest.mark.unit
class TestPasswordHashing:
"""Test password hashing functionality."""
def test_hash_password_with_salt(self, mock_settings):
"""Test password hashing with salt."""
with patch('src.server.fastapi_app.settings', mock_settings):
password = "test_password"
expected_hash = hashlib.sha256(
(password + mock_settings.password_salt).encode()
).hexdigest()
result = hash_password(password)
assert result == expected_hash
assert len(result) == 64 # SHA-256 produces 64 character hex string
def test_hash_password_different_inputs(self, mock_settings):
"""Test that different passwords produce different hashes."""
with patch('src.server.fastapi_app.settings', mock_settings):
hash1 = hash_password("password1")
hash2 = hash_password("password2")
assert hash1 != hash2
def test_hash_password_consistent(self, mock_settings):
"""Test that same password always produces same hash."""
with patch('src.server.fastapi_app.settings', mock_settings):
password = "consistent_password"
hash1 = hash_password(password)
hash2 = hash_password(password)
assert hash1 == hash2
def test_hash_password_empty_string(self, mock_settings):
"""Test hashing empty string."""
with patch('src.server.fastapi_app.settings', mock_settings):
result = hash_password("")
assert isinstance(result, str)
assert len(result) == 64
@pytest.mark.unit
class TestMasterPasswordVerification:
"""Test master password verification functionality."""
def test_verify_master_password_with_hash(self, mock_settings):
"""Test password verification using stored hash."""
password = "test_password"
mock_settings.master_password_hash = hash_password(password)
mock_settings.master_password = None
with patch('src.server.fastapi_app.settings', mock_settings):
assert verify_master_password(password) is True
assert verify_master_password("wrong_password") is False
def test_verify_master_password_with_plain_text(self, mock_settings):
"""Test password verification using plain text (development mode)."""
password = "test_password"
mock_settings.master_password_hash = None
mock_settings.master_password = password
with patch('src.server.fastapi_app.settings', mock_settings):
assert verify_master_password(password) is True
assert verify_master_password("wrong_password") is False
def test_verify_master_password_no_config(self, mock_settings):
"""Test password verification when no password is configured."""
mock_settings.master_password_hash = None
mock_settings.master_password = None
with patch('src.server.fastapi_app.settings', mock_settings):
assert verify_master_password("any_password") is False
@pytest.mark.unit
class TestJWTGeneration:
"""Test JWT token generation functionality."""
def test_generate_jwt_token_structure(self, mock_settings):
"""Test JWT token generation returns correct structure."""
with patch('src.server.fastapi_app.settings', mock_settings):
result = generate_jwt_token()
assert "token" in result
assert "expires_at" in result
assert isinstance(result["token"], str)
assert isinstance(result["expires_at"], datetime)
def test_generate_jwt_token_expiry(self, mock_settings):
"""Test JWT token has correct expiry time."""
with patch('src.server.fastapi_app.settings', mock_settings):
before_generation = datetime.utcnow()
result = generate_jwt_token()
after_generation = datetime.utcnow()
expected_expiry_min = before_generation + timedelta(
hours=mock_settings.token_expiry_hours
)
expected_expiry_max = after_generation + timedelta(
hours=mock_settings.token_expiry_hours
)
assert expected_expiry_min <= result["expires_at"] <= expected_expiry_max
def test_generate_jwt_token_payload(self, mock_settings):
"""Test JWT token contains correct payload."""
with patch('src.server.fastapi_app.settings', mock_settings):
result = generate_jwt_token()
# Decode without verification to check payload
payload = jwt.decode(
result["token"],
options={"verify_signature": False}
)
assert payload["user"] == "master"
assert payload["iss"] == "aniworld-fastapi-server"
assert "exp" in payload
assert "iat" in payload
@pytest.mark.unit
class TestJWTVerification:
"""Test JWT token verification functionality."""
def test_verify_valid_jwt_token(self, mock_settings, valid_jwt_token):
"""Test verification of valid JWT token."""
with patch('src.server.fastapi_app.settings', mock_settings):
result = verify_jwt_token(valid_jwt_token)
assert result is not None
assert result["user"] == "test_user"
assert "exp" in result
def test_verify_expired_jwt_token(self, mock_settings, expired_jwt_token):
"""Test verification of expired JWT token."""
with patch('src.server.fastapi_app.settings', mock_settings):
result = verify_jwt_token(expired_jwt_token)
assert result is None
def test_verify_invalid_jwt_token(self, mock_settings):
"""Test verification of invalid JWT token."""
with patch('src.server.fastapi_app.settings', mock_settings):
result = verify_jwt_token("invalid.token.here")
assert result is None
def test_verify_jwt_token_wrong_secret(self, mock_settings):
"""Test verification with wrong secret key."""
# Generate token with different secret
payload = {
"user": "test_user",
"exp": datetime.utcnow() + timedelta(hours=1)
}
wrong_token = jwt.encode(payload, "wrong-secret", algorithm="HS256")
with patch('src.server.fastapi_app.settings', mock_settings):
result = verify_jwt_token(wrong_token)
assert result is None
@pytest.mark.unit
class TestSessionTimeout:
"""Test session timeout logic."""
def test_token_expiry_calculation(self, mock_settings):
"""Test that token expiry is calculated correctly."""
mock_settings.token_expiry_hours = 24
with patch('src.server.fastapi_app.settings', mock_settings):
before = datetime.utcnow()
result = generate_jwt_token()
after = datetime.utcnow()
# Check expiry is approximately 24 hours from now
expected_min = before + timedelta(hours=24)
expected_max = after + timedelta(hours=24)
assert expected_min <= result["expires_at"] <= expected_max
def test_custom_expiry_hours(self, mock_settings):
"""Test token generation with custom expiry hours."""
mock_settings.token_expiry_hours = 1
with patch('src.server.fastapi_app.settings', mock_settings):
result = generate_jwt_token()
# Decode token to check expiry
payload = jwt.decode(
result["token"],
mock_settings.jwt_secret_key,
algorithms=["HS256"]
)
token_exp = datetime.fromtimestamp(payload["exp"])
expected_exp = result["expires_at"]
# Should be approximately the same (within 1 second)
assert abs((token_exp - expected_exp).total_seconds()) < 1
@pytest.mark.unit
class TestSecurityConfiguration:
"""Test secure environment variable management."""
def test_settings_defaults(self):
"""Test that settings have secure defaults."""
settings = Settings()
# Should have default values
assert settings.jwt_secret_key is not None
assert settings.password_salt is not None
assert settings.token_expiry_hours > 0
def test_settings_from_env(self):
"""Test settings loading from environment variables."""
with patch.dict(os.environ, {
'JWT_SECRET_KEY': 'test-secret',
'PASSWORD_SALT': 'test-salt',
'SESSION_TIMEOUT_HOURS': '12'
}):
settings = Settings()
assert settings.jwt_secret_key == 'test-secret'
assert settings.password_salt == 'test-salt'
assert settings.token_expiry_hours == 12
def test_sensitive_data_not_logged(self, mock_settings, caplog):
"""Test that sensitive data is not logged."""
password = "sensitive_password"
with patch('src.server.fastapi_app.settings', mock_settings):
hash_password(password)
verify_master_password(password)
# Check that password doesn't appear in logs
for record in caplog.records:
assert password not in record.message