Files
BanGUI/backend/app/utils/ip_utils.py
Lukas cc4370c50d feat: Add runtime DNS-rebinding protection for blocklist HTTP connections
## Problem
The blocklist URL validation at create/update time has a TOCTOU (time-of-check-to-time-of-use) window.
An attacker can perform a DNS-rebinding attack where:
1. User adds blocklist URL pointing to attacker.com
2. At create time, attacker.com resolves to a public IP → validation passes
3. Later, when fetching, attacker.com resolves to 192.168.1.1 (internal network)
4. HTTP client connects to the private IP, potentially accessing internal services

## Solution
Add runtime destination IP validation at connection time via a custom socket factory:

- Created 'dns_validated_connector.py' with create_dns_validated_socket_factory() that validates
  all resolved IPs before socket creation
- HTTP session now uses the validated socket factory, protecting all blocklist imports globally
- Rejects connections to RFC 1918 private ranges, loopback, link-local, ULA, multicast, and
  reserved addresses (IPv4 and IPv6)
- Added comprehensive test coverage with 13 test cases

## Changes
- backend/app/services/dns_validated_connector.py: Custom socket factory with IP validation
- backend/app/startup.py: Use DNS-validated socket factory in HTTP session creation
- backend/app/utils/ip_utils.py: Updated docstring explaining runtime validation
- backend/app/services/blocklist_downloader.py: Updated module docstring
- backend/app/services/blocklist_service.py: Updated docstrings explaining two-layer protection
- backend/tests/test_services/test_dns_validated_connector.py: Test suite for socket factory
- Docs/Architekture.md: Added detailed section on DNS-rebinding protection

## Testing
- All 13 DNS validation tests pass
- All blocklist downloader tests pass (unaffected by changes)
- Linting: ruff, mypy pass with --strict
- Test coverage: 90% line coverage on dns_validated_connector.py

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-29 19:10:51 +02:00

200 lines
5.7 KiB
Python

"""IP address and CIDR range validation and normalisation utilities.
All IP handling in BanGUI goes through these helpers to enforce consistency
and prevent malformed addresses from reaching fail2ban.
"""
import asyncio
import ipaddress
import socket
from urllib.parse import urlparse
def is_valid_ip(address: str) -> bool:
"""Return ``True`` if *address* is a valid IPv4 or IPv6 address.
Args:
address: The string to validate.
Returns:
``True`` if the string represents a valid IP address, ``False`` otherwise.
"""
try:
ipaddress.ip_address(address)
return True
except ValueError:
return False
def is_valid_network(cidr: str) -> bool:
"""Return ``True`` if *cidr* is a valid IPv4 or IPv6 network in CIDR notation.
Args:
cidr: The string to validate, e.g. ``"192.168.0.0/24"``.
Returns:
``True`` if the string is a valid CIDR network, ``False`` otherwise.
"""
try:
ipaddress.ip_network(cidr, strict=False)
return True
except ValueError:
return False
def is_valid_ip_or_network(value: str) -> bool:
"""Return ``True`` if *value* is a valid IP address or CIDR network.
Args:
value: The string to validate.
Returns:
``True`` if the string is a valid IP address or CIDR range.
"""
return is_valid_ip(value) or is_valid_network(value)
def normalise_ip(address: str) -> str:
"""Return a normalised string representation of an IP address.
IPv6 addresses are compressed to their canonical short form.
IPv4 addresses are returned unchanged.
Args:
address: A valid IP address string.
Returns:
Normalised IP address string.
Raises:
ValueError: If *address* is not a valid IP address.
"""
return str(ipaddress.ip_address(address))
def normalise_network(cidr: str) -> str:
"""Return a normalised string representation of a CIDR network.
Host bits are masked to produce the network address.
Args:
cidr: A valid CIDR network string, e.g. ``"192.168.1.5/24"``.
Returns:
Normalised network string, e.g. ``"192.168.1.0/24"``.
Raises:
ValueError: If *cidr* is not a valid network.
"""
return str(ipaddress.ip_network(cidr, strict=False))
def ip_version(address: str) -> int:
"""Return 4 or 6 depending on the IP version of *address*.
Args:
address: A valid IP address string.
Returns:
``4`` for IPv4, ``6`` for IPv6.
Raises:
ValueError: If *address* is not a valid IP address.
"""
return ipaddress.ip_address(address).version
def is_private_ip(address: str) -> bool:
"""Return ``True`` if *address* is a private or reserved IP address.
Private ranges include:
- RFC 1918: 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16
- Loopback: 127.0.0.0/8 (IPv4), ::1/128 (IPv6)
- Link-local: 169.254.0.0/16 (IPv4), fe80::/10 (IPv6)
- IPv6 ULA: fc00::/7
- Multicast and other reserved ranges
Args:
address: A valid IP address string.
Returns:
``True`` if the address is private or reserved, ``False`` if it is public.
Raises:
ValueError: If *address* is not a valid IP address.
"""
ip = ipaddress.ip_address(address)
return (
ip.is_private
or ip.is_loopback
or ip.is_link_local
or ip.is_multicast
or ip.is_reserved
)
async def validate_blocklist_url(url: str) -> None:
"""Validate that a blocklist URL points to a public HTTP(S) endpoint.
Checks that:
- The URL uses HTTP or HTTPS scheme
- The hostname resolves to a public (non-private, non-reserved) IP address
- IPv4-mapped IPv6 addresses are checked against IPv4 private ranges
Performs DNS resolution asynchronously to check the resolved IP. This is a
point-in-time check; the application uses a DNS-validated HTTP connector
that performs runtime re-validation at connection time to prevent DNS-rebinding
attacks where the same hostname resolves to a different (private) IP address
after this initial validation.
Args:
url: The blocklist URL to validate.
Raises:
ValueError: If the URL has an invalid scheme, hostname cannot be resolved,
or the resolved IP is private/reserved.
"""
try:
parsed = urlparse(url)
except Exception as exc:
raise ValueError(f"Invalid URL format: {exc}") from exc
if parsed.scheme not in ("http", "https"):
raise ValueError(
f"Invalid scheme '{parsed.scheme}': only http and https are allowed"
)
if not parsed.hostname:
raise ValueError("URL has no hostname")
hostname = parsed.hostname
try:
loop = asyncio.get_event_loop()
addrinfo = await loop.run_in_executor(
None,
socket.getaddrinfo,
hostname,
parsed.port or 80,
socket.AF_UNSPEC,
socket.SOCK_STREAM,
)
except socket.gaierror as exc:
raise ValueError(f"Cannot resolve hostname '{hostname}': {exc}") from exc
except Exception as exc:
raise ValueError(f"DNS resolution error for '{hostname}': {exc}") from exc
if not addrinfo:
raise ValueError(f"No address resolved for hostname '{hostname}'")
for family, socktype, proto, canonname, sockaddr in addrinfo:
ip_str: str = sockaddr[0] # type: ignore[assignment]
try:
if is_private_ip(ip_str):
raise ValueError(
f"Hostname '{hostname}' resolves to private/reserved IP: {ip_str}"
)
except ipaddress.AddressValueError as exc:
raise ValueError(f"Invalid IP address: {ip_str}") from exc