- Created src/server/utils/logging.py with structured JSON logging - Multiple log handlers for app, error, download, security, performance - Request logging middleware with unique request IDs and timing - Log rotation and cleanup functionality - Comprehensive test suite with 19 passing tests - Context variables for request and user tracking - Security event logging and download progress tracking Features: - JSON formatted logs with consistent structure - Automatic log rotation (10MB files, 5 backups) - Request/response logging middleware - Performance monitoring - Security auditing - Download progress tracking - Old log cleanup functionality Tests: All 19 tests passing for logging system functionality
446 lines
15 KiB
Python
446 lines
15 KiB
Python
"""
|
|
Logging configuration for the AniWorld web application.
|
|
|
|
This module provides a comprehensive logging system with:
|
|
- Structured logging with multiple handlers
|
|
- Log rotation and cleanup
|
|
- Request/response logging middleware
|
|
- Performance logging
|
|
- Different log levels for different components
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import logging.handlers
|
|
import sys
|
|
import time
|
|
import uuid
|
|
from contextvars import ContextVar
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional, Union
|
|
|
|
from fastapi import Request, Response
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
|
|
# Context variables for request tracking
|
|
request_id_var: ContextVar[Optional[str]] = ContextVar(
|
|
'request_id', default=None)
|
|
user_id_var: ContextVar[Optional[str]] = ContextVar(
|
|
'user_id', default=None)
|
|
|
|
|
|
class JSONFormatter(logging.Formatter):
|
|
"""Custom JSON formatter for structured logging."""
|
|
|
|
def format(self, record: logging.LogRecord) -> str:
|
|
"""Format log record as JSON."""
|
|
log_data = {
|
|
'timestamp': datetime.now(timezone.utc).isoformat(),
|
|
'level': record.levelname,
|
|
'logger': record.name,
|
|
'message': record.getMessage(),
|
|
'module': record.module,
|
|
'function': record.funcName,
|
|
'line': record.lineno,
|
|
}
|
|
|
|
# Add request context if available
|
|
request_id = request_id_var.get()
|
|
if request_id:
|
|
log_data['request_id'] = request_id
|
|
|
|
user_id = user_id_var.get()
|
|
if user_id:
|
|
log_data['user_id'] = user_id
|
|
|
|
# Add exception info if present
|
|
if record.exc_info:
|
|
log_data['exception'] = self.formatException(record.exc_info)
|
|
|
|
# Add extra fields from the log record
|
|
excluded_fields = {
|
|
'name', 'msg', 'args', 'levelname', 'levelno', 'pathname',
|
|
'filename', 'module', 'lineno', 'funcName', 'created',
|
|
'msecs', 'relativeCreated', 'thread', 'threadName',
|
|
'processName', 'process', 'getMessage', 'exc_info',
|
|
'exc_text', 'stack_info'
|
|
}
|
|
extra_fields = {
|
|
k: v for k, v in record.__dict__.items()
|
|
if k not in excluded_fields
|
|
}
|
|
if extra_fields:
|
|
log_data['extra'] = extra_fields
|
|
|
|
return json.dumps(log_data, default=str)
|
|
|
|
|
|
class LoggingConfig:
|
|
"""Central logging configuration for the application."""
|
|
|
|
def __init__(self,
|
|
log_dir: Union[str, Path] = "logs",
|
|
log_level: str = "INFO",
|
|
max_file_size: int = 10 * 1024 * 1024, # 10MB
|
|
backup_count: int = 5,
|
|
enable_console: bool = True,
|
|
enable_json_format: bool = True):
|
|
"""Initialize logging configuration.
|
|
|
|
Args:
|
|
log_dir: Directory for log files
|
|
log_level: Default log level
|
|
max_file_size: Maximum size for log files before rotation
|
|
backup_count: Number of backup files to keep
|
|
enable_console: Whether to enable console logging
|
|
enable_json_format: Whether to use JSON formatting
|
|
"""
|
|
self.log_dir = Path(log_dir)
|
|
self.log_level = getattr(logging, log_level.upper())
|
|
self.max_file_size = max_file_size
|
|
self.backup_count = backup_count
|
|
self.enable_console = enable_console
|
|
self.enable_json_format = enable_json_format
|
|
|
|
# Ensure log directory exists
|
|
self.log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Configure loggers
|
|
self._setup_loggers()
|
|
|
|
def _setup_loggers(self) -> None:
|
|
"""Set up all application loggers."""
|
|
# Clear existing handlers
|
|
root_logger = logging.getLogger()
|
|
for handler in root_logger.handlers[:]:
|
|
root_logger.removeHandler(handler)
|
|
|
|
# Set root logger level
|
|
root_logger.setLevel(self.log_level)
|
|
|
|
# Create formatters
|
|
json_formatter = JSONFormatter()
|
|
console_formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
# Console handler
|
|
if self.enable_console:
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
console_handler.setLevel(self.log_level)
|
|
console_handler.setFormatter(console_formatter)
|
|
root_logger.addHandler(console_handler)
|
|
|
|
# Main application log file with rotation
|
|
app_handler = logging.handlers.RotatingFileHandler(
|
|
self.log_dir / "app.log",
|
|
maxBytes=self.max_file_size,
|
|
backupCount=self.backup_count,
|
|
encoding='utf-8'
|
|
)
|
|
app_handler.setLevel(self.log_level)
|
|
if self.enable_json_format:
|
|
app_handler.setFormatter(json_formatter)
|
|
else:
|
|
app_handler.setFormatter(console_formatter)
|
|
root_logger.addHandler(app_handler)
|
|
|
|
# Error log file
|
|
error_handler = logging.handlers.RotatingFileHandler(
|
|
self.log_dir / "error.log",
|
|
maxBytes=self.max_file_size,
|
|
backupCount=self.backup_count,
|
|
encoding='utf-8'
|
|
)
|
|
error_handler.setLevel(logging.ERROR)
|
|
if self.enable_json_format:
|
|
error_handler.setFormatter(json_formatter)
|
|
else:
|
|
error_handler.setFormatter(console_formatter)
|
|
root_logger.addHandler(error_handler)
|
|
|
|
# Download-specific log file
|
|
download_logger = logging.getLogger("download")
|
|
download_handler = logging.handlers.RotatingFileHandler(
|
|
self.log_dir / "download.log",
|
|
maxBytes=self.max_file_size,
|
|
backupCount=self.backup_count,
|
|
encoding='utf-8'
|
|
)
|
|
download_handler.setLevel(logging.INFO)
|
|
if self.enable_json_format:
|
|
download_handler.setFormatter(json_formatter)
|
|
else:
|
|
download_handler.setFormatter(console_formatter)
|
|
download_logger.addHandler(download_handler)
|
|
download_logger.setLevel(logging.INFO)
|
|
download_logger.propagate = False
|
|
|
|
# Security log file
|
|
security_logger = logging.getLogger("security")
|
|
security_handler = logging.handlers.RotatingFileHandler(
|
|
self.log_dir / "security.log",
|
|
maxBytes=self.max_file_size,
|
|
backupCount=self.backup_count,
|
|
encoding='utf-8'
|
|
)
|
|
security_handler.setLevel(logging.INFO)
|
|
if self.enable_json_format:
|
|
security_handler.setFormatter(json_formatter)
|
|
else:
|
|
security_handler.setFormatter(console_formatter)
|
|
security_logger.addHandler(security_handler)
|
|
security_logger.setLevel(logging.INFO)
|
|
security_logger.propagate = False
|
|
|
|
# Performance log file
|
|
performance_logger = logging.getLogger("performance")
|
|
performance_handler = logging.handlers.RotatingFileHandler(
|
|
self.log_dir / "performance.log",
|
|
maxBytes=self.max_file_size,
|
|
backupCount=self.backup_count,
|
|
encoding='utf-8'
|
|
)
|
|
performance_handler.setLevel(logging.INFO)
|
|
if self.enable_json_format:
|
|
performance_handler.setFormatter(json_formatter)
|
|
else:
|
|
performance_handler.setFormatter(console_formatter)
|
|
performance_logger.addHandler(performance_handler)
|
|
performance_logger.setLevel(logging.INFO)
|
|
performance_logger.propagate = False
|
|
|
|
# Suppress noisy third-party loggers
|
|
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
|
|
logging.getLogger("charset_normalizer").setLevel(logging.WARNING)
|
|
logging.getLogger("asyncio").setLevel(logging.WARNING)
|
|
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
|
|
|
|
|
|
class RequestLoggingMiddleware(BaseHTTPMiddleware):
|
|
"""Middleware for logging HTTP requests and responses."""
|
|
|
|
def __init__(self, app, logger_name: str = "web"):
|
|
super().__init__(app)
|
|
self.logger = logging.getLogger(logger_name)
|
|
self.performance_logger = logging.getLogger("performance")
|
|
|
|
async def dispatch(self, request: Request, call_next) -> Response:
|
|
"""Process request and log details."""
|
|
# Generate unique request ID
|
|
request_id = str(uuid.uuid4())
|
|
request_id_var.set(request_id)
|
|
|
|
# Extract client information
|
|
client_ip = self._get_client_ip(request)
|
|
user_agent = request.headers.get("user-agent", "")
|
|
|
|
# Log request start
|
|
start_time = time.time()
|
|
self.logger.info(
|
|
"Request started",
|
|
extra={
|
|
"method": request.method,
|
|
"url": str(request.url),
|
|
"client_ip": client_ip,
|
|
"user_agent": user_agent,
|
|
"request_size": request.headers.get("content-length", 0)
|
|
}
|
|
)
|
|
|
|
try:
|
|
# Process request
|
|
response = await call_next(request)
|
|
|
|
# Calculate processing time
|
|
processing_time = time.time() - start_time
|
|
|
|
# Log successful response
|
|
self.logger.info(
|
|
"Request completed",
|
|
extra={
|
|
"status_code": response.status_code,
|
|
"processing_time": processing_time,
|
|
"response_size": response.headers.get("content-length", 0)
|
|
}
|
|
)
|
|
|
|
# Log performance metrics
|
|
self.performance_logger.info(
|
|
"Request performance",
|
|
extra={
|
|
"method": request.method,
|
|
"url": str(request.url),
|
|
"status_code": response.status_code,
|
|
"processing_time": processing_time,
|
|
"client_ip": client_ip
|
|
}
|
|
)
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
# Calculate processing time even for errors
|
|
processing_time = time.time() - start_time
|
|
|
|
# Log error
|
|
self.logger.error(
|
|
"Request failed",
|
|
extra={
|
|
"error": str(e),
|
|
"error_type": type(e).__name__,
|
|
"processing_time": processing_time
|
|
},
|
|
exc_info=True
|
|
)
|
|
|
|
# Re-raise the exception
|
|
raise
|
|
|
|
finally:
|
|
# Clear request context
|
|
request_id_var.set(None)
|
|
|
|
def _get_client_ip(self, request: Request) -> str:
|
|
"""Extract client IP address from request."""
|
|
# Check for forwarded headers first
|
|
forwarded_for = request.headers.get("x-forwarded-for")
|
|
if forwarded_for:
|
|
return forwarded_for.split(",")[0].strip()
|
|
|
|
real_ip = request.headers.get("x-real-ip")
|
|
if real_ip:
|
|
return real_ip
|
|
|
|
# Fall back to direct client IP
|
|
if hasattr(request, "client") and request.client:
|
|
return request.client.host
|
|
|
|
return "unknown"
|
|
|
|
|
|
def setup_logging(log_dir: str = "logs",
|
|
log_level: str = "INFO",
|
|
enable_json: bool = True) -> LoggingConfig:
|
|
"""Set up logging for the application.
|
|
|
|
Args:
|
|
log_dir: Directory for log files
|
|
log_level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
|
|
enable_json: Whether to use JSON formatting
|
|
|
|
Returns:
|
|
LoggingConfig instance
|
|
"""
|
|
return LoggingConfig(
|
|
log_dir=log_dir,
|
|
log_level=log_level,
|
|
enable_json_format=enable_json
|
|
)
|
|
|
|
|
|
def get_logger(name: str) -> logging.Logger:
|
|
"""Get a logger instance with the specified name.
|
|
|
|
Args:
|
|
name: Logger name, typically __name__
|
|
|
|
Returns:
|
|
Logger instance
|
|
"""
|
|
return logging.getLogger(name)
|
|
|
|
|
|
def log_download_progress(episode_id: str,
|
|
progress: float,
|
|
status: str,
|
|
speed: Optional[float] = None,
|
|
eta: Optional[str] = None) -> None:
|
|
"""Log download progress information.
|
|
|
|
Args:
|
|
episode_id: Episode identifier
|
|
progress: Download progress (0.0 to 1.0)
|
|
status: Download status
|
|
speed: Download speed in bytes/second
|
|
eta: Estimated time remaining
|
|
"""
|
|
logger = logging.getLogger("download")
|
|
logger.info(
|
|
"Download progress",
|
|
extra={
|
|
"episode_id": episode_id,
|
|
"progress": progress,
|
|
"status": status,
|
|
"speed": speed,
|
|
"eta": eta
|
|
}
|
|
)
|
|
|
|
|
|
def log_security_event(event_type: str,
|
|
details: Dict[str, Any],
|
|
severity: str = "INFO") -> None:
|
|
"""Log security-related events.
|
|
|
|
Args:
|
|
event_type: Type of security event
|
|
details: Event details
|
|
severity: Event severity
|
|
"""
|
|
logger = logging.getLogger("security")
|
|
log_func = getattr(logger, severity.lower(), logger.info)
|
|
log_func(
|
|
f"Security event: {event_type}",
|
|
extra=details
|
|
)
|
|
|
|
|
|
def cleanup_old_logs(log_dir: Union[str, Path],
|
|
days_to_keep: int = 30) -> None:
|
|
"""Clean up old log files.
|
|
|
|
Args:
|
|
log_dir: Directory containing log files
|
|
days_to_keep: Number of days to keep log files
|
|
"""
|
|
log_path = Path(log_dir)
|
|
if not log_path.exists():
|
|
return
|
|
|
|
cutoff_time = time.time() - (days_to_keep * 24 * 60 * 60)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
for log_file in log_path.glob("*.log*"):
|
|
try:
|
|
if log_file.stat().st_mtime < cutoff_time:
|
|
log_file.unlink()
|
|
logger.info(f"Deleted old log file: {log_file}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete log file {log_file}: {e}")
|
|
|
|
|
|
# Initialize default logging configuration
|
|
_default_config: Optional[LoggingConfig] = None
|
|
|
|
|
|
def init_logging(log_dir: str = "logs",
|
|
log_level: str = "INFO",
|
|
enable_json: bool = True) -> None:
|
|
"""Initialize the logging system.
|
|
|
|
This should be called once at application startup.
|
|
|
|
Args:
|
|
log_dir: Directory for log files
|
|
log_level: Log level
|
|
enable_json: Whether to use JSON formatting
|
|
"""
|
|
global _default_config
|
|
_default_config = setup_logging(log_dir, log_level, enable_json)
|
|
|
|
|
|
def get_request_logging_middleware() -> RequestLoggingMiddleware:
|
|
"""Get the request logging middleware instance."""
|
|
return RequestLoggingMiddleware |