This commit is contained in:
2025-10-22 09:20:35 +02:00
parent 1c8c18c1ea
commit 9e686017a6
18 changed files with 5177 additions and 0 deletions

View File

@@ -0,0 +1,312 @@
"""Unit tests for analytics service.
Tests analytics service functionality including download statistics,
series popularity tracking, storage analysis, and performance reporting.
"""
import json
from datetime import datetime
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from src.server.services.analytics_service import (
AnalyticsService,
DownloadStats,
PerformanceReport,
StorageAnalysis,
)
@pytest.fixture
def analytics_service(tmp_path):
"""Create analytics service with temp directory."""
with patch("src.server.services.analytics_service.ANALYTICS_FILE",
tmp_path / "analytics.json"):
service = AnalyticsService()
yield service
@pytest.fixture
async def mock_db():
"""Create mock database session."""
db = AsyncMock(spec=AsyncSession)
return db
@pytest.mark.asyncio
async def test_analytics_service_initialization(analytics_service):
"""Test analytics service initializes with default data."""
assert analytics_service.analytics_file.exists()
data = json.loads(analytics_service.analytics_file.read_text())
assert "created_at" in data
assert "download_stats" in data
assert "series_popularity" in data
assert data["download_stats"]["total_downloads"] == 0
@pytest.mark.asyncio
async def test_get_download_stats_no_data(
analytics_service, mock_db
):
"""Test download statistics with no download data."""
mock_db.execute = AsyncMock(return_value=MagicMock(
scalars=MagicMock(return_value=MagicMock(all=MagicMock(
return_value=[]
)))
))
stats = await analytics_service.get_download_stats(mock_db)
assert isinstance(stats, DownloadStats)
assert stats.total_downloads == 0
assert stats.successful_downloads == 0
assert stats.success_rate == 0.0
@pytest.mark.asyncio
async def test_get_download_stats_with_data(
analytics_service, mock_db
):
"""Test download statistics with download data."""
# Mock downloads
download1 = MagicMock()
download1.status = "completed"
download1.size_bytes = 1024 * 1024 * 100 # 100 MB
download1.duration_seconds = 60
download2 = MagicMock()
download2.status = "failed"
download2.size_bytes = 0
download2.duration_seconds = 0
mock_db.execute = AsyncMock(return_value=MagicMock(
scalars=MagicMock(return_value=MagicMock(all=MagicMock(
return_value=[download1, download2]
)))
))
stats = await analytics_service.get_download_stats(mock_db)
assert stats.total_downloads == 2
assert stats.successful_downloads == 1
assert stats.failed_downloads == 1
assert stats.success_rate == 50.0
assert stats.total_bytes_downloaded == 1024 * 1024 * 100
@pytest.mark.asyncio
async def test_get_series_popularity_empty(
analytics_service, mock_db
):
"""Test series popularity with no data."""
mock_db.execute = AsyncMock(return_value=MagicMock(
all=MagicMock(return_value=[])
))
popularity = await analytics_service.get_series_popularity(
mock_db, limit=10
)
assert isinstance(popularity, list)
assert len(popularity) == 0
@pytest.mark.asyncio
async def test_get_series_popularity_with_data(
analytics_service, mock_db
):
"""Test series popularity with data."""
row = MagicMock()
row.series_name = "Test Anime"
row.download_count = 5
row.total_size = 1024 * 1024 * 500
row.last_download = datetime.now()
row.successful = 4
mock_db.execute = AsyncMock(return_value=MagicMock(
all=MagicMock(return_value=[row])
))
popularity = await analytics_service.get_series_popularity(
mock_db, limit=10
)
assert len(popularity) == 1
assert popularity[0].series_name == "Test Anime"
assert popularity[0].download_count == 5
assert popularity[0].success_rate == 80.0
@pytest.mark.asyncio
async def test_get_storage_analysis(analytics_service):
"""Test storage analysis retrieval."""
with patch("psutil.disk_usage") as mock_disk:
mock_disk.return_value = MagicMock(
total=1024 * 1024 * 1024 * 1024,
used=512 * 1024 * 1024 * 1024,
free=512 * 1024 * 1024 * 1024,
percent=50.0,
)
analysis = analytics_service.get_storage_analysis()
assert isinstance(analysis, StorageAnalysis)
assert analysis.total_storage_bytes > 0
assert analysis.storage_percent_used == 50.0
@pytest.mark.asyncio
async def test_get_performance_report_no_data(
analytics_service, mock_db
):
"""Test performance report with no data."""
mock_db.execute = AsyncMock(return_value=MagicMock(
scalars=MagicMock(return_value=MagicMock(all=MagicMock(
return_value=[]
)))
))
with patch("psutil.Process") as mock_process:
mock_process.return_value = MagicMock(
memory_info=MagicMock(
return_value=MagicMock(rss=100 * 1024 * 1024)
),
cpu_percent=MagicMock(return_value=10.0),
)
report = await analytics_service.get_performance_report(
mock_db, hours=24
)
assert isinstance(report, PerformanceReport)
assert report.downloads_per_hour == 0.0
@pytest.mark.asyncio
async def test_record_performance_sample(analytics_service):
"""Test recording performance samples."""
analytics_service.record_performance_sample(
queue_size=5,
active_downloads=2,
cpu_percent=25.0,
memory_mb=512.0,
)
data = json.loads(
analytics_service.analytics_file.read_text()
)
assert len(data["performance_samples"]) == 1
sample = data["performance_samples"][0]
assert sample["queue_size"] == 5
assert sample["active_downloads"] == 2
@pytest.mark.asyncio
async def test_record_multiple_performance_samples(
analytics_service
):
"""Test recording multiple performance samples."""
for i in range(5):
analytics_service.record_performance_sample(
queue_size=i,
active_downloads=i % 2,
cpu_percent=10.0 + i,
memory_mb=256.0 + i * 50,
)
data = json.loads(
analytics_service.analytics_file.read_text()
)
assert len(data["performance_samples"]) == 5
@pytest.mark.asyncio
async def test_generate_summary_report(
analytics_service, mock_db
):
"""Test generating comprehensive summary report."""
mock_db.execute = AsyncMock(return_value=MagicMock(
scalars=MagicMock(return_value=MagicMock(all=MagicMock(
return_value=[]
))),
all=MagicMock(return_value=[]),
))
with patch("psutil.disk_usage") as mock_disk:
mock_disk.return_value = MagicMock(
total=1024 * 1024 * 1024,
used=512 * 1024 * 1024,
free=512 * 1024 * 1024,
percent=50.0,
)
with patch("psutil.Process"):
report = await analytics_service.generate_summary_report(
mock_db
)
assert "timestamp" in report
assert "download_stats" in report
assert "series_popularity" in report
assert "storage_analysis" in report
assert "performance_report" in report
@pytest.mark.asyncio
async def test_get_dir_size(analytics_service, tmp_path):
"""Test directory size calculation."""
# Create test files
(tmp_path / "file1.txt").write_text("test content")
(tmp_path / "file2.txt").write_text("more test content")
subdir = tmp_path / "subdir"
subdir.mkdir()
(subdir / "file3.txt").write_text("nested content")
size = analytics_service._get_dir_size(tmp_path)
assert size > 0
@pytest.mark.asyncio
async def test_get_dir_size_nonexistent(analytics_service):
"""Test directory size for nonexistent directory."""
size = analytics_service._get_dir_size(
Path("/nonexistent/directory")
)
assert size == 0
@pytest.mark.asyncio
async def test_analytics_persistence(analytics_service):
"""Test analytics data persistence."""
analytics_service.record_performance_sample(
queue_size=10,
active_downloads=3,
cpu_percent=50.0,
memory_mb=1024.0,
)
# Create new service instance
analytics_service2 = AnalyticsService()
analytics_service2.analytics_file = analytics_service.analytics_file
data = json.loads(
analytics_service2.analytics_file.read_text()
)
assert len(data["performance_samples"]) == 1
@pytest.mark.asyncio
async def test_analytics_service_singleton(analytics_service):
"""Test analytics service singleton pattern."""
from src.server.services.analytics_service import get_analytics_service
service1 = get_analytics_service()
service2 = get_analytics_service()
assert service1 is service2

View File

@@ -0,0 +1,256 @@
"""Unit tests for backup service."""
import tempfile
from pathlib import Path
import pytest
from src.server.services.backup_service import BackupService, get_backup_service
@pytest.fixture
def temp_backup_env():
"""Create temporary directories for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
backup_dir = Path(tmpdir) / "backups"
config_dir = Path(tmpdir) / "config"
config_dir.mkdir()
# Create mock config files
(config_dir / "config.json").write_text('{"test": "config"}')
(config_dir / "download_queue.json").write_text('{"queue": []}')
yield {
"backup_dir": str(backup_dir),
"config_dir": str(config_dir),
"tmpdir": tmpdir,
}
def test_backup_service_initialization(temp_backup_env):
"""Test backup service initialization."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
assert service is not None
assert service.backup_dir.exists()
def test_backup_configuration(temp_backup_env):
"""Test configuration backup creation."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
backup_info = service.backup_configuration("Test backup")
assert backup_info is not None
assert backup_info.backup_type == "config"
assert backup_info.size_bytes > 0
assert "config_" in backup_info.name
def test_backup_configuration_no_config(temp_backup_env):
"""Test configuration backup with missing config file."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
# Remove config file
(Path(temp_backup_env["config_dir"]) / "config.json").unlink()
# Should still create backup (empty tar)
backup_info = service.backup_configuration()
assert backup_info is not None
def test_backup_database(temp_backup_env):
"""Test database backup creation."""
# Create mock database file
db_path = Path(temp_backup_env["tmpdir"]) / "aniworld.db"
db_path.write_bytes(b"mock database content")
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
database_path=str(db_path),
)
backup_info = service.backup_database("DB backup")
assert backup_info is not None
assert backup_info.backup_type == "data"
assert backup_info.size_bytes > 0
assert "database_" in backup_info.name
def test_backup_database_not_found(temp_backup_env):
"""Test database backup with missing database."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
database_path="/nonexistent/database.db",
)
backup_info = service.backup_database()
assert backup_info is None
def test_backup_full(temp_backup_env):
"""Test full system backup."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
backup_info = service.backup_full("Full backup")
assert backup_info is not None
assert backup_info.backup_type == "full"
assert backup_info.size_bytes > 0
def test_list_backups(temp_backup_env):
"""Test listing backups."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
# Create several backups
service.backup_configuration()
service.backup_full()
backups = service.list_backups()
assert len(backups) >= 2
assert all("name" in b for b in backups)
assert all("type" in b for b in backups)
def test_list_backups_by_type(temp_backup_env):
"""Test listing backups filtered by type."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
# Create different types of backups
service.backup_configuration()
service.backup_full()
config_backups = service.list_backups("config")
assert all(b["type"] == "config" for b in config_backups)
def test_delete_backup(temp_backup_env):
"""Test backup deletion."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
backup_info = service.backup_configuration()
assert backup_info is not None
backups_before = service.list_backups()
assert len(backups_before) > 0
result = service.delete_backup(backup_info.name)
assert result is True
backups_after = service.list_backups()
assert len(backups_after) < len(backups_before)
def test_delete_backup_not_found(temp_backup_env):
"""Test deleting non-existent backup."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
result = service.delete_backup("nonexistent_backup.tar.gz")
assert result is False
def test_cleanup_old_backups(temp_backup_env):
"""Test cleanup of old backups."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
# Create multiple backups
for i in range(5):
service.backup_configuration()
backups_before = service.list_backups()
assert len(backups_before) == 5
# Keep only 2 backups
deleted = service.cleanup_old_backups(max_backups=2)
backups_after = service.list_backups()
assert len(backups_after) <= 2
assert deleted == 3
def test_export_anime_data(temp_backup_env):
"""Test anime data export."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
export_file = Path(temp_backup_env["tmpdir"]) / "anime_export.json"
result = service.export_anime_data(str(export_file))
assert result is True
assert export_file.exists()
assert "timestamp" in export_file.read_text()
def test_import_anime_data(temp_backup_env):
"""Test anime data import."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
# Create import file
import_file = Path(temp_backup_env["tmpdir"]) / "anime_import.json"
import_file.write_text('{"timestamp": "2025-01-01T00:00:00", "data": []}')
result = service.import_anime_data(str(import_file))
assert result is True
def test_import_anime_data_not_found(temp_backup_env):
"""Test anime data import with missing file."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
result = service.import_anime_data("/nonexistent/file.json")
assert result is False
def test_get_backup_service():
"""Test singleton backup service."""
service1 = get_backup_service()
service2 = get_backup_service()
assert service1 is service2
assert isinstance(service1, BackupService)

114
tests/unit/test_health.py Normal file
View File

@@ -0,0 +1,114 @@
"""Unit tests for health check endpoints."""
from unittest.mock import AsyncMock, patch
import pytest
from src.server.api.health import (
DatabaseHealth,
HealthStatus,
SystemMetrics,
basic_health_check,
check_database_health,
check_filesystem_health,
get_system_metrics,
)
@pytest.mark.asyncio
async def test_basic_health_check():
"""Test basic health check endpoint."""
result = await basic_health_check()
assert isinstance(result, HealthStatus)
assert result.status == "healthy"
assert result.version == "1.0.0"
assert result.timestamp is not None
@pytest.mark.asyncio
async def test_database_health_check_success():
"""Test database health check with successful connection."""
# Mock database session
mock_db = AsyncMock()
mock_db.execute = AsyncMock()
result = await check_database_health(mock_db)
assert isinstance(result, DatabaseHealth)
assert result.status == "healthy"
assert result.connection_time_ms >= 0
assert "successful" in result.message.lower()
@pytest.mark.asyncio
async def test_database_health_check_failure():
"""Test database health check with failed connection."""
# Mock database session that raises error
mock_db = AsyncMock()
mock_db.execute = AsyncMock(side_effect=Exception("Connection failed"))
result = await check_database_health(mock_db)
assert isinstance(result, DatabaseHealth)
assert result.status == "unhealthy"
assert "failed" in result.message.lower()
def test_filesystem_health_check_success():
"""Test filesystem health check with accessible directories."""
with patch("os.path.exists", return_value=True), patch(
"os.access", return_value=True
):
result = check_filesystem_health()
assert result["status"] in ["healthy", "degraded"]
assert "data_dir_writable" in result
assert "logs_dir_writable" in result
def test_filesystem_health_check_failure():
"""Test filesystem health check with inaccessible directories."""
with patch("os.path.exists", return_value=False), patch(
"os.access", return_value=False
):
result = check_filesystem_health()
assert "status" in result
assert "message" in result
def test_get_system_metrics():
"""Test system metrics collection."""
result = get_system_metrics()
assert isinstance(result, SystemMetrics)
assert result.cpu_percent >= 0
assert result.memory_percent >= 0
assert result.memory_available_mb > 0
assert result.disk_percent >= 0
assert result.disk_free_mb > 0
assert result.uptime_seconds > 0
def test_system_metrics_values_reasonable():
"""Test that system metrics are within reasonable ranges."""
result = get_system_metrics()
# CPU should be 0-100%
assert 0 <= result.cpu_percent <= 100
# Memory should be 0-100%
assert 0 <= result.memory_percent <= 100
# Disk should be 0-100%
assert 0 <= result.disk_percent <= 100
# Memory available should be positive
assert result.memory_available_mb > 0
# Disk free should be positive
assert result.disk_free_mb > 0
# Uptime should be positive
assert result.uptime_seconds > 0

View File

@@ -0,0 +1,209 @@
"""Unit tests for log manager."""
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
import pytest
from src.server.utils.log_manager import LogManager, get_log_manager
@pytest.fixture
def temp_log_env():
"""Create temporary log environment."""
with tempfile.TemporaryDirectory() as tmpdir:
yield tmpdir
def test_log_manager_initialization(temp_log_env):
"""Test log manager initialization."""
manager = LogManager(log_dir=temp_log_env)
assert manager is not None
assert manager.log_dir.exists()
assert manager.archived_dir.exists()
def test_get_log_files(temp_log_env):
"""Test getting list of log files."""
manager = LogManager(log_dir=temp_log_env)
# Create test log files
(Path(temp_log_env) / "app.log").write_text("log content 1")
(Path(temp_log_env) / "error.log").write_text("log content 2")
(Path(temp_log_env) / "other.txt").write_text("not a log")
log_files = manager.get_log_files()
assert len(log_files) == 2
assert log_files[0].filename in ["app.log", "error.log"]
def test_rotate_log(temp_log_env):
"""Test log file rotation."""
manager = LogManager(log_dir=temp_log_env)
log_file = Path(temp_log_env) / "app.log"
large_content = "x" * (11 * 1024 * 1024) # 11MB
log_file.write_text(large_content)
result = manager.rotate_log("app.log", max_size_bytes=10485760)
assert result is True
assert not log_file.exists() # Original file rotated
def test_rotate_log_not_found(temp_log_env):
"""Test rotation of non-existent log."""
manager = LogManager(log_dir=temp_log_env)
result = manager.rotate_log("nonexistent.log")
assert result is False
def test_rotate_log_small_file(temp_log_env):
"""Test rotation of small log file."""
manager = LogManager(log_dir=temp_log_env)
log_file = Path(temp_log_env) / "app.log"
log_file.write_text("small content")
result = manager.rotate_log("app.log", max_size_bytes=1048576)
assert result is False
assert log_file.exists()
def test_archive_old_logs(temp_log_env):
"""Test archiving old log files."""
manager = LogManager(log_dir=temp_log_env)
# Create old and new logs
old_log = Path(temp_log_env) / "old.log"
old_log.write_text("old log")
old_log.touch()
new_log = Path(temp_log_env) / "new.log"
new_log.write_text("new log")
archived = manager.archive_old_logs(days_old=30)
assert archived > 0
def test_search_logs(temp_log_env):
"""Test searching logs."""
manager = LogManager(log_dir=temp_log_env)
# Create test logs
(Path(temp_log_env) / "app.log").write_text(
"Error occurred\nWarning message\nError again"
)
(Path(temp_log_env) / "debug.log").write_text(
"Debug info\nError in debug"
)
results = manager.search_logs("Error", case_sensitive=False)
assert len(results) >= 1
assert any("Error" in line for lines in results.values()
for line in lines)
def test_search_logs_case_sensitive(temp_log_env):
"""Test case-sensitive log search."""
manager = LogManager(log_dir=temp_log_env)
(Path(temp_log_env) / "app.log").write_text("ERROR\nerror\nError")
results = manager.search_logs("ERROR", case_sensitive=True)
assert "app.log" in results
# Should only find uppercase ERROR
assert len(results["app.log"]) == 1
def test_export_logs(temp_log_env):
"""Test exporting logs."""
manager = LogManager(log_dir=temp_log_env)
# Create test logs
(Path(temp_log_env) / "app.log").write_text("log content 1")
(Path(temp_log_env) / "error.log").write_text("log content 2")
output_file = Path(temp_log_env) / "export.tar.gz"
result = manager.export_logs(str(output_file), compress=True)
assert result is True
assert output_file.exists()
def test_export_logs_uncompressed(temp_log_env):
"""Test exporting logs without compression."""
manager = LogManager(log_dir=temp_log_env)
(Path(temp_log_env) / "app.log").write_text("log content")
output_file = Path(temp_log_env) / "export.txt"
result = manager.export_logs(str(output_file), compress=False)
assert result is True
assert output_file.exists()
assert "log content" in output_file.read_text()
def test_get_log_stats(temp_log_env):
"""Test getting log statistics."""
manager = LogManager(log_dir=temp_log_env)
# Create test logs
(Path(temp_log_env) / "app.log").write_text("x" * 1000)
(Path(temp_log_env) / "error.log").write_text("y" * 2000)
stats = manager.get_log_stats()
assert stats["total_files"] == 2
assert stats["total_size_bytes"] >= 3000
def test_get_log_stats_empty(temp_log_env):
"""Test getting stats with no logs."""
manager = LogManager(log_dir=temp_log_env)
stats = manager.get_log_stats()
assert stats["total_files"] == 0
assert stats["total_size_bytes"] == 0
def test_cleanup_logs(temp_log_env):
"""Test log cleanup."""
manager = LogManager(log_dir=temp_log_env)
# Create multiple logs
for i in range(10):
(Path(temp_log_env) / f"log_{i}.log").write_text("x" * 1000)
deleted = manager.cleanup_logs(max_total_size_mb=0.01, keep_files=2)
assert deleted > 0
def test_set_log_level():
"""Test setting log level."""
manager = LogManager()
result = manager.set_log_level("test_logger", "DEBUG")
assert result is True
def test_get_log_manager_singleton():
"""Test singleton log manager."""
manager1 = get_log_manager()
manager2 = get_log_manager()
assert manager1 is manager2
assert isinstance(manager1, LogManager)

236
tests/unit/test_metrics.py Normal file
View File

@@ -0,0 +1,236 @@
"""Unit tests for metrics collection."""
import pytest
from src.server.utils.metrics import (
MetricsCollector,
MetricType,
TimerContext,
get_metrics_collector,
)
def test_metrics_collector_initialization():
"""Test metrics collector initialization."""
collector = MetricsCollector()
assert collector is not None
assert collector._metrics == {}
assert collector._download_stats["completed"] == 0
assert collector._download_stats["failed"] == 0
def test_increment_counter():
"""Test counter metric increment."""
collector = MetricsCollector()
collector.increment_counter("test_counter", 1.0, help_text="Test counter")
collector.increment_counter("test_counter", 2.0)
assert "test_counter" in collector._metrics
assert collector._metrics["test_counter"].value == 3.0
assert collector._metrics["test_counter"].metric_type == MetricType.COUNTER
def test_set_gauge():
"""Test gauge metric."""
collector = MetricsCollector()
collector.set_gauge("test_gauge", 42.0, help_text="Test gauge")
assert collector._metrics["test_gauge"].value == 42.0
collector.set_gauge("test_gauge", 100.0)
assert collector._metrics["test_gauge"].value == 100.0
def test_observe_histogram():
"""Test histogram observation."""
collector = MetricsCollector()
collector.observe_histogram("request_duration", 0.5)
collector.observe_histogram("request_duration", 1.2)
collector.observe_histogram("request_duration", 0.8)
assert len(collector._request_timings["request_duration"]) == 3
assert 0.5 in collector._request_timings["request_duration"]
def test_start_and_end_timer():
"""Test timer functionality."""
collector = MetricsCollector()
collector.start_timer("test_timer")
import time
time.sleep(0.01) # Sleep for 10ms
duration = collector.end_timer("test_timer", "test_duration")
assert duration >= 0.01
assert "test_duration" in collector._metrics
def test_record_download_success():
"""Test download success recording."""
collector = MetricsCollector()
collector.record_download_success(1000000)
collector.record_download_success(2000000)
stats = collector.get_download_stats()
assert stats["completed"] == 2
assert stats["total_size_bytes"] == 3000000
assert stats["failed"] == 0
def test_record_download_failure():
"""Test download failure recording."""
collector = MetricsCollector()
collector.record_download_failure()
collector.record_download_failure()
stats = collector.get_download_stats()
assert stats["failed"] == 2
assert stats["completed"] == 0
def test_get_request_statistics():
"""Test request statistics calculation."""
collector = MetricsCollector()
for val in [0.5, 1.0, 0.8, 0.6, 0.9]:
collector.observe_histogram("request_latency", val)
stats = collector.get_request_statistics("request_latency")
assert stats is not None
assert stats["count"] == 5
assert stats["mean"] == pytest.approx(0.76, abs=0.01)
assert stats["min"] == 0.5
assert stats["max"] == 1.0
def test_get_request_statistics_not_found():
"""Test request statistics for non-existent metric."""
collector = MetricsCollector()
stats = collector.get_request_statistics("non_existent")
assert stats is None
def test_export_prometheus_format():
"""Test Prometheus format export."""
collector = MetricsCollector()
collector.increment_counter(
"requests_total", 10, help_text="Total requests"
)
collector.set_gauge("active_connections", 5)
prometheus_output = collector.export_prometheus_format()
assert "requests_total" in prometheus_output
assert "active_connections" in prometheus_output
assert "10" in prometheus_output
assert "5" in prometheus_output
def test_export_prometheus_with_labels():
"""Test Prometheus format with labels."""
collector = MetricsCollector()
labels = {"endpoint": "/api/anime", "method": "GET"}
collector.increment_counter("requests_total", labels=labels)
prometheus_output = collector.export_prometheus_format()
assert "endpoint" in prometheus_output
assert "method" in prometheus_output
assert "/api/anime" in prometheus_output
assert "GET" in prometheus_output
def test_export_json():
"""Test JSON export."""
collector = MetricsCollector()
collector.increment_counter("test_counter", 5)
collector.set_gauge("test_gauge", 42)
collector.record_download_success(1000000)
json_export = collector.export_json()
assert "metrics" in json_export
assert "downloads" in json_export
assert "request_timings" in json_export
assert json_export["downloads"]["completed"] == 1
assert json_export["downloads"]["total_size_bytes"] == 1000000
def test_reset_metrics():
"""Test metrics reset."""
collector = MetricsCollector()
collector.increment_counter("test_counter", 10)
collector.record_download_success(1000000)
assert len(collector._metrics) > 0
assert collector._download_stats["completed"] == 1
collector.reset_metrics()
assert len(collector._metrics) == 0
assert collector._download_stats["completed"] == 0
def test_get_all_metrics():
"""Test getting all metrics."""
collector = MetricsCollector()
collector.increment_counter("counter1", 5)
collector.set_gauge("gauge1", 10)
collector.increment_counter("counter2", 3)
all_metrics = collector.get_all_metrics()
assert len(all_metrics) == 3
assert "counter1" in all_metrics
assert "gauge1" in all_metrics
assert "counter2" in all_metrics
def test_get_metrics_collector_singleton():
"""Test singleton metrics collector."""
collector1 = get_metrics_collector()
collector2 = get_metrics_collector()
assert collector1 is collector2
assert isinstance(collector1, MetricsCollector)
def test_timer_context_manager():
"""Test timer context manager."""
collector = get_metrics_collector()
collector.reset_metrics()
import time
with TimerContext("operation_duration", "timer1"):
time.sleep(0.01)
stats = collector.get_request_statistics("operation_duration")
assert stats is not None
assert stats["count"] == 1
assert stats["max"] >= 0.01
def test_timer_context_with_labels():
"""Test timer context manager with labels."""
collector = get_metrics_collector()
collector.reset_metrics()
labels = {"endpoint": "/api/test"}
with TimerContext("endpoint_duration", labels=labels):
pass
assert "endpoint_duration" in collector._metrics

View File

@@ -0,0 +1,225 @@
"""Unit tests for monitoring service."""
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock
import pytest
from src.server.services.monitoring_service import (
ErrorMetrics,
MonitoringService,
QueueMetrics,
SystemMetrics,
get_monitoring_service,
)
def test_monitoring_service_initialization():
"""Test monitoring service initialization."""
service = MonitoringService()
assert service is not None
assert service._error_log == []
assert service._performance_samples == []
def test_get_system_metrics():
"""Test system metrics collection."""
service = MonitoringService()
metrics = service.get_system_metrics()
assert isinstance(metrics, SystemMetrics)
assert metrics.cpu_percent >= 0
assert metrics.memory_percent >= 0
assert metrics.disk_percent >= 0
assert metrics.uptime_seconds > 0
assert metrics.memory_available_mb > 0
assert metrics.disk_free_mb > 0
def test_system_metrics_stored():
"""Test that system metrics are stored for performance tracking."""
service = MonitoringService()
metrics1 = service.get_system_metrics()
metrics2 = service.get_system_metrics()
assert len(service._performance_samples) == 2
assert service._performance_samples[0] == metrics1
assert service._performance_samples[1] == metrics2
@pytest.mark.asyncio
async def test_get_queue_metrics_empty():
"""Test queue metrics with no items."""
service = MonitoringService()
mock_db = AsyncMock()
# Mock empty result
mock_result = AsyncMock()
mock_result.scalars().all.return_value = []
mock_db.execute = AsyncMock(return_value=mock_result)
metrics = await service.get_queue_metrics(mock_db)
assert isinstance(metrics, QueueMetrics)
assert metrics.total_items == 0
assert metrics.success_rate == 0.0
@pytest.mark.asyncio
async def test_get_queue_metrics_with_items():
"""Test queue metrics with download items."""
service = MonitoringService()
mock_db = AsyncMock()
# Create mock queue items
item1 = MagicMock()
item1.status = "COMPLETED"
item1.total_bytes = 1000000
item1.downloaded_bytes = 1000000
item1.download_speed = 1000000
item2 = MagicMock()
item2.status = "DOWNLOADING"
item2.total_bytes = 2000000
item2.downloaded_bytes = 1000000
item2.download_speed = 500000
item3 = MagicMock()
item3.status = "FAILED"
item3.total_bytes = 500000
item3.downloaded_bytes = 0
item3.download_speed = None
# Mock result
mock_result = AsyncMock()
mock_result.scalars().all.return_value = [item1, item2, item3]
mock_db.execute = AsyncMock(return_value=mock_result)
metrics = await service.get_queue_metrics(mock_db)
assert metrics.total_items == 3
assert metrics.completed_items == 1
assert metrics.downloading_items == 1
assert metrics.failed_items == 1
assert metrics.total_size_bytes == 3500000
assert metrics.downloaded_bytes == 2000000
assert metrics.success_rate > 0
def test_log_error():
"""Test error logging."""
service = MonitoringService()
service.log_error("Test error 1")
service.log_error("Test error 2")
assert len(service._error_log) == 2
assert service._error_log[0][1] == "Test error 1"
assert service._error_log[1][1] == "Test error 2"
def test_get_error_metrics_empty():
"""Test error metrics with no errors."""
service = MonitoringService()
metrics = service.get_error_metrics()
assert isinstance(metrics, ErrorMetrics)
assert metrics.total_errors == 0
assert metrics.errors_24h == 0
assert metrics.error_rate_per_hour == 0.0
def test_get_error_metrics_with_errors():
"""Test error metrics with multiple errors."""
service = MonitoringService()
service.log_error("ConnectionError: Failed to connect")
service.log_error("ConnectionError: Timeout")
service.log_error("TimeoutError: Download timeout")
metrics = service.get_error_metrics()
assert metrics.total_errors == 3
assert metrics.errors_24h == 3
assert metrics.last_error_time is not None
assert len(metrics.most_common_errors) > 0
def test_get_error_metrics_old_errors():
"""Test error metrics excludes old errors."""
service = MonitoringService()
# Add old error (simulate by directly adding to log)
old_time = datetime.now() - timedelta(hours=25)
service._error_log.append((old_time, "Old error"))
# Add recent error
service.log_error("Recent error")
metrics = service.get_error_metrics()
assert metrics.total_errors == 2
assert metrics.errors_24h == 1
def test_get_performance_summary():
"""Test performance summary generation."""
service = MonitoringService()
# Collect some samples
service.get_system_metrics()
service.get_system_metrics()
service.get_system_metrics()
summary = service.get_performance_summary()
assert "cpu" in summary
assert "memory" in summary
assert "disk" in summary
assert "sample_count" in summary
assert summary["sample_count"] == 3
assert "current" in summary["cpu"]
assert "average" in summary["cpu"]
assert "max" in summary["cpu"]
assert "min" in summary["cpu"]
def test_get_performance_summary_empty():
"""Test performance summary with no samples."""
service = MonitoringService()
summary = service.get_performance_summary()
assert summary == {}
@pytest.mark.asyncio
async def test_get_comprehensive_status():
"""Test comprehensive system status."""
service = MonitoringService()
mock_db = AsyncMock()
# Mock empty queue
mock_result = AsyncMock()
mock_result.scalars().all.return_value = []
mock_db.execute = AsyncMock(return_value=mock_result)
status = await service.get_comprehensive_status(mock_db)
assert "timestamp" in status
assert "system" in status
assert "queue" in status
assert "errors" in status
assert "performance" in status
assert status["system"]["cpu_percent"] >= 0
assert status["queue"]["total_items"] == 0
def test_get_monitoring_service():
"""Test singleton monitoring service."""
service1 = get_monitoring_service()
service2 = get_monitoring_service()
assert service1 is service2
assert isinstance(service1, MonitoringService)

View File

@@ -0,0 +1,211 @@
"""Unit tests for system utilities."""
import os
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
from src.server.utils.system import (
DiskInfo,
ProcessInfo,
SystemUtilities,
get_system_utilities,
)
def test_system_utilities_initialization():
"""Test system utilities initialization."""
utils = SystemUtilities()
assert utils is not None
def test_get_disk_usage():
"""Test getting disk usage information."""
utils = SystemUtilities()
disk_info = utils.get_disk_usage("/")
assert disk_info is not None
assert isinstance(disk_info, DiskInfo)
assert disk_info.total_bytes > 0
assert disk_info.free_bytes >= 0
assert disk_info.percent_used >= 0
def test_get_all_disk_usage():
"""Test getting disk usage for all partitions."""
utils = SystemUtilities()
disk_infos = utils.get_all_disk_usage()
assert isinstance(disk_infos, list)
# Should have at least one partition
assert len(disk_infos) >= 0
def test_cleanup_directory():
"""Test directory cleanup."""
utils = SystemUtilities()
with tempfile.TemporaryDirectory() as tmpdir:
# Create some test files
old_time = (datetime.now() - timedelta(days=31)).timestamp()
for i in range(3):
file_path = Path(tmpdir) / f"old_file_{i}.txt"
file_path.write_text(f"old file {i}")
Path(file_path).touch()
os.utime(file_path, (old_time, old_time))
for i in range(2):
file_path = Path(tmpdir) / f"new_file_{i}.txt"
file_path.write_text(f"new file {i}")
# Clean up files older than 30 days
deleted = utils.cleanup_directory(tmpdir, "*.txt", max_age_days=30)
assert deleted == 3
def test_cleanup_empty_directories():
"""Test empty directory cleanup."""
utils = SystemUtilities()
with tempfile.TemporaryDirectory() as tmpdir:
# Create nested directories
(Path(tmpdir) / "dir1").mkdir()
(Path(tmpdir) / "dir2").mkdir()
(Path(tmpdir) / "dir2" / "subdir").mkdir()
# Create a file in one directory
(Path(tmpdir) / "dir1" / "file.txt").write_text("content")
# Clean up empty directories
deleted = utils.cleanup_empty_directories(tmpdir)
assert deleted >= 1
def test_get_directory_size():
"""Test getting directory size."""
utils = SystemUtilities()
with tempfile.TemporaryDirectory() as tmpdir:
# Create test files
(Path(tmpdir) / "file1.txt").write_text("a" * 1000)
(Path(tmpdir) / "file2.txt").write_text("b" * 2000)
size = utils.get_directory_size(tmpdir)
assert size >= 3000 # At least 3000 bytes
def test_get_directory_size_nonexistent():
"""Test getting directory size for non-existent directory."""
utils = SystemUtilities()
size = utils.get_directory_size("/nonexistent/path")
assert size == 0
def test_get_process_info():
"""Test getting process information."""
import os
utils = SystemUtilities()
pid = os.getpid()
proc_info = utils.get_process_info(pid)
assert proc_info is not None
assert isinstance(proc_info, ProcessInfo)
assert proc_info.pid == pid
assert proc_info.name is not None
assert proc_info.cpu_percent >= 0
assert proc_info.memory_percent >= 0
def test_get_process_info_current():
"""Test getting current process information."""
utils = SystemUtilities()
proc_info = utils.get_process_info()
assert proc_info is not None
assert proc_info.pid > 0
def test_get_process_info_invalid():
"""Test getting process info for invalid PID."""
utils = SystemUtilities()
proc_info = utils.get_process_info(99999999)
assert proc_info is None
def test_get_all_processes():
"""Test getting information about all processes."""
utils = SystemUtilities()
processes = utils.get_all_processes()
assert isinstance(processes, list)
# Should have at least some processes
assert len(processes) > 0
def test_get_system_info():
"""Test getting system information."""
utils = SystemUtilities()
system_info = utils.get_system_info()
assert system_info is not None
assert "platform" in system_info
assert "cpu_count" in system_info
assert "hostname" in system_info
assert "python_version" in system_info
def test_get_network_info():
"""Test getting network information."""
utils = SystemUtilities()
net_info = utils.get_network_info()
assert net_info is not None
assert "bytes_sent" in net_info
assert "bytes_recv" in net_info
assert net_info["bytes_sent"] >= 0
assert net_info["bytes_recv"] >= 0
def test_copy_file_atomic():
"""Test atomic file copy."""
utils = SystemUtilities()
with tempfile.TemporaryDirectory() as tmpdir:
src_file = Path(tmpdir) / "source.txt"
dest_file = Path(tmpdir) / "dest.txt"
src_file.write_text("test content")
result = utils.copy_file_atomic(str(src_file), str(dest_file))
assert result is True
assert dest_file.exists()
assert dest_file.read_text() == "test content"
def test_copy_file_atomic_nonexistent():
"""Test atomic file copy with non-existent source."""
utils = SystemUtilities()
result = utils.copy_file_atomic(
"/nonexistent/source.txt", "/tmp/dest.txt"
)
assert result is False
def test_get_system_utilities_singleton():
"""Test singleton system utilities."""
utils1 = get_system_utilities()
utils2 = get_system_utilities()
assert utils1 is utils2
assert isinstance(utils1, SystemUtilities)