Files
BanGUI/backend/app/utils/rate_limiter.py
Lukas 277f2a467c Refactor rate limiting with exponential backoff strategy
- Update rate limiter to use exponential backoff instead of fixed limit
- Implement progressive delays for failed login attempts (0.5s, 1s, 2s, 4s, 5s max)
- Update auth router documentation and endpoint docs
- Refactor test suite to match new rate limiting behavior
- Update backend development documentation
- Clean up unused tasks documentation

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-30 19:58:09 +02:00

209 lines
7.4 KiB
Python

"""In-memory rate limiter for IP-based request throttling.
Implements exponential backoff for failed login attempts using failure tracking.
Each wrong password attempt increments the failure count for that IP, and subsequent
attempts are blocked for a duration that grows exponentially up to a maximum.
Uses a dictionary of deques (per IP) storing timestamps of recent failures.
Old entries are cleaned up by a background task to prevent unbounded growth.
Process-local implementation — in multi-worker setups, each worker has
independent counters. This constraint limits the blast radius of brute-force
attacks to a single worker.
**How It Works:**
1. A successful login resets the failure counter for that IP.
2. Each failed login (wrong password) calls record_failure() and increments the counter.
3. is_allowed() checks if enough time has passed since the last failure based on
the current failure count. The delay grows exponentially with each consecutive failure:
- 1st failure: 0.5 second penalty
- 2nd failure: 1 second penalty (0.5 * 2^1)
- 3rd failure: 2 seconds penalty (0.5 * 2^2)
- 4th failure: 4 seconds penalty (0.5 * 2^3)
- ... up to the configured maximum (default 5 seconds)
4. Penalties are cumulative within the window: if an attacker makes 5 failed
attempts, they must wait the full 5 seconds before trying again (not 5 seconds
per attempt).
**Cleanup Lifecycle**: The rate limiter state (_failures) grows as IPs interact
with the system. To prevent unbounded memory growth during long runtimes, a
scheduled background task (rate_limiter_cleanup) calls cleanup_expired() every
30 minutes. This is safe because:
- cleanup_expired() only removes IPs with no recent failures (all timestamps
outside the rate-limit window), so active IPs are never disrupted.
- The cleanup is non-blocking and logged for observability.
- Individual requests already prune old timestamps from each IP's deque during
is_allowed() and record_failure(), so cleanup primarily handles dormant IPs.
For monitoring, check logs for "rate_limiter_cleanup" events to observe how
many IPs are being retired from memory each cleanup cycle.
"""
from __future__ import annotations
from collections import deque
from time import time
from typing import TYPE_CHECKING
import structlog
from app.utils.constants import (
LOGIN_PENALTY_BASE_SECONDS,
LOGIN_PENALTY_MAX_SECONDS,
LOGIN_PENALTY_MULTIPLIER,
)
if TYPE_CHECKING:
from collections.abc import Mapping
log: structlog.stdlib.BoundLogger = structlog.get_logger()
# 5 attempts per minute per IP (300 seconds)
DEFAULT_RATE_LIMIT_ATTEMPTS = 5
DEFAULT_RATE_LIMIT_WINDOW_SECONDS = 60
class RateLimiter:
"""Track and enforce request rate limits per IP address.
Stores attempt timestamps in per-IP deques, removing old entries
outside the rate limit window.
"""
def __init__(
self,
max_attempts: int = DEFAULT_RATE_LIMIT_ATTEMPTS,
window_seconds: int = DEFAULT_RATE_LIMIT_WINDOW_SECONDS,
) -> None:
"""Initialize the rate limiter.
Args:
max_attempts: Maximum attempts allowed within the window.
(Deprecated: now only used for cleanup window size)
window_seconds: Time window (seconds) for rate limit.
"""
self.max_attempts: int = max_attempts
self.window_seconds: int = window_seconds
self._failures: dict[str, deque[float]] = {}
def is_allowed(self, ip_address: str) -> bool:
"""Check if a request from *ip_address* is allowed.
Checks if the IP has accumulated failures that would currently block
the attempt due to penalty backoff. Does NOT record a new attempt —
that happens only on successful password verification.
Args:
ip_address: The client IP address to rate-limit.
Returns:
``True`` if the request is allowed (past penalty period), ``False``
if currently blocked by exponential backoff.
"""
now = time()
if ip_address not in self._failures:
self._failures[ip_address] = deque()
failures = self._failures[ip_address]
cutoff = now - self.window_seconds
# Remove old failures outside the window
while failures and failures[0] < cutoff:
failures.popleft()
# If no recent failures, request is allowed
if not failures:
return True
# Calculate accumulated penalty: how much time must pass before
# the next attempt is allowed, based on failure count
failure_count = len(failures)
penalty = min(
LOGIN_PENALTY_BASE_SECONDS * (LOGIN_PENALTY_MULTIPLIER ** failure_count),
LOGIN_PENALTY_MAX_SECONDS,
)
# Check if enough time has passed since the last failure
time_since_last_failure = now - failures[-1]
return time_since_last_failure >= penalty
def cleanup_expired(self) -> None:
"""Remove all IPs with no recent failures (cleanup task).
Called periodically by the background task to prevent unbounded
growth of the tracking dictionary.
"""
now = time()
cutoff = now - self.window_seconds
ips_to_remove = []
for ip_address, failures in self._failures.items():
# Remove old failures
while failures and failures[0] < cutoff:
failures.popleft()
# Mark IP for removal if no failures remain
if not failures:
ips_to_remove.append(ip_address)
for ip_address in ips_to_remove:
del self._failures[ip_address]
if ips_to_remove:
log.debug("rate_limiter_cleanup", removed_ips=len(ips_to_remove))
def get_state(self) -> Mapping[str, int]:
"""Return a read-only view of current failure counts per IP.
For debugging and monitoring.
Returns:
A mapping of IP addresses to their failure counts.
"""
now = time()
cutoff = now - self.window_seconds
result = {}
for ip_address, failures in self._failures.items():
# Count non-expired failures
count = sum(1 for ts in failures if ts >= cutoff)
if count > 0:
result[ip_address] = count
return result
def reset(self) -> None:
"""Clear all tracked failures (for testing)."""
self._failures.clear()
# ---------------------------------------------------------------------------
# Penalty strategy for failed login attempts
# ---------------------------------------------------------------------------
def record_failure(self, ip_address: str) -> None:
"""Record a failed login attempt.
Tracks failures per IP to enable exponential backoff in is_allowed().
The penalty delay is automatically calculated in is_allowed() based on
the failure count, providing transparent brute-force resistance.
Args:
ip_address: The client IP address whose login attempt failed.
"""
now = time()
if ip_address not in self._failures:
self._failures[ip_address] = deque()
failures = self._failures[ip_address]
cutoff = now - self.window_seconds
# Remove old failures outside the window
while failures and failures[0] < cutoff:
failures.popleft()
# Record this failure
failures.append(now)