feat: implement comprehensive logging system

- Created src/server/utils/logging.py with structured JSON logging
- Multiple log handlers for app, error, download, security, performance
- Request logging middleware with unique request IDs and timing
- Log rotation and cleanup functionality
- Comprehensive test suite with 19 passing tests
- Context variables for request and user tracking
- Security event logging and download progress tracking

Features:
- JSON formatted logs with consistent structure
- Automatic log rotation (10MB files, 5 backups)
- Request/response logging middleware
- Performance monitoring
- Security auditing
- Download progress tracking
- Old log cleanup functionality

Tests: All 19 tests passing for logging system functionality
This commit is contained in:
Lukas 2025-10-12 23:33:56 +02:00
parent 8fb4770161
commit 8e885dd40b
3 changed files with 1221 additions and 10 deletions

View File

@ -38,20 +38,11 @@ The tasks should be completed in the following order to ensure proper dependenci
2. Process the task
3. Make Tests.
4. Remove task from instructions.md.
5. Update infrastructure.md
5. Update infrastructure.md, but only add text that belongs to a infrastructure doc.
6. Commit in git
## Core Tasks
### 1. Project Structure Setup
#### [] Configure logging system
- []Create `src/server/utils/logging.py`
- []Set up structured logging with multiple handlers
- []Configure log rotation and cleanup
- []Add request/response logging middleware
### 2. Authentication System
#### [] Implement authentication models
@ -388,3 +379,377 @@ When working with these files:
- []Test session management security
Each task should be implemented with proper error handling, logging, and type hints according to the project's coding standards.
## Additional Implementation Guidelines
### Code Style and Standards
- **Type Hints**: Use comprehensive type annotations throughout all modules
- **Docstrings**: Follow PEP 257 for function and class documentation
- **Error Handling**: Implement custom exception classes with meaningful messages
- **Logging**: Use structured logging with appropriate log levels
- **Security**: Validate all inputs and sanitize outputs
- **Performance**: Use async/await patterns for I/O operations
### Monitoring and Health Checks
#### [] Implement health check endpoints
- []Create `src/server/api/health.py`
- []Add GET `/health` - basic health check
- []Add GET `/health/detailed` - comprehensive system status
- []Include dependency checks (database, file system)
- []Add performance metrics
#### [] Create monitoring service
- []Create `src/server/services/monitoring_service.py`
- []Implement system resource monitoring
- []Add download queue metrics
- []Include error rate tracking
- []Add performance benchmarking
#### [] Add metrics collection
- []Create `src/server/utils/metrics.py`
- []Implement Prometheus metrics export
- []Add custom business metrics
- []Include request timing and counts
- []Add download success/failure rates
### Advanced Features
#### [] Implement backup and restore
- []Create `src/server/services/backup_service.py`
- []Add configuration backup/restore
- []Implement anime data export/import
- []Include download history preservation
- []Add scheduled backup functionality
#### [] Create notification system
- []Create `src/server/services/notification_service.py`
- []Implement email notifications for completed downloads
- []Add webhook support for external integrations
- []Include in-app notification system
- []Add notification preference management
#### [] Add analytics and reporting
- []Create `src/server/services/analytics_service.py`
- []Implement download statistics
- []Add series popularity tracking
- []Include storage usage analysis
- []Add performance reports
### Maintenance and Operations
#### [] Create maintenance endpoints
- []Create `src/server/api/maintenance.py`
- []Add POST `/api/maintenance/cleanup` - cleanup temporary files
- []Add POST `/api/maintenance/rebuild-index` - rebuild search index
- []Add GET `/api/maintenance/stats` - system statistics
- []Add POST `/api/maintenance/vacuum` - database maintenance
#### [] Implement log management
- []Create `src/server/utils/log_manager.py`
- []Add log rotation and archival
- []Implement log level management
- []Include log search and filtering
- []Add log export functionality
#### [] Create system utilities
- []Create `src/server/utils/system.py`
- []Add disk space monitoring
- []Implement file system cleanup
- []Include process management utilities
- []Add system information gathering
### Security Enhancements
#### [] Implement rate limiting
- []Create `src/server/middleware/rate_limit.py`
- []Add endpoint-specific rate limits
- []Implement IP-based limiting
- []Include user-based rate limiting
- []Add bypass mechanisms for authenticated users
#### [] Add security headers
- []Create `src/server/middleware/security.py`
- []Implement CORS headers
- []Add CSP headers
- []Include security headers (HSTS, X-Frame-Options)
- []Add request sanitization
#### [] Create audit logging
- []Create `src/server/services/audit_service.py`
- []Log all authentication attempts
- []Track configuration changes
- []Monitor download activities
- []Include user action tracking
### Data Management
#### [] Implement data validation
- []Create `src/server/utils/validators.py`
- []Add Pydantic custom validators
- []Implement business rule validation
- []Include data integrity checks
- []Add format validation utilities
#### [] Create data migration tools
- []Create `src/server/database/migrations/`
- []Add database schema migration scripts
- []Implement data transformation tools
- []Include rollback mechanisms
- []Add migration validation
#### [] Add caching layer
- []Create `src/server/services/cache_service.py`
- []Implement Redis caching
- []Add in-memory caching for frequent data
- []Include cache invalidation strategies
- []Add cache performance monitoring
### Integration Enhancements
#### [] Extend provider system
- []Enhance `src/core/providers/` for better web integration
- []Add provider health monitoring
- []Implement provider failover mechanisms
- []Include provider performance tracking
- []Add dynamic provider configuration
#### [] Create plugin system
- []Create `src/server/plugins/`
- []Add plugin loading and management
- []Implement plugin API
- []Include plugin configuration
- []Add plugin security validation
#### [] Add external API integrations
- []Create `src/server/integrations/`
- []Add anime database API connections
- []Implement metadata enrichment services
- []Include content recommendation systems
- []Add external notification services
### Advanced Testing
#### [] Performance testing
- []Create `tests/performance/`
- []Add load testing for API endpoints
- []Implement stress testing for download system
- []Include memory leak detection
- []Add concurrency testing
#### [] Security testing
- []Create `tests/security/`
- []Add penetration testing scripts
- []Implement vulnerability scanning
- []Include authentication bypass testing
- []Add input validation testing
#### [] End-to-end testing
- []Create `tests/e2e/`
- []Add full workflow testing
- []Implement UI automation tests
- []Include cross-browser testing
- []Add mobile responsiveness testing
### Deployment Strategies
#### [] Container orchestration
- []Create `kubernetes/` directory
- []Add Kubernetes deployment manifests
- []Implement service discovery
- []Include load balancing configuration
- []Add auto-scaling policies
#### [] CI/CD pipeline
- []Create `.github/workflows/`
- []Add automated testing pipeline
- []Implement deployment automation
- []Include security scanning
- []Add performance benchmarking
#### [] Environment management
- []Create environment-specific configurations
- []Add secrets management
- []Implement feature flags
- []Include environment validation
- []Add rollback mechanisms
## Implementation Best Practices
### Error Handling Patterns
```python
# Custom exception hierarchy
class AniWorldException(Exception):
"""Base exception for AniWorld application"""
pass
class AuthenticationError(AniWorldException):
"""Authentication related errors"""
pass
class DownloadError(AniWorldException):
"""Download related errors"""
pass
# Service-level error handling
async def download_episode(episode_id: str) -> DownloadResult:
try:
result = await downloader.download(episode_id)
return result
except ProviderError as e:
logger.error(f"Provider error downloading {episode_id}: {e}")
raise DownloadError(f"Failed to download episode: {e}")
except Exception as e:
logger.exception(f"Unexpected error downloading {episode_id}")
raise DownloadError("Unexpected download error")
```
### Logging Standards
```python
import logging
import structlog
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger(__name__)
# Usage examples
logger.info("Download started", episode_id=episode_id, user_id=user_id)
logger.error("Download failed", episode_id=episode_id, error=str(e))
```
### API Response Patterns
```python
from pydantic import BaseModel
from typing import Optional, List, Any
class APIResponse(BaseModel):
success: bool
message: Optional[str] = None
data: Optional[Any] = None
errors: Optional[List[str]] = None
class PaginatedResponse(APIResponse):
total: int
page: int
per_page: int
pages: int
# Usage in endpoints
@router.get("/anime", response_model=PaginatedResponse)
async def list_anime(page: int = 1, per_page: int = 20):
try:
anime_list, total = await anime_service.list_anime(page, per_page)
return PaginatedResponse(
success=True,
data=anime_list,
total=total,
page=page,
per_page=per_page,
pages=(total + per_page - 1) // per_page
)
except Exception as e:
logger.exception("Failed to list anime")
return APIResponse(
success=False,
message="Failed to retrieve anime list",
errors=[str(e)]
)
```
### Dependency Injection Patterns
```python
from fastapi import Depends
from typing import Annotated
# Service dependencies
def get_anime_service() -> AnimeService:
return AnimeService()
def get_download_service() -> DownloadService:
return DownloadService()
# Dependency annotations
AnimeServiceDep = Annotated[AnimeService, Depends(get_anime_service)]
DownloadServiceDep = Annotated[DownloadService, Depends(get_download_service)]
# Usage in endpoints
@router.post("/download")
async def start_download(
request: DownloadRequest,
download_service: DownloadServiceDep,
anime_service: AnimeServiceDep
):
# Implementation
pass
```
## Final Implementation Notes
1. **Incremental Development**: Implement features incrementally, testing each component thoroughly before moving to the next
2. **Code Review**: Review all generated code for adherence to project standards
3. **Documentation**: Document all public APIs and complex logic
4. **Testing**: Maintain test coverage above 80% for all new code
5. **Performance**: Profile and optimize critical paths, especially download and streaming operations
6. **Security**: Regular security audits and dependency updates
7. **Monitoring**: Implement comprehensive monitoring and alerting
8. **Maintenance**: Plan for regular maintenance and updates
## Task Completion Checklist
For each task completed:
- [ ] Implementation follows coding standards
- [ ] Unit tests written and passing
- [ ] Integration tests passing
- [ ] Documentation updated
- [ ] Error handling implemented
- [ ] Logging added
- [ ] Security considerations addressed
- [ ] Performance validated
- [ ] Code reviewed
- [ ] Task marked as complete in instructions.md
- [ ] Infrastructure.md updated
- [ ] Changes committed to git
This comprehensive guide ensures a robust, maintainable, and scalable anime download management system with modern web capabilities.

446
src/server/utils/logging.py Normal file
View File

@ -0,0 +1,446 @@
"""
Logging configuration for the AniWorld web application.
This module provides a comprehensive logging system with:
- Structured logging with multiple handlers
- Log rotation and cleanup
- Request/response logging middleware
- Performance logging
- Different log levels for different components
"""
import json
import logging
import logging.handlers
import sys
import time
import uuid
from contextvars import ContextVar
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional, Union
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
# Context variables for request tracking
request_id_var: ContextVar[Optional[str]] = ContextVar(
'request_id', default=None)
user_id_var: ContextVar[Optional[str]] = ContextVar(
'user_id', default=None)
class JSONFormatter(logging.Formatter):
"""Custom JSON formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
"""Format log record as JSON."""
log_data = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
}
# Add request context if available
request_id = request_id_var.get()
if request_id:
log_data['request_id'] = request_id
user_id = user_id_var.get()
if user_id:
log_data['user_id'] = user_id
# Add exception info if present
if record.exc_info:
log_data['exception'] = self.formatException(record.exc_info)
# Add extra fields from the log record
excluded_fields = {
'name', 'msg', 'args', 'levelname', 'levelno', 'pathname',
'filename', 'module', 'lineno', 'funcName', 'created',
'msecs', 'relativeCreated', 'thread', 'threadName',
'processName', 'process', 'getMessage', 'exc_info',
'exc_text', 'stack_info'
}
extra_fields = {
k: v for k, v in record.__dict__.items()
if k not in excluded_fields
}
if extra_fields:
log_data['extra'] = extra_fields
return json.dumps(log_data, default=str)
class LoggingConfig:
"""Central logging configuration for the application."""
def __init__(self,
log_dir: Union[str, Path] = "logs",
log_level: str = "INFO",
max_file_size: int = 10 * 1024 * 1024, # 10MB
backup_count: int = 5,
enable_console: bool = True,
enable_json_format: bool = True):
"""Initialize logging configuration.
Args:
log_dir: Directory for log files
log_level: Default log level
max_file_size: Maximum size for log files before rotation
backup_count: Number of backup files to keep
enable_console: Whether to enable console logging
enable_json_format: Whether to use JSON formatting
"""
self.log_dir = Path(log_dir)
self.log_level = getattr(logging, log_level.upper())
self.max_file_size = max_file_size
self.backup_count = backup_count
self.enable_console = enable_console
self.enable_json_format = enable_json_format
# Ensure log directory exists
self.log_dir.mkdir(parents=True, exist_ok=True)
# Configure loggers
self._setup_loggers()
def _setup_loggers(self) -> None:
"""Set up all application loggers."""
# Clear existing handlers
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Set root logger level
root_logger.setLevel(self.log_level)
# Create formatters
json_formatter = JSONFormatter()
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Console handler
if self.enable_console:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(self.log_level)
console_handler.setFormatter(console_formatter)
root_logger.addHandler(console_handler)
# Main application log file with rotation
app_handler = logging.handlers.RotatingFileHandler(
self.log_dir / "app.log",
maxBytes=self.max_file_size,
backupCount=self.backup_count,
encoding='utf-8'
)
app_handler.setLevel(self.log_level)
if self.enable_json_format:
app_handler.setFormatter(json_formatter)
else:
app_handler.setFormatter(console_formatter)
root_logger.addHandler(app_handler)
# Error log file
error_handler = logging.handlers.RotatingFileHandler(
self.log_dir / "error.log",
maxBytes=self.max_file_size,
backupCount=self.backup_count,
encoding='utf-8'
)
error_handler.setLevel(logging.ERROR)
if self.enable_json_format:
error_handler.setFormatter(json_formatter)
else:
error_handler.setFormatter(console_formatter)
root_logger.addHandler(error_handler)
# Download-specific log file
download_logger = logging.getLogger("download")
download_handler = logging.handlers.RotatingFileHandler(
self.log_dir / "download.log",
maxBytes=self.max_file_size,
backupCount=self.backup_count,
encoding='utf-8'
)
download_handler.setLevel(logging.INFO)
if self.enable_json_format:
download_handler.setFormatter(json_formatter)
else:
download_handler.setFormatter(console_formatter)
download_logger.addHandler(download_handler)
download_logger.setLevel(logging.INFO)
download_logger.propagate = False
# Security log file
security_logger = logging.getLogger("security")
security_handler = logging.handlers.RotatingFileHandler(
self.log_dir / "security.log",
maxBytes=self.max_file_size,
backupCount=self.backup_count,
encoding='utf-8'
)
security_handler.setLevel(logging.INFO)
if self.enable_json_format:
security_handler.setFormatter(json_formatter)
else:
security_handler.setFormatter(console_formatter)
security_logger.addHandler(security_handler)
security_logger.setLevel(logging.INFO)
security_logger.propagate = False
# Performance log file
performance_logger = logging.getLogger("performance")
performance_handler = logging.handlers.RotatingFileHandler(
self.log_dir / "performance.log",
maxBytes=self.max_file_size,
backupCount=self.backup_count,
encoding='utf-8'
)
performance_handler.setLevel(logging.INFO)
if self.enable_json_format:
performance_handler.setFormatter(json_formatter)
else:
performance_handler.setFormatter(console_formatter)
performance_logger.addHandler(performance_handler)
performance_logger.setLevel(logging.INFO)
performance_logger.propagate = False
# Suppress noisy third-party loggers
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
logging.getLogger("charset_normalizer").setLevel(logging.WARNING)
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
class RequestLoggingMiddleware(BaseHTTPMiddleware):
"""Middleware for logging HTTP requests and responses."""
def __init__(self, app, logger_name: str = "web"):
super().__init__(app)
self.logger = logging.getLogger(logger_name)
self.performance_logger = logging.getLogger("performance")
async def dispatch(self, request: Request, call_next) -> Response:
"""Process request and log details."""
# Generate unique request ID
request_id = str(uuid.uuid4())
request_id_var.set(request_id)
# Extract client information
client_ip = self._get_client_ip(request)
user_agent = request.headers.get("user-agent", "")
# Log request start
start_time = time.time()
self.logger.info(
"Request started",
extra={
"method": request.method,
"url": str(request.url),
"client_ip": client_ip,
"user_agent": user_agent,
"request_size": request.headers.get("content-length", 0)
}
)
try:
# Process request
response = await call_next(request)
# Calculate processing time
processing_time = time.time() - start_time
# Log successful response
self.logger.info(
"Request completed",
extra={
"status_code": response.status_code,
"processing_time": processing_time,
"response_size": response.headers.get("content-length", 0)
}
)
# Log performance metrics
self.performance_logger.info(
"Request performance",
extra={
"method": request.method,
"url": str(request.url),
"status_code": response.status_code,
"processing_time": processing_time,
"client_ip": client_ip
}
)
return response
except Exception as e:
# Calculate processing time even for errors
processing_time = time.time() - start_time
# Log error
self.logger.error(
"Request failed",
extra={
"error": str(e),
"error_type": type(e).__name__,
"processing_time": processing_time
},
exc_info=True
)
# Re-raise the exception
raise
finally:
# Clear request context
request_id_var.set(None)
def _get_client_ip(self, request: Request) -> str:
"""Extract client IP address from request."""
# Check for forwarded headers first
forwarded_for = request.headers.get("x-forwarded-for")
if forwarded_for:
return forwarded_for.split(",")[0].strip()
real_ip = request.headers.get("x-real-ip")
if real_ip:
return real_ip
# Fall back to direct client IP
if hasattr(request, "client") and request.client:
return request.client.host
return "unknown"
def setup_logging(log_dir: str = "logs",
log_level: str = "INFO",
enable_json: bool = True) -> LoggingConfig:
"""Set up logging for the application.
Args:
log_dir: Directory for log files
log_level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
enable_json: Whether to use JSON formatting
Returns:
LoggingConfig instance
"""
return LoggingConfig(
log_dir=log_dir,
log_level=log_level,
enable_json_format=enable_json
)
def get_logger(name: str) -> logging.Logger:
"""Get a logger instance with the specified name.
Args:
name: Logger name, typically __name__
Returns:
Logger instance
"""
return logging.getLogger(name)
def log_download_progress(episode_id: str,
progress: float,
status: str,
speed: Optional[float] = None,
eta: Optional[str] = None) -> None:
"""Log download progress information.
Args:
episode_id: Episode identifier
progress: Download progress (0.0 to 1.0)
status: Download status
speed: Download speed in bytes/second
eta: Estimated time remaining
"""
logger = logging.getLogger("download")
logger.info(
"Download progress",
extra={
"episode_id": episode_id,
"progress": progress,
"status": status,
"speed": speed,
"eta": eta
}
)
def log_security_event(event_type: str,
details: Dict[str, Any],
severity: str = "INFO") -> None:
"""Log security-related events.
Args:
event_type: Type of security event
details: Event details
severity: Event severity
"""
logger = logging.getLogger("security")
log_func = getattr(logger, severity.lower(), logger.info)
log_func(
f"Security event: {event_type}",
extra=details
)
def cleanup_old_logs(log_dir: Union[str, Path],
days_to_keep: int = 30) -> None:
"""Clean up old log files.
Args:
log_dir: Directory containing log files
days_to_keep: Number of days to keep log files
"""
log_path = Path(log_dir)
if not log_path.exists():
return
cutoff_time = time.time() - (days_to_keep * 24 * 60 * 60)
logger = logging.getLogger(__name__)
for log_file in log_path.glob("*.log*"):
try:
if log_file.stat().st_mtime < cutoff_time:
log_file.unlink()
logger.info(f"Deleted old log file: {log_file}")
except Exception as e:
logger.error(f"Failed to delete log file {log_file}: {e}")
# Initialize default logging configuration
_default_config: Optional[LoggingConfig] = None
def init_logging(log_dir: str = "logs",
log_level: str = "INFO",
enable_json: bool = True) -> None:
"""Initialize the logging system.
This should be called once at application startup.
Args:
log_dir: Directory for log files
log_level: Log level
enable_json: Whether to use JSON formatting
"""
global _default_config
_default_config = setup_logging(log_dir, log_level, enable_json)
def get_request_logging_middleware() -> RequestLoggingMiddleware:
"""Get the request logging middleware instance."""
return RequestLoggingMiddleware

400
tests/unit/test_logging.py Normal file
View File

@ -0,0 +1,400 @@
"""
Tests for the logging system.
"""
import json
import logging
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from src.server.utils.logging import (
JSONFormatter,
LoggingConfig,
RequestLoggingMiddleware,
cleanup_old_logs,
get_logger,
init_logging,
log_download_progress,
log_security_event,
request_id_var,
setup_logging,
user_id_var,
)
class TestJSONFormatter:
"""Test the JSON log formatter."""
def test_format_basic_log(self):
"""Test basic log formatting."""
formatter = JSONFormatter()
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="/test.py",
lineno=42,
msg="Test message",
args=(),
exc_info=None
)
result = formatter.format(record)
log_data = json.loads(result)
assert log_data["level"] == "INFO"
assert log_data["logger"] == "test"
assert log_data["message"] == "Test message"
assert log_data["module"] == "test"
assert log_data["line"] == 42
assert "timestamp" in log_data
def test_format_with_context(self):
"""Test log formatting with request context."""
request_id_var.set("test-request-123")
user_id_var.set("user-456")
formatter = JSONFormatter()
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="/test.py",
lineno=42,
msg="Test message",
args=(),
exc_info=None
)
result = formatter.format(record)
log_data = json.loads(result)
assert log_data["request_id"] == "test-request-123"
assert log_data["user_id"] == "user-456"
# Clean up context
request_id_var.set(None)
user_id_var.set(None)
def test_format_with_exception(self):
"""Test log formatting with exception."""
formatter = JSONFormatter()
try:
raise ValueError("Test error")
except ValueError:
import sys
exc_info = sys.exc_info()
record = logging.LogRecord(
name="test",
level=logging.ERROR,
pathname="/test.py",
lineno=42,
msg="Error occurred",
args=(),
exc_info=exc_info
)
result = formatter.format(record)
log_data = json.loads(result)
assert log_data["level"] == "ERROR"
assert "exception" in log_data
assert "ValueError" in log_data["exception"]
def test_format_with_extra_fields(self):
"""Test log formatting with extra fields."""
formatter = JSONFormatter()
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="/test.py",
lineno=42,
msg="Test message",
args=(),
exc_info=None
)
# Add extra fields
record.episode_id = "episode-123"
record.download_speed = 1024.5
result = formatter.format(record)
log_data = json.loads(result)
assert "extra" in log_data
assert log_data["extra"]["episode_id"] == "episode-123"
assert log_data["extra"]["download_speed"] == 1024.5
class TestLoggingConfig:
"""Test the logging configuration."""
def test_init_with_defaults(self):
"""Test initialization with default values."""
with tempfile.TemporaryDirectory() as temp_dir:
config = LoggingConfig(log_dir=temp_dir)
assert config.log_dir == Path(temp_dir)
assert config.log_level == logging.INFO
assert config.enable_console is True
assert config.enable_json_format is True
# Check that log files would be created
# No logs yet, files created on first log
assert config.log_dir.exists()
def test_log_directory_creation(self):
"""Test that log directory is created."""
with tempfile.TemporaryDirectory() as temp_dir:
log_dir = Path(temp_dir) / "logs" / "subdir"
config = LoggingConfig(log_dir=log_dir)
assert log_dir.exists()
assert config.log_dir == log_dir
def test_logger_setup(self):
"""Test that loggers are properly configured."""
with tempfile.TemporaryDirectory() as temp_dir:
LoggingConfig(log_dir=temp_dir)
# Test main logger
logger = logging.getLogger()
assert logger.level == logging.INFO
# Test specific loggers
download_logger = logging.getLogger("download")
assert download_logger.level == logging.INFO
assert download_logger.propagate is False
security_logger = logging.getLogger("security")
assert security_logger.level == logging.INFO
assert security_logger.propagate is False
performance_logger = logging.getLogger("performance")
assert performance_logger.level == logging.INFO
assert performance_logger.propagate is False
def test_file_logging(self):
"""Test that log files are created and written to."""
with tempfile.TemporaryDirectory() as temp_dir:
LoggingConfig(log_dir=temp_dir, enable_console=False)
# Write some logs
logger = logging.getLogger("test")
logger.info("Test info message")
logger.error("Test error message")
# Force handler flush
for handler in logging.getLogger().handlers:
handler.flush()
# Check that log files exist and contain content
app_log = Path(temp_dir) / "app.log"
error_log = Path(temp_dir) / "error.log"
# Files should exist after logging
assert app_log.exists()
assert error_log.exists()
class TestRequestLoggingMiddleware:
"""Test the request logging middleware."""
@pytest.fixture
def mock_request(self):
"""Create a mock request."""
request = MagicMock()
request.method = "GET"
request.url = "http://test.com/api/test"
request.headers = {
"user-agent": "test-agent",
"content-length": "100"
}
request.client.host = "127.0.0.1"
return request
@pytest.fixture
def mock_response(self):
"""Create a mock response."""
response = MagicMock()
response.status_code = 200
response.headers = {"content-length": "200"}
return response
@pytest.mark.asyncio
async def test_successful_request_logging(
self, mock_request, mock_response):
"""Test logging of successful requests."""
app = MagicMock()
middleware = RequestLoggingMiddleware(app)
async def mock_call_next(request):
return mock_response
with patch.object(middleware.logger, 'info') as mock_log_info:
with patch.object(
middleware.performance_logger, 'info') as mock_perf_log:
response = await middleware.dispatch(
mock_request, mock_call_next)
assert response == mock_response
assert mock_log_info.call_count == 2 # Start and completion
assert mock_perf_log.call_count == 1
# Check log messages
start_call = mock_log_info.call_args_list[0]
assert "Request started" in start_call[0][0]
completion_call = mock_log_info.call_args_list[1]
assert "Request completed" in completion_call[0][0]
@pytest.mark.asyncio
async def test_failed_request_logging(self, mock_request):
"""Test logging of failed requests."""
app = MagicMock()
middleware = RequestLoggingMiddleware(app)
async def mock_call_next(request):
raise ValueError("Test error")
with patch.object(middleware.logger, 'info') as mock_log_info:
with patch.object(middleware.logger, 'error') as mock_log_error:
with pytest.raises(ValueError):
await middleware.dispatch(mock_request, mock_call_next)
assert mock_log_info.call_count == 1 # Only start
assert mock_log_error.call_count == 1 # Error
error_call = mock_log_error.call_args_list[0]
assert "Request failed" in error_call[0][0]
def test_get_client_ip_forwarded_for(self):
"""Test client IP extraction with X-Forwarded-For header."""
app = MagicMock()
middleware = RequestLoggingMiddleware(app)
request = MagicMock()
request.headers = {"x-forwarded-for": "192.168.1.1, 10.0.0.1"}
ip = middleware._get_client_ip(request)
assert ip == "192.168.1.1"
def test_get_client_ip_real_ip(self):
"""Test client IP extraction with X-Real-IP header."""
app = MagicMock()
middleware = RequestLoggingMiddleware(app)
request = MagicMock()
request.headers = {"x-real-ip": "192.168.1.2"}
ip = middleware._get_client_ip(request)
assert ip == "192.168.1.2"
def test_get_client_ip_direct(self):
"""Test client IP extraction from direct connection."""
app = MagicMock()
middleware = RequestLoggingMiddleware(app)
request = MagicMock()
request.headers = {}
request.client.host = "192.168.1.3"
ip = middleware._get_client_ip(request)
assert ip == "192.168.1.3"
class TestUtilityFunctions:
"""Test utility functions."""
def test_setup_logging(self):
"""Test setup_logging function."""
with tempfile.TemporaryDirectory() as temp_dir:
config = setup_logging(log_dir=temp_dir, log_level="DEBUG")
assert isinstance(config, LoggingConfig)
assert config.log_dir == Path(temp_dir)
assert config.log_level == logging.DEBUG
def test_get_logger(self):
"""Test get_logger function."""
logger = get_logger("test.module")
assert isinstance(logger, logging.Logger)
assert logger.name == "test.module"
def test_log_download_progress(self):
"""Test download progress logging."""
with patch('logging.getLogger') as mock_get_logger:
mock_logger = MagicMock()
mock_get_logger.return_value = mock_logger
log_download_progress(
episode_id="ep-123",
progress=0.75,
status="downloading",
speed=1024.5,
eta="5 minutes"
)
mock_get_logger.assert_called_with("download")
mock_logger.info.assert_called_once()
call_args = mock_logger.info.call_args
assert "Download progress" in call_args[0][0]
assert call_args[1]["extra"]["episode_id"] == "ep-123"
assert call_args[1]["extra"]["progress"] == 0.75
def test_log_security_event(self):
"""Test security event logging."""
with patch('logging.getLogger') as mock_get_logger:
mock_logger = MagicMock()
mock_get_logger.return_value = mock_logger
log_security_event(
event_type="login_attempt",
details={"user_ip": "192.168.1.1", "success": True},
severity="INFO"
)
mock_get_logger.assert_called_with("security")
mock_logger.info.assert_called_once()
call_args = mock_logger.info.call_args
assert "Security event: login_attempt" in call_args[0][0]
def test_cleanup_old_logs(self):
"""Test log cleanup function."""
with tempfile.TemporaryDirectory() as temp_dir:
log_dir = Path(temp_dir)
# Create some test log files
old_log = log_dir / "old.log"
new_log = log_dir / "new.log"
old_log.touch()
new_log.touch()
# Test that function runs without error
# (Real test would require complex mocking of file system)
try:
cleanup_old_logs(log_dir, days_to_keep=30)
# If no exception is raised, the function works
success = True
except Exception:
success = False
assert success
# Both files should still exist since they're new
assert old_log.exists()
assert new_log.exists()
def test_init_logging(self):
"""Test init_logging function."""
with tempfile.TemporaryDirectory() as temp_dir:
init_logging(log_dir=temp_dir, log_level="DEBUG")
# Should set up global logging
logger = logging.getLogger()
assert logger.level == logging.DEBUG