From 8f720443a4b6c733a5d5349159e25eaea11577df Mon Sep 17 00:00:00 2001 From: Lukas Pupka-Lipinski Date: Mon, 6 Oct 2025 11:08:33 +0200 Subject: [PATCH] Add database and storage management tests - Integration tests for database health, info, and maintenance endpoints - Unit tests for database maintenance operations (vacuum, analyze, integrity-check, reindex) - Database statistics collection and optimization recommendation logic - Maintenance scheduling and operation sequencing tests - Error handling and timeout management for database operations - Tests cover both existing endpoints and planned maintenance functionality --- .../integration/test_database_endpoints.py | 349 +++++++++++++ src/tests/unit/test_database_maintenance.py | 484 ++++++++++++++++++ 2 files changed, 833 insertions(+) create mode 100644 src/tests/integration/test_database_endpoints.py create mode 100644 src/tests/unit/test_database_maintenance.py diff --git a/src/tests/integration/test_database_endpoints.py b/src/tests/integration/test_database_endpoints.py new file mode 100644 index 0000000..40b9af6 --- /dev/null +++ b/src/tests/integration/test_database_endpoints.py @@ -0,0 +1,349 @@ +""" +Integration tests for database and storage management API endpoints. + +Tests database info, maintenance operations (vacuum, analyze, integrity-check, +reindex, optimize, stats), and storage management functionality. +""" + +import pytest +import sys +import os +from fastapi.testclient import TestClient +from unittest.mock import patch + +# Add source directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..')) + +# Import after path setup +from src.server.fastapi_app import app # noqa: E402 + + +@pytest.fixture +def client(): + """Test client for database API tests.""" + return TestClient(app) + + +@pytest.mark.integration +class TestDatabaseInfoEndpoints: + """Test database information endpoints.""" + + def test_database_health_requires_auth(self, client): + """Test database health endpoint requires authentication.""" + response = client.get("/api/system/database/health") + + assert response.status_code == 403 + + def test_database_health_with_auth(self, client, mock_settings, valid_jwt_token): + """Test database health with valid authentication.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/system/database/health", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code == 200 + data = response.json() + + assert "status" in data + assert "connection_pool" in data + assert "response_time_ms" in data + assert "last_check" in data + + assert data["status"] == "healthy" + assert isinstance(data["response_time_ms"], (int, float)) + assert data["response_time_ms"] > 0 + + def test_database_info_endpoint(self, client, mock_settings, valid_jwt_token): + """Test /api/database/info endpoint (to be implemented).""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/database/info", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + # Endpoint may not be implemented yet + assert response.status_code in [200, 404] + + if response.status_code == 200: + data = response.json() + expected_fields = ["database_type", "version", "size", "tables"] + for field in expected_fields: + if field in data: + assert isinstance(data[field], (str, int, float, dict, list)) + + +@pytest.mark.integration +class TestDatabaseMaintenanceEndpoints: + """Test database maintenance operation endpoints.""" + + def test_database_vacuum_endpoint(self, client, mock_settings, valid_jwt_token): + """Test /maintenance/database/vacuum endpoint.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.post( + "/maintenance/database/vacuum", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + # Endpoint may not be implemented yet + assert response.status_code in [200, 404] + + if response.status_code == 200: + data = response.json() + assert "success" in data or "status" in data + + def test_database_analyze_endpoint(self, client, mock_settings, valid_jwt_token): + """Test /maintenance/database/analyze endpoint.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.post( + "/maintenance/database/analyze", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code in [200, 404] + + if response.status_code == 200: + data = response.json() + expected_fields = ["tables_analyzed", "statistics_updated", "duration_ms"] + # Check if any expected fields are present + assert any(field in data for field in expected_fields) + + def test_database_integrity_check_endpoint(self, client, mock_settings, valid_jwt_token): + """Test /maintenance/database/integrity-check endpoint.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.post( + "/maintenance/database/integrity-check", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code in [200, 404] + + if response.status_code == 200: + data = response.json() + assert "integrity_status" in data or "status" in data + if "integrity_status" in data: + assert data["integrity_status"] in ["ok", "error", "warning"] + + def test_database_reindex_endpoint(self, client, mock_settings, valid_jwt_token): + """Test /maintenance/database/reindex endpoint.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.post( + "/maintenance/database/reindex", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code in [200, 404] + + if response.status_code == 200: + data = response.json() + expected_fields = ["indexes_rebuilt", "duration_ms", "status"] + assert any(field in data for field in expected_fields) + + def test_database_optimize_endpoint(self, client, mock_settings, valid_jwt_token): + """Test /maintenance/database/optimize endpoint.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.post( + "/maintenance/database/optimize", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code in [200, 404] + + if response.status_code == 200: + data = response.json() + assert "optimization_status" in data or "status" in data + + def test_database_stats_endpoint(self, client, mock_settings, valid_jwt_token): + """Test /maintenance/database/stats endpoint.""" + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/maintenance/database/stats", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code in [200, 404] + + if response.status_code == 200: + data = response.json() + expected_stats = ["table_count", "record_count", "database_size", "index_size"] + # At least some stats should be present + assert any(stat in data for stat in expected_stats) + + +@pytest.mark.integration +class TestDatabaseEndpointAuthentication: + """Test authentication requirements for database endpoints.""" + + def test_database_endpoints_require_auth(self, client): + """Test that database endpoints require authentication.""" + database_endpoints = [ + "/api/database/info", + "/api/system/database/health", + "/maintenance/database/vacuum", + "/maintenance/database/analyze", + "/maintenance/database/integrity-check", + "/maintenance/database/reindex", + "/maintenance/database/optimize", + "/maintenance/database/stats" + ] + + for endpoint in database_endpoints: + # Try GET for info endpoints + if "info" in endpoint or "health" in endpoint or "stats" in endpoint: + response = client.get(endpoint) + else: + # Try POST for maintenance endpoints + response = client.post(endpoint) + + # Should require authentication (403) or not be found (404) + assert response.status_code in [403, 404] + + def test_database_endpoints_with_invalid_auth(self, client): + """Test database endpoints with invalid authentication.""" + invalid_token = "invalid.token.here" + + database_endpoints = [ + ("/api/system/database/health", "GET"), + ("/maintenance/database/vacuum", "POST"), + ("/maintenance/database/analyze", "POST") + ] + + for endpoint, method in database_endpoints: + if method == "GET": + response = client.get( + endpoint, + headers={"Authorization": f"Bearer {invalid_token}"} + ) + else: + response = client.post( + endpoint, + headers={"Authorization": f"Bearer {invalid_token}"} + ) + + # Should be unauthorized (401) or not found (404) + assert response.status_code in [401, 404] + + +@pytest.mark.integration +class TestDatabaseMaintenanceOperations: + """Test database maintenance operation workflows.""" + + def test_maintenance_operation_sequence(self, client, mock_settings, valid_jwt_token): + """Test sequence of maintenance operations.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Test sequence: analyze -> vacuum -> reindex -> optimize + maintenance_sequence = [ + "/maintenance/database/analyze", + "/maintenance/database/vacuum", + "/maintenance/database/reindex", + "/maintenance/database/optimize" + ] + + for endpoint in maintenance_sequence: + response = client.post( + endpoint, + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + # Should either work (200) or not be implemented (404) + assert response.status_code in [200, 404] + + if response.status_code == 200: + data = response.json() + # Should return some kind of status or success indication + assert isinstance(data, dict) + + def test_maintenance_operation_parameters(self, client, mock_settings, valid_jwt_token): + """Test maintenance operations with parameters.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Test vacuum with parameters + response = client.post( + "/maintenance/database/vacuum?full=true", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code in [200, 404, 422] + + # Test analyze with table parameter + response = client.post( + "/maintenance/database/analyze?tables=anime,episodes", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + assert response.status_code in [200, 404, 422] + + def test_concurrent_maintenance_operations(self, client, mock_settings, valid_jwt_token): + """Test behavior of concurrent maintenance operations.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Simulate starting multiple operations + # In real implementation, this should be handled properly + + # Start first operation + response1 = client.post( + "/maintenance/database/vacuum", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + # Try to start second operation while first might be running + response2 = client.post( + "/maintenance/database/analyze", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + # Both should either work or not be implemented + assert response1.status_code in [200, 404, 409] # 409 for conflict + assert response2.status_code in [200, 404, 409] + + +@pytest.mark.integration +class TestDatabaseErrorHandling: + """Test error handling in database operations.""" + + def test_database_connection_errors(self, client, mock_settings, valid_jwt_token): + """Test handling of database connection errors.""" + # Mock database connection failure + with patch('src.server.fastapi_app.settings', mock_settings): + response = client.get( + "/api/system/database/health", + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + # Health check should still return a response even if DB is down + assert response.status_code in [200, 503] # 503 for service unavailable + + if response.status_code == 503: + data = response.json() + assert "error" in data or "status" in data + + def test_maintenance_operation_errors(self, client, mock_settings, valid_jwt_token): + """Test error handling in maintenance operations.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Test with malformed requests + malformed_requests = [ + ("/maintenance/database/vacuum", {"invalid": "data"}), + ("/maintenance/database/analyze", {"tables": ""}), + ] + + for endpoint, json_data in malformed_requests: + response = client.post( + endpoint, + json=json_data, + headers={"Authorization": f"Bearer {valid_jwt_token}"} + ) + + # Should handle gracefully + assert response.status_code in [200, 400, 404, 422] + + def test_database_timeout_handling(self, client, mock_settings, valid_jwt_token): + """Test handling of database operation timeouts.""" + with patch('src.server.fastapi_app.settings', mock_settings): + # Test long-running operation (like full vacuum) + response = client.post( + "/maintenance/database/vacuum?full=true", + headers={"Authorization": f"Bearer {valid_jwt_token}"}, + timeout=1 # Very short timeout to simulate timeout + ) + + # Should either complete quickly or handle timeout gracefully + # Note: This test depends on implementation details + assert response.status_code in [200, 404, 408, 504] # 408/504 for timeout \ No newline at end of file diff --git a/src/tests/unit/test_database_maintenance.py b/src/tests/unit/test_database_maintenance.py new file mode 100644 index 0000000..61ec5c8 --- /dev/null +++ b/src/tests/unit/test_database_maintenance.py @@ -0,0 +1,484 @@ +""" +Unit tests for database maintenance operation logic. + +Tests database maintenance functions, storage optimization, +integrity checking, and database management utilities. +""" + +import pytest +import sys +import os +from unittest.mock import Mock + +# Add source directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..')) + + +@pytest.mark.unit +class TestDatabaseMaintenanceLogic: + """Test database maintenance operation logic.""" + + def test_database_vacuum_operation(self): + """Test database vacuum operation logic.""" + def perform_vacuum(connection, full_vacuum=False): + """Perform database vacuum operation.""" + try: + cursor = connection.cursor() + if full_vacuum: + cursor.execute("VACUUM") + else: + cursor.execute("PRAGMA incremental_vacuum") + + # Get database size before and after (simulated) + cursor.execute("PRAGMA page_count") + page_count = cursor.fetchone()[0] + + cursor.execute("PRAGMA page_size") + page_size = cursor.fetchone()[0] + + return { + "success": True, + "operation": "vacuum", + "type": "full" if full_vacuum else "incremental", + "pages_freed": 0, # Would be calculated in real implementation + "space_saved_bytes": 0, + "final_size_bytes": page_count * page_size + } + except Exception as e: + return { + "success": False, + "operation": "vacuum", + "error": str(e) + } + + # Test with mock connection + mock_connection = Mock() + mock_cursor = Mock() + mock_connection.cursor.return_value = mock_cursor + mock_cursor.fetchone.side_effect = [[1000], [4096]] # page_count, page_size + + # Test incremental vacuum + result = perform_vacuum(mock_connection, full_vacuum=False) + assert result["success"] is True + assert result["type"] == "incremental" + assert "final_size_bytes" in result + + # Test full vacuum + result = perform_vacuum(mock_connection, full_vacuum=True) + assert result["success"] is True + assert result["type"] == "full" + + # Test error handling + mock_cursor.execute.side_effect = Exception("Database locked") + result = perform_vacuum(mock_connection) + assert result["success"] is False + assert "error" in result + + def test_database_analyze_operation(self): + """Test database analyze operation logic.""" + def perform_analyze(connection, tables=None): + """Perform database analyze operation.""" + try: + cursor = connection.cursor() + tables_analyzed = [] + + if tables: + # Analyze specific tables + for table in tables: + cursor.execute(f"ANALYZE {table}") + tables_analyzed.append(table) + else: + # Analyze all tables + cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") + all_tables = [row[0] for row in cursor.fetchall()] + + for table in all_tables: + cursor.execute(f"ANALYZE {table}") + tables_analyzed.append(table) + + return { + "success": True, + "operation": "analyze", + "tables_analyzed": tables_analyzed, + "statistics_updated": len(tables_analyzed) + } + except Exception as e: + return { + "success": False, + "operation": "analyze", + "error": str(e) + } + + # Test with mock connection + mock_connection = Mock() + mock_cursor = Mock() + mock_connection.cursor.return_value = mock_cursor + mock_cursor.fetchall.return_value = [("anime",), ("episodes",), ("users",)] + + # Test analyze all tables + result = perform_analyze(mock_connection) + assert result["success"] is True + assert result["statistics_updated"] == 3 + assert "anime" in result["tables_analyzed"] + + # Test analyze specific tables + result = perform_analyze(mock_connection, tables=["anime", "episodes"]) + assert result["success"] is True + assert result["statistics_updated"] == 2 + assert set(result["tables_analyzed"]) == {"anime", "episodes"} + + def test_database_integrity_check(self): + """Test database integrity check logic.""" + def check_database_integrity(connection): + """Check database integrity.""" + try: + cursor = connection.cursor() + + # Run integrity check + cursor.execute("PRAGMA integrity_check") + integrity_results = cursor.fetchall() + + # Run foreign key check + cursor.execute("PRAGMA foreign_key_check") + foreign_key_results = cursor.fetchall() + + # Determine status + integrity_ok = len(integrity_results) == 1 and integrity_results[0][0] == "ok" + foreign_keys_ok = len(foreign_key_results) == 0 + + if integrity_ok and foreign_keys_ok: + status = "ok" + elif integrity_ok and not foreign_keys_ok: + status = "warning" # Foreign key violations + else: + status = "error" # Integrity issues + + return { + "success": True, + "integrity_status": status, + "integrity_issues": [] if integrity_ok else integrity_results, + "foreign_key_issues": foreign_key_results, + "total_issues": len(foreign_key_results) + (0 if integrity_ok else len(integrity_results)) + } + except Exception as e: + return { + "success": False, + "error": str(e) + } + + # Test healthy database + mock_connection = Mock() + mock_cursor = Mock() + mock_connection.cursor.return_value = mock_cursor + mock_cursor.fetchall.side_effect = [[("ok",)], []] # integrity ok, no FK issues + + result = check_database_integrity(mock_connection) + assert result["success"] is True + assert result["integrity_status"] == "ok" + assert result["total_issues"] == 0 + + # Test database with foreign key issues + mock_cursor.fetchall.side_effect = [[("ok",)], [("table", "row", "issue")]] + + result = check_database_integrity(mock_connection) + assert result["success"] is True + assert result["integrity_status"] == "warning" + assert result["total_issues"] == 1 + + # Test database with integrity issues + mock_cursor.fetchall.side_effect = [[("error in table",)], []] + + result = check_database_integrity(mock_connection) + assert result["success"] is True + assert result["integrity_status"] == "error" + assert result["total_issues"] == 1 + + def test_database_reindex_operation(self): + """Test database reindex operation logic.""" + def perform_reindex(connection, indexes=None): + """Perform database reindex operation.""" + try: + cursor = connection.cursor() + indexes_rebuilt = [] + + if indexes: + # Reindex specific indexes + for index in indexes: + cursor.execute(f"REINDEX {index}") + indexes_rebuilt.append(index) + else: + # Reindex all indexes + cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%'") + all_indexes = [row[0] for row in cursor.fetchall()] + + for index in all_indexes: + cursor.execute(f"REINDEX {index}") + indexes_rebuilt.append(index) + + return { + "success": True, + "operation": "reindex", + "indexes_rebuilt": indexes_rebuilt, + "count": len(indexes_rebuilt) + } + except Exception as e: + return { + "success": False, + "operation": "reindex", + "error": str(e) + } + + # Test with mock connection + mock_connection = Mock() + mock_cursor = Mock() + mock_connection.cursor.return_value = mock_cursor + mock_cursor.fetchall.return_value = [("idx_anime_title",), ("idx_episode_number",)] + + # Test reindex all + result = perform_reindex(mock_connection) + assert result["success"] is True + assert result["count"] == 2 + assert "idx_anime_title" in result["indexes_rebuilt"] + + # Test reindex specific indexes + result = perform_reindex(mock_connection, indexes=["idx_anime_title"]) + assert result["success"] is True + assert result["count"] == 1 + assert result["indexes_rebuilt"] == ["idx_anime_title"] + + +@pytest.mark.unit +class TestDatabaseStatistics: + """Test database statistics collection.""" + + def test_collect_database_stats(self): + """Test database statistics collection.""" + def collect_database_stats(connection): + """Collect comprehensive database statistics.""" + try: + cursor = connection.cursor() + stats = {} + + # Get table count + cursor.execute("SELECT COUNT(*) FROM sqlite_master WHERE type='table'") + stats["table_count"] = cursor.fetchone()[0] + + # Get database size + cursor.execute("PRAGMA page_count") + page_count = cursor.fetchone()[0] + cursor.execute("PRAGMA page_size") + page_size = cursor.fetchone()[0] + stats["database_size_bytes"] = page_count * page_size + + # Get free pages + cursor.execute("PRAGMA freelist_count") + free_pages = cursor.fetchone()[0] + stats["free_space_bytes"] = free_pages * page_size + + # Get index count + cursor.execute("SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%'") + stats["index_count"] = cursor.fetchone()[0] + + # Calculate utilization + used_space = stats["database_size_bytes"] - stats["free_space_bytes"] + stats["space_utilization_percent"] = (used_space / stats["database_size_bytes"]) * 100 if stats["database_size_bytes"] > 0 else 0 + + return { + "success": True, + "stats": stats + } + except Exception as e: + return { + "success": False, + "error": str(e) + } + + # Test with mock connection + mock_connection = Mock() + mock_cursor = Mock() + mock_connection.cursor.return_value = mock_cursor + mock_cursor.fetchone.side_effect = [ + (5,), # table_count + (1000,), # page_count + (4096,), # page_size + (50,), # freelist_count + (3,) # index_count + ] + + result = collect_database_stats(mock_connection) + assert result["success"] is True + + stats = result["stats"] + assert stats["table_count"] == 5 + assert stats["database_size_bytes"] == 1000 * 4096 + assert stats["free_space_bytes"] == 50 * 4096 + assert stats["index_count"] == 3 + assert 0 <= stats["space_utilization_percent"] <= 100 + + def test_table_specific_stats(self): + """Test collection of table-specific statistics.""" + def collect_table_stats(connection, table_name): + """Collect statistics for a specific table.""" + try: + cursor = connection.cursor() + + # Get row count + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + row_count = cursor.fetchone()[0] + + # Get table info + cursor.execute(f"PRAGMA table_info({table_name})") + columns = cursor.fetchall() + column_count = len(columns) + + # Get table size (approximate) + cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'") + if cursor.fetchone(): + # Table exists, calculate approximate size + # This is simplified - real implementation would be more complex + estimated_size = row_count * column_count * 100 # Rough estimate + else: + estimated_size = 0 + + return { + "table_name": table_name, + "row_count": row_count, + "column_count": column_count, + "estimated_size_bytes": estimated_size, + "columns": [col[1] for col in columns] # Column names + } + except Exception as e: + return { + "error": str(e), + "table_name": table_name + } + + # Test with mock connection + mock_connection = Mock() + mock_cursor = Mock() + mock_connection.cursor.return_value = mock_cursor + mock_cursor.fetchone.side_effect = [ + (1000,), # row count + ("anime",) # table exists + ] + mock_cursor.fetchall.return_value = [ + (0, "id", "INTEGER", 0, None, 1), + (1, "title", "TEXT", 0, None, 0), + (2, "genre", "TEXT", 0, None, 0) + ] + + result = collect_table_stats(mock_connection, "anime") + assert result["table_name"] == "anime" + assert result["row_count"] == 1000 + assert result["column_count"] == 3 + assert "columns" in result + assert "id" in result["columns"] + + +@pytest.mark.unit +class TestDatabaseOptimization: + """Test database optimization logic.""" + + def test_optimization_recommendations(self): + """Test generation of database optimization recommendations.""" + def generate_optimization_recommendations(stats): + """Generate optimization recommendations based on stats.""" + recommendations = [] + + # Check space utilization + if stats.get("space_utilization_percent", 100) < 70: + recommendations.append({ + "type": "vacuum", + "priority": "medium", + "description": "Database has significant free space, consider running VACUUM", + "estimated_benefit": "Reduce database file size" + }) + + # Check if analyze is needed (simplified check) + if stats.get("table_count", 0) > 0: + recommendations.append({ + "type": "analyze", + "priority": "low", + "description": "Update table statistics for better query planning", + "estimated_benefit": "Improve query performance" + }) + + # Check index count vs table count ratio + table_count = stats.get("table_count", 0) + index_count = stats.get("index_count", 0) + + if table_count > 0 and (index_count / table_count) < 1: + recommendations.append({ + "type": "indexing", + "priority": "medium", + "description": "Some tables may benefit from additional indexes", + "estimated_benefit": "Faster query execution" + }) + + return recommendations + + # Test with stats indicating need for vacuum + stats = { + "table_count": 5, + "index_count": 3, + "space_utilization_percent": 60 + } + + recommendations = generate_optimization_recommendations(stats) + assert len(recommendations) >= 2 + + vacuum_rec = next((r for r in recommendations if r["type"] == "vacuum"), None) + assert vacuum_rec is not None + assert vacuum_rec["priority"] == "medium" + + # Test with well-optimized database + optimized_stats = { + "table_count": 5, + "index_count": 8, + "space_utilization_percent": 95 + } + + recommendations = generate_optimization_recommendations(optimized_stats) + # Should still recommend analyze, but not vacuum + vacuum_rec = next((r for r in recommendations if r["type"] == "vacuum"), None) + assert vacuum_rec is None + + def test_maintenance_scheduler(self): + """Test maintenance operation scheduling logic.""" + def should_run_maintenance(operation_type, last_run_timestamp, current_timestamp): + """Determine if maintenance operation should run.""" + intervals = { + "analyze": 24 * 3600, # 24 hours + "vacuum": 7 * 24 * 3600, # 7 days + "integrity_check": 30 * 24 * 3600, # 30 days + "reindex": 30 * 24 * 3600 # 30 days + } + + if operation_type not in intervals: + return False + + if last_run_timestamp is None: + return True # Never run before + + time_since_last = current_timestamp - last_run_timestamp + return time_since_last >= intervals[operation_type] + + current_time = 1000000 + + # Test never run before + assert should_run_maintenance("analyze", None, current_time) is True + + # Test recent run + recent_run = current_time - (12 * 3600) # 12 hours ago + assert should_run_maintenance("analyze", recent_run, current_time) is False + + # Test old run + old_run = current_time - (25 * 3600) # 25 hours ago + assert should_run_maintenance("analyze", old_run, current_time) is True + + # Test vacuum timing + week_ago = current_time - (8 * 24 * 3600) # 8 days ago + assert should_run_maintenance("vacuum", week_ago, current_time) is True + + day_ago = current_time - (24 * 3600) # 1 day ago + assert should_run_maintenance("vacuum", day_ago, current_time) is False \ No newline at end of file