"""DNS-rebinding protection for HTTP client connections. This module provides a custom socket factory for aiohttp.TCPConnector that validates resolved IP addresses before connection to prevent DNS-rebinding attacks. A DNS-rebinding attack occurs when: 1. A blocklist URL is validated at create/update time (ip_utils.validate_blocklist_url) which confirms it resolves to a public IP 2. The attacker's DNS server later responds with a different (private) IP 3. When the HTTP client connects, it reaches the private IP instead The custom socket factory validates that every socket address is public (not private or reserved) before the socket is created, closing the window for DNS-rebinding attacks between validation time and actual connection time. Reference: https://cheatsheetseries.owasp.org/cheatsheets/Server_Side_Request_Forgery_Prevention_Cheat_Sheet.html """ from __future__ import annotations import ipaddress import socket from typing import TYPE_CHECKING from app.utils.logging_compat import get_logger from app.utils.ip_utils import is_private_ip if TYPE_CHECKING: from collections.abc import Callable log = get_logger(__name__) def create_dns_validated_socket_factory() -> ( Callable[ [tuple[int | socket.AddressFamily, int | socket.SocketKind, int, str, tuple[str, int]]], socket.socket, ] ): """Create a socket factory function that validates IP addresses before connection. The returned factory checks that all resolved addresses are public before creating the socket. This prevents DNS-rebinding attacks where a hostname initially resolves to a public IP but later resolves to a private IP. Returns: A socket factory callable that validates IPs before socket creation. """ def validated_socket_factory( address_info: tuple[int | socket.AddressFamily, int | socket.SocketKind, int, str, tuple[str, int]], ) -> socket.socket: """Create a socket after validating the target IP address. Args: address_info: Tuple of (family, socktype, proto, canonname, sockaddr) where sockaddr is (ip, port) for IPv4 or (ip, port, flowinfo, scope_id) for IPv6. Returns: A newly created socket. Raises: OSError: If the IP address is private/reserved or invalid. """ family, socktype, proto, canonname, sockaddr = address_info # Extract IP from sockaddr tuple ip_str: str = sockaddr[0] try: # Validate that the IP is public if is_private_ip(ip_str): log.warning( "dns_rebinding_attempt_blocked", resolved_ip=ip_str, ) raise OSError( f"DNS rebinding attack detected: resolved IP '{ip_str}' is " "private/reserved. Blocklist URLs must resolve to public addresses." ) except ipaddress.AddressValueError as exc: log.error( "dns_validation_invalid_ip", ip_address=ip_str, error=str(exc), ) raise OSError(f"Invalid IP address: {ip_str}") from exc # IP is valid and public, create the socket sock = socket.socket(family, socktype, proto) return sock return validated_socket_factory