116 lines
3.5 KiB
Python
116 lines
3.5 KiB
Python
"""Utilities for extracting client IP addresses from HTTP requests.
|
|
|
|
Handles X-Forwarded-For and X-Real-IP headers when behind a reverse proxy (nginx).
|
|
Only trusts these headers when the request comes from a known trusted proxy.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ipaddress
|
|
from typing import TYPE_CHECKING
|
|
|
|
from app.utils.ip_utils import normalise_ip
|
|
|
|
if TYPE_CHECKING:
|
|
from fastapi import Request
|
|
|
|
|
|
def get_client_ip(
|
|
request: Request, trusted_proxies: str | list[str] | None = None
|
|
) -> str:
|
|
"""Extract the client IP address from a request.
|
|
|
|
When the request comes from a trusted proxy, reads the real IP from
|
|
X-Forwarded-For or X-Real-IP headers. Otherwise returns the immediate
|
|
connection source (request.client.host).
|
|
|
|
X-Forwarded-For can be spoofed by the client, so we only trust it if
|
|
the request comes from a known proxy IP or CIDR range.
|
|
|
|
Args:
|
|
request: The incoming FastAPI request.
|
|
trusted_proxies: Optional list of trusted proxy IP addresses or CIDR ranges,
|
|
or a comma-separated string. If None, only uses request.client.host.
|
|
|
|
Returns:
|
|
The best-guess client IP address suitable for rate limiting.
|
|
"""
|
|
if not request.client:
|
|
return "0.0.0.0"
|
|
|
|
immediate_ip = request.client.host
|
|
|
|
# Normalize trusted_proxies to a list
|
|
if isinstance(trusted_proxies, str):
|
|
trusted_proxies = [
|
|
proxy.strip()
|
|
for proxy in trusted_proxies.split(",")
|
|
if proxy.strip()
|
|
]
|
|
elif trusted_proxies is None:
|
|
trusted_proxies = []
|
|
|
|
# If no trusted proxies are configured, use immediate IP directly
|
|
if not trusted_proxies:
|
|
return normalise_ip(immediate_ip)
|
|
|
|
# Check if the immediate connection is from a trusted proxy
|
|
if not _is_trusted_proxy(immediate_ip, trusted_proxies):
|
|
return immediate_ip
|
|
|
|
# Proxy is trusted, check for forwarded headers.
|
|
# X-Forwarded-For can contain multiple IPs (client, proxy1, proxy2).
|
|
# We use the leftmost (the original client).
|
|
forwarded_for = request.headers.get("X-Forwarded-For", "").strip()
|
|
if forwarded_for:
|
|
# Take the first IP in the list
|
|
client_ip = forwarded_for.split(",")[0].strip()
|
|
if client_ip:
|
|
return normalise_ip(client_ip)
|
|
|
|
# Fall back to X-Real-IP
|
|
real_ip = request.headers.get("X-Real-IP", "").strip()
|
|
if real_ip:
|
|
return normalise_ip(real_ip)
|
|
|
|
# No forwarded headers found, use immediate connection
|
|
return normalise_ip(immediate_ip)
|
|
|
|
|
|
def _is_trusted_proxy(ip: str, trusted_proxies: list[str]) -> bool:
|
|
"""Check if an IP is in the list of trusted proxies.
|
|
|
|
Supports both single IP addresses and CIDR ranges.
|
|
|
|
Args:
|
|
ip: The IP address to check.
|
|
trusted_proxies: List of trusted proxy IP addresses or CIDR ranges.
|
|
|
|
Returns:
|
|
True if the IP is trusted, False otherwise.
|
|
"""
|
|
try:
|
|
ip_obj = ipaddress.ip_address(ip)
|
|
except ValueError:
|
|
# Invalid IP format
|
|
return False
|
|
|
|
for proxy in trusted_proxies:
|
|
try:
|
|
# Try to parse as a CIDR network
|
|
network = ipaddress.ip_network(proxy, strict=False)
|
|
if ip_obj in network:
|
|
return True
|
|
except ValueError:
|
|
try:
|
|
# Fall back to single IP address
|
|
proxy_ip = ipaddress.ip_address(proxy)
|
|
if ip_obj == proxy_ip:
|
|
return True
|
|
except ValueError:
|
|
# Invalid proxy format, skip it
|
|
continue
|
|
|
|
return False
|
|
|