Refactor backend configuration and authentication
- Add comprehensive documentation for backend development - Improve client IP detection with utility functions and tests - Update auth router with better error handling - Refactor config module with environment-based settings Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -4,6 +4,7 @@ Follows pydantic-settings patterns: all values are prefixed with BANGUI_
|
||||
and validated at startup via the Settings singleton.
|
||||
"""
|
||||
|
||||
import ipaddress
|
||||
import shlex
|
||||
from typing import Literal
|
||||
|
||||
@@ -180,6 +181,62 @@ class Settings(BaseSettings):
|
||||
"In production, leave unset (defaults to false) to avoid exposing API schema."
|
||||
),
|
||||
)
|
||||
trusted_proxies: str | list[str] = Field(
|
||||
default_factory=list,
|
||||
description=(
|
||||
"Comma-separated list of trusted reverse proxy IP addresses or CIDR ranges. "
|
||||
"Only requests from these IPs/ranges are allowed to set X-Forwarded-For and X-Real-IP headers. "
|
||||
"Examples: '192.168.1.1' or '10.0.0.0/8' or '192.168.1.1,10.0.0.0/8'. "
|
||||
"Leave empty to disable proxy header forwarding (default). "
|
||||
"This is critical for correct client IP extraction behind reverse proxies like nginx."
|
||||
),
|
||||
)
|
||||
|
||||
@field_validator("trusted_proxies", mode="before")
|
||||
@classmethod
|
||||
def _normalize_trusted_proxies(cls, value: str | list[str] | None) -> list[str]:
|
||||
"""Normalize trusted_proxies from comma-separated string to list.
|
||||
|
||||
Args:
|
||||
value: A comma-separated string or list of trusted proxy IPs/CIDRs.
|
||||
|
||||
Returns:
|
||||
A list of normalized proxy IP/CIDR strings.
|
||||
"""
|
||||
if value is None:
|
||||
return []
|
||||
if isinstance(value, str):
|
||||
return [proxy.strip() for proxy in value.split(",") if proxy.strip()]
|
||||
return value
|
||||
|
||||
@field_validator("trusted_proxies", mode="after")
|
||||
@classmethod
|
||||
def _validate_trusted_proxies(cls, value: list[str]) -> list[str]:
|
||||
"""Validate trusted_proxies as valid IPs or CIDR ranges.
|
||||
|
||||
Args:
|
||||
value: A list of proxy IP addresses or CIDR ranges.
|
||||
|
||||
Returns:
|
||||
The validated list.
|
||||
|
||||
Raises:
|
||||
ValueError: If any item is not a valid IP address or CIDR range.
|
||||
"""
|
||||
for proxy in value:
|
||||
try:
|
||||
# Try to parse as a CIDR network first
|
||||
ipaddress.ip_network(proxy, strict=False)
|
||||
except ValueError:
|
||||
try:
|
||||
# Fall back to parsing as a single IP address
|
||||
ipaddress.ip_address(proxy)
|
||||
except ValueError as exc:
|
||||
raise ValueError(
|
||||
f"Invalid IP address or CIDR range: {proxy!r}. "
|
||||
f"Expected format: '192.168.1.1' or '10.0.0.0/8'"
|
||||
) from exc
|
||||
return value
|
||||
|
||||
@field_validator("fail2ban_start_command", mode="after")
|
||||
@classmethod
|
||||
@@ -222,4 +279,4 @@ def get_settings() -> Settings:
|
||||
A validated :class:`Settings` object. Raises :class:`pydantic.ValidationError`
|
||||
if required keys are absent or values fail validation.
|
||||
"""
|
||||
return Settings() # pydantic-settings populates required fields from env vars
|
||||
return Settings() # type: ignore[call-arg] # pydantic-settings populates required fields from env vars
|
||||
|
||||
@@ -41,10 +41,6 @@ log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
||||
|
||||
router = APIRouter(prefix="/api/auth", tags=["auth"])
|
||||
|
||||
# Trusted proxy IPs that can set X-Forwarded-For header.
|
||||
# By default, none are trusted. In production behind nginx, add the nginx container IP.
|
||||
_TRUSTED_PROXIES: list[str] = []
|
||||
|
||||
|
||||
@router.post(
|
||||
"/login",
|
||||
@@ -73,7 +69,7 @@ async def login(
|
||||
response: FastAPI response object used to set the cookie.
|
||||
request: The incoming HTTP request (used to extract client IP).
|
||||
session_ctx: Session service context containing db and repository.
|
||||
settings: Application settings (used for session duration).
|
||||
settings: Application settings (used for session duration and trusted proxies).
|
||||
rate_limiter: The login rate limiter (per IP).
|
||||
|
||||
Returns:
|
||||
@@ -83,7 +79,7 @@ async def login(
|
||||
AuthenticationError: if the password is incorrect.
|
||||
RateLimitError: if the rate limit is exceeded.
|
||||
"""
|
||||
client_ip = get_client_ip(request, trusted_proxies=_TRUSTED_PROXIES)
|
||||
client_ip = get_client_ip(request, trusted_proxies=settings.trusted_proxies)
|
||||
|
||||
if not rate_limiter.is_allowed(client_ip):
|
||||
log.warning("login_rate_limit_exceeded", client_ip=client_ip)
|
||||
|
||||
@@ -6,13 +6,16 @@ Only trusts these headers when the request comes from a known trusted proxy.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ipaddress
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fastapi import Request
|
||||
|
||||
|
||||
def get_client_ip(request: Request, trusted_proxies: list[str] | None = None) -> str:
|
||||
def get_client_ip(
|
||||
request: Request, trusted_proxies: str | list[str] | None = None
|
||||
) -> str:
|
||||
"""Extract the client IP address from a request.
|
||||
|
||||
When the request comes from a trusted proxy, reads the real IP from
|
||||
@@ -20,12 +23,12 @@ def get_client_ip(request: Request, trusted_proxies: list[str] | None = None) ->
|
||||
connection source (request.client.host).
|
||||
|
||||
X-Forwarded-For can be spoofed by the client, so we only trust it if
|
||||
the request comes from a known proxy IP.
|
||||
the request comes from a known proxy IP or CIDR range.
|
||||
|
||||
Args:
|
||||
request: The incoming FastAPI request.
|
||||
trusted_proxies: Optional list of trusted proxy IP addresses. If None,
|
||||
only uses request.client.host.
|
||||
trusted_proxies: Optional list of trusted proxy IP addresses or CIDR ranges,
|
||||
or a comma-separated string. If None, only uses request.client.host.
|
||||
|
||||
Returns:
|
||||
The best-guess client IP address suitable for rate limiting.
|
||||
@@ -34,10 +37,23 @@ def get_client_ip(request: Request, trusted_proxies: list[str] | None = None) ->
|
||||
return "0.0.0.0"
|
||||
|
||||
immediate_ip = request.client.host
|
||||
trusted_proxies = trusted_proxies or []
|
||||
|
||||
# If the immediate connection is not from a trusted proxy, use it directly.
|
||||
if immediate_ip not in trusted_proxies:
|
||||
# Normalize trusted_proxies to a list
|
||||
if isinstance(trusted_proxies, str):
|
||||
trusted_proxies = [
|
||||
proxy.strip()
|
||||
for proxy in trusted_proxies.split(",")
|
||||
if proxy.strip()
|
||||
]
|
||||
elif trusted_proxies is None:
|
||||
trusted_proxies = []
|
||||
|
||||
# If no trusted proxies are configured, use immediate IP directly
|
||||
if not trusted_proxies:
|
||||
return immediate_ip
|
||||
|
||||
# Check if the immediate connection is from a trusted proxy
|
||||
if not _is_trusted_proxy(immediate_ip, trusted_proxies):
|
||||
return immediate_ip
|
||||
|
||||
# Proxy is trusted, check for forwarded headers.
|
||||
@@ -57,3 +73,41 @@ def get_client_ip(request: Request, trusted_proxies: list[str] | None = None) ->
|
||||
|
||||
# No forwarded headers found, use immediate connection
|
||||
return immediate_ip
|
||||
|
||||
|
||||
def _is_trusted_proxy(ip: str, trusted_proxies: list[str]) -> bool:
|
||||
"""Check if an IP is in the list of trusted proxies.
|
||||
|
||||
Supports both single IP addresses and CIDR ranges.
|
||||
|
||||
Args:
|
||||
ip: The IP address to check.
|
||||
trusted_proxies: List of trusted proxy IP addresses or CIDR ranges.
|
||||
|
||||
Returns:
|
||||
True if the IP is trusted, False otherwise.
|
||||
"""
|
||||
try:
|
||||
ip_obj = ipaddress.ip_address(ip)
|
||||
except ValueError:
|
||||
# Invalid IP format
|
||||
return False
|
||||
|
||||
for proxy in trusted_proxies:
|
||||
try:
|
||||
# Try to parse as a CIDR network
|
||||
network = ipaddress.ip_network(proxy, strict=False)
|
||||
if ip_obj in network:
|
||||
return True
|
||||
except ValueError:
|
||||
try:
|
||||
# Fall back to single IP address
|
||||
proxy_ip = ipaddress.ip_address(proxy)
|
||||
if ip_obj == proxy_ip:
|
||||
return True
|
||||
except ValueError:
|
||||
# Invalid proxy format, skip it
|
||||
continue
|
||||
|
||||
return False
|
||||
|
||||
|
||||
@@ -158,3 +158,157 @@ def test_session_secret_error_message_includes_guidance() -> None:
|
||||
error_msg = str(exc_info.value)
|
||||
# Verify the error mentions the constraint
|
||||
assert "session_secret" in error_msg
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# trusted_proxies configuration tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_trusted_proxies_default_is_empty_list() -> None:
|
||||
"""By default, trusted_proxies is an empty list (no trusted proxies)."""
|
||||
settings = Settings(
|
||||
database_path="/tmp/test.db",
|
||||
fail2ban_socket="/tmp/fake_fail2ban.sock",
|
||||
fail2ban_config_dir="/tmp/fail2ban",
|
||||
session_secret="a" * 32,
|
||||
)
|
||||
assert settings.trusted_proxies == []
|
||||
|
||||
|
||||
def test_trusted_proxies_accepts_single_ip() -> None:
|
||||
"""Single IP address is accepted."""
|
||||
settings = Settings(
|
||||
database_path="/tmp/test.db",
|
||||
fail2ban_socket="/tmp/fake_fail2ban.sock",
|
||||
fail2ban_config_dir="/tmp/fail2ban",
|
||||
session_secret="a" * 32,
|
||||
trusted_proxies="192.168.1.1",
|
||||
)
|
||||
assert settings.trusted_proxies == ["192.168.1.1"]
|
||||
|
||||
|
||||
def test_trusted_proxies_accepts_single_cidr() -> None:
|
||||
"""Single CIDR range is accepted."""
|
||||
settings = Settings(
|
||||
database_path="/tmp/test.db",
|
||||
fail2ban_socket="/tmp/fake_fail2ban.sock",
|
||||
fail2ban_config_dir="/tmp/fail2ban",
|
||||
session_secret="a" * 32,
|
||||
trusted_proxies="10.0.0.0/8",
|
||||
)
|
||||
assert settings.trusted_proxies == ["10.0.0.0/8"]
|
||||
|
||||
|
||||
def test_trusted_proxies_accepts_comma_separated_list() -> None:
|
||||
"""Comma-separated list of IPs and CIDRs is accepted."""
|
||||
settings = Settings(
|
||||
database_path="/tmp/test.db",
|
||||
fail2ban_socket="/tmp/fake_fail2ban.sock",
|
||||
fail2ban_config_dir="/tmp/fail2ban",
|
||||
session_secret="a" * 32,
|
||||
trusted_proxies="192.168.1.1,10.0.0.0/8,172.16.0.0/12",
|
||||
)
|
||||
assert settings.trusted_proxies == ["192.168.1.1", "10.0.0.0/8", "172.16.0.0/12"]
|
||||
|
||||
|
||||
def test_trusted_proxies_accepts_list() -> None:
|
||||
"""List of IPs and CIDRs is accepted."""
|
||||
settings = Settings(
|
||||
database_path="/tmp/test.db",
|
||||
fail2ban_socket="/tmp/fake_fail2ban.sock",
|
||||
fail2ban_config_dir="/tmp/fail2ban",
|
||||
session_secret="a" * 32,
|
||||
trusted_proxies=["192.168.1.1", "10.0.0.0/8"],
|
||||
)
|
||||
assert settings.trusted_proxies == ["192.168.1.1", "10.0.0.0/8"]
|
||||
|
||||
|
||||
def test_trusted_proxies_strips_whitespace() -> None:
|
||||
"""Whitespace around IPs is stripped."""
|
||||
settings = Settings(
|
||||
database_path="/tmp/test.db",
|
||||
fail2ban_socket="/tmp/fake_fail2ban.sock",
|
||||
fail2ban_config_dir="/tmp/fail2ban",
|
||||
session_secret="a" * 32,
|
||||
trusted_proxies=" 192.168.1.1 , 10.0.0.0/8 ",
|
||||
)
|
||||
assert settings.trusted_proxies == ["192.168.1.1", "10.0.0.0/8"]
|
||||
|
||||
|
||||
def test_trusted_proxies_rejects_invalid_ip() -> None:
|
||||
"""Invalid IP address is rejected."""
|
||||
with pytest.raises(ValidationError) as exc_info:
|
||||
Settings(
|
||||
database_path="/tmp/test.db",
|
||||
fail2ban_socket="/tmp/fake_fail2ban.sock",
|
||||
fail2ban_config_dir="/tmp/fail2ban",
|
||||
session_secret="a" * 32,
|
||||
trusted_proxies="not-an-ip",
|
||||
)
|
||||
error_msg = str(exc_info.value)
|
||||
assert "trusted_proxies" in error_msg or "Invalid IP" in error_msg
|
||||
|
||||
|
||||
def test_trusted_proxies_rejects_invalid_cidr() -> None:
|
||||
"""Invalid CIDR range is rejected."""
|
||||
with pytest.raises(ValidationError) as exc_info:
|
||||
Settings(
|
||||
database_path="/tmp/test.db",
|
||||
fail2ban_socket="/tmp/fake_fail2ban.sock",
|
||||
fail2ban_config_dir="/tmp/fail2ban",
|
||||
session_secret="a" * 32,
|
||||
trusted_proxies="10.0.0.0/33", # Invalid - /33 is out of range for IPv4
|
||||
)
|
||||
error_msg = str(exc_info.value)
|
||||
assert "trusted_proxies" in error_msg or "Invalid IP" in error_msg
|
||||
|
||||
|
||||
def test_trusted_proxies_rejects_one_invalid_in_list() -> None:
|
||||
"""One invalid IP in a list causes entire list to be rejected."""
|
||||
with pytest.raises(ValidationError) as exc_info:
|
||||
Settings(
|
||||
database_path="/tmp/test.db",
|
||||
fail2ban_socket="/tmp/fake_fail2ban.sock",
|
||||
fail2ban_config_dir="/tmp/fail2ban",
|
||||
session_secret="a" * 32,
|
||||
trusted_proxies="192.168.1.1,invalid-ip,10.0.0.0/8",
|
||||
)
|
||||
error_msg = str(exc_info.value)
|
||||
assert "trusted_proxies" in error_msg or "Invalid IP" in error_msg
|
||||
|
||||
|
||||
def test_trusted_proxies_accepts_ipv6_address() -> None:
|
||||
"""IPv6 address is accepted."""
|
||||
settings = Settings(
|
||||
database_path="/tmp/test.db",
|
||||
fail2ban_socket="/tmp/fake_fail2ban.sock",
|
||||
fail2ban_config_dir="/tmp/fail2ban",
|
||||
session_secret="a" * 32,
|
||||
trusted_proxies="2001:db8::1",
|
||||
)
|
||||
assert settings.trusted_proxies == ["2001:db8::1"]
|
||||
|
||||
|
||||
def test_trusted_proxies_accepts_ipv6_cidr() -> None:
|
||||
"""IPv6 CIDR range is accepted."""
|
||||
settings = Settings(
|
||||
database_path="/tmp/test.db",
|
||||
fail2ban_socket="/tmp/fake_fail2ban.sock",
|
||||
fail2ban_config_dir="/tmp/fail2ban",
|
||||
session_secret="a" * 32,
|
||||
trusted_proxies="2001:db8::/32",
|
||||
)
|
||||
assert settings.trusted_proxies == ["2001:db8::/32"]
|
||||
|
||||
|
||||
def test_trusted_proxies_accepts_mixed_ipv4_and_ipv6() -> None:
|
||||
"""Mixed IPv4 and IPv6 addresses and ranges are accepted."""
|
||||
settings = Settings(
|
||||
database_path="/tmp/test.db",
|
||||
fail2ban_socket="/tmp/fake_fail2ban.sock",
|
||||
fail2ban_config_dir="/tmp/fail2ban",
|
||||
session_secret="a" * 32,
|
||||
trusted_proxies="192.168.1.0/24,2001:db8::/32,10.0.0.1",
|
||||
)
|
||||
assert settings.trusted_proxies == ["192.168.1.0/24", "2001:db8::/32", "10.0.0.1"]
|
||||
|
||||
257
backend/tests/test_utils/test_client_ip.py
Normal file
257
backend/tests/test_utils/test_client_ip.py
Normal file
@@ -0,0 +1,257 @@
|
||||
"""Tests for client IP extraction with proxy support."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from app.utils.client_ip import _is_trusted_proxy, get_client_ip
|
||||
|
||||
|
||||
class TestGetClientIp:
|
||||
"""Tests for get_client_ip function."""
|
||||
|
||||
def test_returns_immediate_ip_when_no_trusted_proxies(self) -> None:
|
||||
"""When no trusted proxies are configured, returns immediate IP."""
|
||||
request = MagicMock()
|
||||
request.client.host = "192.168.1.100"
|
||||
request.headers.get.return_value = ""
|
||||
|
||||
result = get_client_ip(request, trusted_proxies=None)
|
||||
|
||||
assert result == "192.168.1.100"
|
||||
|
||||
def test_returns_immediate_ip_when_trusted_proxies_empty(self) -> None:
|
||||
"""When trusted_proxies list is empty, returns immediate IP."""
|
||||
request = MagicMock()
|
||||
request.client.host = "192.168.1.100"
|
||||
request.headers.get.return_value = ""
|
||||
|
||||
result = get_client_ip(request, trusted_proxies=[])
|
||||
|
||||
assert result == "192.168.1.100"
|
||||
|
||||
def test_returns_immediate_ip_when_not_from_proxy(self) -> None:
|
||||
"""When immediate IP is not a trusted proxy, ignores forwarded headers."""
|
||||
request = MagicMock()
|
||||
request.client.host = "203.0.113.1" # random public IP
|
||||
request.headers.get = MagicMock(side_effect=lambda key, default="": {
|
||||
"X-Forwarded-For": "192.168.1.50",
|
||||
"X-Real-IP": "192.168.1.60",
|
||||
}.get(key, default))
|
||||
|
||||
result = get_client_ip(request, trusted_proxies=["10.0.0.1"])
|
||||
|
||||
assert result == "203.0.113.1"
|
||||
|
||||
def test_returns_forwarded_for_when_from_trusted_ip(self) -> None:
|
||||
"""When from trusted proxy IP, extracts X-Forwarded-For header."""
|
||||
request = MagicMock()
|
||||
request.client.host = "10.0.0.1" # trusted proxy IP
|
||||
request.headers.get = MagicMock(side_effect=lambda key, default="": {
|
||||
"X-Forwarded-For": "203.0.113.100",
|
||||
"X-Real-IP": "203.0.113.200",
|
||||
}.get(key, default))
|
||||
|
||||
result = get_client_ip(request, trusted_proxies=["10.0.0.1"])
|
||||
|
||||
assert result == "203.0.113.100"
|
||||
|
||||
def test_returns_first_ip_from_forwarded_for_chain(self) -> None:
|
||||
"""When X-Forwarded-For has multiple IPs, uses the first (leftmost)."""
|
||||
request = MagicMock()
|
||||
request.client.host = "10.0.0.1"
|
||||
request.headers.get = MagicMock(side_effect=lambda key, default="": {
|
||||
"X-Forwarded-For": "203.0.113.100, 10.0.0.2, 10.0.0.3",
|
||||
}.get(key, default))
|
||||
|
||||
result = get_client_ip(request, trusted_proxies=["10.0.0.1"])
|
||||
|
||||
assert result == "203.0.113.100"
|
||||
|
||||
def test_returns_real_ip_when_forwarded_for_missing(self) -> None:
|
||||
"""Falls back to X-Real-IP when X-Forwarded-For is absent."""
|
||||
request = MagicMock()
|
||||
request.client.host = "10.0.0.1"
|
||||
request.headers.get = MagicMock(side_effect=lambda key, default="": {
|
||||
"X-Real-IP": "203.0.113.200",
|
||||
}.get(key, default))
|
||||
|
||||
result = get_client_ip(request, trusted_proxies=["10.0.0.1"])
|
||||
|
||||
assert result == "203.0.113.200"
|
||||
|
||||
def test_returns_immediate_ip_when_no_forwarded_headers(self) -> None:
|
||||
"""When proxy is trusted but no forwarded headers, returns immediate IP."""
|
||||
request = MagicMock()
|
||||
request.client.host = "10.0.0.1"
|
||||
request.headers.get.return_value = ""
|
||||
|
||||
result = get_client_ip(request, trusted_proxies=["10.0.0.1"])
|
||||
|
||||
assert result == "10.0.0.1"
|
||||
|
||||
def test_returns_default_when_no_client_info(self) -> None:
|
||||
"""When request.client is None, returns default IP."""
|
||||
request = MagicMock()
|
||||
request.client = None
|
||||
|
||||
result = get_client_ip(request)
|
||||
|
||||
assert result == "0.0.0.0"
|
||||
|
||||
def test_strips_whitespace_from_headers(self) -> None:
|
||||
"""Whitespace in headers is properly stripped."""
|
||||
request = MagicMock()
|
||||
request.client.host = "10.0.0.1"
|
||||
request.headers.get = MagicMock(side_effect=lambda key, default="": {
|
||||
"X-Forwarded-For": " 203.0.113.100 ",
|
||||
}.get(key, default))
|
||||
|
||||
result = get_client_ip(request, trusted_proxies=["10.0.0.1"])
|
||||
|
||||
assert result == "203.0.113.100"
|
||||
|
||||
def test_trusts_ip_in_cidr_range(self) -> None:
|
||||
"""Trusts proxy when its IP falls within configured CIDR range."""
|
||||
request = MagicMock()
|
||||
request.client.host = "10.0.0.50" # within 10.0.0.0/8
|
||||
request.headers.get = MagicMock(side_effect=lambda key, default="": {
|
||||
"X-Forwarded-For": "203.0.113.100",
|
||||
}.get(key, default))
|
||||
|
||||
result = get_client_ip(request, trusted_proxies=["10.0.0.0/8"])
|
||||
|
||||
assert result == "203.0.113.100"
|
||||
|
||||
def test_rejects_ip_outside_cidr_range(self) -> None:
|
||||
"""Rejects proxy when its IP is outside configured CIDR range."""
|
||||
request = MagicMock()
|
||||
request.client.host = "192.168.1.1" # outside 10.0.0.0/8
|
||||
request.headers.get = MagicMock(side_effect=lambda key, default="": {
|
||||
"X-Forwarded-For": "203.0.113.100",
|
||||
}.get(key, default))
|
||||
|
||||
result = get_client_ip(request, trusted_proxies=["10.0.0.0/8"])
|
||||
|
||||
assert result == "192.168.1.1"
|
||||
|
||||
def test_handles_multiple_trusted_proxies_and_ranges(self) -> None:
|
||||
"""Handles mix of individual IPs and CIDR ranges."""
|
||||
request = MagicMock()
|
||||
request.client.host = "10.0.0.50"
|
||||
request.headers.get = MagicMock(side_effect=lambda key, default="": {
|
||||
"X-Forwarded-For": "203.0.113.100",
|
||||
}.get(key, default))
|
||||
|
||||
# Multiple trusted proxies with CIDR ranges
|
||||
result = get_client_ip(
|
||||
request,
|
||||
trusted_proxies=["192.168.1.1", "10.0.0.0/8", "172.16.0.0/12"],
|
||||
)
|
||||
|
||||
assert result == "203.0.113.100"
|
||||
|
||||
def test_handles_ipv6_addresses(self) -> None:
|
||||
"""Handles IPv6 proxy addresses and CIDR ranges."""
|
||||
request = MagicMock()
|
||||
request.client.host = "2001:db8::1"
|
||||
request.headers.get = MagicMock(side_effect=lambda key, default="": {
|
||||
"X-Forwarded-For": "203.0.113.100",
|
||||
}.get(key, default))
|
||||
|
||||
result = get_client_ip(request, trusted_proxies=["2001:db8::/32"])
|
||||
|
||||
assert result == "203.0.113.100"
|
||||
|
||||
def test_handles_mixed_ipv4_and_ipv6(self) -> None:
|
||||
"""Handles IPv4 and IPv6 in the same trusted_proxies list."""
|
||||
request = MagicMock()
|
||||
request.client.host = "2001:db8::50"
|
||||
request.headers.get = MagicMock(side_effect=lambda key, default="": {
|
||||
"X-Forwarded-For": "203.0.113.100",
|
||||
}.get(key, default))
|
||||
|
||||
result = get_client_ip(
|
||||
request,
|
||||
trusted_proxies=["192.168.1.0/24", "2001:db8::/32"],
|
||||
)
|
||||
|
||||
assert result == "203.0.113.100"
|
||||
|
||||
|
||||
class TestIsTrustedProxy:
|
||||
"""Tests for _is_trusted_proxy helper function."""
|
||||
|
||||
def test_matches_exact_ip(self) -> None:
|
||||
"""Exact IP match is recognized."""
|
||||
assert _is_trusted_proxy("192.168.1.1", ["192.168.1.1"])
|
||||
|
||||
def test_rejects_different_ip(self) -> None:
|
||||
"""Different IP is not recognized."""
|
||||
assert not _is_trusted_proxy("192.168.1.2", ["192.168.1.1"])
|
||||
|
||||
def test_matches_ip_in_cidr_range(self) -> None:
|
||||
"""IP within CIDR range is recognized."""
|
||||
assert _is_trusted_proxy("10.0.0.50", ["10.0.0.0/8"])
|
||||
assert _is_trusted_proxy("10.255.255.255", ["10.0.0.0/8"])
|
||||
|
||||
def test_rejects_ip_outside_cidr_range(self) -> None:
|
||||
"""IP outside CIDR range is not recognized."""
|
||||
assert not _is_trusted_proxy("11.0.0.1", ["10.0.0.0/8"])
|
||||
assert not _is_trusted_proxy("9.255.255.255", ["10.0.0.0/8"])
|
||||
|
||||
def test_handles_multiple_ranges(self) -> None:
|
||||
"""Checks against all ranges, returns True on first match."""
|
||||
result = _is_trusted_proxy(
|
||||
"192.168.1.50",
|
||||
["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"],
|
||||
)
|
||||
assert result is True
|
||||
|
||||
def test_returns_false_when_no_match_in_multiple_ranges(self) -> None:
|
||||
"""Returns False when IP doesn't match any range."""
|
||||
result = _is_trusted_proxy(
|
||||
"203.0.113.1",
|
||||
["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"],
|
||||
)
|
||||
assert result is False
|
||||
|
||||
def test_handles_ipv6_single_address(self) -> None:
|
||||
"""IPv6 single address matching."""
|
||||
assert _is_trusted_proxy("2001:db8::1", ["2001:db8::1"])
|
||||
assert not _is_trusted_proxy("2001:db8::2", ["2001:db8::1"])
|
||||
|
||||
def test_handles_ipv6_cidr_range(self) -> None:
|
||||
"""IPv6 CIDR range matching."""
|
||||
assert _is_trusted_proxy("2001:db8::50", ["2001:db8::/32"])
|
||||
assert _is_trusted_proxy("2001:db8:ffff:ffff:ffff:ffff:ffff:ffff", ["2001:db8::/32"])
|
||||
assert not _is_trusted_proxy("2001:db9::1", ["2001:db8::/32"])
|
||||
|
||||
def test_returns_false_for_invalid_ip(self) -> None:
|
||||
"""Invalid IP format returns False."""
|
||||
assert not _is_trusted_proxy("not-an-ip", ["192.168.1.0/24"])
|
||||
|
||||
def test_skips_invalid_trusted_proxies(self) -> None:
|
||||
"""Invalid entries in trusted_proxies list are skipped."""
|
||||
# Should not crash and should check valid entries
|
||||
result = _is_trusted_proxy(
|
||||
"192.168.1.1",
|
||||
["invalid", "192.168.1.0/24", "also-invalid"],
|
||||
)
|
||||
assert result is True
|
||||
|
||||
def test_empty_trusted_proxies_list(self) -> None:
|
||||
"""Empty trusted_proxies list always returns False."""
|
||||
assert not _is_trusted_proxy("192.168.1.1", [])
|
||||
|
||||
def test_handles_subnet_boundary_cases(self) -> None:
|
||||
"""Handles edge cases like /32 (single IP) and /0 (all IPs)."""
|
||||
# /32 is single IP
|
||||
assert _is_trusted_proxy("10.0.0.1", ["10.0.0.1/32"])
|
||||
assert not _is_trusted_proxy("10.0.0.2", ["10.0.0.1/32"])
|
||||
|
||||
# /0 should match any IPv4
|
||||
assert _is_trusted_proxy("192.168.1.1", ["0.0.0.0/0"])
|
||||
assert _is_trusted_proxy("203.0.113.100", ["0.0.0.0/0"])
|
||||
Reference in New Issue
Block a user