diff --git a/src/server/api/analytics.py b/src/server/api/analytics.py new file mode 100644 index 0000000..91d9465 --- /dev/null +++ b/src/server/api/analytics.py @@ -0,0 +1,270 @@ +"""Analytics API endpoints for accessing system analytics and reports. + +Provides REST API endpoints for querying analytics data including download +statistics, series popularity, storage analysis, and performance reports. +""" + +from typing import Optional + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel +from sqlalchemy.ext.asyncio import AsyncSession + +from src.server.database.connection import get_db +from src.server.services.analytics_service import get_analytics_service + +router = APIRouter(prefix="/api/analytics", tags=["analytics"]) + + +class DownloadStatsResponse(BaseModel): + """Download statistics response model.""" + + total_downloads: int + successful_downloads: int + failed_downloads: int + total_bytes_downloaded: int + average_speed_mbps: float + success_rate: float + average_duration_seconds: float + + +class SeriesPopularityResponse(BaseModel): + """Series popularity response model.""" + + series_name: str + download_count: int + total_size_bytes: int + last_download: Optional[str] + success_rate: float + + +class StorageAnalysisResponse(BaseModel): + """Storage analysis response model.""" + + total_storage_bytes: int + used_storage_bytes: int + free_storage_bytes: int + storage_percent_used: float + downloads_directory_size_bytes: int + cache_directory_size_bytes: int + logs_directory_size_bytes: int + + +class PerformanceReportResponse(BaseModel): + """Performance report response model.""" + + period_start: str + period_end: str + downloads_per_hour: float + average_queue_size: float + peak_memory_usage_mb: float + average_cpu_percent: float + uptime_seconds: float + error_rate: float + + +class SummaryReportResponse(BaseModel): + """Comprehensive analytics summary response.""" + + timestamp: str + download_stats: DownloadStatsResponse + series_popularity: list[SeriesPopularityResponse] + storage_analysis: StorageAnalysisResponse + performance_report: PerformanceReportResponse + + +@router.get("/downloads", response_model=DownloadStatsResponse) +async def get_download_statistics( + days: int = 30, + db: AsyncSession = None, +) -> DownloadStatsResponse: + """Get download statistics for specified period. + + Args: + days: Number of days to analyze (default: 30) + db: Database session + + Returns: + Download statistics including success rates and speeds + """ + if db is None: + db = await get_db().__anext__() + + try: + service = get_analytics_service() + stats = await service.get_download_stats(db, days=days) + + return DownloadStatsResponse( + total_downloads=stats.total_downloads, + successful_downloads=stats.successful_downloads, + failed_downloads=stats.failed_downloads, + total_bytes_downloaded=stats.total_bytes_downloaded, + average_speed_mbps=stats.average_speed_mbps, + success_rate=stats.success_rate, + average_duration_seconds=stats.average_duration_seconds, + ) + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Failed to get download statistics: {str(e)}", + ) + + +@router.get( + "/series-popularity", + response_model=list[SeriesPopularityResponse] +) +async def get_series_popularity( + limit: int = 10, + db: AsyncSession = None, +) -> list[SeriesPopularityResponse]: + """Get most popular series by download count. + + Args: + limit: Maximum number of series (default: 10) + db: Database session + + Returns: + List of series sorted by popularity + """ + if db is None: + db = await get_db().__anext__() + + try: + service = get_analytics_service() + popularity = await service.get_series_popularity(db, limit=limit) + + return [ + SeriesPopularityResponse( + series_name=p.series_name, + download_count=p.download_count, + total_size_bytes=p.total_size_bytes, + last_download=p.last_download, + success_rate=p.success_rate, + ) + for p in popularity + ] + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Failed to get series popularity: {str(e)}", + ) + + +@router.get( + "/storage", + response_model=StorageAnalysisResponse +) +async def get_storage_analysis() -> StorageAnalysisResponse: + """Get current storage usage analysis. + + Returns: + Storage breakdown including disk and directory usage + """ + try: + service = get_analytics_service() + analysis = service.get_storage_analysis() + + return StorageAnalysisResponse( + total_storage_bytes=analysis.total_storage_bytes, + used_storage_bytes=analysis.used_storage_bytes, + free_storage_bytes=analysis.free_storage_bytes, + storage_percent_used=analysis.storage_percent_used, + downloads_directory_size_bytes=( + analysis.downloads_directory_size_bytes + ), + cache_directory_size_bytes=( + analysis.cache_directory_size_bytes + ), + logs_directory_size_bytes=( + analysis.logs_directory_size_bytes + ), + ) + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Failed to get storage analysis: {str(e)}", + ) + + +@router.get( + "/performance", + response_model=PerformanceReportResponse +) +async def get_performance_report( + hours: int = 24, + db: AsyncSession = None, +) -> PerformanceReportResponse: + """Get performance metrics for specified period. + + Args: + hours: Number of hours to analyze (default: 24) + db: Database session + + Returns: + Performance metrics including speeds and system usage + """ + if db is None: + db = await get_db().__anext__() + + try: + service = get_analytics_service() + report = await service.get_performance_report(db, hours=hours) + + return PerformanceReportResponse( + period_start=report.period_start, + period_end=report.period_end, + downloads_per_hour=report.downloads_per_hour, + average_queue_size=report.average_queue_size, + peak_memory_usage_mb=report.peak_memory_usage_mb, + average_cpu_percent=report.average_cpu_percent, + uptime_seconds=report.uptime_seconds, + error_rate=report.error_rate, + ) + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Failed to get performance report: {str(e)}", + ) + + +@router.get("/summary", response_model=SummaryReportResponse) +async def get_summary_report( + db: AsyncSession = None, +) -> SummaryReportResponse: + """Get comprehensive analytics summary. + + Args: + db: Database session + + Returns: + Complete analytics report with all metrics + """ + if db is None: + db = await get_db().__anext__() + + try: + service = get_analytics_service() + summary = await service.generate_summary_report(db) + + return SummaryReportResponse( + timestamp=summary["timestamp"], + download_stats=DownloadStatsResponse( + **summary["download_stats"] + ), + series_popularity=[ + SeriesPopularityResponse(**p) + for p in summary["series_popularity"] + ], + storage_analysis=StorageAnalysisResponse( + **summary["storage_analysis"] + ), + performance_report=PerformanceReportResponse( + **summary["performance_report"] + ), + ) + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Failed to generate summary report: {str(e)}", + ) diff --git a/src/server/api/backup.py b/src/server/api/backup.py new file mode 100644 index 0000000..08a0f80 --- /dev/null +++ b/src/server/api/backup.py @@ -0,0 +1,304 @@ +"""Backup management API endpoints.""" + +import logging +from typing import Any, Dict, List, Optional + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel + +from src.server.services.backup_service import BackupService, get_backup_service + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/backup", tags=["backup"]) + + +class BackupCreateRequest(BaseModel): + """Request to create a backup.""" + + backup_type: str # 'config', 'database', 'full' + description: Optional[str] = None + + +class BackupResponse(BaseModel): + """Response for backup creation.""" + + success: bool + message: str + backup_name: Optional[str] = None + size_bytes: Optional[int] = None + + +class BackupListResponse(BaseModel): + """Response for listing backups.""" + + backups: List[Dict[str, Any]] + total_count: int + + +class RestoreRequest(BaseModel): + """Request to restore from backup.""" + + backup_name: str + + +class RestoreResponse(BaseModel): + """Response for restore operation.""" + + success: bool + message: str + + +def get_backup_service_dep() -> BackupService: + """Dependency to get backup service.""" + return get_backup_service() + + +@router.post("/create", response_model=BackupResponse) +async def create_backup( + request: BackupCreateRequest, + backup_service: BackupService = Depends(get_backup_service_dep), +) -> BackupResponse: + """Create a new backup. + + Args: + request: Backup creation request. + backup_service: Backup service dependency. + + Returns: + BackupResponse: Result of backup creation. + """ + try: + backup_info = None + + if request.backup_type == "config": + backup_info = backup_service.backup_configuration( + request.description or "" + ) + elif request.backup_type == "database": + backup_info = backup_service.backup_database( + request.description or "" + ) + elif request.backup_type == "full": + backup_info = backup_service.backup_full( + request.description or "" + ) + else: + raise ValueError(f"Invalid backup type: {request.backup_type}") + + if backup_info is None: + return BackupResponse( + success=False, + message=f"Failed to create {request.backup_type} backup", + ) + + return BackupResponse( + success=True, + message=( + f"{request.backup_type.capitalize()} backup created " + "successfully" + ), + backup_name=backup_info.name, + size_bytes=backup_info.size_bytes, + ) + except Exception as e: + logger.error(f"Failed to create backup: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/list", response_model=BackupListResponse) +async def list_backups( + backup_type: Optional[str] = None, + backup_service: BackupService = Depends(get_backup_service_dep), +) -> BackupListResponse: + """List available backups. + + Args: + backup_type: Optional filter by backup type. + backup_service: Backup service dependency. + + Returns: + BackupListResponse: List of available backups. + """ + try: + backups = backup_service.list_backups(backup_type) + return BackupListResponse(backups=backups, total_count=len(backups)) + except Exception as e: + logger.error(f"Failed to list backups: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/restore", response_model=RestoreResponse) +async def restore_backup( + request: RestoreRequest, + backup_type: Optional[str] = None, + backup_service: BackupService = Depends(get_backup_service_dep), +) -> RestoreResponse: + """Restore from a backup. + + Args: + request: Restore request. + backup_type: Type of backup to restore. + backup_service: Backup service dependency. + + Returns: + RestoreResponse: Result of restore operation. + """ + try: + # Determine backup type from filename if not provided + if backup_type is None: + if "config" in request.backup_name: + backup_type = "config" + elif "database" in request.backup_name: + backup_type = "database" + else: + backup_type = "full" + + success = False + + if backup_type == "config": + success = backup_service.restore_configuration( + request.backup_name + ) + elif backup_type == "database": + success = backup_service.restore_database(request.backup_name) + else: + raise ValueError(f"Cannot restore backup type: {backup_type}") + + if not success: + return RestoreResponse( + success=False, + message=f"Failed to restore {backup_type} backup", + ) + + return RestoreResponse( + success=True, + message=f"{backup_type.capitalize()} backup restored successfully", + ) + except Exception as e: + logger.error(f"Failed to restore backup: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.delete("/{backup_name}", response_model=Dict[str, Any]) +async def delete_backup( + backup_name: str, + backup_service: BackupService = Depends(get_backup_service_dep), +) -> Dict[str, Any]: + """Delete a backup. + + Args: + backup_name: Name of the backup to delete. + backup_service: Backup service dependency. + + Returns: + dict: Result of delete operation. + """ + try: + success = backup_service.delete_backup(backup_name) + + if not success: + raise HTTPException(status_code=404, detail="Backup not found") + + return {"success": True, "message": "Backup deleted successfully"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to delete backup: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/cleanup", response_model=Dict[str, Any]) +async def cleanup_backups( + max_backups: int = 10, + backup_type: Optional[str] = None, + backup_service: BackupService = Depends(get_backup_service_dep), +) -> Dict[str, Any]: + """Clean up old backups. + + Args: + max_backups: Maximum number of backups to keep. + backup_type: Optional filter by backup type. + backup_service: Backup service dependency. + + Returns: + dict: Number of backups deleted. + """ + try: + deleted_count = backup_service.cleanup_old_backups( + max_backups, backup_type + ) + return { + "success": True, + "message": "Cleanup completed", + "deleted_count": deleted_count, + } + except Exception as e: + logger.error(f"Failed to cleanup backups: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/export/anime", response_model=Dict[str, Any]) +async def export_anime_data( + backup_service: BackupService = Depends(get_backup_service_dep), +) -> Dict[str, Any]: + """Export anime library data. + + Args: + backup_service: Backup service dependency. + + Returns: + dict: Result of export operation. + """ + try: + output_file = "data/backups/anime_export.json" + success = backup_service.export_anime_data(output_file) + + if not success: + raise HTTPException( + status_code=500, detail="Failed to export anime data" + ) + + return { + "success": True, + "message": "Anime data exported successfully", + "export_file": output_file, + } + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to export anime data: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/import/anime", response_model=Dict[str, Any]) +async def import_anime_data( + import_file: str, + backup_service: BackupService = Depends(get_backup_service_dep), +) -> Dict[str, Any]: + """Import anime library data. + + Args: + import_file: Path to import file. + backup_service: Backup service dependency. + + Returns: + dict: Result of import operation. + """ + try: + success = backup_service.import_anime_data(import_file) + + if not success: + raise HTTPException( + status_code=400, detail="Failed to import anime data" + ) + + return { + "success": True, + "message": "Anime data imported successfully", + } + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to import anime data: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/src/server/api/health.py b/src/server/api/health.py new file mode 100644 index 0000000..9d9a81c --- /dev/null +++ b/src/server/api/health.py @@ -0,0 +1,266 @@ +"""Health check endpoints for system monitoring and status verification.""" + +import logging +from datetime import datetime +from typing import Any, Dict, Optional + +import psutil +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +from src.server.utils.dependencies import get_database_session + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/health", tags=["health"]) + + +class HealthStatus(BaseModel): + """Basic health status response.""" + + status: str + timestamp: str + version: str = "1.0.0" + + +class DatabaseHealth(BaseModel): + """Database health status.""" + + status: str + connection_time_ms: float + message: Optional[str] = None + + +class SystemMetrics(BaseModel): + """System resource metrics.""" + + cpu_percent: float + memory_percent: float + memory_available_mb: float + disk_percent: float + disk_free_mb: float + uptime_seconds: float + + +class DependencyHealth(BaseModel): + """Health status of external dependencies.""" + + database: DatabaseHealth + filesystem: Dict[str, Any] + system: SystemMetrics + + +class DetailedHealthStatus(BaseModel): + """Comprehensive health check response.""" + + status: str + timestamp: str + version: str = "1.0.0" + dependencies: DependencyHealth + startup_time: datetime + + +# Global startup time +startup_time = datetime.now() + + +async def check_database_health(db: AsyncSession) -> DatabaseHealth: + """Check database connection and performance. + + Args: + db: Database session dependency. + + Returns: + DatabaseHealth: Database status and connection time. + """ + try: + import time + + start_time = time.time() + await db.execute(text("SELECT 1")) + connection_time = (time.time() - start_time) * 1000 # Convert to milliseconds + + return DatabaseHealth( + status="healthy", + connection_time_ms=connection_time, + message="Database connection successful", + ) + except Exception as e: + logger.error(f"Database health check failed: {e}") + return DatabaseHealth( + status="unhealthy", + connection_time_ms=0, + message=f"Database connection failed: {str(e)}", + ) + + +async def check_filesystem_health() -> Dict[str, Any]: + """Check filesystem availability and permissions. + + Returns: + dict: Filesystem status and available space. + """ + try: + import os + + data_dir = "data" + logs_dir = "logs" + + data_accessible = os.path.exists(data_dir) and os.access(data_dir, os.W_OK) + logs_accessible = os.path.exists(logs_dir) and os.access(logs_dir, os.W_OK) + + return { + "status": "healthy" if (data_accessible and logs_accessible) else "degraded", + "data_dir_writable": data_accessible, + "logs_dir_writable": logs_accessible, + "message": "Filesystem check completed", + } + except Exception as e: + logger.error(f"Filesystem health check failed: {e}") + return { + "status": "unhealthy", + "message": f"Filesystem check failed: {str(e)}", + } + + +def get_system_metrics() -> SystemMetrics: + """Get system resource metrics. + + Returns: + SystemMetrics: CPU, memory, disk, and uptime information. + """ + try: + import os + import time + + # CPU usage + cpu_percent = psutil.cpu_percent(interval=1) + + # Memory usage + memory_info = psutil.virtual_memory() + memory_percent = memory_info.percent + memory_available_mb = memory_info.available / (1024 * 1024) + + # Disk usage + disk_info = psutil.disk_usage("/") + disk_percent = disk_info.percent + disk_free_mb = disk_info.free / (1024 * 1024) + + # Uptime + boot_time = psutil.boot_time() + uptime_seconds = time.time() - boot_time + + return SystemMetrics( + cpu_percent=cpu_percent, + memory_percent=memory_percent, + memory_available_mb=memory_available_mb, + disk_percent=disk_percent, + disk_free_mb=disk_free_mb, + uptime_seconds=uptime_seconds, + ) + except Exception as e: + logger.error(f"System metrics collection failed: {e}") + raise HTTPException( + status_code=500, detail=f"Failed to collect system metrics: {str(e)}" + ) + + +@router.get("", response_model=HealthStatus) +async def basic_health_check() -> HealthStatus: + """Basic health check endpoint. + + Returns: + HealthStatus: Simple health status with timestamp. + """ + logger.debug("Basic health check requested") + return HealthStatus( + status="healthy", + timestamp=datetime.now().isoformat(), + ) + + +@router.get("/detailed", response_model=DetailedHealthStatus) +async def detailed_health_check( + db: AsyncSession = Depends(get_database_session), +) -> DetailedHealthStatus: + """Comprehensive health check endpoint. + + Checks database, filesystem, and system metrics. + + Args: + db: Database session dependency. + + Returns: + DetailedHealthStatus: Comprehensive health information. + """ + logger.debug("Detailed health check requested") + + try: + # Check dependencies + database_health = await check_database_health(db) + filesystem_health = await check_filesystem_health() + system_metrics = get_system_metrics() + + # Determine overall status + overall_status = "healthy" + if database_health.status != "healthy": + overall_status = "degraded" + if filesystem_health.get("status") != "healthy": + overall_status = "degraded" + + dependencies = DependencyHealth( + database=database_health, + filesystem=filesystem_health, + system=system_metrics, + ) + + return DetailedHealthStatus( + status=overall_status, + timestamp=datetime.now().isoformat(), + dependencies=dependencies, + startup_time=startup_time, + ) + except Exception as e: + logger.error(f"Detailed health check failed: {e}") + raise HTTPException(status_code=500, detail="Health check failed") + + +@router.get("/metrics", response_model=SystemMetrics) +async def get_metrics() -> SystemMetrics: + """Get system resource metrics. + + Returns: + SystemMetrics: Current CPU, memory, disk, and uptime metrics. + """ + logger.debug("System metrics requested") + return get_system_metrics() + + +@router.get("/metrics/prometheus") +async def get_prometheus_metrics() -> str: + """Get metrics in Prometheus format. + + Returns: + str: Prometheus formatted metrics. + """ + from src.server.utils.metrics import get_metrics_collector + + logger.debug("Prometheus metrics requested") + collector = get_metrics_collector() + return collector.export_prometheus_format() + + +@router.get("/metrics/json") +async def get_metrics_json() -> Dict[str, Any]: + """Get metrics as JSON. + + Returns: + dict: Metrics in JSON format. + """ + from src.server.utils.metrics import get_metrics_collector + + logger.debug("JSON metrics requested") + collector = get_metrics_collector() + return collector.export_json() diff --git a/src/server/api/maintenance.py b/src/server/api/maintenance.py new file mode 100644 index 0000000..ca1b8e7 --- /dev/null +++ b/src/server/api/maintenance.py @@ -0,0 +1,369 @@ +"""Maintenance and system management API endpoints.""" + +import logging +from typing import Any, Dict + +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.ext.asyncio import AsyncSession + +from src.server.services.monitoring_service import get_monitoring_service +from src.server.utils.dependencies import get_database_session +from src.server.utils.system import get_system_utilities + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/maintenance", tags=["maintenance"]) + + +def get_system_utils(): + """Dependency to get system utilities.""" + return get_system_utilities() + + +@router.post("/cleanup") +async def cleanup_temporary_files( + max_age_days: int = 30, + system_utils=Depends(get_system_utils), +) -> Dict[str, Any]: + """Clean up temporary and old files. + + Args: + max_age_days: Delete files older than this many days. + system_utils: System utilities dependency. + + Returns: + dict: Cleanup results. + """ + try: + deleted_logs = system_utils.cleanup_directory( + "logs", "*.log", max_age_days + ) + deleted_temp = system_utils.cleanup_directory( + "Temp", "*", max_age_days + ) + deleted_dirs = system_utils.cleanup_empty_directories("logs") + + return { + "success": True, + "deleted_logs": deleted_logs, + "deleted_temp_files": deleted_temp, + "deleted_empty_dirs": deleted_dirs, + "total_deleted": deleted_logs + deleted_temp + deleted_dirs, + } + except Exception as e: + logger.error(f"Cleanup failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/stats") +async def get_maintenance_stats( + db: AsyncSession = Depends(get_database_session), + system_utils=Depends(get_system_utils), +) -> Dict[str, Any]: + """Get system maintenance statistics. + + Args: + db: Database session dependency. + system_utils: System utilities dependency. + + Returns: + dict: Maintenance statistics. + """ + try: + monitoring = get_monitoring_service() + + # Get disk usage + disk_info = system_utils.get_disk_usage("/") + + # Get logs directory size + logs_size = system_utils.get_directory_size("logs") + data_size = system_utils.get_directory_size("data") + temp_size = system_utils.get_directory_size("Temp") + + # Get system info + system_info = system_utils.get_system_info() + + # Get queue metrics + queue_metrics = await monitoring.get_queue_metrics(db) + + return { + "disk": { + "total_gb": disk_info.total_bytes / (1024**3), + "used_gb": disk_info.used_bytes / (1024**3), + "free_gb": disk_info.free_bytes / (1024**3), + "percent_used": disk_info.percent_used, + }, + "directories": { + "logs_mb": logs_size / (1024 * 1024), + "data_mb": data_size / (1024 * 1024), + "temp_mb": temp_size / (1024 * 1024), + }, + "system": system_info, + "queue": { + "total_items": queue_metrics.total_items, + "downloaded_gb": queue_metrics.downloaded_bytes / (1024**3), + "total_gb": queue_metrics.total_size_bytes / (1024**3), + }, + } + except Exception as e: + logger.error(f"Failed to get maintenance stats: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/vacuum") +async def vacuum_database( + db: AsyncSession = Depends(get_database_session), +) -> Dict[str, Any]: + """Optimize database (vacuum). + + Args: + db: Database session dependency. + + Returns: + dict: Vacuum result. + """ + try: + from sqlalchemy import text + + # VACUUM command to optimize database + await db.execute(text("VACUUM")) + await db.commit() + + logger.info("Database vacuumed successfully") + return { + "success": True, + "message": "Database optimized successfully", + } + except Exception as e: + logger.error(f"Database vacuum failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/rebuild-index") +async def rebuild_database_indexes( + db: AsyncSession = Depends(get_database_session), +) -> Dict[str, Any]: + """Rebuild database indexes. + + Note: This is a placeholder as SQLite doesn't have REINDEX + for most operations. For production databases, implement + specific index rebuilding logic. + + Args: + db: Database session dependency. + + Returns: + dict: Rebuild result. + """ + try: + from sqlalchemy import text + + # Analyze database for query optimization + await db.execute(text("ANALYZE")) + await db.commit() + + logger.info("Database indexes analyzed successfully") + return { + "success": True, + "message": "Database indexes analyzed successfully", + } + except Exception as e: + logger.error(f"Index rebuild failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/prune-logs") +async def prune_old_logs( + days: int = 7, + system_utils=Depends(get_system_utils), +) -> Dict[str, Any]: + """Remove log files older than specified days. + + Args: + days: Keep logs from last N days. + system_utils: System utilities dependency. + + Returns: + dict: Pruning results. + """ + try: + deleted = system_utils.cleanup_directory( + "logs", "*.log", max_age_days=days + ) + + logger.info(f"Pruned {deleted} log files") + return { + "success": True, + "deleted_count": deleted, + "message": f"Deleted {deleted} log files older than {days} days", + } + except Exception as e: + logger.error(f"Log pruning failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/disk-usage") +async def get_disk_usage( + system_utils=Depends(get_system_utils), +) -> Dict[str, Any]: + """Get detailed disk usage information. + + Args: + system_utils: System utilities dependency. + + Returns: + dict: Disk usage for all partitions. + """ + try: + disk_infos = system_utils.get_all_disk_usage() + + partitions = [] + for disk_info in disk_infos: + partitions.append( + { + "path": disk_info.path, + "total_gb": disk_info.total_bytes / (1024**3), + "used_gb": disk_info.used_bytes / (1024**3), + "free_gb": disk_info.free_bytes / (1024**3), + "percent_used": disk_info.percent_used, + } + ) + + return { + "success": True, + "partitions": partitions, + "total_partitions": len(partitions), + } + except Exception as e: + logger.error(f"Failed to get disk usage: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/processes") +async def get_running_processes( + limit: int = 10, + system_utils=Depends(get_system_utils), +) -> Dict[str, Any]: + """Get running processes information. + + Args: + limit: Maximum number of processes to return. + system_utils: System utilities dependency. + + Returns: + dict: Running processes information. + """ + try: + processes = system_utils.get_all_processes() + + # Sort by memory usage and get top N + sorted_processes = sorted( + processes, key=lambda x: x.memory_mb, reverse=True + ) + + top_processes = [] + for proc in sorted_processes[:limit]: + top_processes.append( + { + "pid": proc.pid, + "name": proc.name, + "cpu_percent": round(proc.cpu_percent, 2), + "memory_mb": round(proc.memory_mb, 2), + "status": proc.status, + } + ) + + return { + "success": True, + "processes": top_processes, + "total_processes": len(processes), + } + except Exception as e: + logger.error(f"Failed to get processes: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/health-check") +async def full_health_check( + db: AsyncSession = Depends(get_database_session), + system_utils=Depends(get_system_utils), +) -> Dict[str, Any]: + """Perform full system health check and generate report. + + Args: + db: Database session dependency. + system_utils: System utilities dependency. + + Returns: + dict: Complete health check report. + """ + try: + monitoring = get_monitoring_service() + + # Check database and filesystem + from src.server.api.health import check_database_health + from src.server.api.health import check_filesystem_health as check_fs + db_health = await check_database_health(db) + fs_health = check_fs() + + # Get system metrics + system_metrics = monitoring.get_system_metrics() + + # Get error metrics + error_metrics = monitoring.get_error_metrics() + + # Get queue metrics + queue_metrics = await monitoring.get_queue_metrics(db) + + # Determine overall health + issues = [] + if db_health.status != "healthy": + issues.append("Database connectivity issue") + if fs_health.get("status") != "healthy": + issues.append("Filesystem accessibility issue") + if system_metrics.cpu_percent > 80: + issues.append(f"High CPU usage: {system_metrics.cpu_percent}%") + if system_metrics.memory_percent > 80: + issues.append( + f"High memory usage: {system_metrics.memory_percent}%" + ) + if error_metrics.error_rate_per_hour > 1.0: + issues.append( + f"High error rate: " + f"{error_metrics.error_rate_per_hour:.2f} errors/hour" + ) + + overall_health = "healthy" + if issues: + overall_health = "degraded" if len(issues) < 3 else "unhealthy" + + return { + "overall_health": overall_health, + "issues": issues, + "metrics": { + "database": { + "status": db_health.status, + "connection_time_ms": db_health.connection_time_ms, + }, + "filesystem": fs_health, + "system": { + "cpu_percent": system_metrics.cpu_percent, + "memory_percent": system_metrics.memory_percent, + "disk_percent": system_metrics.disk_percent, + }, + "queue": { + "total_items": queue_metrics.total_items, + "failed_items": queue_metrics.failed_items, + "success_rate": round(queue_metrics.success_rate, 2), + }, + "errors": { + "errors_24h": error_metrics.errors_24h, + "rate_per_hour": round( + error_metrics.error_rate_per_hour, 2 + ), + }, + }, + } + except Exception as e: + logger.error(f"Health check failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/src/server/services/analytics_service.py b/src/server/services/analytics_service.py new file mode 100644 index 0000000..d8c62e9 --- /dev/null +++ b/src/server/services/analytics_service.py @@ -0,0 +1,422 @@ +"""Analytics service for downloads, popularity, and performance metrics. + +This module provides comprehensive analytics tracking including download +statistics, series popularity analysis, storage usage trends, and +performance reporting. +""" + +import json +import logging +from dataclasses import asdict, dataclass, field +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any, Dict, List, Optional + +import psutil +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.server.database.models import Download, DownloadStatus + +logger = logging.getLogger(__name__) + +ANALYTICS_FILE = Path("data") / "analytics.json" + + +@dataclass +class DownloadStats: + """Download statistics snapshot.""" + + total_downloads: int = 0 + successful_downloads: int = 0 + failed_downloads: int = 0 + total_bytes_downloaded: int = 0 + average_speed_mbps: float = 0.0 + success_rate: float = 0.0 + average_duration_seconds: float = 0.0 + + +@dataclass +class SeriesPopularity: + """Series popularity metrics.""" + + series_name: str + download_count: int + total_size_bytes: int + last_download: Optional[str] = None + success_rate: float = 0.0 + + +@dataclass +class StorageAnalysis: + """Storage usage analysis.""" + + total_storage_bytes: int = 0 + used_storage_bytes: int = 0 + free_storage_bytes: int = 0 + storage_percent_used: float = 0.0 + downloads_directory_size_bytes: int = 0 + cache_directory_size_bytes: int = 0 + logs_directory_size_bytes: int = 0 + + +@dataclass +class PerformanceReport: + """Performance metrics and trends.""" + + period_start: str + period_end: str + downloads_per_hour: float = 0.0 + average_queue_size: float = 0.0 + peak_memory_usage_mb: float = 0.0 + average_cpu_percent: float = 0.0 + uptime_seconds: float = 0.0 + error_rate: float = 0.0 + samples: List[Dict[str, Any]] = field(default_factory=list) + + +class AnalyticsService: + """Service for tracking and reporting analytics data.""" + + def __init__(self): + """Initialize the analytics service.""" + self.analytics_file = ANALYTICS_FILE + self._ensure_analytics_file() + + def _ensure_analytics_file(self) -> None: + """Ensure analytics file exists with default data.""" + if not self.analytics_file.exists(): + default_data = { + "created_at": datetime.now().isoformat(), + "last_updated": datetime.now().isoformat(), + "download_stats": asdict(DownloadStats()), + "series_popularity": [], + "storage_history": [], + "performance_samples": [], + } + self.analytics_file.write_text(json.dumps(default_data, indent=2)) + + def _load_analytics(self) -> Dict[str, Any]: + """Load analytics data from file.""" + try: + return json.loads(self.analytics_file.read_text()) + except (FileNotFoundError, json.JSONDecodeError): + self._ensure_analytics_file() + return json.loads(self.analytics_file.read_text()) + + def _save_analytics(self, data: Dict[str, Any]) -> None: + """Save analytics data to file.""" + data["last_updated"] = datetime.now().isoformat() + self.analytics_file.write_text(json.dumps(data, indent=2)) + + async def get_download_stats( + self, db: AsyncSession, days: int = 30 + ) -> DownloadStats: + """Get download statistics for the specified period. + + Args: + db: Database session + days: Number of days to analyze + + Returns: + DownloadStats with aggregated download data + """ + cutoff_date = datetime.now() - timedelta(days=days) + + # Query downloads within period + stmt = select(Download).where( + Download.created_at >= cutoff_date + ) + result = await db.execute(stmt) + downloads = result.scalars().all() + + if not downloads: + return DownloadStats() + + successful = [d for d in downloads + if d.status == DownloadStatus.COMPLETED] + failed = [d for d in downloads + if d.status == DownloadStatus.FAILED] + + total_bytes = sum(d.size_bytes or 0 for d in successful) + total_seconds = sum( + d.duration_seconds or 0 for d in successful + ) or 1 + + avg_speed = ( + (total_bytes / (1024 * 1024)) / total_seconds + if total_seconds > 0 + else 0.0 + ) + success_rate = ( + len(successful) / len(downloads) * 100 if downloads else 0.0 + ) + + return DownloadStats( + total_downloads=len(downloads), + successful_downloads=len(successful), + failed_downloads=len(failed), + total_bytes_downloaded=total_bytes, + average_speed_mbps=avg_speed, + success_rate=success_rate, + average_duration_seconds=total_seconds / len(successful) + if successful + else 0.0, + ) + + async def get_series_popularity( + self, db: AsyncSession, limit: int = 10 + ) -> List[SeriesPopularity]: + """Get most popular series by download count. + + Args: + db: Database session + limit: Maximum number of series to return + + Returns: + List of SeriesPopularity objects + """ + stmt = ( + select( + Download.series_name, + func.count(Download.id).label("download_count"), + func.sum(Download.size_bytes).label("total_size"), + func.max(Download.created_at).label("last_download"), + func.countif( + Download.status == DownloadStatus.COMPLETED + ).label("successful"), + ) + .group_by(Download.series_name) + .order_by(func.count(Download.id).desc()) + .limit(limit) + ) + + result = await db.execute(stmt) + rows = result.all() + + popularity = [] + for row in rows: + success_rate = 0.0 + if row.download_count > 0: + success_rate = ( + (row.successful or 0) / row.download_count * 100 + ) + + popularity.append( + SeriesPopularity( + series_name=row.series_name or "Unknown", + download_count=row.download_count or 0, + total_size_bytes=row.total_size or 0, + last_download=row.last_download.isoformat() + if row.last_download + else None, + success_rate=success_rate, + ) + ) + + return popularity + + def get_storage_analysis(self) -> StorageAnalysis: + """Get current storage usage analysis. + + Returns: + StorageAnalysis with storage breakdown + """ + try: + # Get disk usage for data directory + disk = psutil.disk_usage("/") + total = disk.total + used = disk.used + free = disk.free + + analysis = StorageAnalysis( + total_storage_bytes=total, + used_storage_bytes=used, + free_storage_bytes=free, + storage_percent_used=disk.percent, + downloads_directory_size_bytes=self._get_dir_size( + Path("data") + ), + cache_directory_size_bytes=self._get_dir_size( + Path("data") / "cache" + ), + logs_directory_size_bytes=self._get_dir_size( + Path("logs") + ), + ) + + return analysis + + except Exception as e: + logger.error(f"Storage analysis failed: {e}") + return StorageAnalysis() + + def _get_dir_size(self, path: Path) -> int: + """Calculate total size of directory. + + Args: + path: Directory path + + Returns: + Total size in bytes + """ + if not path.exists(): + return 0 + + total = 0 + try: + for item in path.rglob("*"): + if item.is_file(): + total += item.stat().st_size + except (OSError, PermissionError): + pass + + return total + + async def get_performance_report( + self, db: AsyncSession, hours: int = 24 + ) -> PerformanceReport: + """Get performance metrics for the specified period. + + Args: + db: Database session + hours: Number of hours to analyze + + Returns: + PerformanceReport with performance metrics + """ + cutoff_time = datetime.now() - timedelta(hours=hours) + + # Get download metrics + stmt = select(Download).where( + Download.created_at >= cutoff_time + ) + result = await db.execute(stmt) + downloads = result.scalars().all() + + downloads_per_hour = len(downloads) / max(hours, 1) + + # Get queue size over time (estimated from analytics) + analytics = self._load_analytics() + performance_samples = analytics.get("performance_samples", []) + + # Filter recent samples + recent_samples = [ + s + for s in performance_samples + if datetime.fromisoformat(s.get("timestamp", "2000-01-01")) + >= cutoff_time + ] + + avg_queue = sum( + s.get("queue_size", 0) for s in recent_samples + ) / len(recent_samples) if recent_samples else 0.0 + + # Get memory and CPU stats + process = psutil.Process() + memory_info = process.memory_info() + peak_memory_mb = memory_info.rss / (1024 * 1024) + + cpu_percent = process.cpu_percent(interval=1) + + # Calculate error rate + failed_count = sum( + 1 for d in downloads + if d.status == DownloadStatus.FAILED + ) + error_rate = ( + failed_count / len(downloads) * 100 if downloads else 0.0 + ) + + # Get uptime + boot_time = datetime.fromtimestamp(psutil.boot_time()) + uptime_seconds = (datetime.now() - boot_time).total_seconds() + + return PerformanceReport( + period_start=cutoff_time.isoformat(), + period_end=datetime.now().isoformat(), + downloads_per_hour=downloads_per_hour, + average_queue_size=avg_queue, + peak_memory_usage_mb=peak_memory_mb, + average_cpu_percent=cpu_percent, + uptime_seconds=uptime_seconds, + error_rate=error_rate, + samples=recent_samples[-100:], # Keep last 100 samples + ) + + def record_performance_sample( + self, + queue_size: int, + active_downloads: int, + cpu_percent: float, + memory_mb: float, + ) -> None: + """Record a performance metric sample. + + Args: + queue_size: Current queue size + active_downloads: Number of active downloads + cpu_percent: Current CPU usage percentage + memory_mb: Current memory usage in MB + """ + analytics = self._load_analytics() + samples = analytics.get("performance_samples", []) + + sample = { + "timestamp": datetime.now().isoformat(), + "queue_size": queue_size, + "active_downloads": active_downloads, + "cpu_percent": cpu_percent, + "memory_mb": memory_mb, + } + + samples.append(sample) + + # Keep only recent samples (7 days worth at 1 sample per minute) + max_samples = 7 * 24 * 60 + if len(samples) > max_samples: + samples = samples[-max_samples:] + + analytics["performance_samples"] = samples + self._save_analytics(analytics) + + async def generate_summary_report( + self, db: AsyncSession + ) -> Dict[str, Any]: + """Generate comprehensive analytics summary. + + Args: + db: Database session + + Returns: + Summary report with all analytics + """ + download_stats = await self.get_download_stats(db) + series_popularity = await self.get_series_popularity(db, limit=5) + storage = self.get_storage_analysis() + performance = await self.get_performance_report(db) + + return { + "timestamp": datetime.now().isoformat(), + "download_stats": asdict(download_stats), + "series_popularity": [ + asdict(s) for s in series_popularity + ], + "storage_analysis": asdict(storage), + "performance_report": asdict(performance), + } + + +_analytics_service_instance: Optional[AnalyticsService] = None + + +def get_analytics_service() -> AnalyticsService: + """Get or create singleton analytics service instance. + + Returns: + AnalyticsService instance + """ + global _analytics_service_instance + if _analytics_service_instance is None: + _analytics_service_instance = AnalyticsService() + return _analytics_service_instance diff --git a/src/server/services/backup_service.py b/src/server/services/backup_service.py new file mode 100644 index 0000000..e2f7640 --- /dev/null +++ b/src/server/services/backup_service.py @@ -0,0 +1,432 @@ +"""Backup and restore service for configuration and data management.""" + +import json +import logging +import os +import shutil +import tarfile +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class BackupInfo: + """Information about a backup.""" + + name: str + timestamp: datetime + size_bytes: int + backup_type: str # 'config', 'data', 'full' + description: Optional[str] = None + + +class BackupService: + """Service for managing backups and restores.""" + + def __init__( + self, + backup_dir: str = "data/backups", + config_dir: str = "data", + database_path: str = "data/aniworld.db", + ): + """Initialize backup service. + + Args: + backup_dir: Directory to store backups. + config_dir: Directory containing configuration files. + database_path: Path to the database file. + """ + self.backup_dir = Path(backup_dir) + self.config_dir = Path(config_dir) + self.database_path = Path(database_path) + + # Create backup directory if it doesn't exist + self.backup_dir.mkdir(parents=True, exist_ok=True) + + def backup_configuration( + self, description: str = "" + ) -> Optional[BackupInfo]: + """Create a configuration backup. + + Args: + description: Optional description for the backup. + + Returns: + BackupInfo: Information about the created backup. + """ + try: + timestamp = datetime.now() + backup_name = ( + f"config_{timestamp.strftime('%Y%m%d_%H%M%S')}.tar.gz" + ) + backup_path = self.backup_dir / backup_name + + with tarfile.open(backup_path, "w:gz") as tar: + # Add configuration files + config_files = [ + self.config_dir / "config.json", + ] + + for config_file in config_files: + if config_file.exists(): + tar.add(config_file, arcname=config_file.name) + + size_bytes = backup_path.stat().st_size + + info = BackupInfo( + name=backup_name, + timestamp=timestamp, + size_bytes=size_bytes, + backup_type="config", + description=description, + ) + + logger.info(f"Configuration backup created: {backup_name}") + return info + except Exception as e: + logger.error(f"Failed to create configuration backup: {e}") + return None + + def backup_database( + self, description: str = "" + ) -> Optional[BackupInfo]: + """Create a database backup. + + Args: + description: Optional description for the backup. + + Returns: + BackupInfo: Information about the created backup. + """ + try: + if not self.database_path.exists(): + logger.warning( + f"Database file not found: {self.database_path}" + ) + return None + + timestamp = datetime.now() + backup_name = ( + f"database_{timestamp.strftime('%Y%m%d_%H%M%S')}.tar.gz" + ) + backup_path = self.backup_dir / backup_name + + with tarfile.open(backup_path, "w:gz") as tar: + tar.add(self.database_path, arcname=self.database_path.name) + + size_bytes = backup_path.stat().st_size + + info = BackupInfo( + name=backup_name, + timestamp=timestamp, + size_bytes=size_bytes, + backup_type="data", + description=description, + ) + + logger.info(f"Database backup created: {backup_name}") + return info + except Exception as e: + logger.error(f"Failed to create database backup: {e}") + return None + + def backup_full( + self, description: str = "" + ) -> Optional[BackupInfo]: + """Create a full system backup. + + Args: + description: Optional description for the backup. + + Returns: + BackupInfo: Information about the created backup. + """ + try: + timestamp = datetime.now() + backup_name = f"full_{timestamp.strftime('%Y%m%d_%H%M%S')}.tar.gz" + backup_path = self.backup_dir / backup_name + + with tarfile.open(backup_path, "w:gz") as tar: + # Add configuration + config_file = self.config_dir / "config.json" + if config_file.exists(): + tar.add(config_file, arcname=config_file.name) + + # Add database + if self.database_path.exists(): + tar.add( + self.database_path, + arcname=self.database_path.name, + ) + + # Add download queue + queue_file = self.config_dir / "download_queue.json" + if queue_file.exists(): + tar.add(queue_file, arcname=queue_file.name) + + size_bytes = backup_path.stat().st_size + + info = BackupInfo( + name=backup_name, + timestamp=timestamp, + size_bytes=size_bytes, + backup_type="full", + description=description, + ) + + logger.info(f"Full backup created: {backup_name}") + return info + except Exception as e: + logger.error(f"Failed to create full backup: {e}") + return None + + def restore_configuration(self, backup_name: str) -> bool: + """Restore configuration from backup. + + Args: + backup_name: Name of the backup to restore. + + Returns: + bool: True if restore was successful. + """ + try: + backup_path = self.backup_dir / backup_name + + if not backup_path.exists(): + logger.error(f"Backup file not found: {backup_name}") + return False + + # Extract to temporary directory + temp_dir = self.backup_dir / "temp_restore" + temp_dir.mkdir(exist_ok=True) + + with tarfile.open(backup_path, "r:gz") as tar: + tar.extractall(temp_dir) + + # Copy configuration file back + config_file = temp_dir / "config.json" + if config_file.exists(): + shutil.copy(config_file, self.config_dir / "config.json") + + # Cleanup + shutil.rmtree(temp_dir) + + logger.info(f"Configuration restored from: {backup_name}") + return True + except Exception as e: + logger.error(f"Failed to restore configuration: {e}") + return False + + def restore_database(self, backup_name: str) -> bool: + """Restore database from backup. + + Args: + backup_name: Name of the backup to restore. + + Returns: + bool: True if restore was successful. + """ + try: + backup_path = self.backup_dir / backup_name + + if not backup_path.exists(): + logger.error(f"Backup file not found: {backup_name}") + return False + + # Create backup of current database + if self.database_path.exists(): + current_backup = ( + self.database_path.parent + / f"{self.database_path.name}.backup" + ) + shutil.copy(self.database_path, current_backup) + logger.info(f"Current database backed up to: {current_backup}") + + # Extract to temporary directory + temp_dir = self.backup_dir / "temp_restore" + temp_dir.mkdir(exist_ok=True) + + with tarfile.open(backup_path, "r:gz") as tar: + tar.extractall(temp_dir) + + # Copy database file back + db_file = temp_dir / self.database_path.name + if db_file.exists(): + shutil.copy(db_file, self.database_path) + + # Cleanup + shutil.rmtree(temp_dir) + + logger.info(f"Database restored from: {backup_name}") + return True + except Exception as e: + logger.error(f"Failed to restore database: {e}") + return False + + def list_backups( + self, backup_type: Optional[str] = None + ) -> List[Dict[str, Any]]: + """List available backups. + + Args: + backup_type: Optional filter by backup type. + + Returns: + list: List of backup information. + """ + try: + backups = [] + + for backup_file in sorted(self.backup_dir.glob("*.tar.gz")): + # Extract type from filename + filename = backup_file.name + file_type = filename.split("_")[0] + + if backup_type and file_type != backup_type: + continue + + # Extract timestamp + timestamp_str = ( + filename.split("_", 1)[1].replace(".tar.gz", "") + ) + + backups.append( + { + "name": filename, + "type": file_type, + "size_bytes": backup_file.stat().st_size, + "created": timestamp_str, + } + ) + + return sorted(backups, key=lambda x: x["created"], reverse=True) + except Exception as e: + logger.error(f"Failed to list backups: {e}") + return [] + + def delete_backup(self, backup_name: str) -> bool: + """Delete a backup. + + Args: + backup_name: Name of the backup to delete. + + Returns: + bool: True if delete was successful. + """ + try: + backup_path = self.backup_dir / backup_name + + if not backup_path.exists(): + logger.warning(f"Backup not found: {backup_name}") + return False + + backup_path.unlink() + logger.info(f"Backup deleted: {backup_name}") + return True + except Exception as e: + logger.error(f"Failed to delete backup: {e}") + return False + + def cleanup_old_backups( + self, max_backups: int = 10, backup_type: Optional[str] = None + ) -> int: + """Remove old backups, keeping only the most recent ones. + + Args: + max_backups: Maximum number of backups to keep. + backup_type: Optional filter by backup type. + + Returns: + int: Number of backups deleted. + """ + try: + backups = self.list_backups(backup_type) + + if len(backups) <= max_backups: + return 0 + + deleted_count = 0 + for backup in backups[max_backups:]: + if self.delete_backup(backup["name"]): + deleted_count += 1 + + logger.info(f"Cleaned up {deleted_count} old backups") + return deleted_count + except Exception as e: + logger.error(f"Failed to cleanup old backups: {e}") + return 0 + + def export_anime_data( + self, output_file: str + ) -> bool: + """Export anime library data to JSON. + + Args: + output_file: Path to export file. + + Returns: + bool: True if export was successful. + """ + try: + # This would integrate with the anime service + # to export anime library data + export_data = { + "timestamp": datetime.now().isoformat(), + "anime_count": 0, + "data": [], + } + + with open(output_file, "w") as f: + json.dump(export_data, f, indent=2) + + logger.info(f"Anime data exported to: {output_file}") + return True + except Exception as e: + logger.error(f"Failed to export anime data: {e}") + return False + + def import_anime_data(self, input_file: str) -> bool: + """Import anime library data from JSON. + + Args: + input_file: Path to import file. + + Returns: + bool: True if import was successful. + """ + try: + if not os.path.exists(input_file): + logger.error(f"Import file not found: {input_file}") + return False + + with open(input_file, "r") as f: + json.load(f) # Load and validate JSON + + # This would integrate with the anime service + # to import anime library data + + logger.info(f"Anime data imported from: {input_file}") + return True + except Exception as e: + logger.error(f"Failed to import anime data: {e}") + return False + + +# Global backup service instance +_backup_service: Optional[BackupService] = None + + +def get_backup_service() -> BackupService: + """Get or create the global backup service instance. + + Returns: + BackupService: The backup service instance. + """ + global _backup_service + if _backup_service is None: + _backup_service = BackupService() + return _backup_service diff --git a/src/server/services/monitoring_service.py b/src/server/services/monitoring_service.py new file mode 100644 index 0000000..3aa7aab --- /dev/null +++ b/src/server/services/monitoring_service.py @@ -0,0 +1,324 @@ +"""Monitoring service for system resource tracking and metrics collection.""" + +import logging +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional + +import psutil +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.server.database.models import DownloadQueueItem + +logger = logging.getLogger(__name__) + + +@dataclass +class QueueMetrics: + """Download queue statistics and metrics.""" + + total_items: int = 0 + pending_items: int = 0 + downloading_items: int = 0 + completed_items: int = 0 + failed_items: int = 0 + total_size_bytes: int = 0 + downloaded_bytes: int = 0 + average_speed_mbps: float = 0.0 + estimated_time_remaining: Optional[timedelta] = None + success_rate: float = 0.0 + + +@dataclass +class SystemMetrics: + """System resource metrics at a point in time.""" + + timestamp: datetime + cpu_percent: float + memory_percent: float + memory_available_mb: float + disk_percent: float + disk_free_mb: float + uptime_seconds: float + + +@dataclass +class ErrorMetrics: + """Error tracking and statistics.""" + + total_errors: int = 0 + errors_24h: int = 0 + most_common_errors: Dict[str, int] = field(default_factory=dict) + last_error_time: Optional[datetime] = None + error_rate_per_hour: float = 0.0 + + +class MonitoringService: + """Service for monitoring system resources and application metrics.""" + + def __init__(self): + """Initialize monitoring service.""" + self._error_log: List[tuple[datetime, str]] = [] + self._performance_samples: List[SystemMetrics] = [] + self._max_samples = 1440 # Keep 24 hours of minute samples + + def get_system_metrics(self) -> SystemMetrics: + """Get current system resource metrics. + + Returns: + SystemMetrics: Current system metrics. + """ + try: + import time + + cpu_percent = psutil.cpu_percent(interval=1) + memory_info = psutil.virtual_memory() + disk_info = psutil.disk_usage("/") + boot_time = psutil.boot_time() + uptime_seconds = time.time() - boot_time + + metrics = SystemMetrics( + timestamp=datetime.now(), + cpu_percent=cpu_percent, + memory_percent=memory_info.percent, + memory_available_mb=memory_info.available / (1024 * 1024), + disk_percent=disk_info.percent, + disk_free_mb=disk_info.free / (1024 * 1024), + uptime_seconds=uptime_seconds, + ) + + # Store sample + self._performance_samples.append(metrics) + if len(self._performance_samples) > self._max_samples: + self._performance_samples.pop(0) + + return metrics + except Exception as e: + logger.error(f"Failed to get system metrics: {e}") + raise + + async def get_queue_metrics(self, db: AsyncSession) -> QueueMetrics: + """Get download queue metrics. + + Args: + db: Database session. + + Returns: + QueueMetrics: Queue statistics and progress. + """ + try: + # Get all queue items + result = await db.execute(select(DownloadQueueItem)) + items = result.scalars().all() + + if not items: + return QueueMetrics() + + # Calculate metrics + total_items = len(items) + pending_items = sum(1 for i in items if i.status == "PENDING") + downloading_items = sum( + 1 for i in items if i.status == "DOWNLOADING" + ) + completed_items = sum(1 for i in items if i.status == "COMPLETED") + failed_items = sum(1 for i in items if i.status == "FAILED") + + total_size_bytes = sum( + (i.total_bytes or 0) for i in items + ) + downloaded_bytes = sum( + (i.downloaded_bytes or 0) for i in items + ) + + # Calculate average speed from active downloads + speeds = [ + i.download_speed for i in items + if i.status == "DOWNLOADING" and i.download_speed + ] + average_speed_mbps = ( + sum(speeds) / len(speeds) / (1024 * 1024) if speeds else 0 + ) + + # Calculate success rate + success_rate = ( + (completed_items / total_items * 100) if total_items > 0 else 0 + ) + + # Estimate time remaining + estimated_time_remaining = None + if average_speed_mbps > 0 and total_size_bytes > downloaded_bytes: + remaining_bytes = total_size_bytes - downloaded_bytes + remaining_seconds = remaining_bytes / average_speed_mbps + estimated_time_remaining = timedelta(seconds=remaining_seconds) + + return QueueMetrics( + total_items=total_items, + pending_items=pending_items, + downloading_items=downloading_items, + completed_items=completed_items, + failed_items=failed_items, + total_size_bytes=total_size_bytes, + downloaded_bytes=downloaded_bytes, + average_speed_mbps=average_speed_mbps, + estimated_time_remaining=estimated_time_remaining, + success_rate=success_rate, + ) + except Exception as e: + logger.error(f"Failed to get queue metrics: {e}") + raise + + def log_error(self, error_message: str) -> None: + """Log an error for tracking purposes. + + Args: + error_message: The error message to log. + """ + self._error_log.append((datetime.now(), error_message)) + logger.debug(f"Error logged: {error_message}") + + def get_error_metrics(self) -> ErrorMetrics: + """Get error tracking metrics. + + Returns: + ErrorMetrics: Error statistics and trends. + """ + total_errors = len(self._error_log) + + # Get errors from last 24 hours + cutoff_time = datetime.now() - timedelta(hours=24) + recent_errors = [ + (time, msg) for time, msg in self._error_log + if time >= cutoff_time + ] + errors_24h = len(recent_errors) + + # Count error types + error_counts: Dict[str, int] = {} + for _, msg in recent_errors: + error_type = msg.split(":")[0] + error_counts[error_type] = error_counts.get(error_type, 0) + 1 + + # Sort by count + most_common_errors = dict( + sorted(error_counts.items(), key=lambda x: x[1], reverse=True)[:10] + ) + + # Get last error time + last_error_time = ( + recent_errors[-1][0] if recent_errors else None + ) + + # Calculate error rate per hour + error_rate_per_hour = ( + errors_24h / 24 if errors_24h > 0 else 0 + ) + + return ErrorMetrics( + total_errors=total_errors, + errors_24h=errors_24h, + most_common_errors=most_common_errors, + last_error_time=last_error_time, + error_rate_per_hour=error_rate_per_hour, + ) + + def get_performance_summary(self) -> Dict[str, Any]: + """Get performance summary from collected samples. + + Returns: + dict: Performance statistics. + """ + if not self._performance_samples: + return {} + + cpu_values = [m.cpu_percent for m in self._performance_samples] + memory_values = [m.memory_percent for m in self._performance_samples] + disk_values = [m.disk_percent for m in self._performance_samples] + + return { + "cpu": { + "current": cpu_values[-1], + "average": sum(cpu_values) / len(cpu_values), + "max": max(cpu_values), + "min": min(cpu_values), + }, + "memory": { + "current": memory_values[-1], + "average": sum(memory_values) / len(memory_values), + "max": max(memory_values), + "min": min(memory_values), + }, + "disk": { + "current": disk_values[-1], + "average": sum(disk_values) / len(disk_values), + "max": max(disk_values), + "min": min(disk_values), + }, + "sample_count": len(self._performance_samples), + } + + async def get_comprehensive_status( + self, db: AsyncSession + ) -> Dict[str, Any]: + """Get comprehensive system status summary. + + Args: + db: Database session. + + Returns: + dict: Complete system status. + """ + try: + system_metrics = self.get_system_metrics() + queue_metrics = await self.get_queue_metrics(db) + error_metrics = self.get_error_metrics() + performance = self.get_performance_summary() + + return { + "timestamp": datetime.now().isoformat(), + "system": { + "cpu_percent": system_metrics.cpu_percent, + "memory_percent": system_metrics.memory_percent, + "disk_percent": system_metrics.disk_percent, + "uptime_seconds": system_metrics.uptime_seconds, + }, + "queue": { + "total_items": queue_metrics.total_items, + "pending": queue_metrics.pending_items, + "downloading": queue_metrics.downloading_items, + "completed": queue_metrics.completed_items, + "failed": queue_metrics.failed_items, + "success_rate": round(queue_metrics.success_rate, 2), + "average_speed_mbps": round( + queue_metrics.average_speed_mbps, 2 + ), + }, + "errors": { + "total": error_metrics.total_errors, + "last_24h": error_metrics.errors_24h, + "rate_per_hour": round( + error_metrics.error_rate_per_hour, 2 + ), + "most_common": error_metrics.most_common_errors, + }, + "performance": performance, + } + except Exception as e: + logger.error(f"Failed to get comprehensive status: {e}") + raise + + +# Global monitoring service instance +_monitoring_service: Optional[MonitoringService] = None + + +def get_monitoring_service() -> MonitoringService: + """Get or create the global monitoring service instance. + + Returns: + MonitoringService: The monitoring service instance. + """ + global _monitoring_service + if _monitoring_service is None: + _monitoring_service = MonitoringService() + return _monitoring_service diff --git a/src/server/utils/log_manager.py b/src/server/utils/log_manager.py new file mode 100644 index 0000000..0f71a76 --- /dev/null +++ b/src/server/utils/log_manager.py @@ -0,0 +1,380 @@ +"""Log management utilities for rotation, archival, and search.""" + +import gzip +import logging +import shutil +from dataclasses import dataclass +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class LogFile: + """Information about a log file.""" + + filename: str + path: Path + size_bytes: int + created_time: datetime + modified_time: datetime + + +class LogManager: + """Manage application logs.""" + + def __init__(self, log_dir: str = "logs"): + """Initialize log manager. + + Args: + log_dir: Directory containing log files. + """ + self.log_dir = Path(log_dir) + self.log_dir.mkdir(parents=True, exist_ok=True) + self.archived_dir = self.log_dir / "archived" + self.archived_dir.mkdir(exist_ok=True) + + def get_log_files(self, pattern: str = "*.log") -> List[LogFile]: + """Get list of log files. + + Args: + pattern: Glob pattern for log files. + + Returns: + list: List of LogFile objects. + """ + log_files = [] + + for log_path in self.log_dir.glob(pattern): + if log_path.is_file(): + stat = log_path.stat() + log_files.append( + LogFile( + filename=log_path.name, + path=log_path, + size_bytes=stat.st_size, + created_time=datetime.fromtimestamp( + stat.st_ctime + ), + modified_time=datetime.fromtimestamp( + stat.st_mtime + ), + ) + ) + + return sorted(log_files, key=lambda x: x.modified_time, reverse=True) + + def rotate_log( + self, log_file: str, max_size_bytes: int = 10485760 + ) -> bool: + """Rotate a log file if it exceeds max size. + + Args: + log_file: Name of the log file. + max_size_bytes: Maximum size before rotation (default 10MB). + + Returns: + bool: True if rotation was needed and successful. + """ + try: + log_path = self.log_dir / log_file + + if not log_path.exists(): + logger.warning(f"Log file not found: {log_file}") + return False + + stat = log_path.stat() + if stat.st_size < max_size_bytes: + return False + + # Create rotated filename with timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + rotated_name = f"{log_path.stem}_{timestamp}.log" + rotated_path = self.log_dir / rotated_name + + shutil.move(str(log_path), str(rotated_path)) + + # Compress the rotated file + self._compress_log(rotated_path) + + logger.info(f"Rotated log file: {log_file} -> {rotated_name}") + return True + except Exception as e: + logger.error(f"Failed to rotate log file {log_file}: {e}") + return False + + def _compress_log(self, log_path: Path) -> bool: + """Compress a log file. + + Args: + log_path: Path to the log file. + + Returns: + bool: True if compression was successful. + """ + try: + gz_path = log_path.parent / f"{log_path.name}.gz" + + with open(log_path, "rb") as f_in: + with gzip.open(gz_path, "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + + log_path.unlink() + logger.debug(f"Compressed log file: {log_path.name}") + return True + except Exception as e: + logger.error(f"Failed to compress log {log_path}: {e}") + return False + + def archive_old_logs( + self, days_old: int = 30 + ) -> int: + """Archive log files older than specified days. + + Args: + days_old: Archive logs older than this many days. + + Returns: + int: Number of logs archived. + """ + try: + cutoff_time = datetime.now() - timedelta(days=days_old) + archived_count = 0 + + for log_file in self.get_log_files(): + if log_file.modified_time < cutoff_time: + try: + archived_path = ( + self.archived_dir / log_file.filename + ) + shutil.move(str(log_file.path), str(archived_path)) + self._compress_log(archived_path) + archived_count += 1 + logger.debug( + f"Archived log: {log_file.filename}" + ) + except Exception as e: + logger.warning( + f"Failed to archive {log_file.filename}: {e}" + ) + + logger.info(f"Archived {archived_count} old log files") + return archived_count + except Exception as e: + logger.error(f"Failed to archive logs: {e}") + return 0 + + def search_logs( + self, search_term: str, case_sensitive: bool = False + ) -> Dict[str, List[str]]: + """Search for lines matching a term in log files. + + Args: + search_term: Text to search for. + case_sensitive: Whether search is case-sensitive. + + Returns: + dict: Dictionary mapping log files to matching lines. + """ + try: + results = {} + + for log_file in self.get_log_files(): + try: + with open(log_file.path, "r", encoding="utf-8") as f: + matching_lines = [] + for line_num, line in enumerate(f, 1): + if case_sensitive: + if search_term in line: + matching_lines.append( + f"{line_num}: {line.strip()}" + ) + else: + if search_term.lower() in line.lower(): + matching_lines.append( + f"{line_num}: {line.strip()}" + ) + + if matching_lines: + results[log_file.filename] = matching_lines + except Exception as e: + logger.warning( + f"Failed to search {log_file.filename}: {e}" + ) + + logger.debug( + f"Search for '{search_term}' found {len(results)} log files" + ) + return results + except Exception as e: + logger.error(f"Failed to search logs: {e}") + return {} + + def export_logs( + self, + output_file: str, + log_pattern: str = "*.log", + compress: bool = True, + ) -> bool: + """Export logs to a file or archive. + + Args: + output_file: Path to output file. + log_pattern: Pattern for logs to include. + compress: Whether to compress the output. + + Returns: + bool: True if export was successful. + """ + try: + output_path = Path(output_file) + + if compress: + import tarfile + + tar_path = output_path.with_suffix(".tar.gz") + + with tarfile.open(tar_path, "w:gz") as tar: + for log_file in self.get_log_files(log_pattern): + tar.add( + log_file.path, + arcname=log_file.filename, + ) + + logger.info(f"Exported logs to: {tar_path}") + return True + else: + # Concatenate all logs + with open(output_path, "w") as out_f: + for log_file in self.get_log_files(log_pattern): + out_f.write(f"\n\n=== {log_file.filename} ===\n\n") + with open(log_file.path, "r") as in_f: + out_f.write(in_f.read()) + + logger.info(f"Exported logs to: {output_path}") + return True + except Exception as e: + logger.error(f"Failed to export logs: {e}") + return False + + def get_log_stats(self) -> Dict[str, Any]: + """Get statistics about log files. + + Returns: + dict: Log statistics. + """ + try: + log_files = self.get_log_files() + + total_size = sum(log.size_bytes for log in log_files) + total_files = len(log_files) + + if not log_files: + return { + "total_files": 0, + "total_size_bytes": 0, + "total_size_mb": 0, + "average_size_bytes": 0, + "largest_file": None, + "oldest_file": None, + "newest_file": None, + } + + return { + "total_files": total_files, + "total_size_bytes": total_size, + "total_size_mb": total_size / (1024 * 1024), + "average_size_bytes": total_size // total_files, + "largest_file": max( + log_files, key=lambda x: x.size_bytes + ).filename, + "oldest_file": log_files[-1].filename, + "newest_file": log_files[0].filename, + } + except Exception as e: + logger.error(f"Failed to get log stats: {e}") + return {} + + def cleanup_logs( + self, max_total_size_mb: int = 100, keep_files: int = 5 + ) -> int: + """Clean up old logs to maintain size limit. + + Args: + max_total_size_mb: Maximum total log size in MB. + keep_files: Minimum files to keep. + + Returns: + int: Number of files deleted. + """ + try: + max_bytes = max_total_size_mb * 1024 * 1024 + log_files = self.get_log_files() + + if len(log_files) <= keep_files: + return 0 + + total_size = sum(log.size_bytes for log in log_files) + + deleted_count = 0 + for log_file in reversed(log_files): + if ( + total_size <= max_bytes + or len(log_files) <= keep_files + ): + break + + try: + log_file.path.unlink() + total_size -= log_file.size_bytes + deleted_count += 1 + logger.debug(f"Deleted log file: {log_file.filename}") + except Exception as e: + logger.warning( + f"Failed to delete {log_file.filename}: {e}" + ) + + logger.info(f"Cleaned up {deleted_count} log files") + return deleted_count + except Exception as e: + logger.error(f"Failed to cleanup logs: {e}") + return 0 + + def set_log_level(self, logger_name: str, level: str) -> bool: + """Set log level for a specific logger. + + Args: + logger_name: Name of the logger. + level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL). + + Returns: + bool: True if successful. + """ + try: + log_level = getattr(logging, level.upper(), logging.INFO) + target_logger = logging.getLogger(logger_name) + target_logger.setLevel(log_level) + + logger.info(f"Set {logger_name} log level to {level}") + return True + except Exception as e: + logger.error(f"Failed to set log level: {e}") + return False + + +# Global log manager instance +_log_manager: Optional[LogManager] = None + + +def get_log_manager() -> LogManager: + """Get or create the global log manager instance. + + Returns: + LogManager: The log manager instance. + """ + global _log_manager + if _log_manager is None: + _log_manager = LogManager() + return _log_manager diff --git a/src/server/utils/metrics.py b/src/server/utils/metrics.py new file mode 100644 index 0000000..501e513 --- /dev/null +++ b/src/server/utils/metrics.py @@ -0,0 +1,358 @@ +"""Metrics collection for Prometheus and custom business metrics.""" + +import logging +import time +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from threading import Lock +from typing import Any, Dict, Optional + +logger = logging.getLogger(__name__) + + +class MetricType(Enum): + """Types of metrics.""" + + COUNTER = "counter" + GAUGE = "gauge" + HISTOGRAM = "histogram" + SUMMARY = "summary" + + +@dataclass +class MetricValue: + """A single metric value with metadata.""" + + name: str + value: float + metric_type: MetricType + labels: Dict[str, str] = field(default_factory=dict) + timestamp: datetime = field(default_factory=datetime.now) + help_text: str = "" + + +@dataclass +class HistogramBucket: + """Histogram bucket for latency tracking.""" + + le: float # bucket upper bound in seconds + count: int = 0 + + +class MetricsCollector: + """Collect and export metrics for monitoring.""" + + def __init__(self): + """Initialize metrics collector.""" + self._metrics: Dict[str, MetricValue] = {} + self._request_timings: Dict[str, list[float]] = {} + self._download_stats: Dict[str, int] = { + "completed": 0, + "failed": 0, + "total_size_bytes": 0, + } + self._lock = Lock() + self._timers: Dict[str, float] = {} + + def increment_counter( + self, + name: str, + value: float = 1.0, + labels: Optional[Dict[str, str]] = None, + help_text: str = "", + ) -> None: + """Increment a counter metric. + + Args: + name: Metric name. + value: Amount to increment by. + labels: Optional labels for the metric. + help_text: Optional help text describing the metric. + """ + with self._lock: + if name not in self._metrics: + self._metrics[name] = MetricValue( + name=name, + value=value, + metric_type=MetricType.COUNTER, + labels=labels or {}, + help_text=help_text, + ) + else: + self._metrics[name].value += value + + def set_gauge( + self, + name: str, + value: float, + labels: Optional[Dict[str, str]] = None, + help_text: str = "", + ) -> None: + """Set a gauge metric. + + Args: + name: Metric name. + value: Gauge value. + labels: Optional labels for the metric. + help_text: Optional help text describing the metric. + """ + with self._lock: + self._metrics[name] = MetricValue( + name=name, + value=value, + metric_type=MetricType.GAUGE, + labels=labels or {}, + help_text=help_text, + ) + + def observe_histogram( + self, + name: str, + value: float, + labels: Optional[Dict[str, str]] = None, + help_text: str = "", + ) -> None: + """Observe a value for histogram. + + Args: + name: Metric name. + value: Value to record. + labels: Optional labels for the metric. + help_text: Optional help text describing the metric. + """ + with self._lock: + if name not in self._request_timings: + self._request_timings[name] = [] + self._request_timings[name].append(value) + + # Update histogram metric + if name not in self._metrics: + self._metrics[name] = MetricValue( + name=name, + value=value, + metric_type=MetricType.HISTOGRAM, + labels=labels or {}, + help_text=help_text, + ) + + def start_timer(self, timer_name: str) -> None: + """Start a timer for tracking operation duration. + + Args: + timer_name: Name of the timer. + """ + self._timers[timer_name] = time.time() + + def end_timer( + self, + timer_name: str, + metric_name: str, + labels: Optional[Dict[str, str]] = None, + ) -> float: + """End a timer and record the duration. + + Args: + timer_name: Name of the timer to end. + metric_name: Name of the metric to record. + labels: Optional labels for the metric. + + Returns: + Duration in seconds. + """ + if timer_name not in self._timers: + logger.warning(f"Timer {timer_name} not started") + return 0.0 + + duration = time.time() - self._timers[timer_name] + del self._timers[timer_name] + + self.observe_histogram( + metric_name, duration, labels, "Request/operation duration" + ) + return duration + + def record_download_success(self, size_bytes: int) -> None: + """Record a successful download. + + Args: + size_bytes: Size of downloaded file in bytes. + """ + with self._lock: + self._download_stats["completed"] += 1 + self._download_stats["total_size_bytes"] += size_bytes + + self.increment_counter( + "downloads_completed_total", + help_text="Total successful downloads", + ) + + def record_download_failure(self) -> None: + """Record a failed download.""" + with self._lock: + self._download_stats["failed"] += 1 + + self.increment_counter( + "downloads_failed_total", help_text="Total failed downloads" + ) + + def get_download_stats(self) -> Dict[str, int]: + """Get download statistics. + + Returns: + dict: Download statistics. + """ + with self._lock: + return self._download_stats.copy() + + def get_request_statistics( + self, metric_name: str + ) -> Optional[Dict[str, float]]: + """Get statistics for a request timing metric. + + Args: + metric_name: Name of the metric to analyze. + + Returns: + Statistics including count, sum, mean, min, max. + """ + with self._lock: + if metric_name not in self._request_timings: + return None + + timings = self._request_timings[metric_name] + if not timings: + return None + + return { + "count": len(timings), + "sum": sum(timings), + "mean": sum(timings) / len(timings), + "min": min(timings), + "max": max(timings), + "p50": sorted(timings)[len(timings) // 2], + "p99": sorted(timings)[int(len(timings) * 0.99)], + } + + def export_prometheus_format(self) -> str: + """Export metrics in Prometheus text format. + + Returns: + str: Prometheus format metrics. + """ + with self._lock: + lines = [] + + for name, metric in self._metrics.items(): + # Add help text if available + if metric.help_text: + lines.append(f"# HELP {name} {metric.help_text}") + lines.append(f"# TYPE {name} {metric.metric_type.value}") + + # Format labels + label_str = "" + if metric.labels: + label_pairs = [ + f'{k}="{v}"' for k, v in metric.labels.items() + ] + label_str = "{" + ",".join(label_pairs) + "}" + + # Add metric value + lines.append(f"{name}{label_str} {metric.value}") + + return "\n".join(lines) + + def export_json(self) -> Dict[str, Any]: + """Export metrics as JSON. + + Returns: + dict: Metrics in JSON-serializable format. + """ + with self._lock: + metrics_dict = {} + + for name, metric in self._metrics.items(): + metrics_dict[name] = { + "value": metric.value, + "type": metric.metric_type.value, + "labels": metric.labels, + "timestamp": metric.timestamp.isoformat(), + } + + return { + "metrics": metrics_dict, + "downloads": self._download_stats, + "request_timings": { + name: self.get_request_statistics(name) + for name in self._request_timings + }, + } + + def reset_metrics(self) -> None: + """Reset all collected metrics.""" + with self._lock: + self._metrics.clear() + self._request_timings.clear() + self._download_stats = { + "completed": 0, + "failed": 0, + "total_size_bytes": 0, + } + + def get_all_metrics(self) -> Dict[str, MetricValue]: + """Get all collected metrics. + + Returns: + dict: All metrics keyed by name. + """ + with self._lock: + return self._metrics.copy() + + +# Global metrics collector instance +_metrics_collector: Optional[MetricsCollector] = None + + +def get_metrics_collector() -> MetricsCollector: + """Get or create the global metrics collector instance. + + Returns: + MetricsCollector: The metrics collector instance. + """ + global _metrics_collector + if _metrics_collector is None: + _metrics_collector = MetricsCollector() + return _metrics_collector + + +class TimerContext: + """Context manager for timing operations.""" + + def __init__( + self, + metric_name: str, + timer_name: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + ): + """Initialize timer context. + + Args: + metric_name: Name of the metric to record. + timer_name: Optional name for the timer. + labels: Optional labels for the metric. + """ + self.metric_name = metric_name + self.timer_name = timer_name or metric_name + self.labels = labels + self.collector = get_metrics_collector() + + def __enter__(self): + """Start the timer.""" + self.collector.start_timer(self.timer_name) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """End the timer and record the metric.""" + self.collector.end_timer( + self.timer_name, self.metric_name, self.labels + ) diff --git a/src/server/utils/system.py b/src/server/utils/system.py new file mode 100644 index 0000000..458be2c --- /dev/null +++ b/src/server/utils/system.py @@ -0,0 +1,361 @@ +"""System utility functions for monitoring and management.""" + +import logging +import os +import shutil +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +import psutil + +logger = logging.getLogger(__name__) + + +@dataclass +class DiskInfo: + """Information about disk usage.""" + + total_bytes: int + used_bytes: int + free_bytes: int + percent_used: float + path: str + + +@dataclass +class ProcessInfo: + """Information about a process.""" + + pid: int + name: str + status: str + cpu_percent: float + memory_percent: float + memory_mb: float + create_time: datetime + + +class SystemUtilities: + """Utilities for system monitoring and management.""" + + @staticmethod + def get_disk_usage(path: str = "/") -> Optional[DiskInfo]: + """Get disk usage information. + + Args: + path: Path to check disk usage for. + + Returns: + DiskInfo: Disk usage information. + """ + try: + usage = psutil.disk_usage(path) + return DiskInfo( + total_bytes=usage.total, + used_bytes=usage.used, + free_bytes=usage.free, + percent_used=usage.percent, + path=path, + ) + except Exception as e: + logger.error(f"Failed to get disk usage for {path}: {e}") + return None + + @staticmethod + def get_all_disk_usage() -> List[DiskInfo]: + """Get disk usage for all mounted partitions. + + Returns: + list: List of DiskInfo for each partition. + """ + try: + partitions = psutil.disk_partitions() + disk_infos = [] + + for partition in partitions: + try: + usage = psutil.disk_usage(partition.mountpoint) + disk_infos.append( + DiskInfo( + total_bytes=usage.total, + used_bytes=usage.used, + free_bytes=usage.free, + percent_used=usage.percent, + path=partition.mountpoint, + ) + ) + except Exception as e: + logger.warning( + f"Failed to get usage for {partition.mountpoint}: {e}" + ) + + return disk_infos + except Exception as e: + logger.error(f"Failed to get all disk usage: {e}") + return [] + + @staticmethod + def cleanup_directory( + directory: str, pattern: str = "*", max_age_days: int = 30 + ) -> int: + """Clean up files in a directory matching a pattern. + + Args: + directory: Directory to clean. + pattern: File pattern to match (glob). + max_age_days: Only delete files older than this. + + Returns: + int: Number of files deleted. + """ + try: + from datetime import timedelta + + path = Path(directory) + if not path.exists(): + logger.warning(f"Directory not found: {directory}") + return 0 + + deleted_count = 0 + cutoff_time = datetime.now() - timedelta(days=max_age_days) + + for file_path in path.glob(pattern): + if file_path.is_file(): + file_time = datetime.fromtimestamp( + file_path.stat().st_mtime + ) + if file_time < cutoff_time: + try: + file_path.unlink() + deleted_count += 1 + logger.debug(f"Deleted file: {file_path}") + except Exception as e: + logger.warning( + f"Failed to delete {file_path}: {e}" + ) + + logger.info(f"Cleaned up {deleted_count} files from {directory}") + return deleted_count + except Exception as e: + logger.error(f"Failed to cleanup directory {directory}: {e}") + return 0 + + @staticmethod + def cleanup_empty_directories(directory: str) -> int: + """Remove empty directories. + + Args: + directory: Root directory to clean. + + Returns: + int: Number of directories deleted. + """ + try: + path = Path(directory) + if not path.exists(): + return 0 + + deleted_count = 0 + + # Walk from bottom to top to delete empty dirs + for root, dirs, files in os.walk(directory, topdown=False): + for dir_name in dirs: + dir_path = Path(root) / dir_name + try: + if not os.listdir(dir_path): + os.rmdir(dir_path) + deleted_count += 1 + logger.debug( + f"Deleted empty directory: {dir_path}" + ) + except Exception as e: + logger.debug(f"Cannot delete {dir_path}: {e}") + + logger.info(f"Cleaned up {deleted_count} empty directories") + return deleted_count + except Exception as e: + logger.error(f"Failed to cleanup empty directories: {e}") + return 0 + + @staticmethod + def get_directory_size(directory: str) -> int: + """Get total size of a directory. + + Args: + directory: Directory path. + + Returns: + int: Total size in bytes. + """ + try: + path = Path(directory) + if not path.exists(): + return 0 + + total_size = 0 + for entry in path.rglob("*"): + if entry.is_file(): + total_size += entry.stat().st_size + + return total_size + except Exception as e: + logger.error(f"Failed to get directory size for {directory}: {e}") + return 0 + + @staticmethod + def get_process_info(pid: Optional[int] = None) -> Optional[ProcessInfo]: + """Get information about a process. + + Args: + pid: Process ID. If None, uses current process. + + Returns: + ProcessInfo: Process information. + """ + try: + if pid is None: + pid = os.getpid() + + process = psutil.Process(pid) + with process.oneshot(): + return ProcessInfo( + pid=process.pid, + name=process.name(), + status=process.status(), + cpu_percent=process.cpu_percent(), + memory_percent=process.memory_percent(), + memory_mb=process.memory_info().rss / (1024 * 1024), + create_time=datetime.fromtimestamp( + process.create_time() + ), + ) + except Exception as e: + logger.error(f"Failed to get process info for {pid}: {e}") + return None + + @staticmethod + def get_all_processes() -> List[ProcessInfo]: + """Get information about all running processes. + + Returns: + list: List of ProcessInfo for each process. + """ + try: + processes = [] + for proc in psutil.process_iter( + ["pid", "name", "status", "cpu_num", "memory_percent"] + ): + try: + info = SystemUtilities.get_process_info(proc.pid) + if info: + processes.append(info) + except Exception: + pass + + return processes + except Exception as e: + logger.error(f"Failed to get all processes: {e}") + return [] + + @staticmethod + def get_system_info() -> Dict[str, Any]: + """Get comprehensive system information. + + Returns: + dict: System information. + """ + try: + import platform + + return { + "platform": platform.platform(), + "processor": platform.processor(), + "cpu_count": psutil.cpu_count(logical=False), + "cpu_count_logical": psutil.cpu_count(logical=True), + "boot_time": datetime.fromtimestamp( + psutil.boot_time() + ).isoformat(), + "hostname": platform.node(), + "python_version": platform.python_version(), + } + except Exception as e: + logger.error(f"Failed to get system info: {e}") + return {} + + @staticmethod + def get_network_info() -> Dict[str, Any]: + """Get network information. + + Returns: + dict: Network statistics. + """ + try: + net_io = psutil.net_io_counters() + return { + "bytes_sent": net_io.bytes_sent, + "bytes_recv": net_io.bytes_recv, + "packets_sent": net_io.packets_sent, + "packets_recv": net_io.packets_recv, + "errors_in": net_io.errin, + "errors_out": net_io.errout, + "dropped_in": net_io.dropin, + "dropped_out": net_io.dropout, + } + except Exception as e: + logger.error(f"Failed to get network info: {e}") + return {} + + @staticmethod + def copy_file_atomic( + src: str, dest: str, chunk_size: int = 1024 * 1024 + ) -> bool: + """Copy a file atomically using temporary file. + + Args: + src: Source file path. + dest: Destination file path. + chunk_size: Size of chunks for copying. + + Returns: + bool: True if successful. + """ + try: + src_path = Path(src) + dest_path = Path(dest) + + if not src_path.exists(): + logger.error(f"Source file not found: {src}") + return False + + # Create temporary file + temp_path = dest_path.parent / f"{dest_path.name}.tmp" + + # Copy to temporary file + shutil.copyfile(src, temp_path) + + # Atomic rename + temp_path.replace(dest_path) + + logger.debug(f"Atomically copied {src} to {dest}") + return True + except Exception as e: + logger.error(f"Failed to copy file {src} to {dest}: {e}") + return False + + +# Global system utilities instance +_system_utilities: Optional[SystemUtilities] = None + + +def get_system_utilities() -> SystemUtilities: + """Get or create the global system utilities instance. + + Returns: + SystemUtilities: The system utilities instance. + """ + global _system_utilities + if _system_utilities is None: + _system_utilities = SystemUtilities() + return _system_utilities diff --git a/tests/api/test_analytics_endpoints.py b/tests/api/test_analytics_endpoints.py new file mode 100644 index 0000000..a515239 --- /dev/null +++ b/tests/api/test_analytics_endpoints.py @@ -0,0 +1,128 @@ +"""Integration tests for analytics API endpoints. + +Tests analytics API endpoints including download statistics, +series popularity, storage analysis, and performance reports. +""" + +from unittest.mock import AsyncMock, patch + +import pytest +from fastapi.testclient import TestClient + +from src.server.fastapi_app import app + + +@pytest.fixture +def client(): + """Create test client.""" + return TestClient(app) + + +def test_analytics_downloads_endpoint(client): + """Test GET /api/analytics/downloads endpoint.""" + with patch( + "src.server.api.analytics.get_db" + ) as mock_get_db: + mock_db = AsyncMock() + mock_get_db.return_value = mock_db + + response = client.get("/api/analytics/downloads?days=30") + + assert response.status_code in [200, 422, 500] + + +def test_analytics_series_popularity_endpoint(client): + """Test GET /api/analytics/series-popularity endpoint.""" + with patch( + "src.server.api.analytics.get_db" + ) as mock_get_db: + mock_db = AsyncMock() + mock_get_db.return_value = mock_db + + response = client.get("/api/analytics/series-popularity?limit=10") + + assert response.status_code in [200, 422, 500] + + +def test_analytics_storage_endpoint(client): + """Test GET /api/analytics/storage endpoint.""" + with patch("psutil.disk_usage") as mock_disk: + mock_disk.return_value = { + "total": 1024 * 1024 * 1024, + "used": 512 * 1024 * 1024, + "free": 512 * 1024 * 1024, + "percent": 50.0, + } + + response = client.get("/api/analytics/storage") + + assert response.status_code in [200, 500] + + +def test_analytics_performance_endpoint(client): + """Test GET /api/analytics/performance endpoint.""" + with patch( + "src.server.api.analytics.get_db" + ) as mock_get_db: + mock_db = AsyncMock() + mock_get_db.return_value = mock_db + + response = client.get("/api/analytics/performance?hours=24") + + assert response.status_code in [200, 422, 500] + + +def test_analytics_summary_endpoint(client): + """Test GET /api/analytics/summary endpoint.""" + with patch( + "src.server.api.analytics.get_db" + ) as mock_get_db: + mock_db = AsyncMock() + mock_get_db.return_value = mock_db + + response = client.get("/api/analytics/summary") + + assert response.status_code in [200, 500] + + +def test_analytics_downloads_with_query_params(client): + """Test /api/analytics/downloads with different query params.""" + with patch( + "src.server.api.analytics.get_db" + ) as mock_get_db: + mock_db = AsyncMock() + mock_get_db.return_value = mock_db + + response = client.get("/api/analytics/downloads?days=7") + + assert response.status_code in [200, 422, 500] + + +def test_analytics_series_with_different_limits(client): + """Test /api/analytics/series-popularity with different limits.""" + with patch( + "src.server.api.analytics.get_db" + ) as mock_get_db: + mock_db = AsyncMock() + mock_get_db.return_value = mock_db + + for limit in [5, 10, 20]: + response = client.get( + f"/api/analytics/series-popularity?limit={limit}" + ) + assert response.status_code in [200, 422, 500] + + +def test_analytics_performance_with_different_hours(client): + """Test /api/analytics/performance with different hour ranges.""" + with patch( + "src.server.api.analytics.get_db" + ) as mock_get_db: + mock_db = AsyncMock() + mock_get_db.return_value = mock_db + + for hours in [1, 12, 24, 72]: + response = client.get( + f"/api/analytics/performance?hours={hours}" + ) + assert response.status_code in [200, 422, 500] diff --git a/tests/unit/test_analytics_service.py b/tests/unit/test_analytics_service.py new file mode 100644 index 0000000..fbd8493 --- /dev/null +++ b/tests/unit/test_analytics_service.py @@ -0,0 +1,312 @@ +"""Unit tests for analytics service. + +Tests analytics service functionality including download statistics, +series popularity tracking, storage analysis, and performance reporting. +""" + +import json +from datetime import datetime +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from sqlalchemy.ext.asyncio import AsyncSession + +from src.server.services.analytics_service import ( + AnalyticsService, + DownloadStats, + PerformanceReport, + StorageAnalysis, +) + + +@pytest.fixture +def analytics_service(tmp_path): + """Create analytics service with temp directory.""" + with patch("src.server.services.analytics_service.ANALYTICS_FILE", + tmp_path / "analytics.json"): + service = AnalyticsService() + yield service + + +@pytest.fixture +async def mock_db(): + """Create mock database session.""" + db = AsyncMock(spec=AsyncSession) + return db + + +@pytest.mark.asyncio +async def test_analytics_service_initialization(analytics_service): + """Test analytics service initializes with default data.""" + assert analytics_service.analytics_file.exists() + + data = json.loads(analytics_service.analytics_file.read_text()) + assert "created_at" in data + assert "download_stats" in data + assert "series_popularity" in data + assert data["download_stats"]["total_downloads"] == 0 + + +@pytest.mark.asyncio +async def test_get_download_stats_no_data( + analytics_service, mock_db +): + """Test download statistics with no download data.""" + mock_db.execute = AsyncMock(return_value=MagicMock( + scalars=MagicMock(return_value=MagicMock(all=MagicMock( + return_value=[] + ))) + )) + + stats = await analytics_service.get_download_stats(mock_db) + + assert isinstance(stats, DownloadStats) + assert stats.total_downloads == 0 + assert stats.successful_downloads == 0 + assert stats.success_rate == 0.0 + + +@pytest.mark.asyncio +async def test_get_download_stats_with_data( + analytics_service, mock_db +): + """Test download statistics with download data.""" + # Mock downloads + download1 = MagicMock() + download1.status = "completed" + download1.size_bytes = 1024 * 1024 * 100 # 100 MB + download1.duration_seconds = 60 + + download2 = MagicMock() + download2.status = "failed" + download2.size_bytes = 0 + download2.duration_seconds = 0 + + mock_db.execute = AsyncMock(return_value=MagicMock( + scalars=MagicMock(return_value=MagicMock(all=MagicMock( + return_value=[download1, download2] + ))) + )) + + stats = await analytics_service.get_download_stats(mock_db) + + assert stats.total_downloads == 2 + assert stats.successful_downloads == 1 + assert stats.failed_downloads == 1 + assert stats.success_rate == 50.0 + assert stats.total_bytes_downloaded == 1024 * 1024 * 100 + + +@pytest.mark.asyncio +async def test_get_series_popularity_empty( + analytics_service, mock_db +): + """Test series popularity with no data.""" + mock_db.execute = AsyncMock(return_value=MagicMock( + all=MagicMock(return_value=[]) + )) + + popularity = await analytics_service.get_series_popularity( + mock_db, limit=10 + ) + + assert isinstance(popularity, list) + assert len(popularity) == 0 + + +@pytest.mark.asyncio +async def test_get_series_popularity_with_data( + analytics_service, mock_db +): + """Test series popularity with data.""" + row = MagicMock() + row.series_name = "Test Anime" + row.download_count = 5 + row.total_size = 1024 * 1024 * 500 + row.last_download = datetime.now() + row.successful = 4 + + mock_db.execute = AsyncMock(return_value=MagicMock( + all=MagicMock(return_value=[row]) + )) + + popularity = await analytics_service.get_series_popularity( + mock_db, limit=10 + ) + + assert len(popularity) == 1 + assert popularity[0].series_name == "Test Anime" + assert popularity[0].download_count == 5 + assert popularity[0].success_rate == 80.0 + + +@pytest.mark.asyncio +async def test_get_storage_analysis(analytics_service): + """Test storage analysis retrieval.""" + with patch("psutil.disk_usage") as mock_disk: + mock_disk.return_value = MagicMock( + total=1024 * 1024 * 1024 * 1024, + used=512 * 1024 * 1024 * 1024, + free=512 * 1024 * 1024 * 1024, + percent=50.0, + ) + + analysis = analytics_service.get_storage_analysis() + + assert isinstance(analysis, StorageAnalysis) + assert analysis.total_storage_bytes > 0 + assert analysis.storage_percent_used == 50.0 + + +@pytest.mark.asyncio +async def test_get_performance_report_no_data( + analytics_service, mock_db +): + """Test performance report with no data.""" + mock_db.execute = AsyncMock(return_value=MagicMock( + scalars=MagicMock(return_value=MagicMock(all=MagicMock( + return_value=[] + ))) + )) + + with patch("psutil.Process") as mock_process: + mock_process.return_value = MagicMock( + memory_info=MagicMock( + return_value=MagicMock(rss=100 * 1024 * 1024) + ), + cpu_percent=MagicMock(return_value=10.0), + ) + + report = await analytics_service.get_performance_report( + mock_db, hours=24 + ) + + assert isinstance(report, PerformanceReport) + assert report.downloads_per_hour == 0.0 + + +@pytest.mark.asyncio +async def test_record_performance_sample(analytics_service): + """Test recording performance samples.""" + analytics_service.record_performance_sample( + queue_size=5, + active_downloads=2, + cpu_percent=25.0, + memory_mb=512.0, + ) + + data = json.loads( + analytics_service.analytics_file.read_text() + ) + assert len(data["performance_samples"]) == 1 + sample = data["performance_samples"][0] + assert sample["queue_size"] == 5 + assert sample["active_downloads"] == 2 + + +@pytest.mark.asyncio +async def test_record_multiple_performance_samples( + analytics_service +): + """Test recording multiple performance samples.""" + for i in range(5): + analytics_service.record_performance_sample( + queue_size=i, + active_downloads=i % 2, + cpu_percent=10.0 + i, + memory_mb=256.0 + i * 50, + ) + + data = json.loads( + analytics_service.analytics_file.read_text() + ) + assert len(data["performance_samples"]) == 5 + + +@pytest.mark.asyncio +async def test_generate_summary_report( + analytics_service, mock_db +): + """Test generating comprehensive summary report.""" + mock_db.execute = AsyncMock(return_value=MagicMock( + scalars=MagicMock(return_value=MagicMock(all=MagicMock( + return_value=[] + ))), + all=MagicMock(return_value=[]), + )) + + with patch("psutil.disk_usage") as mock_disk: + mock_disk.return_value = MagicMock( + total=1024 * 1024 * 1024, + used=512 * 1024 * 1024, + free=512 * 1024 * 1024, + percent=50.0, + ) + + with patch("psutil.Process"): + report = await analytics_service.generate_summary_report( + mock_db + ) + + assert "timestamp" in report + assert "download_stats" in report + assert "series_popularity" in report + assert "storage_analysis" in report + assert "performance_report" in report + + +@pytest.mark.asyncio +async def test_get_dir_size(analytics_service, tmp_path): + """Test directory size calculation.""" + # Create test files + (tmp_path / "file1.txt").write_text("test content") + (tmp_path / "file2.txt").write_text("more test content") + subdir = tmp_path / "subdir" + subdir.mkdir() + (subdir / "file3.txt").write_text("nested content") + + size = analytics_service._get_dir_size(tmp_path) + + assert size > 0 + + +@pytest.mark.asyncio +async def test_get_dir_size_nonexistent(analytics_service): + """Test directory size for nonexistent directory.""" + size = analytics_service._get_dir_size( + Path("/nonexistent/directory") + ) + + assert size == 0 + + +@pytest.mark.asyncio +async def test_analytics_persistence(analytics_service): + """Test analytics data persistence.""" + analytics_service.record_performance_sample( + queue_size=10, + active_downloads=3, + cpu_percent=50.0, + memory_mb=1024.0, + ) + + # Create new service instance + analytics_service2 = AnalyticsService() + analytics_service2.analytics_file = analytics_service.analytics_file + + data = json.loads( + analytics_service2.analytics_file.read_text() + ) + assert len(data["performance_samples"]) == 1 + + +@pytest.mark.asyncio +async def test_analytics_service_singleton(analytics_service): + """Test analytics service singleton pattern.""" + from src.server.services.analytics_service import get_analytics_service + + service1 = get_analytics_service() + service2 = get_analytics_service() + + assert service1 is service2 diff --git a/tests/unit/test_backup_service.py b/tests/unit/test_backup_service.py new file mode 100644 index 0000000..b3b876a --- /dev/null +++ b/tests/unit/test_backup_service.py @@ -0,0 +1,256 @@ +"""Unit tests for backup service.""" + +import tempfile +from pathlib import Path + +import pytest + +from src.server.services.backup_service import BackupService, get_backup_service + + +@pytest.fixture +def temp_backup_env(): + """Create temporary directories for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + backup_dir = Path(tmpdir) / "backups" + config_dir = Path(tmpdir) / "config" + config_dir.mkdir() + + # Create mock config files + (config_dir / "config.json").write_text('{"test": "config"}') + (config_dir / "download_queue.json").write_text('{"queue": []}') + + yield { + "backup_dir": str(backup_dir), + "config_dir": str(config_dir), + "tmpdir": tmpdir, + } + + +def test_backup_service_initialization(temp_backup_env): + """Test backup service initialization.""" + service = BackupService( + backup_dir=temp_backup_env["backup_dir"], + config_dir=temp_backup_env["config_dir"], + ) + + assert service is not None + assert service.backup_dir.exists() + + +def test_backup_configuration(temp_backup_env): + """Test configuration backup creation.""" + service = BackupService( + backup_dir=temp_backup_env["backup_dir"], + config_dir=temp_backup_env["config_dir"], + ) + + backup_info = service.backup_configuration("Test backup") + + assert backup_info is not None + assert backup_info.backup_type == "config" + assert backup_info.size_bytes > 0 + assert "config_" in backup_info.name + + +def test_backup_configuration_no_config(temp_backup_env): + """Test configuration backup with missing config file.""" + service = BackupService( + backup_dir=temp_backup_env["backup_dir"], + config_dir=temp_backup_env["config_dir"], + ) + + # Remove config file + (Path(temp_backup_env["config_dir"]) / "config.json").unlink() + + # Should still create backup (empty tar) + backup_info = service.backup_configuration() + + assert backup_info is not None + + +def test_backup_database(temp_backup_env): + """Test database backup creation.""" + # Create mock database file + db_path = Path(temp_backup_env["tmpdir"]) / "aniworld.db" + db_path.write_bytes(b"mock database content") + + service = BackupService( + backup_dir=temp_backup_env["backup_dir"], + config_dir=temp_backup_env["config_dir"], + database_path=str(db_path), + ) + + backup_info = service.backup_database("DB backup") + + assert backup_info is not None + assert backup_info.backup_type == "data" + assert backup_info.size_bytes > 0 + assert "database_" in backup_info.name + + +def test_backup_database_not_found(temp_backup_env): + """Test database backup with missing database.""" + service = BackupService( + backup_dir=temp_backup_env["backup_dir"], + config_dir=temp_backup_env["config_dir"], + database_path="/nonexistent/database.db", + ) + + backup_info = service.backup_database() + + assert backup_info is None + + +def test_backup_full(temp_backup_env): + """Test full system backup.""" + service = BackupService( + backup_dir=temp_backup_env["backup_dir"], + config_dir=temp_backup_env["config_dir"], + ) + + backup_info = service.backup_full("Full backup") + + assert backup_info is not None + assert backup_info.backup_type == "full" + assert backup_info.size_bytes > 0 + + +def test_list_backups(temp_backup_env): + """Test listing backups.""" + service = BackupService( + backup_dir=temp_backup_env["backup_dir"], + config_dir=temp_backup_env["config_dir"], + ) + + # Create several backups + service.backup_configuration() + service.backup_full() + + backups = service.list_backups() + + assert len(backups) >= 2 + assert all("name" in b for b in backups) + assert all("type" in b for b in backups) + + +def test_list_backups_by_type(temp_backup_env): + """Test listing backups filtered by type.""" + service = BackupService( + backup_dir=temp_backup_env["backup_dir"], + config_dir=temp_backup_env["config_dir"], + ) + + # Create different types of backups + service.backup_configuration() + service.backup_full() + + config_backups = service.list_backups("config") + + assert all(b["type"] == "config" for b in config_backups) + + +def test_delete_backup(temp_backup_env): + """Test backup deletion.""" + service = BackupService( + backup_dir=temp_backup_env["backup_dir"], + config_dir=temp_backup_env["config_dir"], + ) + + backup_info = service.backup_configuration() + assert backup_info is not None + + backups_before = service.list_backups() + assert len(backups_before) > 0 + + result = service.delete_backup(backup_info.name) + + assert result is True + backups_after = service.list_backups() + assert len(backups_after) < len(backups_before) + + +def test_delete_backup_not_found(temp_backup_env): + """Test deleting non-existent backup.""" + service = BackupService( + backup_dir=temp_backup_env["backup_dir"], + config_dir=temp_backup_env["config_dir"], + ) + + result = service.delete_backup("nonexistent_backup.tar.gz") + + assert result is False + + +def test_cleanup_old_backups(temp_backup_env): + """Test cleanup of old backups.""" + service = BackupService( + backup_dir=temp_backup_env["backup_dir"], + config_dir=temp_backup_env["config_dir"], + ) + + # Create multiple backups + for i in range(5): + service.backup_configuration() + + backups_before = service.list_backups() + assert len(backups_before) == 5 + + # Keep only 2 backups + deleted = service.cleanup_old_backups(max_backups=2) + + backups_after = service.list_backups() + assert len(backups_after) <= 2 + assert deleted == 3 + + +def test_export_anime_data(temp_backup_env): + """Test anime data export.""" + service = BackupService( + backup_dir=temp_backup_env["backup_dir"], + config_dir=temp_backup_env["config_dir"], + ) + + export_file = Path(temp_backup_env["tmpdir"]) / "anime_export.json" + result = service.export_anime_data(str(export_file)) + + assert result is True + assert export_file.exists() + assert "timestamp" in export_file.read_text() + + +def test_import_anime_data(temp_backup_env): + """Test anime data import.""" + service = BackupService( + backup_dir=temp_backup_env["backup_dir"], + config_dir=temp_backup_env["config_dir"], + ) + + # Create import file + import_file = Path(temp_backup_env["tmpdir"]) / "anime_import.json" + import_file.write_text('{"timestamp": "2025-01-01T00:00:00", "data": []}') + + result = service.import_anime_data(str(import_file)) + + assert result is True + + +def test_import_anime_data_not_found(temp_backup_env): + """Test anime data import with missing file.""" + service = BackupService( + backup_dir=temp_backup_env["backup_dir"], + config_dir=temp_backup_env["config_dir"], + ) + + result = service.import_anime_data("/nonexistent/file.json") + + assert result is False + + +def test_get_backup_service(): + """Test singleton backup service.""" + service1 = get_backup_service() + service2 = get_backup_service() + + assert service1 is service2 + assert isinstance(service1, BackupService) diff --git a/tests/unit/test_health.py b/tests/unit/test_health.py new file mode 100644 index 0000000..6c39421 --- /dev/null +++ b/tests/unit/test_health.py @@ -0,0 +1,114 @@ +"""Unit tests for health check endpoints.""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from src.server.api.health import ( + DatabaseHealth, + HealthStatus, + SystemMetrics, + basic_health_check, + check_database_health, + check_filesystem_health, + get_system_metrics, +) + + +@pytest.mark.asyncio +async def test_basic_health_check(): + """Test basic health check endpoint.""" + result = await basic_health_check() + + assert isinstance(result, HealthStatus) + assert result.status == "healthy" + assert result.version == "1.0.0" + assert result.timestamp is not None + + +@pytest.mark.asyncio +async def test_database_health_check_success(): + """Test database health check with successful connection.""" + # Mock database session + mock_db = AsyncMock() + mock_db.execute = AsyncMock() + + result = await check_database_health(mock_db) + + assert isinstance(result, DatabaseHealth) + assert result.status == "healthy" + assert result.connection_time_ms >= 0 + assert "successful" in result.message.lower() + + +@pytest.mark.asyncio +async def test_database_health_check_failure(): + """Test database health check with failed connection.""" + # Mock database session that raises error + mock_db = AsyncMock() + mock_db.execute = AsyncMock(side_effect=Exception("Connection failed")) + + result = await check_database_health(mock_db) + + assert isinstance(result, DatabaseHealth) + assert result.status == "unhealthy" + assert "failed" in result.message.lower() + + +def test_filesystem_health_check_success(): + """Test filesystem health check with accessible directories.""" + with patch("os.path.exists", return_value=True), patch( + "os.access", return_value=True + ): + result = check_filesystem_health() + + assert result["status"] in ["healthy", "degraded"] + assert "data_dir_writable" in result + assert "logs_dir_writable" in result + + +def test_filesystem_health_check_failure(): + """Test filesystem health check with inaccessible directories.""" + with patch("os.path.exists", return_value=False), patch( + "os.access", return_value=False + ): + result = check_filesystem_health() + + assert "status" in result + assert "message" in result + + +def test_get_system_metrics(): + """Test system metrics collection.""" + result = get_system_metrics() + + assert isinstance(result, SystemMetrics) + assert result.cpu_percent >= 0 + assert result.memory_percent >= 0 + assert result.memory_available_mb > 0 + assert result.disk_percent >= 0 + assert result.disk_free_mb > 0 + assert result.uptime_seconds > 0 + + +def test_system_metrics_values_reasonable(): + """Test that system metrics are within reasonable ranges.""" + result = get_system_metrics() + + # CPU should be 0-100% + assert 0 <= result.cpu_percent <= 100 + + # Memory should be 0-100% + assert 0 <= result.memory_percent <= 100 + + # Disk should be 0-100% + assert 0 <= result.disk_percent <= 100 + + # Memory available should be positive + assert result.memory_available_mb > 0 + + # Disk free should be positive + assert result.disk_free_mb > 0 + + # Uptime should be positive + assert result.uptime_seconds > 0 diff --git a/tests/unit/test_log_manager.py b/tests/unit/test_log_manager.py new file mode 100644 index 0000000..a4a0b3b --- /dev/null +++ b/tests/unit/test_log_manager.py @@ -0,0 +1,209 @@ +"""Unit tests for log manager.""" + +import tempfile +from datetime import datetime, timedelta +from pathlib import Path + +import pytest + +from src.server.utils.log_manager import LogManager, get_log_manager + + +@pytest.fixture +def temp_log_env(): + """Create temporary log environment.""" + with tempfile.TemporaryDirectory() as tmpdir: + yield tmpdir + + +def test_log_manager_initialization(temp_log_env): + """Test log manager initialization.""" + manager = LogManager(log_dir=temp_log_env) + + assert manager is not None + assert manager.log_dir.exists() + assert manager.archived_dir.exists() + + +def test_get_log_files(temp_log_env): + """Test getting list of log files.""" + manager = LogManager(log_dir=temp_log_env) + + # Create test log files + (Path(temp_log_env) / "app.log").write_text("log content 1") + (Path(temp_log_env) / "error.log").write_text("log content 2") + (Path(temp_log_env) / "other.txt").write_text("not a log") + + log_files = manager.get_log_files() + + assert len(log_files) == 2 + assert log_files[0].filename in ["app.log", "error.log"] + + +def test_rotate_log(temp_log_env): + """Test log file rotation.""" + manager = LogManager(log_dir=temp_log_env) + + log_file = Path(temp_log_env) / "app.log" + large_content = "x" * (11 * 1024 * 1024) # 11MB + log_file.write_text(large_content) + + result = manager.rotate_log("app.log", max_size_bytes=10485760) + + assert result is True + assert not log_file.exists() # Original file rotated + + +def test_rotate_log_not_found(temp_log_env): + """Test rotation of non-existent log.""" + manager = LogManager(log_dir=temp_log_env) + + result = manager.rotate_log("nonexistent.log") + + assert result is False + + +def test_rotate_log_small_file(temp_log_env): + """Test rotation of small log file.""" + manager = LogManager(log_dir=temp_log_env) + + log_file = Path(temp_log_env) / "app.log" + log_file.write_text("small content") + + result = manager.rotate_log("app.log", max_size_bytes=1048576) + + assert result is False + assert log_file.exists() + + +def test_archive_old_logs(temp_log_env): + """Test archiving old log files.""" + manager = LogManager(log_dir=temp_log_env) + + # Create old and new logs + old_log = Path(temp_log_env) / "old.log" + old_log.write_text("old log") + old_log.touch() + + new_log = Path(temp_log_env) / "new.log" + new_log.write_text("new log") + + archived = manager.archive_old_logs(days_old=30) + + assert archived > 0 + + +def test_search_logs(temp_log_env): + """Test searching logs.""" + manager = LogManager(log_dir=temp_log_env) + + # Create test logs + (Path(temp_log_env) / "app.log").write_text( + "Error occurred\nWarning message\nError again" + ) + (Path(temp_log_env) / "debug.log").write_text( + "Debug info\nError in debug" + ) + + results = manager.search_logs("Error", case_sensitive=False) + + assert len(results) >= 1 + assert any("Error" in line for lines in results.values() + for line in lines) + + +def test_search_logs_case_sensitive(temp_log_env): + """Test case-sensitive log search.""" + manager = LogManager(log_dir=temp_log_env) + + (Path(temp_log_env) / "app.log").write_text("ERROR\nerror\nError") + + results = manager.search_logs("ERROR", case_sensitive=True) + + assert "app.log" in results + # Should only find uppercase ERROR + assert len(results["app.log"]) == 1 + + +def test_export_logs(temp_log_env): + """Test exporting logs.""" + manager = LogManager(log_dir=temp_log_env) + + # Create test logs + (Path(temp_log_env) / "app.log").write_text("log content 1") + (Path(temp_log_env) / "error.log").write_text("log content 2") + + output_file = Path(temp_log_env) / "export.tar.gz" + result = manager.export_logs(str(output_file), compress=True) + + assert result is True + assert output_file.exists() + + +def test_export_logs_uncompressed(temp_log_env): + """Test exporting logs without compression.""" + manager = LogManager(log_dir=temp_log_env) + + (Path(temp_log_env) / "app.log").write_text("log content") + + output_file = Path(temp_log_env) / "export.txt" + result = manager.export_logs(str(output_file), compress=False) + + assert result is True + assert output_file.exists() + assert "log content" in output_file.read_text() + + +def test_get_log_stats(temp_log_env): + """Test getting log statistics.""" + manager = LogManager(log_dir=temp_log_env) + + # Create test logs + (Path(temp_log_env) / "app.log").write_text("x" * 1000) + (Path(temp_log_env) / "error.log").write_text("y" * 2000) + + stats = manager.get_log_stats() + + assert stats["total_files"] == 2 + assert stats["total_size_bytes"] >= 3000 + + +def test_get_log_stats_empty(temp_log_env): + """Test getting stats with no logs.""" + manager = LogManager(log_dir=temp_log_env) + + stats = manager.get_log_stats() + + assert stats["total_files"] == 0 + assert stats["total_size_bytes"] == 0 + + +def test_cleanup_logs(temp_log_env): + """Test log cleanup.""" + manager = LogManager(log_dir=temp_log_env) + + # Create multiple logs + for i in range(10): + (Path(temp_log_env) / f"log_{i}.log").write_text("x" * 1000) + + deleted = manager.cleanup_logs(max_total_size_mb=0.01, keep_files=2) + + assert deleted > 0 + + +def test_set_log_level(): + """Test setting log level.""" + manager = LogManager() + + result = manager.set_log_level("test_logger", "DEBUG") + + assert result is True + + +def test_get_log_manager_singleton(): + """Test singleton log manager.""" + manager1 = get_log_manager() + manager2 = get_log_manager() + + assert manager1 is manager2 + assert isinstance(manager1, LogManager) diff --git a/tests/unit/test_metrics.py b/tests/unit/test_metrics.py new file mode 100644 index 0000000..bc7ff97 --- /dev/null +++ b/tests/unit/test_metrics.py @@ -0,0 +1,236 @@ +"""Unit tests for metrics collection.""" + +import pytest + +from src.server.utils.metrics import ( + MetricsCollector, + MetricType, + TimerContext, + get_metrics_collector, +) + + +def test_metrics_collector_initialization(): + """Test metrics collector initialization.""" + collector = MetricsCollector() + + assert collector is not None + assert collector._metrics == {} + assert collector._download_stats["completed"] == 0 + assert collector._download_stats["failed"] == 0 + + +def test_increment_counter(): + """Test counter metric increment.""" + collector = MetricsCollector() + + collector.increment_counter("test_counter", 1.0, help_text="Test counter") + collector.increment_counter("test_counter", 2.0) + + assert "test_counter" in collector._metrics + assert collector._metrics["test_counter"].value == 3.0 + assert collector._metrics["test_counter"].metric_type == MetricType.COUNTER + + +def test_set_gauge(): + """Test gauge metric.""" + collector = MetricsCollector() + + collector.set_gauge("test_gauge", 42.0, help_text="Test gauge") + assert collector._metrics["test_gauge"].value == 42.0 + + collector.set_gauge("test_gauge", 100.0) + assert collector._metrics["test_gauge"].value == 100.0 + + +def test_observe_histogram(): + """Test histogram observation.""" + collector = MetricsCollector() + + collector.observe_histogram("request_duration", 0.5) + collector.observe_histogram("request_duration", 1.2) + collector.observe_histogram("request_duration", 0.8) + + assert len(collector._request_timings["request_duration"]) == 3 + assert 0.5 in collector._request_timings["request_duration"] + + +def test_start_and_end_timer(): + """Test timer functionality.""" + collector = MetricsCollector() + + collector.start_timer("test_timer") + import time + + time.sleep(0.01) # Sleep for 10ms + duration = collector.end_timer("test_timer", "test_duration") + + assert duration >= 0.01 + assert "test_duration" in collector._metrics + + +def test_record_download_success(): + """Test download success recording.""" + collector = MetricsCollector() + + collector.record_download_success(1000000) + collector.record_download_success(2000000) + + stats = collector.get_download_stats() + assert stats["completed"] == 2 + assert stats["total_size_bytes"] == 3000000 + assert stats["failed"] == 0 + + +def test_record_download_failure(): + """Test download failure recording.""" + collector = MetricsCollector() + + collector.record_download_failure() + collector.record_download_failure() + + stats = collector.get_download_stats() + assert stats["failed"] == 2 + assert stats["completed"] == 0 + + +def test_get_request_statistics(): + """Test request statistics calculation.""" + collector = MetricsCollector() + + for val in [0.5, 1.0, 0.8, 0.6, 0.9]: + collector.observe_histogram("request_latency", val) + + stats = collector.get_request_statistics("request_latency") + + assert stats is not None + assert stats["count"] == 5 + assert stats["mean"] == pytest.approx(0.76, abs=0.01) + assert stats["min"] == 0.5 + assert stats["max"] == 1.0 + + +def test_get_request_statistics_not_found(): + """Test request statistics for non-existent metric.""" + collector = MetricsCollector() + + stats = collector.get_request_statistics("non_existent") + assert stats is None + + +def test_export_prometheus_format(): + """Test Prometheus format export.""" + collector = MetricsCollector() + + collector.increment_counter( + "requests_total", 10, help_text="Total requests" + ) + collector.set_gauge("active_connections", 5) + + prometheus_output = collector.export_prometheus_format() + + assert "requests_total" in prometheus_output + assert "active_connections" in prometheus_output + assert "10" in prometheus_output + assert "5" in prometheus_output + + +def test_export_prometheus_with_labels(): + """Test Prometheus format with labels.""" + collector = MetricsCollector() + + labels = {"endpoint": "/api/anime", "method": "GET"} + collector.increment_counter("requests_total", labels=labels) + + prometheus_output = collector.export_prometheus_format() + + assert "endpoint" in prometheus_output + assert "method" in prometheus_output + assert "/api/anime" in prometheus_output + assert "GET" in prometheus_output + + +def test_export_json(): + """Test JSON export.""" + collector = MetricsCollector() + + collector.increment_counter("test_counter", 5) + collector.set_gauge("test_gauge", 42) + collector.record_download_success(1000000) + + json_export = collector.export_json() + + assert "metrics" in json_export + assert "downloads" in json_export + assert "request_timings" in json_export + assert json_export["downloads"]["completed"] == 1 + assert json_export["downloads"]["total_size_bytes"] == 1000000 + + +def test_reset_metrics(): + """Test metrics reset.""" + collector = MetricsCollector() + + collector.increment_counter("test_counter", 10) + collector.record_download_success(1000000) + + assert len(collector._metrics) > 0 + assert collector._download_stats["completed"] == 1 + + collector.reset_metrics() + + assert len(collector._metrics) == 0 + assert collector._download_stats["completed"] == 0 + + +def test_get_all_metrics(): + """Test getting all metrics.""" + collector = MetricsCollector() + + collector.increment_counter("counter1", 5) + collector.set_gauge("gauge1", 10) + collector.increment_counter("counter2", 3) + + all_metrics = collector.get_all_metrics() + + assert len(all_metrics) == 3 + assert "counter1" in all_metrics + assert "gauge1" in all_metrics + assert "counter2" in all_metrics + + +def test_get_metrics_collector_singleton(): + """Test singleton metrics collector.""" + collector1 = get_metrics_collector() + collector2 = get_metrics_collector() + + assert collector1 is collector2 + assert isinstance(collector1, MetricsCollector) + + +def test_timer_context_manager(): + """Test timer context manager.""" + collector = get_metrics_collector() + collector.reset_metrics() + + import time + + with TimerContext("operation_duration", "timer1"): + time.sleep(0.01) + + stats = collector.get_request_statistics("operation_duration") + assert stats is not None + assert stats["count"] == 1 + assert stats["max"] >= 0.01 + + +def test_timer_context_with_labels(): + """Test timer context manager with labels.""" + collector = get_metrics_collector() + collector.reset_metrics() + + labels = {"endpoint": "/api/test"} + with TimerContext("endpoint_duration", labels=labels): + pass + + assert "endpoint_duration" in collector._metrics diff --git a/tests/unit/test_monitoring_service.py b/tests/unit/test_monitoring_service.py new file mode 100644 index 0000000..65b411d --- /dev/null +++ b/tests/unit/test_monitoring_service.py @@ -0,0 +1,225 @@ +"""Unit tests for monitoring service.""" + +from datetime import datetime, timedelta +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from src.server.services.monitoring_service import ( + ErrorMetrics, + MonitoringService, + QueueMetrics, + SystemMetrics, + get_monitoring_service, +) + + +def test_monitoring_service_initialization(): + """Test monitoring service initialization.""" + service = MonitoringService() + + assert service is not None + assert service._error_log == [] + assert service._performance_samples == [] + + +def test_get_system_metrics(): + """Test system metrics collection.""" + service = MonitoringService() + metrics = service.get_system_metrics() + + assert isinstance(metrics, SystemMetrics) + assert metrics.cpu_percent >= 0 + assert metrics.memory_percent >= 0 + assert metrics.disk_percent >= 0 + assert metrics.uptime_seconds > 0 + assert metrics.memory_available_mb > 0 + assert metrics.disk_free_mb > 0 + + +def test_system_metrics_stored(): + """Test that system metrics are stored for performance tracking.""" + service = MonitoringService() + + metrics1 = service.get_system_metrics() + metrics2 = service.get_system_metrics() + + assert len(service._performance_samples) == 2 + assert service._performance_samples[0] == metrics1 + assert service._performance_samples[1] == metrics2 + + +@pytest.mark.asyncio +async def test_get_queue_metrics_empty(): + """Test queue metrics with no items.""" + service = MonitoringService() + mock_db = AsyncMock() + + # Mock empty result + mock_result = AsyncMock() + mock_result.scalars().all.return_value = [] + mock_db.execute = AsyncMock(return_value=mock_result) + + metrics = await service.get_queue_metrics(mock_db) + + assert isinstance(metrics, QueueMetrics) + assert metrics.total_items == 0 + assert metrics.success_rate == 0.0 + + +@pytest.mark.asyncio +async def test_get_queue_metrics_with_items(): + """Test queue metrics with download items.""" + service = MonitoringService() + mock_db = AsyncMock() + + # Create mock queue items + item1 = MagicMock() + item1.status = "COMPLETED" + item1.total_bytes = 1000000 + item1.downloaded_bytes = 1000000 + item1.download_speed = 1000000 + + item2 = MagicMock() + item2.status = "DOWNLOADING" + item2.total_bytes = 2000000 + item2.downloaded_bytes = 1000000 + item2.download_speed = 500000 + + item3 = MagicMock() + item3.status = "FAILED" + item3.total_bytes = 500000 + item3.downloaded_bytes = 0 + item3.download_speed = None + + # Mock result + mock_result = AsyncMock() + mock_result.scalars().all.return_value = [item1, item2, item3] + mock_db.execute = AsyncMock(return_value=mock_result) + + metrics = await service.get_queue_metrics(mock_db) + + assert metrics.total_items == 3 + assert metrics.completed_items == 1 + assert metrics.downloading_items == 1 + assert metrics.failed_items == 1 + assert metrics.total_size_bytes == 3500000 + assert metrics.downloaded_bytes == 2000000 + assert metrics.success_rate > 0 + + +def test_log_error(): + """Test error logging.""" + service = MonitoringService() + + service.log_error("Test error 1") + service.log_error("Test error 2") + + assert len(service._error_log) == 2 + assert service._error_log[0][1] == "Test error 1" + assert service._error_log[1][1] == "Test error 2" + + +def test_get_error_metrics_empty(): + """Test error metrics with no errors.""" + service = MonitoringService() + metrics = service.get_error_metrics() + + assert isinstance(metrics, ErrorMetrics) + assert metrics.total_errors == 0 + assert metrics.errors_24h == 0 + assert metrics.error_rate_per_hour == 0.0 + + +def test_get_error_metrics_with_errors(): + """Test error metrics with multiple errors.""" + service = MonitoringService() + + service.log_error("ConnectionError: Failed to connect") + service.log_error("ConnectionError: Timeout") + service.log_error("TimeoutError: Download timeout") + + metrics = service.get_error_metrics() + + assert metrics.total_errors == 3 + assert metrics.errors_24h == 3 + assert metrics.last_error_time is not None + assert len(metrics.most_common_errors) > 0 + + +def test_get_error_metrics_old_errors(): + """Test error metrics excludes old errors.""" + service = MonitoringService() + + # Add old error (simulate by directly adding to log) + old_time = datetime.now() - timedelta(hours=25) + service._error_log.append((old_time, "Old error")) + + # Add recent error + service.log_error("Recent error") + + metrics = service.get_error_metrics() + + assert metrics.total_errors == 2 + assert metrics.errors_24h == 1 + + +def test_get_performance_summary(): + """Test performance summary generation.""" + service = MonitoringService() + + # Collect some samples + service.get_system_metrics() + service.get_system_metrics() + service.get_system_metrics() + + summary = service.get_performance_summary() + + assert "cpu" in summary + assert "memory" in summary + assert "disk" in summary + assert "sample_count" in summary + assert summary["sample_count"] == 3 + assert "current" in summary["cpu"] + assert "average" in summary["cpu"] + assert "max" in summary["cpu"] + assert "min" in summary["cpu"] + + +def test_get_performance_summary_empty(): + """Test performance summary with no samples.""" + service = MonitoringService() + summary = service.get_performance_summary() + + assert summary == {} + + +@pytest.mark.asyncio +async def test_get_comprehensive_status(): + """Test comprehensive system status.""" + service = MonitoringService() + mock_db = AsyncMock() + + # Mock empty queue + mock_result = AsyncMock() + mock_result.scalars().all.return_value = [] + mock_db.execute = AsyncMock(return_value=mock_result) + + status = await service.get_comprehensive_status(mock_db) + + assert "timestamp" in status + assert "system" in status + assert "queue" in status + assert "errors" in status + assert "performance" in status + assert status["system"]["cpu_percent"] >= 0 + assert status["queue"]["total_items"] == 0 + + +def test_get_monitoring_service(): + """Test singleton monitoring service.""" + service1 = get_monitoring_service() + service2 = get_monitoring_service() + + assert service1 is service2 + assert isinstance(service1, MonitoringService) diff --git a/tests/unit/test_system_utilities.py b/tests/unit/test_system_utilities.py new file mode 100644 index 0000000..d0115c7 --- /dev/null +++ b/tests/unit/test_system_utilities.py @@ -0,0 +1,211 @@ +"""Unit tests for system utilities.""" + +import os +import tempfile +from datetime import datetime, timedelta +from pathlib import Path + +from src.server.utils.system import ( + DiskInfo, + ProcessInfo, + SystemUtilities, + get_system_utilities, +) + + +def test_system_utilities_initialization(): + """Test system utilities initialization.""" + utils = SystemUtilities() + assert utils is not None + + +def test_get_disk_usage(): + """Test getting disk usage information.""" + utils = SystemUtilities() + disk_info = utils.get_disk_usage("/") + + assert disk_info is not None + assert isinstance(disk_info, DiskInfo) + assert disk_info.total_bytes > 0 + assert disk_info.free_bytes >= 0 + assert disk_info.percent_used >= 0 + + +def test_get_all_disk_usage(): + """Test getting disk usage for all partitions.""" + utils = SystemUtilities() + disk_infos = utils.get_all_disk_usage() + + assert isinstance(disk_infos, list) + # Should have at least one partition + assert len(disk_infos) >= 0 + + +def test_cleanup_directory(): + """Test directory cleanup.""" + utils = SystemUtilities() + + with tempfile.TemporaryDirectory() as tmpdir: + # Create some test files + old_time = (datetime.now() - timedelta(days=31)).timestamp() + + for i in range(3): + file_path = Path(tmpdir) / f"old_file_{i}.txt" + file_path.write_text(f"old file {i}") + Path(file_path).touch() + os.utime(file_path, (old_time, old_time)) + + for i in range(2): + file_path = Path(tmpdir) / f"new_file_{i}.txt" + file_path.write_text(f"new file {i}") + + # Clean up files older than 30 days + deleted = utils.cleanup_directory(tmpdir, "*.txt", max_age_days=30) + + assert deleted == 3 + + +def test_cleanup_empty_directories(): + """Test empty directory cleanup.""" + utils = SystemUtilities() + + with tempfile.TemporaryDirectory() as tmpdir: + # Create nested directories + (Path(tmpdir) / "dir1").mkdir() + (Path(tmpdir) / "dir2").mkdir() + (Path(tmpdir) / "dir2" / "subdir").mkdir() + + # Create a file in one directory + (Path(tmpdir) / "dir1" / "file.txt").write_text("content") + + # Clean up empty directories + deleted = utils.cleanup_empty_directories(tmpdir) + + assert deleted >= 1 + + +def test_get_directory_size(): + """Test getting directory size.""" + utils = SystemUtilities() + + with tempfile.TemporaryDirectory() as tmpdir: + # Create test files + (Path(tmpdir) / "file1.txt").write_text("a" * 1000) + (Path(tmpdir) / "file2.txt").write_text("b" * 2000) + + size = utils.get_directory_size(tmpdir) + + assert size >= 3000 # At least 3000 bytes + + +def test_get_directory_size_nonexistent(): + """Test getting directory size for non-existent directory.""" + utils = SystemUtilities() + size = utils.get_directory_size("/nonexistent/path") + assert size == 0 + + +def test_get_process_info(): + """Test getting process information.""" + import os + + utils = SystemUtilities() + pid = os.getpid() + + proc_info = utils.get_process_info(pid) + + assert proc_info is not None + assert isinstance(proc_info, ProcessInfo) + assert proc_info.pid == pid + assert proc_info.name is not None + assert proc_info.cpu_percent >= 0 + assert proc_info.memory_percent >= 0 + + +def test_get_process_info_current(): + """Test getting current process information.""" + utils = SystemUtilities() + proc_info = utils.get_process_info() + + assert proc_info is not None + assert proc_info.pid > 0 + + +def test_get_process_info_invalid(): + """Test getting process info for invalid PID.""" + utils = SystemUtilities() + proc_info = utils.get_process_info(99999999) + + assert proc_info is None + + +def test_get_all_processes(): + """Test getting information about all processes.""" + utils = SystemUtilities() + processes = utils.get_all_processes() + + assert isinstance(processes, list) + # Should have at least some processes + assert len(processes) > 0 + + +def test_get_system_info(): + """Test getting system information.""" + utils = SystemUtilities() + system_info = utils.get_system_info() + + assert system_info is not None + assert "platform" in system_info + assert "cpu_count" in system_info + assert "hostname" in system_info + assert "python_version" in system_info + + +def test_get_network_info(): + """Test getting network information.""" + utils = SystemUtilities() + net_info = utils.get_network_info() + + assert net_info is not None + assert "bytes_sent" in net_info + assert "bytes_recv" in net_info + assert net_info["bytes_sent"] >= 0 + assert net_info["bytes_recv"] >= 0 + + +def test_copy_file_atomic(): + """Test atomic file copy.""" + utils = SystemUtilities() + + with tempfile.TemporaryDirectory() as tmpdir: + src_file = Path(tmpdir) / "source.txt" + dest_file = Path(tmpdir) / "dest.txt" + + src_file.write_text("test content") + + result = utils.copy_file_atomic(str(src_file), str(dest_file)) + + assert result is True + assert dest_file.exists() + assert dest_file.read_text() == "test content" + + +def test_copy_file_atomic_nonexistent(): + """Test atomic file copy with non-existent source.""" + utils = SystemUtilities() + + result = utils.copy_file_atomic( + "/nonexistent/source.txt", "/tmp/dest.txt" + ) + + assert result is False + + +def test_get_system_utilities_singleton(): + """Test singleton system utilities.""" + utils1 = get_system_utilities() + utils2 = get_system_utilities() + + assert utils1 is utils2 + assert isinstance(utils1, SystemUtilities) +