"""In-memory rate limiter for IP-based request throttling. Implements exponential backoff for failed login attempts using failure tracking. Each wrong password attempt increments the failure count for that IP, and subsequent attempts are blocked for a duration that grows exponentially up to a maximum. Uses a dictionary of deques (per IP) storing timestamps of recent failures. Old entries are cleaned up by a background task to prevent unbounded growth. Process-local implementation — in multi-worker setups, each worker has independent counters. This constraint limits the blast radius of brute-force attacks to a single worker. **How It Works:** 1. A successful login resets the failure counter for that IP. 2. Each failed login (wrong password) calls record_failure() and increments the counter. 3. is_allowed() checks if enough time has passed since the last failure based on the current failure count. The delay grows exponentially with each consecutive failure: - 1st failure: 0.5 second penalty - 2nd failure: 1 second penalty (0.5 * 2^1) - 3rd failure: 2 seconds penalty (0.5 * 2^2) - 4th failure: 4 seconds penalty (0.5 * 2^3) - ... up to the configured maximum (default 5 seconds) 4. Penalties are cumulative within the window: if an attacker makes 5 failed attempts, they must wait the full 5 seconds before trying again (not 5 seconds per attempt). **Cleanup Lifecycle**: The rate limiter state (_failures) grows as IPs interact with the system. To prevent unbounded memory growth during long runtimes, a scheduled background task (rate_limiter_cleanup) calls cleanup_expired() every 30 minutes. This is safe because: - cleanup_expired() only removes IPs with no recent failures (all timestamps outside the rate-limit window), so active IPs are never disrupted. - The cleanup is non-blocking and logged for observability. - Individual requests already prune old timestamps from each IP's deque during is_allowed() and record_failure(), so cleanup primarily handles dormant IPs. For monitoring, check logs for "rate_limiter_cleanup" events to observe how many IPs are being retired from memory each cleanup cycle. """ from __future__ import annotations from collections import deque from time import time from typing import TYPE_CHECKING import structlog from app.utils.constants import ( LOGIN_PENALTY_BASE_SECONDS, LOGIN_PENALTY_MAX_SECONDS, LOGIN_PENALTY_MULTIPLIER, ) if TYPE_CHECKING: from collections.abc import Mapping log: structlog.stdlib.BoundLogger = structlog.get_logger() # 5 attempts per minute per IP (300 seconds) DEFAULT_RATE_LIMIT_ATTEMPTS = 5 DEFAULT_RATE_LIMIT_WINDOW_SECONDS = 60 class RateLimiter: """Track and enforce request rate limits per IP address. Stores attempt timestamps in per-IP deques, removing old entries outside the rate limit window. """ def __init__( self, max_attempts: int = DEFAULT_RATE_LIMIT_ATTEMPTS, window_seconds: int = DEFAULT_RATE_LIMIT_WINDOW_SECONDS, ) -> None: """Initialize the rate limiter. Args: max_attempts: Maximum attempts allowed within the window. (Deprecated: now only used for cleanup window size) window_seconds: Time window (seconds) for rate limit. """ self.max_attempts: int = max_attempts self.window_seconds: int = window_seconds self._failures: dict[str, deque[float]] = {} def is_allowed(self, ip_address: str) -> bool: """Check if a request from *ip_address* is allowed. Checks if the IP has accumulated failures that would currently block the attempt due to penalty backoff. Does NOT record a new attempt — that happens only on successful password verification. Args: ip_address: The client IP address to rate-limit. Returns: ``True`` if the request is allowed (past penalty period), ``False`` if currently blocked by exponential backoff. """ now = time() if ip_address not in self._failures: self._failures[ip_address] = deque() failures = self._failures[ip_address] cutoff = now - self.window_seconds # Remove old failures outside the window while failures and failures[0] < cutoff: failures.popleft() # If no recent failures, request is allowed if not failures: return True # Calculate accumulated penalty: how much time must pass before # the next attempt is allowed, based on failure count failure_count = len(failures) penalty = min( LOGIN_PENALTY_BASE_SECONDS * (LOGIN_PENALTY_MULTIPLIER ** failure_count), LOGIN_PENALTY_MAX_SECONDS, ) # Check if enough time has passed since the last failure time_since_last_failure = now - failures[-1] return time_since_last_failure >= penalty def cleanup_expired(self) -> None: """Remove all IPs with no recent failures (cleanup task). Called periodically by the background task to prevent unbounded growth of the tracking dictionary. """ now = time() cutoff = now - self.window_seconds ips_to_remove = [] for ip_address, failures in self._failures.items(): # Remove old failures while failures and failures[0] < cutoff: failures.popleft() # Mark IP for removal if no failures remain if not failures: ips_to_remove.append(ip_address) for ip_address in ips_to_remove: del self._failures[ip_address] if ips_to_remove: log.debug("rate_limiter_cleanup", removed_ips=len(ips_to_remove)) def get_state(self) -> Mapping[str, int]: """Return a read-only view of current failure counts per IP. For debugging and monitoring. Returns: A mapping of IP addresses to their failure counts. """ now = time() cutoff = now - self.window_seconds result = {} for ip_address, failures in self._failures.items(): # Count non-expired failures count = sum(1 for ts in failures if ts >= cutoff) if count > 0: result[ip_address] = count return result def reset(self) -> None: """Clear all tracked failures (for testing).""" self._failures.clear() # --------------------------------------------------------------------------- # Penalty strategy for failed login attempts # --------------------------------------------------------------------------- def record_failure(self, ip_address: str) -> None: """Record a failed login attempt. Tracks failures per IP to enable exponential backoff in is_allowed(). The penalty delay is automatically calculated in is_allowed() based on the failure count, providing transparent brute-force resistance. Args: ip_address: The client IP address whose login attempt failed. """ now = time() if ip_address not in self._failures: self._failures[ip_address] = deque() failures = self._failures[ip_address] cutoff = now - self.window_seconds # Remove old failures outside the window while failures and failures[0] < cutoff: failures.popleft() # Record this failure failures.append(now)