Normalise IP addresses across backend
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -9,6 +9,8 @@ from __future__ import annotations
|
||||
import ipaddress
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from app.utils.ip_utils import normalise_ip
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fastapi import Request
|
||||
|
||||
@@ -50,7 +52,7 @@ def get_client_ip(
|
||||
|
||||
# If no trusted proxies are configured, use immediate IP directly
|
||||
if not trusted_proxies:
|
||||
return immediate_ip
|
||||
return normalise_ip(immediate_ip)
|
||||
|
||||
# Check if the immediate connection is from a trusted proxy
|
||||
if not _is_trusted_proxy(immediate_ip, trusted_proxies):
|
||||
@@ -64,15 +66,15 @@ def get_client_ip(
|
||||
# Take the first IP in the list
|
||||
client_ip = forwarded_for.split(",")[0].strip()
|
||||
if client_ip:
|
||||
return client_ip
|
||||
return normalise_ip(client_ip)
|
||||
|
||||
# Fall back to X-Real-IP
|
||||
real_ip = request.headers.get("X-Real-IP", "").strip()
|
||||
if real_ip:
|
||||
return real_ip
|
||||
return normalise_ip(real_ip)
|
||||
|
||||
# No forwarded headers found, use immediate connection
|
||||
return immediate_ip
|
||||
return normalise_ip(immediate_ip)
|
||||
|
||||
|
||||
def _is_trusted_proxy(ip: str, trusted_proxies: list[str]) -> bool:
|
||||
|
||||
@@ -56,6 +56,7 @@ from app.utils.constants import (
|
||||
LOGIN_PENALTY_MAX_SECONDS,
|
||||
LOGIN_PENALTY_MULTIPLIER,
|
||||
)
|
||||
from app.utils.ip_utils import normalise_ip
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Mapping
|
||||
@@ -104,6 +105,7 @@ class RateLimiter:
|
||||
``True`` if the request is allowed (past penalty period), ``False``
|
||||
if currently blocked by exponential backoff.
|
||||
"""
|
||||
ip_address = normalise_ip(ip_address)
|
||||
now = time()
|
||||
|
||||
if ip_address not in self._failures:
|
||||
@@ -192,6 +194,7 @@ class RateLimiter:
|
||||
Args:
|
||||
ip_address: The client IP address whose login attempt failed.
|
||||
"""
|
||||
ip_address = normalise_ip(ip_address)
|
||||
now = time()
|
||||
|
||||
if ip_address not in self._failures:
|
||||
@@ -294,6 +297,7 @@ class GlobalRateLimiter:
|
||||
A tuple of (is_allowed, retry_after_seconds). If is_allowed is True,
|
||||
retry_after_seconds is 0. If False, it's the estimated time to wait.
|
||||
"""
|
||||
ip_address = normalise_ip(ip_address)
|
||||
now = time()
|
||||
|
||||
if ip_address not in self._requests:
|
||||
@@ -347,6 +351,7 @@ class GlobalRateLimiter:
|
||||
"""
|
||||
now = time()
|
||||
|
||||
ip_address = normalise_ip(ip_address)
|
||||
requests = self._get_bucket_deque(bucket, ip_address, max_requests, window_seconds)
|
||||
cutoff = now - window_seconds
|
||||
|
||||
|
||||
Reference in New Issue
Block a user