Add advanced features: notification system, security middleware, audit logging, data validation, and caching

- Implement notification service with email, webhook, and in-app support
- Add security headers middleware (CORS, CSP, HSTS, XSS protection)
- Create comprehensive audit logging service for security events
- Add data validation utilities with Pydantic validators
- Implement cache service with in-memory and Redis backend support

All 714 tests passing
This commit is contained in:
2025-10-24 09:23:15 +02:00
parent 17e5a551e1
commit 7409ae637e
6 changed files with 3033 additions and 44 deletions

View File

@@ -0,0 +1,628 @@
"""
Data Validation Utilities for AniWorld.
This module provides Pydantic validators and business rule validation
utilities for ensuring data integrity across the application.
"""
import re
from pathlib import Path
from typing import Any, Dict, List, Optional
class ValidationError(Exception):
"""Custom validation error."""
pass
class ValidatorMixin:
"""Mixin class providing common validation utilities."""
@staticmethod
def validate_password_strength(password: str) -> str:
"""
Validate password meets security requirements.
Args:
password: Password to validate
Returns:
Validated password
Raises:
ValueError: If password doesn't meet requirements
"""
if len(password) < 8:
raise ValueError("Password must be at least 8 characters long")
if not re.search(r"[A-Z]", password):
raise ValueError(
"Password must contain at least one uppercase letter"
)
if not re.search(r"[a-z]", password):
raise ValueError(
"Password must contain at least one lowercase letter"
)
if not re.search(r"[0-9]", password):
raise ValueError("Password must contain at least one digit")
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
raise ValueError(
"Password must contain at least one special character"
)
return password
@staticmethod
def validate_file_path(path: str, must_exist: bool = False) -> str:
"""
Validate file path.
Args:
path: File path to validate
must_exist: Whether the path must exist
Returns:
Validated path
Raises:
ValueError: If path is invalid
"""
if not path or not isinstance(path, str):
raise ValueError("Path must be a non-empty string")
# Check for path traversal attempts
if ".." in path or path.startswith("/"):
raise ValueError("Invalid path: path traversal not allowed")
path_obj = Path(path)
if must_exist and not path_obj.exists():
raise ValueError(f"Path does not exist: {path}")
return path
@staticmethod
def validate_url(url: str) -> str:
"""
Validate URL format.
Args:
url: URL to validate
Returns:
Validated URL
Raises:
ValueError: If URL is invalid
"""
if not url or not isinstance(url, str):
raise ValueError("URL must be a non-empty string")
url_pattern = re.compile(
r"^https?://" # http:// or https://
r"(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|"
r"localhost|" # localhost
r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" # IP address
r"(?::\d+)?" # optional port
r"(?:/?|[/?]\S+)$",
re.IGNORECASE,
)
if not url_pattern.match(url):
raise ValueError(f"Invalid URL format: {url}")
return url
@staticmethod
def validate_email(email: str) -> str:
"""
Validate email address format.
Args:
email: Email to validate
Returns:
Validated email
Raises:
ValueError: If email is invalid
"""
if not email or not isinstance(email, str):
raise ValueError("Email must be a non-empty string")
email_pattern = re.compile(
r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
if not email_pattern.match(email):
raise ValueError(f"Invalid email format: {email}")
return email
@staticmethod
def validate_port(port: int) -> int:
"""
Validate port number.
Args:
port: Port number to validate
Returns:
Validated port
Raises:
ValueError: If port is invalid
"""
if not isinstance(port, int):
raise ValueError("Port must be an integer")
if port < 1 or port > 65535:
raise ValueError("Port must be between 1 and 65535")
return port
@staticmethod
def validate_positive_integer(value: int, name: str = "Value") -> int:
"""
Validate positive integer.
Args:
value: Value to validate
name: Name for error messages
Returns:
Validated value
Raises:
ValueError: If value is invalid
"""
if not isinstance(value, int):
raise ValueError(f"{name} must be an integer")
if value <= 0:
raise ValueError(f"{name} must be positive")
return value
@staticmethod
def validate_non_negative_integer(value: int, name: str = "Value") -> int:
"""
Validate non-negative integer.
Args:
value: Value to validate
name: Name for error messages
Returns:
Validated value
Raises:
ValueError: If value is invalid
"""
if not isinstance(value, int):
raise ValueError(f"{name} must be an integer")
if value < 0:
raise ValueError(f"{name} cannot be negative")
return value
@staticmethod
def validate_string_length(
value: str, min_length: int = 0, max_length: Optional[int] = None, name: str = "Value"
) -> str:
"""
Validate string length.
Args:
value: String to validate
min_length: Minimum length
max_length: Maximum length (None for no limit)
name: Name for error messages
Returns:
Validated string
Raises:
ValueError: If string length is invalid
"""
if not isinstance(value, str):
raise ValueError(f"{name} must be a string")
if len(value) < min_length:
raise ValueError(
f"{name} must be at least {min_length} characters long"
)
if max_length is not None and len(value) > max_length:
raise ValueError(
f"{name} must be at most {max_length} characters long"
)
return value
@staticmethod
def validate_choice(value: Any, choices: List[Any], name: str = "Value") -> Any:
"""
Validate value is in allowed choices.
Args:
value: Value to validate
choices: List of allowed values
name: Name for error messages
Returns:
Validated value
Raises:
ValueError: If value not in choices
"""
if value not in choices:
raise ValueError(f"{name} must be one of: {', '.join(map(str, choices))}")
return value
@staticmethod
def validate_dict_keys(
data: Dict[str, Any], required_keys: List[str], name: str = "Data"
) -> Dict[str, Any]:
"""
Validate dictionary contains required keys.
Args:
data: Dictionary to validate
required_keys: List of required keys
name: Name for error messages
Returns:
Validated dictionary
Raises:
ValueError: If required keys are missing
"""
if not isinstance(data, dict):
raise ValueError(f"{name} must be a dictionary")
missing_keys = [key for key in required_keys if key not in data]
if missing_keys:
raise ValueError(
f"{name} missing required keys: {', '.join(missing_keys)}"
)
return data
def validate_episode_range(start: int, end: int) -> tuple[int, int]:
"""
Validate episode range.
Args:
start: Start episode number
end: End episode number
Returns:
Tuple of (start, end)
Raises:
ValueError: If range is invalid
"""
if start < 1:
raise ValueError("Start episode must be at least 1")
if end < start:
raise ValueError("End episode must be greater than or equal to start")
if end - start > 1000:
raise ValueError("Episode range too large (max 1000 episodes)")
return start, end
def validate_download_quality(quality: str) -> str:
"""
Validate download quality setting.
Args:
quality: Quality setting
Returns:
Validated quality
Raises:
ValueError: If quality is invalid
"""
valid_qualities = ["360p", "480p", "720p", "1080p", "best", "worst"]
if quality not in valid_qualities:
raise ValueError(
f"Invalid quality: {quality}. Must be one of: {', '.join(valid_qualities)}"
)
return quality
def validate_language(language: str) -> str:
"""
Validate language code.
Args:
language: Language code
Returns:
Validated language
Raises:
ValueError: If language is invalid
"""
valid_languages = ["ger-sub", "ger-dub", "eng-sub", "eng-dub", "jpn"]
if language not in valid_languages:
raise ValueError(
f"Invalid language: {language}. Must be one of: {', '.join(valid_languages)}"
)
return language
def validate_download_priority(priority: int) -> int:
"""
Validate download priority.
Args:
priority: Priority value
Returns:
Validated priority
Raises:
ValueError: If priority is invalid
"""
if priority < 0 or priority > 10:
raise ValueError("Priority must be between 0 and 10")
return priority
def validate_anime_url(url: str) -> str:
"""
Validate anime URL format.
Args:
url: Anime URL
Returns:
Validated URL
Raises:
ValueError: If URL is invalid
"""
if not url:
raise ValueError("URL cannot be empty")
# Check if it's a valid aniworld.to URL
if "aniworld.to" not in url and "s.to" not in url:
raise ValueError("URL must be from aniworld.to or s.to")
# Basic URL validation
ValidatorMixin.validate_url(url)
return url
def validate_series_name(name: str) -> str:
"""
Validate series name.
Args:
name: Series name
Returns:
Validated name
Raises:
ValueError: If name is invalid
"""
if not name or not name.strip():
raise ValueError("Series name cannot be empty")
if len(name) > 200:
raise ValueError("Series name too long (max 200 characters)")
# Check for invalid characters
invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']
for char in invalid_chars:
if char in name:
raise ValueError(
f"Series name contains invalid character: {char}"
)
return name.strip()
def validate_backup_name(name: str) -> str:
"""
Validate backup file name.
Args:
name: Backup name
Returns:
Validated name
Raises:
ValueError: If name is invalid
"""
if not name or not name.strip():
raise ValueError("Backup name cannot be empty")
# Must be a valid filename
if not re.match(r"^[a-zA-Z0-9_\-\.]+$", name):
raise ValueError(
"Backup name can only contain letters, numbers, underscores, hyphens, and dots"
)
if not name.endswith(".json"):
raise ValueError("Backup name must end with .json")
return name
def validate_config_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate configuration data structure.
Args:
data: Configuration data
Returns:
Validated data
Raises:
ValueError: If data is invalid
"""
required_keys = ["download_directory", "concurrent_downloads"]
ValidatorMixin.validate_dict_keys(data, required_keys, "Configuration")
# Validate download directory
if not isinstance(data["download_directory"], str):
raise ValueError("download_directory must be a string")
# Validate concurrent downloads
concurrent = data["concurrent_downloads"]
if not isinstance(concurrent, int) or concurrent < 1 or concurrent > 10:
raise ValueError("concurrent_downloads must be between 1 and 10")
# Validate quality if present
if "quality" in data:
validate_download_quality(data["quality"])
# Validate language if present
if "language" in data:
validate_language(data["language"])
return data
def sanitize_filename(filename: str) -> str:
"""
Sanitize filename for safe filesystem use.
Args:
filename: Original filename
Returns:
Sanitized filename
"""
# Remove or replace invalid characters
invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']
for char in invalid_chars:
filename = filename.replace(char, '_')
# Remove leading/trailing spaces and dots
filename = filename.strip('. ')
# Ensure not empty
if not filename:
filename = "unnamed"
# Limit length
if len(filename) > 255:
name, ext = filename.rsplit('.', 1) if '.' in filename else (filename, '')
max_name_len = 255 - len(ext) - 1 if ext else 255
filename = name[:max_name_len] + ('.' + ext if ext else '')
return filename
def validate_jwt_token(token: str) -> str:
"""
Validate JWT token format.
Args:
token: JWT token
Returns:
Validated token
Raises:
ValueError: If token format is invalid
"""
if not token or not isinstance(token, str):
raise ValueError("Token must be a non-empty string")
# JWT tokens have 3 parts separated by dots
parts = token.split(".")
if len(parts) != 3:
raise ValueError("Invalid JWT token format")
# Each part should be base64url encoded (alphanumeric + - and _)
for part in parts:
if not re.match(r"^[A-Za-z0-9_-]+$", part):
raise ValueError("Invalid JWT token encoding")
return token
def validate_ip_address(ip: str) -> str:
"""
Validate IP address format.
Args:
ip: IP address
Returns:
Validated IP address
Raises:
ValueError: If IP is invalid
"""
if not ip or not isinstance(ip, str):
raise ValueError("IP address must be a non-empty string")
# IPv4 pattern
ipv4_pattern = re.compile(
r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}"
r"(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
)
# IPv6 pattern (simplified)
ipv6_pattern = re.compile(
r"^(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}$"
)
if not ipv4_pattern.match(ip) and not ipv6_pattern.match(ip):
raise ValueError(f"Invalid IP address format: {ip}")
return ip
def validate_websocket_message(message: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate WebSocket message structure.
Args:
message: WebSocket message
Returns:
Validated message
Raises:
ValueError: If message structure is invalid
"""
required_keys = ["type"]
ValidatorMixin.validate_dict_keys(message, required_keys, "WebSocket message")
valid_types = [
"download_progress",
"download_complete",
"download_failed",
"queue_update",
"error",
"system_message",
]
if message["type"] not in valid_types:
raise ValueError(
f"Invalid message type. Must be one of: {', '.join(valid_types)}"
)
return message