Add health and system monitoring tests

- Integration tests for health endpoints (/health, /api/health/*, /api/system/database/health)
- Unit tests for system metrics collection (CPU, memory, disk, network)
- Performance monitoring tests (response time, throughput, error rate)
- Health status determination and service dependency checking
- Tests for both existing and planned health endpoints
- Authentication requirements testing for protected health endpoints
This commit is contained in:
Lukas Pupka-Lipinski 2025-10-06 10:53:17 +02:00
parent 7f27ff823a
commit 548eda6c94
2 changed files with 659 additions and 0 deletions

View File

@ -0,0 +1,285 @@
"""
Integration tests for health and system monitoring API endpoints.
Tests /health, /api/health/* endpoints including system metrics,
database health, dependencies, performance, and monitoring.
"""
import pytest
import sys
import os
from fastapi.testclient import TestClient
from unittest.mock import patch
from datetime import datetime
# Add source directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..'))
# Import after path setup
from src.server.fastapi_app import app # noqa: E402
@pytest.fixture
def client():
"""Test client for health API tests."""
return TestClient(app)
@pytest.mark.integration
class TestBasicHealthEndpoints:
"""Test basic health check endpoints."""
def test_health_endpoint_structure(self, client):
"""Test basic health endpoint returns correct structure."""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert "status" in data
assert "timestamp" in data
assert "version" in data
assert "services" in data
assert data["status"] == "healthy"
assert data["version"] == "1.0.0"
assert isinstance(data["services"], dict)
def test_health_endpoint_services(self, client):
"""Test health endpoint returns service status."""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
services = data["services"]
expected_services = ["authentication", "anime_service", "episode_service"]
for service in expected_services:
assert service in services
assert services[service] == "online"
def test_health_endpoint_timestamp_format(self, client):
"""Test health endpoint timestamp is valid."""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
# Should be able to parse timestamp
timestamp_str = data["timestamp"]
parsed_timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
assert isinstance(parsed_timestamp, datetime)
def test_database_health_requires_auth(self, client):
"""Test database health endpoint requires authentication."""
response = client.get("/api/system/database/health")
assert response.status_code == 403 # Should require authentication
def test_database_health_with_auth(self, client, mock_settings, valid_jwt_token):
"""Test database health endpoint with authentication."""
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
"/api/system/database/health",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code == 200
data = response.json()
assert "status" in data
assert "connection_pool" in data
assert "response_time_ms" in data
assert "last_check" in data
assert data["status"] == "healthy"
@pytest.mark.integration
class TestSystemHealthEndpoints:
"""Test system health monitoring endpoints (to be implemented)."""
def test_api_health_endpoint(self, client, mock_settings, valid_jwt_token):
"""Test /api/health endpoint."""
with patch('src.server.fastapi_app.settings', mock_settings):
# This endpoint might not exist yet, so we test expected behavior
response = client.get(
"/api/health",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
# If not implemented, should return 404
# If implemented, should return 200 with health data
assert response.status_code in [200, 404]
if response.status_code == 200:
data = response.json()
assert "status" in data
def test_system_health_endpoint(self, client, mock_settings, valid_jwt_token):
"""Test /api/health/system endpoint for CPU, memory, disk metrics."""
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
"/api/health/system",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
# Endpoint may not be implemented yet
assert response.status_code in [200, 404]
if response.status_code == 200:
data = response.json()
expected_metrics = ["cpu_usage", "memory_usage", "disk_usage"]
for metric in expected_metrics:
assert metric in data
def test_dependencies_health_endpoint(self, client, mock_settings, valid_jwt_token):
"""Test /api/health/dependencies endpoint."""
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
"/api/health/dependencies",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code in [200, 404]
if response.status_code == 200:
data = response.json()
assert isinstance(data, dict)
def test_performance_health_endpoint(self, client, mock_settings, valid_jwt_token):
"""Test /api/health/performance endpoint."""
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
"/api/health/performance",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code in [200, 404]
if response.status_code == 200:
data = response.json()
performance_metrics = ["response_time", "throughput", "error_rate"]
# At least some performance metrics should be present
assert any(metric in data for metric in performance_metrics)
def test_metrics_health_endpoint(self, client, mock_settings, valid_jwt_token):
"""Test /api/health/metrics endpoint."""
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
"/api/health/metrics",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code in [200, 404]
if response.status_code == 200:
data = response.json()
assert isinstance(data, (dict, list))
def test_ready_health_endpoint(self, client, mock_settings, valid_jwt_token):
"""Test /api/health/ready endpoint for readiness probe."""
with patch('src.server.fastapi_app.settings', mock_settings):
response = client.get(
"/api/health/ready",
headers={"Authorization": f"Bearer {valid_jwt_token}"}
)
assert response.status_code in [200, 404, 503]
if response.status_code in [200, 503]:
data = response.json()
assert "ready" in data or "status" in data
@pytest.mark.integration
class TestHealthEndpointAuthentication:
"""Test authentication requirements for health endpoints."""
def test_health_endpoints_without_auth(self, client):
"""Test which health endpoints require authentication."""
# Basic health should be public
response = client.get("/health")
assert response.status_code == 200
# System endpoints should require auth
protected_endpoints = [
"/api/health",
"/api/health/system",
"/api/health/database",
"/api/health/dependencies",
"/api/health/performance",
"/api/health/metrics",
"/api/health/ready"
]
for endpoint in protected_endpoints:
response = client.get(endpoint)
# Should either be not found (404) or require auth (403)
assert response.status_code in [403, 404]
def test_health_endpoints_with_invalid_auth(self, client):
"""Test health endpoints with invalid authentication."""
invalid_token = "invalid.token.here"
protected_endpoints = [
"/api/health",
"/api/health/system",
"/api/health/database",
"/api/health/dependencies",
"/api/health/performance",
"/api/health/metrics",
"/api/health/ready"
]
for endpoint in protected_endpoints:
response = client.get(
endpoint,
headers={"Authorization": f"Bearer {invalid_token}"}
)
# Should either be not found (404) or unauthorized (401)
assert response.status_code in [401, 404]
@pytest.mark.integration
class TestHealthEndpointErrorHandling:
"""Test error handling in health endpoints."""
def test_health_endpoint_resilience(self, client):
"""Test health endpoint handles errors gracefully."""
# Test with various malformed requests
malformed_requests = [
("/health", {"Content-Type": "application/xml"}),
("/health", {"Accept": "text/plain"}),
]
for endpoint, headers in malformed_requests:
response = client.get(endpoint, headers=headers)
# Should still return 200 for basic health
assert response.status_code == 200
def test_database_health_error_handling(self, client, mock_settings):
"""Test database health endpoint error handling."""
with patch('src.server.fastapi_app.settings', mock_settings):
# Test with expired token
expired_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyIjoidGVzdCIsImV4cCI6MH0"
response = client.get(
"/api/system/database/health",
headers={"Authorization": f"Bearer {expired_token}"}
)
assert response.status_code == 401
def test_health_endpoint_malformed_auth_header(self, client):
"""Test health endpoints with malformed authorization headers."""
malformed_headers = [
{"Authorization": "Bearer"}, # Missing token
{"Authorization": "Basic token"}, # Wrong type
{"Authorization": "token"}, # Missing Bearer
]
for headers in malformed_headers:
response = client.get("/api/system/database/health", headers=headers)
assert response.status_code in [401, 403]

View File

@ -0,0 +1,374 @@
"""
Unit tests for system metrics gathering functionality.
Tests CPU, memory, disk usage collection, performance monitoring,
and system health assessment logic.
"""
import pytest
import sys
import os
from unittest.mock import patch, Mock
from datetime import datetime
# Add source directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..'))
@pytest.mark.unit
class TestSystemMetricsCollection:
"""Test system metrics collection functionality."""
def test_cpu_usage_collection(self):
"""Test CPU usage metrics collection."""
with patch('psutil.cpu_percent') as mock_cpu:
mock_cpu.return_value = 45.5
# Mock function that would collect CPU metrics
def get_cpu_usage():
import psutil
return psutil.cpu_percent(interval=1)
cpu_usage = get_cpu_usage()
assert isinstance(cpu_usage, (int, float))
assert 0 <= cpu_usage <= 100
mock_cpu.assert_called_once_with(interval=1)
def test_memory_usage_collection(self):
"""Test memory usage metrics collection."""
with patch('psutil.virtual_memory') as mock_memory:
mock_memory_obj = Mock()
mock_memory_obj.total = 8 * 1024**3 # 8GB
mock_memory_obj.available = 4 * 1024**3 # 4GB available
mock_memory_obj.percent = 50.0
mock_memory.return_value = mock_memory_obj
def get_memory_usage():
import psutil
mem = psutil.virtual_memory()
return {
'total': mem.total,
'available': mem.available,
'percent': mem.percent
}
memory_info = get_memory_usage()
assert 'total' in memory_info
assert 'available' in memory_info
assert 'percent' in memory_info
assert memory_info['total'] > 0
assert memory_info['available'] >= 0
assert 0 <= memory_info['percent'] <= 100
def test_disk_usage_collection(self):
"""Test disk usage metrics collection."""
with patch('psutil.disk_usage') as mock_disk:
mock_disk_obj = Mock()
mock_disk_obj.total = 500 * 1024**3 # 500GB
mock_disk_obj.used = 300 * 1024**3 # 300GB used
mock_disk_obj.free = 200 * 1024**3 # 200GB free
mock_disk.return_value = mock_disk_obj
def get_disk_usage(path="/"):
import psutil
disk = psutil.disk_usage(path)
return {
'total': disk.total,
'used': disk.used,
'free': disk.free,
'percent': (disk.used / disk.total) * 100
}
disk_info = get_disk_usage()
assert 'total' in disk_info
assert 'used' in disk_info
assert 'free' in disk_info
assert 'percent' in disk_info
assert disk_info['total'] > 0
assert disk_info['used'] >= 0
assert disk_info['free'] >= 0
assert 0 <= disk_info['percent'] <= 100
def test_network_metrics_collection(self):
"""Test network metrics collection."""
with patch('psutil.net_io_counters') as mock_net:
mock_net_obj = Mock()
mock_net_obj.bytes_sent = 1024 * 1024 * 100 # 100MB sent
mock_net_obj.bytes_recv = 1024 * 1024 * 200 # 200MB received
mock_net_obj.packets_sent = 1000
mock_net_obj.packets_recv = 2000
mock_net.return_value = mock_net_obj
def get_network_stats():
import psutil
net = psutil.net_io_counters()
return {
'bytes_sent': net.bytes_sent,
'bytes_recv': net.bytes_recv,
'packets_sent': net.packets_sent,
'packets_recv': net.packets_recv
}
net_stats = get_network_stats()
assert 'bytes_sent' in net_stats
assert 'bytes_recv' in net_stats
assert 'packets_sent' in net_stats
assert 'packets_recv' in net_stats
assert all(val >= 0 for val in net_stats.values())
@pytest.mark.unit
class TestPerformanceMonitoring:
"""Test performance monitoring functionality."""
def test_response_time_tracking(self):
"""Test response time measurement."""
def measure_response_time():
import time
start_time = time.time()
# Simulate some work
time.sleep(0.001) # 1ms
end_time = time.time()
return (end_time - start_time) * 1000 # Convert to milliseconds
response_time = measure_response_time()
assert isinstance(response_time, float)
assert response_time > 0
assert response_time < 1000 # Should be less than 1 second
def test_throughput_calculation(self):
"""Test throughput calculation."""
def calculate_throughput(requests_count, time_period):
if time_period <= 0:
return 0
return requests_count / time_period
# Test normal case
throughput = calculate_throughput(100, 10) # 100 requests in 10 seconds
assert throughput == 10.0
# Test edge cases
assert calculate_throughput(0, 10) == 0
assert calculate_throughput(100, 0) == 0
assert calculate_throughput(100, -1) == 0
def test_error_rate_calculation(self):
"""Test error rate calculation."""
def calculate_error_rate(error_count, total_requests):
if total_requests <= 0:
return 0
return (error_count / total_requests) * 100
# Test normal cases
error_rate = calculate_error_rate(5, 100) # 5 errors out of 100 requests
assert error_rate == 5.0
error_rate = calculate_error_rate(0, 100) # No errors
assert error_rate == 0.0
# Test edge cases
assert calculate_error_rate(5, 0) == 0 # No requests
assert calculate_error_rate(0, 0) == 0 # No requests, no errors
def test_health_status_determination(self):
"""Test health status determination logic."""
def determine_health_status(cpu_usage, memory_usage, disk_usage, error_rate):
if error_rate > 10:
return "unhealthy"
elif cpu_usage > 90 or memory_usage > 90 or disk_usage > 95:
return "degraded"
elif cpu_usage > 70 or memory_usage > 70 or disk_usage > 80:
return "warning"
else:
return "healthy"
# Test healthy status
status = determine_health_status(50, 60, 70, 1)
assert status == "healthy"
# Test warning status
status = determine_health_status(75, 60, 70, 1)
assert status == "warning"
# Test degraded status
status = determine_health_status(95, 60, 70, 1)
assert status == "degraded"
# Test unhealthy status
status = determine_health_status(50, 60, 70, 15)
assert status == "unhealthy"
@pytest.mark.unit
class TestSystemHealthAssessment:
"""Test system health assessment logic."""
def test_service_dependency_check(self):
"""Test service dependency health checking."""
def check_service_health(service_name, check_function):
try:
result = check_function()
return {
"service": service_name,
"status": "healthy" if result else "unhealthy",
"last_check": datetime.utcnow().isoformat()
}
except ConnectionError as e:
return {
"service": service_name,
"status": "error",
"error": str(e),
"last_check": datetime.utcnow().isoformat()
}
# Test healthy service
def healthy_service():
return True
result = check_service_health("test_service", healthy_service)
assert result["status"] == "healthy"
assert result["service"] == "test_service"
assert "last_check" in result
# Test unhealthy service
def unhealthy_service():
return False
result = check_service_health("test_service", unhealthy_service)
assert result["status"] == "unhealthy"
# Test service with error
def error_service():
raise ConnectionError("Service connection failed")
result = check_service_health("test_service", error_service)
assert result["status"] == "error"
assert "error" in result
def test_database_connection_health(self):
"""Test database connection health check."""
def check_database_connection(connection_string):
# Mock database connection check
if "invalid" in connection_string:
raise ConnectionError("Connection failed")
return {
"connected": True,
"response_time_ms": 15,
"connection_pool_size": 10,
"active_connections": 3
}
# Test successful connection
result = check_database_connection("sqlite:///test.db")
assert result["connected"] is True
assert result["response_time_ms"] > 0
assert result["connection_pool_size"] > 0
# Test failed connection
with pytest.raises(ConnectionError):
check_database_connection("invalid://connection")
def test_metrics_aggregation(self):
"""Test metrics aggregation functionality."""
def aggregate_metrics(metrics_list):
if not metrics_list:
return {}
# Simple aggregation - calculate averages
aggregated = {}
for key in metrics_list[0].keys():
if isinstance(metrics_list[0][key], (int, float)):
values = [m[key] for m in metrics_list if key in m]
aggregated[key] = {
"avg": sum(values) / len(values),
"min": min(values),
"max": max(values),
"count": len(values)
}
return aggregated
# Test metrics aggregation
metrics = [
{"cpu": 50, "memory": 60},
{"cpu": 60, "memory": 70},
{"cpu": 40, "memory": 50}
]
result = aggregate_metrics(metrics)
assert "cpu" in result
assert "memory" in result
assert result["cpu"]["avg"] == 50.0
assert result["cpu"]["min"] == 40
assert result["cpu"]["max"] == 60
assert result["memory"]["avg"] == 60.0
# Test empty metrics
empty_result = aggregate_metrics([])
assert empty_result == {}
@pytest.mark.unit
class TestHealthCheckUtilities:
"""Test health check utility functions."""
def test_format_bytes(self):
"""Test byte formatting utility."""
def format_bytes(bytes_value):
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_value < 1024.0:
return f"{bytes_value:.1f} {unit}"
bytes_value /= 1024.0
return f"{bytes_value:.1f} PB"
assert format_bytes(512) == "512.0 B"
assert format_bytes(1024) == "1.0 KB"
assert format_bytes(1024 * 1024) == "1.0 MB"
assert format_bytes(1024 * 1024 * 1024) == "1.0 GB"
def test_timestamp_formatting(self):
"""Test timestamp formatting for health checks."""
def format_timestamp(dt=None):
if dt is None:
dt = datetime.utcnow()
return dt.isoformat() + "Z"
# Test with current time
timestamp = format_timestamp()
assert timestamp.endswith("Z")
assert "T" in timestamp
# Test with specific time
specific_time = datetime(2023, 1, 1, 12, 0, 0)
timestamp = format_timestamp(specific_time)
assert timestamp.startswith("2023-01-01T12:00:00")
def test_health_status_priority(self):
"""Test health status priority determination."""
def get_highest_priority_status(statuses):
priority = {
"healthy": 0,
"warning": 1,
"degraded": 2,
"unhealthy": 3,
"error": 4
}
if not statuses:
return "unknown"
highest = max(statuses, key=lambda s: priority.get(s, -1))
return highest
# Test various combinations
assert get_highest_priority_status(["healthy", "healthy"]) == "healthy"
assert get_highest_priority_status(["healthy", "warning"]) == "warning"
assert get_highest_priority_status(["warning", "degraded"]) == "degraded"
assert get_highest_priority_status(["healthy", "error"]) == "error"
assert get_highest_priority_status([]) == "unknown"