742 lines
24 KiB
Python
742 lines
24 KiB
Python
"""Integration tests for authentication flow.
|
|
|
|
This module tests the complete authentication flow including:
|
|
- Initial setup and master password configuration
|
|
- Login with valid/invalid credentials
|
|
- JWT token generation and validation
|
|
- Protected endpoint access control
|
|
- Token refresh and expiration
|
|
- Logout functionality
|
|
- Rate limiting and lockout mechanisms
|
|
- Session management
|
|
"""
|
|
import time
|
|
from typing import Dict, Optional
|
|
|
|
import pytest
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from src.server.fastapi_app import app
|
|
from src.server.services.auth_service import auth_service
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_auth():
|
|
"""Reset authentication state before each test."""
|
|
original_hash = auth_service._hash
|
|
auth_service._hash = None
|
|
auth_service._failed.clear()
|
|
yield
|
|
auth_service._hash = original_hash
|
|
auth_service._failed.clear()
|
|
|
|
|
|
@pytest.fixture
|
|
async def client():
|
|
"""Create an async test client."""
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
|
yield ac
|
|
|
|
|
|
class TestInitialSetup:
|
|
"""Test initial authentication setup flow."""
|
|
|
|
async def test_setup_with_strong_password(self, client):
|
|
"""Test setting up master password with strong password."""
|
|
response = await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": "StrongP@ssw0rd123"}
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert data["status"] == "ok"
|
|
|
|
async def test_setup_with_weak_password_fails(self, client):
|
|
"""Test that setup fails with weak password."""
|
|
response = await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": "weak"}
|
|
)
|
|
|
|
# Should fail validation
|
|
assert response.status_code in [400, 422]
|
|
|
|
async def test_setup_cannot_be_called_twice(self, client):
|
|
"""Test that setup can only be called once."""
|
|
# First setup succeeds
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": "FirstPassword123!"}
|
|
)
|
|
|
|
# Second setup should fail
|
|
response = await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": "SecondPassword123!"}
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert "already configured" in data["detail"].lower()
|
|
|
|
async def test_auth_status_before_setup(self, client):
|
|
"""Test authentication status before setup."""
|
|
response = await client.get("/api/auth/status")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["configured"] is False
|
|
assert data["authenticated"] is False
|
|
|
|
async def test_auth_status_after_setup(self, client):
|
|
"""Test authentication status after setup."""
|
|
# Setup
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": "SetupPassword123!"}
|
|
)
|
|
|
|
# Check status
|
|
response = await client.get("/api/auth/status")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["configured"] is True
|
|
assert data["authenticated"] is False
|
|
|
|
|
|
class TestLoginFlow:
|
|
"""Test login flow with valid and invalid credentials."""
|
|
|
|
async def test_login_with_valid_credentials(self, client):
|
|
"""Test successful login with correct password."""
|
|
# Setup
|
|
password = "ValidPassword123!"
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": password}
|
|
)
|
|
|
|
# Login
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Verify token structure
|
|
assert "access_token" in data
|
|
assert "token_type" in data
|
|
assert data["token_type"] == "bearer"
|
|
assert isinstance(data["access_token"], str)
|
|
assert len(data["access_token"]) > 0
|
|
|
|
async def test_login_with_invalid_password(self, client):
|
|
"""Test login failure with incorrect password."""
|
|
# Setup
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": "CorrectPassword123!"}
|
|
)
|
|
|
|
# Login with wrong password
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": "WrongPassword123!"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "detail" in data
|
|
assert "invalid" in data["detail"].lower()
|
|
|
|
async def test_login_before_setup_fails(self, client):
|
|
"""Test that login fails before setup is complete."""
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": "AnyPassword123!"}
|
|
)
|
|
|
|
assert response.status_code in [400, 401]
|
|
|
|
async def test_login_with_remember_me(self, client):
|
|
"""Test login with remember me option."""
|
|
# Setup
|
|
password = "RememberPassword123!"
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": password}
|
|
)
|
|
|
|
# Login with remember=true
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password, "remember": True}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "access_token" in data
|
|
|
|
# Token should be issued (expiration time may be extended)
|
|
|
|
async def test_login_without_remember_me(self, client):
|
|
"""Test login without remember me option."""
|
|
# Setup
|
|
password = "NoRememberPassword123!"
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": password}
|
|
)
|
|
|
|
# Login without remember
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password, "remember": False}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "access_token" in data
|
|
|
|
|
|
class TestTokenValidation:
|
|
"""Test JWT token validation and usage."""
|
|
|
|
async def get_valid_token(self, client) -> str:
|
|
"""Helper to get a valid authentication token."""
|
|
password = "TokenTestPassword123!"
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": password}
|
|
)
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password}
|
|
)
|
|
return response.json()["access_token"]
|
|
|
|
async def test_access_protected_endpoint_with_valid_token(self, client):
|
|
"""Test accessing protected endpoint with valid token."""
|
|
token = await self.get_valid_token(client)
|
|
|
|
# Access protected endpoint
|
|
response = await client.get(
|
|
"/api/queue/status",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
|
|
# Should succeed (or return 503 if service not configured)
|
|
assert response.status_code in [200, 503]
|
|
|
|
async def test_access_protected_endpoint_without_token(self, client):
|
|
"""Test accessing protected endpoint without token."""
|
|
response = await client.get("/api/queue/status")
|
|
|
|
assert response.status_code == 401
|
|
|
|
async def test_access_protected_endpoint_with_invalid_token(self, client):
|
|
"""Test accessing protected endpoint with invalid token."""
|
|
response = await client.get(
|
|
"/api/queue/status",
|
|
headers={"Authorization": "Bearer invalid_token_12345"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
async def test_access_protected_endpoint_with_malformed_header(
|
|
self, client
|
|
):
|
|
"""Test accessing protected endpoint with malformed auth header."""
|
|
token = await self.get_valid_token(client)
|
|
|
|
# Missing "Bearer" prefix
|
|
response = await client.get(
|
|
"/api/queue/status",
|
|
headers={"Authorization": token}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
async def test_token_works_for_multiple_requests(self, client):
|
|
"""Test that token can be reused for multiple requests."""
|
|
token = await self.get_valid_token(client)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
# Make multiple requests with same token
|
|
for _ in range(5):
|
|
response = await client.get("/api/queue/status", headers=headers)
|
|
assert response.status_code in [200, 503]
|
|
|
|
async def test_auth_status_with_valid_token(self, client):
|
|
"""Test auth status endpoint with valid token."""
|
|
token = await self.get_valid_token(client)
|
|
|
|
response = await client.get(
|
|
"/api/auth/status",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["configured"] is True
|
|
assert data["authenticated"] is True
|
|
|
|
|
|
class TestProtectedEndpoints:
|
|
"""Test that all protected endpoints enforce authentication."""
|
|
|
|
async def get_valid_token(self, client) -> str:
|
|
"""Helper to get a valid authentication token."""
|
|
password = "ProtectedTestPassword123!"
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": password}
|
|
)
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password}
|
|
)
|
|
return response.json()["access_token"]
|
|
|
|
async def test_anime_endpoints_require_auth(self, client):
|
|
"""Test that anime endpoints require authentication."""
|
|
# Without token
|
|
response = await client.get("/api/v1/anime/")
|
|
assert response.status_code == 401
|
|
|
|
# With valid token
|
|
token = await self.get_valid_token(client)
|
|
response = await client.get(
|
|
"/api/v1/anime/",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
assert response.status_code in [200, 503]
|
|
|
|
async def test_queue_endpoints_require_auth(self, client):
|
|
"""Test that queue endpoints require authentication."""
|
|
endpoints = [
|
|
("/api/queue/status", "GET"),
|
|
("/api/queue/add", "POST"),
|
|
("/api/queue/control/start", "POST"),
|
|
("/api/queue/control/pause", "POST"),
|
|
]
|
|
|
|
token = await self.get_valid_token(client)
|
|
|
|
for endpoint, method in endpoints:
|
|
# Without token
|
|
if method == "GET":
|
|
response = await client.get(endpoint)
|
|
else:
|
|
response = await client.post(endpoint, json={})
|
|
|
|
assert response.status_code in [400, 401, 422]
|
|
|
|
# With token (should pass auth, may fail validation)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
if method == "GET":
|
|
response = await client.get(endpoint, headers=headers)
|
|
else:
|
|
response = await client.post(endpoint, json={}, headers=headers)
|
|
|
|
assert response.status_code not in [401]
|
|
|
|
async def test_config_endpoints_require_auth(self, client):
|
|
"""Test that config endpoints require authentication."""
|
|
# Without token
|
|
response = await client.get("/api/config")
|
|
assert response.status_code == 401
|
|
|
|
# With token
|
|
token = await self.get_valid_token(client)
|
|
response = await client.get(
|
|
"/api/config",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
assert response.status_code in [200, 503]
|
|
|
|
async def test_download_endpoints_require_auth(self, client):
|
|
"""Test that download endpoints require authentication."""
|
|
token = await self.get_valid_token(client)
|
|
|
|
# Test queue operations require auth
|
|
response = await client.get("/api/queue/status")
|
|
assert response.status_code == 401
|
|
|
|
response = await client.get(
|
|
"/api/queue/status",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
assert response.status_code in [200, 503]
|
|
|
|
|
|
class TestLogoutFlow:
|
|
"""Test logout functionality."""
|
|
|
|
async def get_valid_token(self, client) -> str:
|
|
"""Helper to get a valid authentication token."""
|
|
password = "LogoutTestPassword123!"
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": password}
|
|
)
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password}
|
|
)
|
|
return response.json()["access_token"]
|
|
|
|
async def test_logout_with_valid_token(self, client):
|
|
"""Test logout with valid token."""
|
|
token = await self.get_valid_token(client)
|
|
|
|
response = await client.post(
|
|
"/api/auth/logout",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "ok"
|
|
|
|
async def test_logout_without_token(self, client):
|
|
"""Test logout without token."""
|
|
response = await client.post("/api/auth/logout")
|
|
|
|
# May succeed as logout is sometimes allowed without auth
|
|
assert response.status_code in [200, 401]
|
|
|
|
async def test_token_after_logout(self, client):
|
|
"""Test that token still works after logout (stateless JWT)."""
|
|
token = await self.get_valid_token(client)
|
|
|
|
# Logout
|
|
await client.post(
|
|
"/api/auth/logout",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
|
|
# Try to use token (may still work if JWT is stateless)
|
|
response = await client.get(
|
|
"/api/queue/status",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
|
|
# Stateless JWT: token may still work
|
|
# Stateful: should return 401
|
|
assert response.status_code in [200, 401, 503]
|
|
|
|
|
|
class TestRateLimitingAndLockout:
|
|
"""Test rate limiting and lockout mechanisms."""
|
|
|
|
async def test_failed_login_attempts_tracked(self, client):
|
|
"""Test that failed login attempts are tracked."""
|
|
# Setup
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": "CorrectPassword123!"}
|
|
)
|
|
|
|
# Multiple failed attempts
|
|
for _ in range(3):
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": "WrongPassword123!"}
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
async def test_lockout_after_max_failed_attempts(self, client):
|
|
"""Test account lockout after maximum failed attempts."""
|
|
# Setup (counts as 1 request towards rate limit)
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": "CorrectPassword123!"}
|
|
)
|
|
|
|
# Make multiple failed attempts to trigger lockout
|
|
# Note: setup used 1 request, so we can make 4 more before rate limit
|
|
for i in range(6): # More than max allowed
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": "WrongPassword123!"}
|
|
)
|
|
|
|
if i < 4:
|
|
# First 4 login attempts get 401 (setup + 4 = 5 total)
|
|
assert response.status_code == 401
|
|
else:
|
|
# 5th and 6th attempts should be rate limited or rejected
|
|
assert response.status_code in [401, 429]
|
|
|
|
async def test_successful_login_resets_failed_attempts(self, client):
|
|
"""Test that successful login resets failed attempt counter."""
|
|
# Setup
|
|
password = "ResetCounterPassword123!"
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": password}
|
|
)
|
|
|
|
# Failed attempts
|
|
for _ in range(2):
|
|
await client.post(
|
|
"/api/auth/login",
|
|
json={"password": "WrongPassword123!"}
|
|
)
|
|
|
|
# Successful login
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password}
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# Should be able to make more attempts (counter reset)
|
|
await client.post(
|
|
"/api/auth/login",
|
|
json={"password": "WrongPassword123!"}
|
|
)
|
|
|
|
|
|
class TestSessionManagement:
|
|
"""Test session management and concurrent sessions."""
|
|
|
|
async def get_valid_token(self, client) -> str:
|
|
"""Helper to get a valid authentication token."""
|
|
password = "SessionTestPassword123!"
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": password}
|
|
)
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password}
|
|
)
|
|
return response.json()["access_token"]
|
|
|
|
async def test_multiple_concurrent_sessions(self, client):
|
|
"""Test that multiple sessions can exist simultaneously."""
|
|
password = "MultiSessionPassword123!"
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": password}
|
|
)
|
|
|
|
# Create multiple sessions
|
|
tokens = []
|
|
for _ in range(3):
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password}
|
|
)
|
|
assert response.status_code == 200
|
|
tokens.append(response.json()["access_token"])
|
|
|
|
# All tokens should work
|
|
for token in tokens:
|
|
response = await client.get(
|
|
"/api/queue/status",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
assert response.status_code in [200, 503]
|
|
|
|
async def test_independent_token_lifetimes(self, client):
|
|
"""Test that tokens have independent lifetimes."""
|
|
token1 = await self.get_valid_token(client)
|
|
|
|
# Small delay
|
|
time.sleep(0.1)
|
|
|
|
token2 = await self.get_valid_token(client)
|
|
|
|
# Both tokens should work
|
|
for token in [token1, token2]:
|
|
response = await client.get(
|
|
"/api/queue/status",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
assert response.status_code in [200, 503]
|
|
|
|
|
|
class TestAuthenticationEdgeCases:
|
|
"""Test edge cases and error scenarios."""
|
|
|
|
async def test_empty_password_in_setup(self, client):
|
|
"""Test setup with empty password."""
|
|
response = await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": ""}
|
|
)
|
|
|
|
assert response.status_code in [400, 422]
|
|
|
|
async def test_empty_password_in_login(self, client):
|
|
"""Test login with empty password."""
|
|
# Setup first
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": "ValidPassword123!"}
|
|
)
|
|
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": ""}
|
|
)
|
|
|
|
assert response.status_code in [400, 401, 422]
|
|
|
|
async def test_missing_password_field(self, client):
|
|
"""Test requests with missing password field."""
|
|
response = await client.post(
|
|
"/api/auth/setup",
|
|
json={}
|
|
)
|
|
|
|
assert response.status_code == 422 # Validation error
|
|
|
|
async def test_malformed_json_in_auth_requests(self, client):
|
|
"""Test authentication with malformed JSON."""
|
|
response = await client.post(
|
|
"/api/auth/setup",
|
|
content="not valid json",
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
|
|
assert response.status_code in [400, 422]
|
|
|
|
async def test_extremely_long_password(self, client):
|
|
"""Test setup with extremely long password."""
|
|
long_password = "P@ssw0rd" + "x" * 10000
|
|
|
|
response = await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": long_password}
|
|
)
|
|
|
|
# Should handle gracefully (accept or reject)
|
|
assert response.status_code in [201, 400, 413, 422]
|
|
|
|
async def test_special_characters_in_password(self, client):
|
|
"""Test password with various special characters."""
|
|
special_password = "P@$$w0rd!#%^&*()_+-=[]{}|;:',.<>?/~`"
|
|
|
|
response = await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": special_password}
|
|
)
|
|
|
|
# Should accept special characters
|
|
assert response.status_code in [201, 400]
|
|
|
|
async def test_unicode_characters_in_password(self, client):
|
|
"""Test password with unicode characters."""
|
|
unicode_password = "Pässwörd123!日本語"
|
|
|
|
response = await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": unicode_password}
|
|
)
|
|
|
|
# Should handle unicode gracefully
|
|
assert response.status_code in [201, 400, 422]
|
|
|
|
|
|
class TestCompleteAuthenticationWorkflow:
|
|
"""Test complete authentication workflows."""
|
|
|
|
async def test_full_authentication_cycle(self, client):
|
|
"""Test complete authentication cycle from setup to logout."""
|
|
password = "CompleteWorkflowPassword123!"
|
|
|
|
# 1. Check initial status (not configured)
|
|
status = await client.get("/api/auth/status")
|
|
assert status.json()["configured"] is False
|
|
|
|
# 2. Setup master password
|
|
setup = await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": password}
|
|
)
|
|
assert setup.status_code == 201
|
|
|
|
# 3. Check status (configured, not authenticated)
|
|
status = await client.get("/api/auth/status")
|
|
data = status.json()
|
|
assert data["configured"] is True
|
|
assert data["authenticated"] is False
|
|
|
|
# 4. Login
|
|
login = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password}
|
|
)
|
|
assert login.status_code == 200
|
|
token = login.json()["access_token"]
|
|
|
|
# 5. Access protected endpoint
|
|
protected = await client.get(
|
|
"/api/queue/status",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
assert protected.status_code in [200, 503]
|
|
|
|
# 6. Check authenticated status
|
|
status = await client.get(
|
|
"/api/auth/status",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
data = status.json()
|
|
assert data["configured"] is True
|
|
assert data["authenticated"] is True
|
|
|
|
# 7. Logout
|
|
logout = await client.post(
|
|
"/api/auth/logout",
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
assert logout.status_code == 200
|
|
|
|
async def test_workflow_with_failed_and_successful_attempts(self, client):
|
|
"""Test workflow with mixed failed and successful attempts."""
|
|
password = "MixedAttemptsPassword123!"
|
|
|
|
# Setup
|
|
await client.post(
|
|
"/api/auth/setup",
|
|
json={"master_password": password}
|
|
)
|
|
|
|
# Failed attempt
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": "WrongPassword123!"}
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
# Successful attempt
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password}
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# Another failed attempt
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": "WrongAgain123!"}
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
# Another successful attempt
|
|
response = await client.post(
|
|
"/api/auth/login",
|
|
json={"password": password}
|
|
)
|
|
assert response.status_code == 200
|