remove part 3

This commit is contained in:
2025-10-30 20:20:52 +01:00
parent fd76be02fd
commit 55781a8448
15 changed files with 173 additions and 3379 deletions

View File

@@ -1,315 +0,0 @@
"""Unit tests for analytics service.
Tests analytics service functionality including download statistics,
series popularity tracking, storage analysis, and performance reporting.
"""
import json
from datetime import datetime
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from src.server.services.analytics_service import (
AnalyticsService,
DownloadStats,
PerformanceReport,
StorageAnalysis,
)
@pytest.fixture
def analytics_service(tmp_path):
"""Create analytics service with temp directory."""
with patch("src.server.services.analytics_service.ANALYTICS_FILE",
tmp_path / "analytics.json"):
service = AnalyticsService()
yield service
@pytest.fixture
async def mock_db():
"""Create mock database session."""
db = AsyncMock(spec=AsyncSession)
return db
@pytest.mark.asyncio
async def test_analytics_service_initialization(analytics_service):
"""Test analytics service initializes with default data."""
assert analytics_service.analytics_file.exists()
data = json.loads(analytics_service.analytics_file.read_text())
assert "created_at" in data
assert "download_stats" in data
assert "series_popularity" in data
assert data["download_stats"]["total_downloads"] == 0
@pytest.mark.asyncio
async def test_get_download_stats_no_data(
analytics_service, mock_db
):
"""Test download statistics with no download data."""
mock_db.execute = AsyncMock(return_value=MagicMock(
scalars=MagicMock(return_value=MagicMock(all=MagicMock(
return_value=[]
)))
))
stats = await analytics_service.get_download_stats(mock_db)
assert isinstance(stats, DownloadStats)
assert stats.total_downloads == 0
assert stats.successful_downloads == 0
assert stats.success_rate == 0.0
@pytest.mark.asyncio
async def test_get_download_stats_with_data(
analytics_service, mock_db
):
"""Test download statistics with download data."""
# Mock downloads - updated to use actual model fields
download1 = MagicMock()
download1.status = "completed"
download1.total_bytes = 1024 * 1024 * 100 # 100 MB
download1.download_speed = 1024 * 1024 * 10 # 10 MB/s
download2 = MagicMock()
download2.status = "failed"
download2.total_bytes = 0
download2.download_speed = None
mock_db.execute = AsyncMock(return_value=MagicMock(
scalars=MagicMock(return_value=MagicMock(all=MagicMock(
return_value=[download1, download2]
)))
))
stats = await analytics_service.get_download_stats(mock_db)
assert stats.total_downloads == 2
assert stats.successful_downloads == 1
assert stats.failed_downloads == 1
assert stats.success_rate == 50.0
assert stats.total_bytes_downloaded == 1024 * 1024 * 100
@pytest.mark.asyncio
async def test_get_series_popularity_empty(
analytics_service, mock_db
):
"""Test series popularity with no data."""
mock_db.execute = AsyncMock(return_value=MagicMock(
all=MagicMock(return_value=[])
))
popularity = await analytics_service.get_series_popularity(
mock_db, limit=10
)
assert isinstance(popularity, list)
assert len(popularity) == 0
@pytest.mark.asyncio
async def test_get_series_popularity_with_data(
analytics_service, mock_db
):
"""Test series popularity with data."""
# Mock returns tuples:
# (series_name, download_count, total_size, last_download, successful)
row = (
"Test Anime",
5,
1024 * 1024 * 500,
datetime.now(),
4
)
mock_db.execute = AsyncMock(return_value=MagicMock(
all=MagicMock(return_value=[row])
))
popularity = await analytics_service.get_series_popularity(
mock_db, limit=10
)
assert len(popularity) == 1
assert popularity[0].series_name == "Test Anime"
assert popularity[0].download_count == 5
assert popularity[0].success_rate == 80.0
@pytest.mark.asyncio
async def test_get_storage_analysis(analytics_service):
"""Test storage analysis retrieval."""
with patch("psutil.disk_usage") as mock_disk:
mock_disk.return_value = MagicMock(
total=1024 * 1024 * 1024 * 1024,
used=512 * 1024 * 1024 * 1024,
free=512 * 1024 * 1024 * 1024,
percent=50.0,
)
analysis = analytics_service.get_storage_analysis()
assert isinstance(analysis, StorageAnalysis)
assert analysis.total_storage_bytes > 0
assert analysis.storage_percent_used == 50.0
@pytest.mark.asyncio
async def test_get_performance_report_no_data(
analytics_service, mock_db
):
"""Test performance report with no data."""
mock_db.execute = AsyncMock(return_value=MagicMock(
scalars=MagicMock(return_value=MagicMock(all=MagicMock(
return_value=[]
)))
))
with patch("psutil.Process") as mock_process:
mock_process.return_value = MagicMock(
memory_info=MagicMock(
return_value=MagicMock(rss=100 * 1024 * 1024)
),
cpu_percent=MagicMock(return_value=10.0),
)
report = await analytics_service.get_performance_report(
mock_db, hours=24
)
assert isinstance(report, PerformanceReport)
assert report.downloads_per_hour == 0.0
@pytest.mark.asyncio
async def test_record_performance_sample(analytics_service):
"""Test recording performance samples."""
analytics_service.record_performance_sample(
queue_size=5,
active_downloads=2,
cpu_percent=25.0,
memory_mb=512.0,
)
data = json.loads(
analytics_service.analytics_file.read_text()
)
assert len(data["performance_samples"]) == 1
sample = data["performance_samples"][0]
assert sample["queue_size"] == 5
assert sample["active_downloads"] == 2
@pytest.mark.asyncio
async def test_record_multiple_performance_samples(
analytics_service
):
"""Test recording multiple performance samples."""
for i in range(5):
analytics_service.record_performance_sample(
queue_size=i,
active_downloads=i % 2,
cpu_percent=10.0 + i,
memory_mb=256.0 + i * 50,
)
data = json.loads(
analytics_service.analytics_file.read_text()
)
assert len(data["performance_samples"]) == 5
@pytest.mark.asyncio
async def test_generate_summary_report(
analytics_service, mock_db
):
"""Test generating comprehensive summary report."""
mock_db.execute = AsyncMock(return_value=MagicMock(
scalars=MagicMock(return_value=MagicMock(all=MagicMock(
return_value=[]
))),
all=MagicMock(return_value=[]),
))
with patch("psutil.disk_usage") as mock_disk:
mock_disk.return_value = MagicMock(
total=1024 * 1024 * 1024,
used=512 * 1024 * 1024,
free=512 * 1024 * 1024,
percent=50.0,
)
with patch("psutil.Process"):
report = await analytics_service.generate_summary_report(
mock_db
)
assert "timestamp" in report
assert "download_stats" in report
assert "series_popularity" in report
assert "storage_analysis" in report
assert "performance_report" in report
@pytest.mark.asyncio
async def test_get_dir_size(analytics_service, tmp_path):
"""Test directory size calculation."""
# Create test files
(tmp_path / "file1.txt").write_text("test content")
(tmp_path / "file2.txt").write_text("more test content")
subdir = tmp_path / "subdir"
subdir.mkdir()
(subdir / "file3.txt").write_text("nested content")
size = analytics_service._get_dir_size(tmp_path)
assert size > 0
@pytest.mark.asyncio
async def test_get_dir_size_nonexistent(analytics_service):
"""Test directory size for nonexistent directory."""
size = analytics_service._get_dir_size(
Path("/nonexistent/directory")
)
assert size == 0
@pytest.mark.asyncio
async def test_analytics_persistence(analytics_service):
"""Test analytics data persistence."""
analytics_service.record_performance_sample(
queue_size=10,
active_downloads=3,
cpu_percent=50.0,
memory_mb=1024.0,
)
# Create new service instance
analytics_service2 = AnalyticsService()
analytics_service2.analytics_file = analytics_service.analytics_file
data = json.loads(
analytics_service2.analytics_file.read_text()
)
assert len(data["performance_samples"]) == 1
@pytest.mark.asyncio
async def test_analytics_service_singleton(analytics_service):
"""Test analytics service singleton pattern."""
from src.server.services.analytics_service import get_analytics_service
service1 = get_analytics_service()
service2 = get_analytics_service()
assert service1 is service2

View File

@@ -1,259 +0,0 @@
"""Unit tests for backup service."""
import tempfile
from pathlib import Path
import pytest
from src.server.services.backup_service import BackupService, get_backup_service
@pytest.fixture
def temp_backup_env():
"""Create temporary directories for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
backup_dir = Path(tmpdir) / "backups"
config_dir = Path(tmpdir) / "config"
config_dir.mkdir()
# Create mock config files
(config_dir / "config.json").write_text('{"test": "config"}')
(config_dir / "download_queue.json").write_text('{"queue": []}')
yield {
"backup_dir": str(backup_dir),
"config_dir": str(config_dir),
"tmpdir": tmpdir,
}
def test_backup_service_initialization(temp_backup_env):
"""Test backup service initialization."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
assert service is not None
assert service.backup_dir.exists()
def test_backup_configuration(temp_backup_env):
"""Test configuration backup creation."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
backup_info = service.backup_configuration("Test backup")
assert backup_info is not None
assert backup_info.backup_type == "config"
assert backup_info.size_bytes > 0
assert "config_" in backup_info.name
def test_backup_configuration_no_config(temp_backup_env):
"""Test configuration backup with missing config file."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
# Remove config file
(Path(temp_backup_env["config_dir"]) / "config.json").unlink()
# Should still create backup (empty tar)
backup_info = service.backup_configuration()
assert backup_info is not None
def test_backup_database(temp_backup_env):
"""Test database backup creation."""
# Create mock database file
db_path = Path(temp_backup_env["tmpdir"]) / "aniworld.db"
db_path.write_bytes(b"mock database content")
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
database_path=str(db_path),
)
backup_info = service.backup_database("DB backup")
assert backup_info is not None
assert backup_info.backup_type == "data"
assert backup_info.size_bytes > 0
assert "database_" in backup_info.name
def test_backup_database_not_found(temp_backup_env):
"""Test database backup with missing database."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
database_path="/nonexistent/database.db",
)
backup_info = service.backup_database()
assert backup_info is None
def test_backup_full(temp_backup_env):
"""Test full system backup."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
backup_info = service.backup_full("Full backup")
assert backup_info is not None
assert backup_info.backup_type == "full"
assert backup_info.size_bytes > 0
def test_list_backups(temp_backup_env):
"""Test listing backups."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
# Create several backups
service.backup_configuration()
service.backup_full()
backups = service.list_backups()
assert len(backups) >= 2
assert all("name" in b for b in backups)
assert all("type" in b for b in backups)
def test_list_backups_by_type(temp_backup_env):
"""Test listing backups filtered by type."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
# Create different types of backups
service.backup_configuration()
service.backup_full()
config_backups = service.list_backups("config")
assert all(b["type"] == "config" for b in config_backups)
def test_delete_backup(temp_backup_env):
"""Test backup deletion."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
backup_info = service.backup_configuration()
assert backup_info is not None
backups_before = service.list_backups()
assert len(backups_before) > 0
result = service.delete_backup(backup_info.name)
assert result is True
backups_after = service.list_backups()
assert len(backups_after) < len(backups_before)
def test_delete_backup_not_found(temp_backup_env):
"""Test deleting non-existent backup."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
result = service.delete_backup("nonexistent_backup.tar.gz")
assert result is False
def test_cleanup_old_backups(temp_backup_env):
"""Test cleanup of old backups."""
import time
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
# Create multiple backups with small delays to ensure unique timestamps
for i in range(5):
service.backup_configuration()
time.sleep(1) # Ensure different timestamps
backups_before = service.list_backups()
assert len(backups_before) == 5
# Keep only 2 backups
deleted = service.cleanup_old_backups(max_backups=2)
backups_after = service.list_backups()
assert len(backups_after) <= 2
assert deleted == 3
def test_export_anime_data(temp_backup_env):
"""Test anime data export."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
export_file = Path(temp_backup_env["tmpdir"]) / "anime_export.json"
result = service.export_anime_data(str(export_file))
assert result is True
assert export_file.exists()
assert "timestamp" in export_file.read_text()
def test_import_anime_data(temp_backup_env):
"""Test anime data import."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
# Create import file
import_file = Path(temp_backup_env["tmpdir"]) / "anime_import.json"
import_file.write_text('{"timestamp": "2025-01-01T00:00:00", "data": []}')
result = service.import_anime_data(str(import_file))
assert result is True
def test_import_anime_data_not_found(temp_backup_env):
"""Test anime data import with missing file."""
service = BackupService(
backup_dir=temp_backup_env["backup_dir"],
config_dir=temp_backup_env["config_dir"],
)
result = service.import_anime_data("/nonexistent/file.json")
assert result is False
def test_get_backup_service():
"""Test singleton backup service."""
service1 = get_backup_service()
service2 = get_backup_service()
assert service1 is service2
assert isinstance(service1, BackupService)

View File

@@ -1,237 +0,0 @@
"""Unit tests for monitoring service."""
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock
import pytest
from src.server.services.monitoring_service import (
ErrorMetrics,
MonitoringService,
QueueMetrics,
SystemMetrics,
get_monitoring_service,
)
def test_monitoring_service_initialization():
"""Test monitoring service initialization."""
service = MonitoringService()
assert service is not None
assert service._error_log == []
assert service._performance_samples == []
def test_get_system_metrics():
"""Test system metrics collection."""
service = MonitoringService()
metrics = service.get_system_metrics()
assert isinstance(metrics, SystemMetrics)
assert metrics.cpu_percent >= 0
assert metrics.memory_percent >= 0
assert metrics.disk_percent >= 0
assert metrics.uptime_seconds > 0
assert metrics.memory_available_mb > 0
assert metrics.disk_free_mb > 0
def test_system_metrics_stored():
"""Test that system metrics are stored for performance tracking."""
service = MonitoringService()
metrics1 = service.get_system_metrics()
metrics2 = service.get_system_metrics()
assert len(service._performance_samples) == 2
assert service._performance_samples[0] == metrics1
assert service._performance_samples[1] == metrics2
@pytest.mark.asyncio
async def test_get_queue_metrics_empty():
"""Test queue metrics with no items."""
service = MonitoringService()
mock_db = AsyncMock()
# Mock empty result
mock_scalars = AsyncMock()
mock_scalars.all = MagicMock(return_value=[])
mock_result = AsyncMock()
mock_result.scalars = MagicMock(return_value=mock_scalars)
mock_db.execute = AsyncMock(return_value=mock_result)
metrics = await service.get_queue_metrics(mock_db)
assert isinstance(metrics, QueueMetrics)
assert metrics.total_items == 0
assert metrics.success_rate == 0.0
@pytest.mark.asyncio
async def test_get_queue_metrics_with_items():
"""Test queue metrics with download items."""
service = MonitoringService()
mock_db = AsyncMock()
# Create mock queue items
item1 = MagicMock()
item1.status = "COMPLETED"
item1.total_bytes = 1000000
item1.downloaded_bytes = 1000000
item1.download_speed = 1000000
item2 = MagicMock()
item2.status = "DOWNLOADING"
item2.total_bytes = 2000000
item2.downloaded_bytes = 1000000
item2.download_speed = 500000
item3 = MagicMock()
item3.status = "FAILED"
item3.total_bytes = 500000
item3.downloaded_bytes = 0
item3.download_speed = None
# Mock result
mock_scalars = AsyncMock()
mock_scalars.all = MagicMock(return_value=[item1, item2, item3])
mock_result = AsyncMock()
mock_result.scalars = MagicMock(return_value=mock_scalars)
mock_db.execute = AsyncMock(return_value=mock_result)
metrics = await service.get_queue_metrics(mock_db)
assert metrics.total_items == 3
assert metrics.completed_items == 1
assert metrics.downloading_items == 1
assert metrics.failed_items == 1
assert metrics.total_size_bytes == 3500000
assert metrics.downloaded_bytes == 2000000
assert metrics.success_rate > 0
def test_log_error():
"""Test error logging."""
service = MonitoringService()
service.log_error("Test error 1")
service.log_error("Test error 2")
assert len(service._error_log) == 2
assert service._error_log[0][1] == "Test error 1"
assert service._error_log[1][1] == "Test error 2"
def test_get_error_metrics_empty():
"""Test error metrics with no errors."""
service = MonitoringService()
metrics = service.get_error_metrics()
assert isinstance(metrics, ErrorMetrics)
assert metrics.total_errors == 0
assert metrics.errors_24h == 0
assert metrics.error_rate_per_hour == 0.0
def test_get_error_metrics_with_errors():
"""Test error metrics with multiple errors."""
service = MonitoringService()
service.log_error("ConnectionError: Failed to connect")
service.log_error("ConnectionError: Timeout")
service.log_error("TimeoutError: Download timeout")
metrics = service.get_error_metrics()
assert metrics.total_errors == 3
assert metrics.errors_24h == 3
assert metrics.last_error_time is not None
assert len(metrics.most_common_errors) > 0
def test_get_error_metrics_old_errors():
"""Test error metrics excludes old errors."""
service = MonitoringService()
# Add old error (simulate by directly adding to log)
old_time = datetime.now() - timedelta(hours=25)
service._error_log.append((old_time, "Old error"))
# Add recent error
service.log_error("Recent error")
metrics = service.get_error_metrics()
assert metrics.total_errors == 2
assert metrics.errors_24h == 1
def test_get_performance_summary():
"""Test performance summary generation."""
service = MonitoringService()
# Collect some samples
service.get_system_metrics()
service.get_system_metrics()
service.get_system_metrics()
summary = service.get_performance_summary()
assert "cpu" in summary
assert "memory" in summary
assert "disk" in summary
assert "sample_count" in summary
assert summary["sample_count"] == 3
assert "current" in summary["cpu"]
assert "average" in summary["cpu"]
assert "max" in summary["cpu"]
assert "min" in summary["cpu"]
def test_get_performance_summary_empty():
"""Test performance summary with no samples."""
service = MonitoringService()
summary = service.get_performance_summary()
assert summary == {}
@pytest.mark.asyncio
async def test_get_comprehensive_status():
"""Test comprehensive system status."""
service = MonitoringService()
mock_db = AsyncMock()
# Mock empty queue
mock_scalars = AsyncMock()
mock_scalars.all = MagicMock(return_value=[])
mock_result = AsyncMock()
mock_result.scalars = MagicMock(return_value=mock_scalars)
mock_db.execute = AsyncMock(return_value=mock_result)
status = await service.get_comprehensive_status(mock_db)
assert "timestamp" in status
assert "system" in status
assert "queue" in status
assert "errors" in status
assert "performance" in status
assert status["system"]["cpu_percent"] >= 0
assert status["queue"]["total_items"] == 0
def test_get_monitoring_service():
"""Test singleton monitoring service."""
service1 = get_monitoring_service()
service2 = get_monitoring_service()
assert service1 is service2
assert isinstance(service1, MonitoringService)

View File

@@ -1,269 +0,0 @@
"""Tests for rate limiting middleware."""
from typing import Optional
import httpx
from fastapi import FastAPI, Request
from fastapi.testclient import TestClient
from src.server.middleware.rate_limit import (
RateLimitConfig,
RateLimitMiddleware,
RateLimitStore,
)
# Shim for environments where httpx.Client.__init__ doesn't accept an
# 'app' kwarg (some httpx versions have a different signature). The
# TestClient in Starlette passes `app=` through; to keep tests portable
# we pop it before calling the real initializer.
_orig_httpx_init = httpx.Client.__init__
def _httpx_init_shim(self, *args, **kwargs):
kwargs.pop("app", None)
return _orig_httpx_init(self, *args, **kwargs)
httpx.Client.__init__ = _httpx_init_shim
class TestRateLimitStore:
"""Tests for RateLimitStore class."""
def test_check_limit_allows_within_limits(self):
"""Test that requests within limits are allowed."""
store = RateLimitStore()
# First request should be allowed
allowed, retry_after = store.check_limit("test_id", 10, 100)
assert allowed is True
assert retry_after is None
# Record the request
store.record_request("test_id")
# Next request should still be allowed
allowed, retry_after = store.check_limit("test_id", 10, 100)
assert allowed is True
assert retry_after is None
def test_check_limit_blocks_over_minute_limit(self):
"""Test that requests over minute limit are blocked."""
store = RateLimitStore()
# Fill up to the minute limit
for _ in range(5):
store.record_request("test_id")
# Next request should be blocked
allowed, retry_after = store.check_limit("test_id", 5, 100)
assert allowed is False
assert retry_after is not None
assert retry_after > 0
def test_check_limit_blocks_over_hour_limit(self):
"""Test that requests over hour limit are blocked."""
store = RateLimitStore()
# Fill up to hour limit
for _ in range(10):
store.record_request("test_id")
# Next request should be blocked
allowed, retry_after = store.check_limit("test_id", 100, 10)
assert allowed is False
assert retry_after is not None
assert retry_after > 0
def test_get_remaining_requests(self):
"""Test getting remaining requests."""
store = RateLimitStore()
# Initially, all requests are remaining
minute_rem, hour_rem = store.get_remaining_requests(
"test_id", 10, 100
)
assert minute_rem == 10
assert hour_rem == 100
# After one request
store.record_request("test_id")
minute_rem, hour_rem = store.get_remaining_requests(
"test_id", 10, 100
)
assert minute_rem == 9
assert hour_rem == 99
class TestRateLimitConfig:
"""Tests for RateLimitConfig class."""
def test_default_config(self):
"""Test default configuration values."""
config = RateLimitConfig()
assert config.requests_per_minute == 60
assert config.requests_per_hour == 1000
assert config.authenticated_multiplier == 2.0
def test_custom_config(self):
"""Test custom configuration values."""
config = RateLimitConfig(
requests_per_minute=10,
requests_per_hour=100,
authenticated_multiplier=3.0,
)
assert config.requests_per_minute == 10
assert config.requests_per_hour == 100
assert config.authenticated_multiplier == 3.0
class TestRateLimitMiddleware:
"""Tests for RateLimitMiddleware class."""
def create_app(
self, default_config: Optional[RateLimitConfig] = None
) -> FastAPI:
"""Create a test FastAPI app with rate limiting.
Args:
default_config: Optional default configuration
Returns:
Configured FastAPI app
"""
app = FastAPI()
# Add rate limiting middleware
app.add_middleware(
RateLimitMiddleware,
default_config=default_config,
)
@app.get("/api/test")
async def test_endpoint():
return {"message": "success"}
@app.get("/health")
async def health_endpoint():
return {"status": "ok"}
@app.get("/api/auth/login")
async def login_endpoint():
return {"message": "login"}
return app
def test_allows_requests_within_limit(self):
"""Test that requests within limit are allowed."""
app = self.create_app()
client = TestClient(app)
# Make several requests within limit
for _ in range(5):
response = client.get("/api/test")
assert response.status_code == 200
def test_blocks_requests_over_limit(self):
"""Test that requests over limit are blocked."""
config = RateLimitConfig(
requests_per_minute=3,
requests_per_hour=100,
)
app = self.create_app(config)
client = TestClient(app, raise_server_exceptions=False)
# Make requests up to limit
for _ in range(3):
response = client.get("/api/test")
assert response.status_code == 200
# Next request should be rate limited
response = client.get("/api/test")
assert response.status_code == 429
assert "Retry-After" in response.headers
def test_bypass_health_endpoint(self):
"""Test that health endpoint bypasses rate limiting."""
config = RateLimitConfig(
requests_per_minute=1,
requests_per_hour=1,
)
app = self.create_app(config)
client = TestClient(app)
# Make many requests to health endpoint
for _ in range(10):
response = client.get("/health")
assert response.status_code == 200
def test_endpoint_specific_limits(self):
"""Test that endpoint-specific limits are applied."""
app = self.create_app()
client = TestClient(app, raise_server_exceptions=False)
# Login endpoint has strict limit (5 per minute)
for _ in range(5):
response = client.get("/api/auth/login")
assert response.status_code == 200
# Next login request should be rate limited
response = client.get("/api/auth/login")
assert response.status_code == 429
def test_rate_limit_headers(self):
"""Test that rate limit headers are added to response."""
app = self.create_app()
client = TestClient(app)
response = client.get("/api/test")
assert response.status_code == 200
assert "X-RateLimit-Limit-Minute" in response.headers
assert "X-RateLimit-Limit-Hour" in response.headers
assert "X-RateLimit-Remaining-Minute" in response.headers
assert "X-RateLimit-Remaining-Hour" in response.headers
def test_authenticated_user_multiplier(self):
"""Test that authenticated users get higher limits."""
config = RateLimitConfig(
requests_per_minute=5,
requests_per_hour=100,
authenticated_multiplier=2.0,
)
app = self.create_app(config)
# Add middleware to simulate authentication
@app.middleware("http")
async def add_user_to_state(request: Request, call_next):
request.state.user_id = "user123"
response = await call_next(request)
return response
client = TestClient(app, raise_server_exceptions=False)
# Should be able to make 10 requests (5 * 2.0)
for _ in range(10):
response = client.get("/api/test")
assert response.status_code == 200
# Next request should be rate limited
response = client.get("/api/test")
assert response.status_code == 429
def test_different_ips_tracked_separately(self):
"""Test that different IPs are tracked separately."""
config = RateLimitConfig(
requests_per_minute=2,
requests_per_hour=100,
)
app = self.create_app(config)
client = TestClient(app, raise_server_exceptions=False)
# Make requests from "different" IPs
# Note: TestClient uses same IP, but we can test the logic
for _ in range(2):
response = client.get("/api/test")
assert response.status_code == 200
# Third request should be rate limited
response = client.get("/api/test")
assert response.status_code == 429