"""Regex pattern validation with security checks against ReDoS attacks. Provides timeout and complexity limits to prevent catastrophic backtracking (ReDoS - Regular Expression Denial of Service). """ from __future__ import annotations import re import signal from contextlib import contextmanager from typing import TYPE_CHECKING import structlog if TYPE_CHECKING: from collections.abc import Generator logger = structlog.get_logger() # Constants for regex validation MAX_REGEX_LENGTH = 1000 REGEX_COMPILE_TIMEOUT_SECONDS = 2 class RegexTimeoutError(Exception): """Raised when regex compilation exceeds the timeout limit.""" def __init__(self, pattern: str, timeout_seconds: int) -> None: """Initialize with the pattern and timeout value. Args: pattern: The regex pattern that timed out. timeout_seconds: The timeout value in seconds. """ self.pattern = pattern self.timeout_seconds = timeout_seconds super().__init__( f"Regex pattern compilation timed out after {timeout_seconds}s " f"(possible ReDoS attack): {pattern!r}" ) def validate_regex_pattern(pattern: str) -> None: """Validate a regex pattern with length and timeout checks. Validates a regex pattern by: 1. Checking length does not exceed MAX_REGEX_LENGTH characters 2. Attempting compilation with a timeout to prevent ReDoS attacks Args: pattern: The regex pattern string to validate. Raises: ValueError: If the pattern exceeds maximum length. RegexTimeoutError: If compilation exceeds the timeout. re.error: If the pattern is syntactically invalid. Example: >>> validate_regex_pattern(r'^[a-z]+$') # OK >>> validate_regex_pattern('a' * 1001) # Raises ValueError >>> validate_regex_pattern(r'(a+)+b') # May raise RegexTimeoutError """ # Check length first (fast, no timeout needed) if len(pattern) > MAX_REGEX_LENGTH: msg = f"Regex pattern exceeds maximum length of {MAX_REGEX_LENGTH} characters: {len(pattern)} provided" logger.warning("regex_validation_length_exceeded", max_length=MAX_REGEX_LENGTH, actual_length=len(pattern)) raise ValueError(msg) # Attempt compilation with timeout try: with _timeout_context(REGEX_COMPILE_TIMEOUT_SECONDS): re.compile(pattern) except TimeoutError as exc: logger.warning( "regex_compilation_timeout", timeout_seconds=REGEX_COMPILE_TIMEOUT_SECONDS, pattern_preview=pattern[:100], ) raise RegexTimeoutError(pattern, REGEX_COMPILE_TIMEOUT_SECONDS) from exc @contextmanager def _timeout_context(timeout_seconds: int) -> Generator[None, None, None]: """Context manager to enforce a timeout using signal.alarm(). Works on Unix-like systems (Linux, macOS, etc.). On Windows or other platforms where signal.SIGALRM is unavailable, compilation proceeds without timeout (not ideal, but graceful degradation). Args: timeout_seconds: Timeout duration in seconds. Yields: None. Raises: TimeoutError: If the timeout is exceeded. Note: This uses signal.alarm() which is only available on Unix. On Windows, timeouts are not enforced (limitation of the platform). """ # Check if signal.SIGALRM is available (Unix-like systems) if not hasattr(signal, "SIGALRM"): # Windows or other platforms without SIGALRM # Just proceed without timeout (not ideal, but prevents crashes) yield return def _timeout_handler(signum: int, frame: object) -> None: raise TimeoutError("Timeout exceeded") # Set up signal handler old_handler = signal.signal(signal.SIGALRM, _timeout_handler) signal.alarm(timeout_seconds) try: yield finally: # Always disable the alarm, even if an exception occurred signal.alarm(0) signal.signal(signal.SIGALRM, old_handler)