"""IP address and CIDR range validation and normalisation utilities. All IP handling in BanGUI goes through these helpers to enforce consistency and prevent malformed addresses from reaching fail2ban. """ import asyncio import ipaddress import socket from urllib.parse import urlparse def is_valid_ip(address: str) -> bool: """Return ``True`` if *address* is a valid IPv4 or IPv6 address. Args: address: The string to validate. Returns: ``True`` if the string represents a valid IP address, ``False`` otherwise. """ try: ipaddress.ip_address(address) return True except ValueError: return False def is_valid_network(cidr: str) -> bool: """Return ``True`` if *cidr* is a valid IPv4 or IPv6 network in CIDR notation. Args: cidr: The string to validate, e.g. ``"192.168.0.0/24"``. Returns: ``True`` if the string is a valid CIDR network, ``False`` otherwise. """ try: ipaddress.ip_network(cidr, strict=False) return True except ValueError: return False def is_valid_ip_or_network(value: str) -> bool: """Return ``True`` if *value* is a valid IP address or CIDR network. Args: value: The string to validate. Returns: ``True`` if the string is a valid IP address or CIDR range. """ return is_valid_ip(value) or is_valid_network(value) def normalise_ip(address: str) -> str: """Return a normalised string representation of an IP address. IPv6 addresses are compressed to their canonical short form. IPv4 addresses are returned unchanged. Args: address: A valid IP address string. Returns: Normalised IP address string. Raises: ValueError: If *address* is not a valid IP address. """ return str(ipaddress.ip_address(address)) def normalise_network(cidr: str) -> str: """Return a normalised string representation of a CIDR network. Host bits are masked to produce the network address. Args: cidr: A valid CIDR network string, e.g. ``"192.168.1.5/24"``. Returns: Normalised network string, e.g. ``"192.168.1.0/24"``. Raises: ValueError: If *cidr* is not a valid network. """ return str(ipaddress.ip_network(cidr, strict=False)) def ip_version(address: str) -> int: """Return 4 or 6 depending on the IP version of *address*. Args: address: A valid IP address string. Returns: ``4`` for IPv4, ``6`` for IPv6. Raises: ValueError: If *address* is not a valid IP address. """ return ipaddress.ip_address(address).version def is_private_ip(address: str) -> bool: """Return ``True`` if *address* is a private or reserved IP address. Private ranges include: - RFC 1918: 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16 - Loopback: 127.0.0.0/8 (IPv4), ::1/128 (IPv6) - Link-local: 169.254.0.0/16 (IPv4), fe80::/10 (IPv6) - IPv6 ULA: fc00::/7 - Multicast and other reserved ranges Args: address: A valid IP address string. Returns: ``True`` if the address is private or reserved, ``False`` if it is public. Raises: ValueError: If *address* is not a valid IP address. """ ip = ipaddress.ip_address(address) return ( ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_reserved ) async def validate_blocklist_url(url: str) -> None: """Validate that a blocklist URL points to a public HTTP(S) endpoint. Checks that: - The URL uses HTTP or HTTPS scheme - The hostname resolves to a public (non-private, non-reserved) IP address - IPv4-mapped IPv6 addresses are checked against IPv4 private ranges Performs DNS resolution asynchronously to check the resolved IP. This is a point-in-time check; the application uses a DNS-validated HTTP connector that performs runtime re-validation at connection time to prevent DNS-rebinding attacks where the same hostname resolves to a different (private) IP address after this initial validation. Args: url: The blocklist URL to validate. Raises: ValueError: If the URL has an invalid scheme, hostname cannot be resolved, or the resolved IP is private/reserved. """ try: parsed = urlparse(url) except Exception as exc: raise ValueError(f"Invalid URL format: {exc}") from exc if parsed.scheme not in ("http", "https"): raise ValueError( f"Invalid scheme '{parsed.scheme}': only http and https are allowed" ) if not parsed.hostname: raise ValueError("URL has no hostname") hostname = parsed.hostname try: loop = asyncio.get_event_loop() addrinfo = await loop.run_in_executor( None, socket.getaddrinfo, hostname, parsed.port or 80, socket.AF_UNSPEC, socket.SOCK_STREAM, ) except socket.gaierror as exc: raise ValueError(f"Cannot resolve hostname '{hostname}': {exc}") from exc except Exception as exc: raise ValueError(f"DNS resolution error for '{hostname}': {exc}") from exc if not addrinfo: raise ValueError(f"No address resolved for hostname '{hostname}'") for family, socktype, proto, canonname, sockaddr in addrinfo: ip_str: str = sockaddr[0] # type: ignore[assignment] try: if is_private_ip(ip_str): raise ValueError( f"Hostname '{hostname}' resolves to private/reserved IP: {ip_str}" ) except ipaddress.AddressValueError as exc: raise ValueError(f"Invalid IP address: {ip_str}") from exc