diff --git a/src/tests/integration/test_health_endpoints.py b/src/tests/integration/test_health_endpoints.py new file mode 100644 index 0000000..0320746 --- /dev/null +++ b/src/tests/integration/test_health_endpoints.py @@ -0,0 +1,285 @@ +""" +Integration tests for health and system monitoring API endpoints. + +Tests /health, /api/health/* endpoints including system metrics, +database health, dependencies, performance, and monitoring. +""" + +import pytest +import sys +import os +from fastapi.testclient import TestClient +from unittest.mock import patch +from datetime import datetime + +# Add source directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..')) + +# Import after path setup +from src.server.fastapi_app import app # noqa: E402 + + +@pytest.fixture +def client(): + """Test client for health API tests.""" + return TestClient(app) + + +@pytest.mark.integration +class TestBasicHealthEndpoints: + """Test basic health check endpoints.""" + + def test_health_endpoint_structure(self, client): + """Test basic health endpoint returns correct structure.""" + response = client.get("/health") + + assert response.status_code == 200 + data = response.json() + + assert "status" in data + assert "timestamp" in data + assert "version" in data + assert "services" in data + + assert data["status"] == "healthy" + assert data["version"] == "1.0.0" + assert isinstance(data["services"], dict) + + def test_health_endpoint_services(self, client): + """Test health endpoint returns service status.""" + response = client.get("/health") + + assert response.status_code == 200 + data = response.json() + + services = data["services"] + expected_services = ["authentication", "anime_service", "episode_service"] + + for service in expected_services: + assert service in services + assert services[service] == "online" + + def test_health_endpoint_timestamp_format(self, client): + """Test health endpoint timestamp is valid.""" + response = client.get("/health") + + assert response.status_code == 200 + data = response.json() + + # Should be able to parse timestamp + timestamp_str = data["timestamp"] + parsed_timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) + assert isinstance(parsed_timestamp, datetime) + + def test_database_health_requires_auth(self, client): + """Test database health endpoint requires authentication.""" + response = client.get("/api/system/database/health") + + assert response.status_code == 403 # Should require authentication + + def test_database_health_with_auth(self, client, mock_settings, valid_jwt_token): + """Test database health endpoint with authentication.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/system/database/health", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 200 + data = response.json() + + assert "status" in data + assert "connection_pool" in data + assert "response_time_ms" in data + assert "last_check" in data + + assert data["status"] == "healthy" + + +@pytest.mark.integration +class TestSystemHealthEndpoints: + """Test system health monitoring endpoints (to be implemented).""" + + def test_api_health_endpoint(self, client, mock_settings, valid_jwt_token): + """Test /api/health endpoint.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # This endpoint might not exist yet, so we test expected behavior + response = client.get( + "/api/health", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + # If not implemented, should return 404 + # If implemented, should return 200 with health data + assert response.status_code in [200, 404] + + if response.status_code == 200: + data = response.json() + assert "status" in data + + def test_system_health_endpoint(self, client, mock_settings, valid_jwt_token): + """Test /api/health/system endpoint for CPU, memory, disk metrics.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/health/system", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + # Endpoint may not be implemented yet + assert response.status_code in [200, 404] + + if response.status_code == 200: + data = response.json() + expected_metrics = ["cpu_usage", "memory_usage", "disk_usage"] + for metric in expected_metrics: + assert metric in data + + def test_dependencies_health_endpoint(self, client, mock_settings, valid_jwt_token): + """Test /api/health/dependencies endpoint.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/health/dependencies", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code in [200, 404] + + if response.status_code == 200: + data = response.json() + assert isinstance(data, dict) + + def test_performance_health_endpoint(self, client, mock_settings, valid_jwt_token): + """Test /api/health/performance endpoint.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/health/performance", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code in [200, 404] + + if response.status_code == 200: + data = response.json() + performance_metrics = ["response_time", "throughput", "error_rate"] + # At least some performance metrics should be present + assert any(metric in data for metric in performance_metrics) + + def test_metrics_health_endpoint(self, client, mock_settings, valid_jwt_token): + """Test /api/health/metrics endpoint.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/health/metrics", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code in [200, 404] + + if response.status_code == 200: + data = response.json() + assert isinstance(data, (dict, list)) + + def test_ready_health_endpoint(self, client, mock_settings, valid_jwt_token): + """Test /api/health/ready endpoint for readiness probe.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/health/ready", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code in [200, 404, 503] + + if response.status_code in [200, 503]: + data = response.json() + assert "ready" in data or "status" in data + + +@pytest.mark.integration +class TestHealthEndpointAuthentication: + """Test authentication requirements for health endpoints.""" + + def test_health_endpoints_without_auth(self, client): + """Test which health endpoints require authentication.""" + # Basic health should be public + response = client.get("/health") + assert response.status_code == 200 + + # System endpoints should require auth + protected_endpoints = [ + "/api/health", + "/api/health/system", + "/api/health/database", + "/api/health/dependencies", + "/api/health/performance", + "/api/health/metrics", + "/api/health/ready" + ] + + for endpoint in protected_endpoints: + response = client.get(endpoint) + # Should either be not found (404) or require auth (403) + assert response.status_code in [403, 404] + + def test_health_endpoints_with_invalid_auth(self, client): + """Test health endpoints with invalid authentication.""" + invalid_token = "invalid.token.here" + + protected_endpoints = [ + "/api/health", + "/api/health/system", + "/api/health/database", + "/api/health/dependencies", + "/api/health/performance", + "/api/health/metrics", + "/api/health/ready" + ] + + for endpoint in protected_endpoints: + response = client.get( + endpoint, + headers={"Authorization": f"Bearer {invalid_token}"} + ) + # Should either be not found (404) or unauthorized (401) + assert response.status_code in [401, 404] + + +@pytest.mark.integration +class TestHealthEndpointErrorHandling: + """Test error handling in health endpoints.""" + + def test_health_endpoint_resilience(self, client): + """Test health endpoint handles errors gracefully.""" + # Test with various malformed requests + malformed_requests = [ + ("/health", {"Content-Type": "application/xml"}), + ("/health", {"Accept": "text/plain"}), + ] + + for endpoint, headers in malformed_requests: + response = client.get(endpoint, headers=headers) + # Should still return 200 for basic health + assert response.status_code == 200 + + def test_database_health_error_handling(self, client, mock_settings): + """Test database health endpoint error handling.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Test with expired token + expired_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyIjoidGVzdCIsImV4cCI6MH0" + + response = client.get( + "/api/system/database/health", + headers={"Authorization": f"Bearer {expired_token}"} + ) + + assert response.status_code == 401 + + def test_health_endpoint_malformed_auth_header(self, client): + """Test health endpoints with malformed authorization headers.""" + malformed_headers = [ + {"Authorization": "Bearer"}, # Missing token + {"Authorization": "Basic token"}, # Wrong type + {"Authorization": "token"}, # Missing Bearer + ] + + for headers in malformed_headers: + response = client.get("/api/system/database/health", headers=headers) + assert response.status_code in [401, 403] \ No newline at end of file diff --git a/src/tests/unit/test_system_metrics.py b/src/tests/unit/test_system_metrics.py new file mode 100644 index 0000000..ef989fb --- /dev/null +++ b/src/tests/unit/test_system_metrics.py @@ -0,0 +1,374 @@ +""" +Unit tests for system metrics gathering functionality. + +Tests CPU, memory, disk usage collection, performance monitoring, +and system health assessment logic. +""" + +import pytest +import sys +import os +from unittest.mock import patch, Mock +from datetime import datetime + +# Add source directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..')) + + +@pytest.mark.unit +class TestSystemMetricsCollection: + """Test system metrics collection functionality.""" + + def test_cpu_usage_collection(self): + """Test CPU usage metrics collection.""" + with patch('psutil.cpu_percent') as mock_cpu: + mock_cpu.return_value = 45.5 + + # Mock function that would collect CPU metrics + def get_cpu_usage(): + import psutil + return psutil.cpu_percent(interval=1) + + cpu_usage = get_cpu_usage() + + assert isinstance(cpu_usage, (int, float)) + assert 0 <= cpu_usage <= 100 + mock_cpu.assert_called_once_with(interval=1) + + def test_memory_usage_collection(self): + """Test memory usage metrics collection.""" + with patch('psutil.virtual_memory') as mock_memory: + mock_memory_obj = Mock() + mock_memory_obj.total = 8 * 1024**3 # 8GB + mock_memory_obj.available = 4 * 1024**3 # 4GB available + mock_memory_obj.percent = 50.0 + mock_memory.return_value = mock_memory_obj + + def get_memory_usage(): + import psutil + mem = psutil.virtual_memory() + return { + 'total': mem.total, + 'available': mem.available, + 'percent': mem.percent + } + + memory_info = get_memory_usage() + + assert 'total' in memory_info + assert 'available' in memory_info + assert 'percent' in memory_info + assert memory_info['total'] > 0 + assert memory_info['available'] >= 0 + assert 0 <= memory_info['percent'] <= 100 + + def test_disk_usage_collection(self): + """Test disk usage metrics collection.""" + with patch('psutil.disk_usage') as mock_disk: + mock_disk_obj = Mock() + mock_disk_obj.total = 500 * 1024**3 # 500GB + mock_disk_obj.used = 300 * 1024**3 # 300GB used + mock_disk_obj.free = 200 * 1024**3 # 200GB free + mock_disk.return_value = mock_disk_obj + + def get_disk_usage(path="/"): + import psutil + disk = psutil.disk_usage(path) + return { + 'total': disk.total, + 'used': disk.used, + 'free': disk.free, + 'percent': (disk.used / disk.total) * 100 + } + + disk_info = get_disk_usage() + + assert 'total' in disk_info + assert 'used' in disk_info + assert 'free' in disk_info + assert 'percent' in disk_info + assert disk_info['total'] > 0 + assert disk_info['used'] >= 0 + assert disk_info['free'] >= 0 + assert 0 <= disk_info['percent'] <= 100 + + def test_network_metrics_collection(self): + """Test network metrics collection.""" + with patch('psutil.net_io_counters') as mock_net: + mock_net_obj = Mock() + mock_net_obj.bytes_sent = 1024 * 1024 * 100 # 100MB sent + mock_net_obj.bytes_recv = 1024 * 1024 * 200 # 200MB received + mock_net_obj.packets_sent = 1000 + mock_net_obj.packets_recv = 2000 + mock_net.return_value = mock_net_obj + + def get_network_stats(): + import psutil + net = psutil.net_io_counters() + return { + 'bytes_sent': net.bytes_sent, + 'bytes_recv': net.bytes_recv, + 'packets_sent': net.packets_sent, + 'packets_recv': net.packets_recv + } + + net_stats = get_network_stats() + + assert 'bytes_sent' in net_stats + assert 'bytes_recv' in net_stats + assert 'packets_sent' in net_stats + assert 'packets_recv' in net_stats + assert all(val >= 0 for val in net_stats.values()) + + +@pytest.mark.unit +class TestPerformanceMonitoring: + """Test performance monitoring functionality.""" + + def test_response_time_tracking(self): + """Test response time measurement.""" + def measure_response_time(): + import time + start_time = time.time() + # Simulate some work + time.sleep(0.001) # 1ms + end_time = time.time() + return (end_time - start_time) * 1000 # Convert to milliseconds + + response_time = measure_response_time() + + assert isinstance(response_time, float) + assert response_time > 0 + assert response_time < 1000 # Should be less than 1 second + + def test_throughput_calculation(self): + """Test throughput calculation.""" + def calculate_throughput(requests_count, time_period): + if time_period <= 0: + return 0 + return requests_count / time_period + + # Test normal case + throughput = calculate_throughput(100, 10) # 100 requests in 10 seconds + assert throughput == 10.0 + + # Test edge cases + assert calculate_throughput(0, 10) == 0 + assert calculate_throughput(100, 0) == 0 + assert calculate_throughput(100, -1) == 0 + + def test_error_rate_calculation(self): + """Test error rate calculation.""" + def calculate_error_rate(error_count, total_requests): + if total_requests <= 0: + return 0 + return (error_count / total_requests) * 100 + + # Test normal cases + error_rate = calculate_error_rate(5, 100) # 5 errors out of 100 requests + assert error_rate == 5.0 + + error_rate = calculate_error_rate(0, 100) # No errors + assert error_rate == 0.0 + + # Test edge cases + assert calculate_error_rate(5, 0) == 0 # No requests + assert calculate_error_rate(0, 0) == 0 # No requests, no errors + + def test_health_status_determination(self): + """Test health status determination logic.""" + def determine_health_status(cpu_usage, memory_usage, disk_usage, error_rate): + if error_rate > 10: + return "unhealthy" + elif cpu_usage > 90 or memory_usage > 90 or disk_usage > 95: + return "degraded" + elif cpu_usage > 70 or memory_usage > 70 or disk_usage > 80: + return "warning" + else: + return "healthy" + + # Test healthy status + status = determine_health_status(50, 60, 70, 1) + assert status == "healthy" + + # Test warning status + status = determine_health_status(75, 60, 70, 1) + assert status == "warning" + + # Test degraded status + status = determine_health_status(95, 60, 70, 1) + assert status == "degraded" + + # Test unhealthy status + status = determine_health_status(50, 60, 70, 15) + assert status == "unhealthy" + + +@pytest.mark.unit +class TestSystemHealthAssessment: + """Test system health assessment logic.""" + + def test_service_dependency_check(self): + """Test service dependency health checking.""" + def check_service_health(service_name, check_function): + try: + result = check_function() + return { + "service": service_name, + "status": "healthy" if result else "unhealthy", + "last_check": datetime.utcnow().isoformat() + } + except ConnectionError as e: + return { + "service": service_name, + "status": "error", + "error": str(e), + "last_check": datetime.utcnow().isoformat() + } + + # Test healthy service + def healthy_service(): + return True + + result = check_service_health("test_service", healthy_service) + assert result["status"] == "healthy" + assert result["service"] == "test_service" + assert "last_check" in result + + # Test unhealthy service + def unhealthy_service(): + return False + + result = check_service_health("test_service", unhealthy_service) + assert result["status"] == "unhealthy" + + # Test service with error + def error_service(): + raise ConnectionError("Service connection failed") + + result = check_service_health("test_service", error_service) + assert result["status"] == "error" + assert "error" in result + + def test_database_connection_health(self): + """Test database connection health check.""" + def check_database_connection(connection_string): + # Mock database connection check + if "invalid" in connection_string: + raise ConnectionError("Connection failed") + return { + "connected": True, + "response_time_ms": 15, + "connection_pool_size": 10, + "active_connections": 3 + } + + # Test successful connection + result = check_database_connection("sqlite:///test.db") + assert result["connected"] is True + assert result["response_time_ms"] > 0 + assert result["connection_pool_size"] > 0 + + # Test failed connection + with pytest.raises(ConnectionError): + check_database_connection("invalid://connection") + + def test_metrics_aggregation(self): + """Test metrics aggregation functionality.""" + def aggregate_metrics(metrics_list): + if not metrics_list: + return {} + + # Simple aggregation - calculate averages + aggregated = {} + for key in metrics_list[0].keys(): + if isinstance(metrics_list[0][key], (int, float)): + values = [m[key] for m in metrics_list if key in m] + aggregated[key] = { + "avg": sum(values) / len(values), + "min": min(values), + "max": max(values), + "count": len(values) + } + + return aggregated + + # Test metrics aggregation + metrics = [ + {"cpu": 50, "memory": 60}, + {"cpu": 60, "memory": 70}, + {"cpu": 40, "memory": 50} + ] + + result = aggregate_metrics(metrics) + + assert "cpu" in result + assert "memory" in result + assert result["cpu"]["avg"] == 50.0 + assert result["cpu"]["min"] == 40 + assert result["cpu"]["max"] == 60 + assert result["memory"]["avg"] == 60.0 + + # Test empty metrics + empty_result = aggregate_metrics([]) + assert empty_result == {} + + +@pytest.mark.unit +class TestHealthCheckUtilities: + """Test health check utility functions.""" + + def test_format_bytes(self): + """Test byte formatting utility.""" + def format_bytes(bytes_value): + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if bytes_value < 1024.0: + return f"{bytes_value:.1f} {unit}" + bytes_value /= 1024.0 + return f"{bytes_value:.1f} PB" + + assert format_bytes(512) == "512.0 B" + assert format_bytes(1024) == "1.0 KB" + assert format_bytes(1024 * 1024) == "1.0 MB" + assert format_bytes(1024 * 1024 * 1024) == "1.0 GB" + + def test_timestamp_formatting(self): + """Test timestamp formatting for health checks.""" + def format_timestamp(dt=None): + if dt is None: + dt = datetime.utcnow() + return dt.isoformat() + "Z" + + # Test with current time + timestamp = format_timestamp() + assert timestamp.endswith("Z") + assert "T" in timestamp + + # Test with specific time + specific_time = datetime(2023, 1, 1, 12, 0, 0) + timestamp = format_timestamp(specific_time) + assert timestamp.startswith("2023-01-01T12:00:00") + + def test_health_status_priority(self): + """Test health status priority determination.""" + def get_highest_priority_status(statuses): + priority = { + "healthy": 0, + "warning": 1, + "degraded": 2, + "unhealthy": 3, + "error": 4 + } + + if not statuses: + return "unknown" + + highest = max(statuses, key=lambda s: priority.get(s, -1)) + return highest + + # Test various combinations + assert get_highest_priority_status(["healthy", "healthy"]) == "healthy" + assert get_highest_priority_status(["healthy", "warning"]) == "warning" + assert get_highest_priority_status(["warning", "degraded"]) == "degraded" + assert get_highest_priority_status(["healthy", "error"]) == "error" + assert get_highest_priority_status([]) == "unknown" \ No newline at end of file