- Remove structlog dependency from backend/pyproject.toml - Add app.utils.logging_compat shim for keyword-arg logging API - Add app.utils.json_formatter for JSON log output with extra fields - Update all backend modules to use logging_compat.get_logger() - Update docstrings in log_sanitizer.py and json_formatter.py - Update test comment in test_async_utils.py - Record 406 failing tests in Docs/Tasks.md for tracking
97 lines
3.3 KiB
Python
97 lines
3.3 KiB
Python
"""DNS-rebinding protection for HTTP client connections.
|
|
|
|
This module provides a custom socket factory for aiohttp.TCPConnector that
|
|
validates resolved IP addresses before connection to prevent DNS-rebinding
|
|
attacks. A DNS-rebinding attack occurs when:
|
|
|
|
1. A blocklist URL is validated at create/update time (ip_utils.validate_blocklist_url)
|
|
which confirms it resolves to a public IP
|
|
2. The attacker's DNS server later responds with a different (private) IP
|
|
3. When the HTTP client connects, it reaches the private IP instead
|
|
|
|
The custom socket factory validates that every socket address is public
|
|
(not private or reserved) before the socket is created, closing the window
|
|
for DNS-rebinding attacks between validation time and actual connection time.
|
|
|
|
Reference:
|
|
https://cheatsheetseries.owasp.org/cheatsheets/Server_Side_Request_Forgery_Prevention_Cheat_Sheet.html
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ipaddress
|
|
import socket
|
|
from typing import TYPE_CHECKING
|
|
|
|
from app.utils.logging_compat import get_logger
|
|
|
|
from app.utils.ip_utils import is_private_ip
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Callable
|
|
|
|
log = get_logger(__name__)
|
|
|
|
|
|
def create_dns_validated_socket_factory() -> (
|
|
Callable[
|
|
[tuple[int | socket.AddressFamily, int | socket.SocketKind, int, str, tuple[str, int]]],
|
|
socket.socket,
|
|
]
|
|
):
|
|
"""Create a socket factory function that validates IP addresses before connection.
|
|
|
|
The returned factory checks that all resolved addresses are public before
|
|
creating the socket. This prevents DNS-rebinding attacks where a hostname
|
|
initially resolves to a public IP but later resolves to a private IP.
|
|
|
|
Returns:
|
|
A socket factory callable that validates IPs before socket creation.
|
|
"""
|
|
|
|
def validated_socket_factory(
|
|
address_info: tuple[int | socket.AddressFamily, int | socket.SocketKind, int, str, tuple[str, int]],
|
|
) -> socket.socket:
|
|
"""Create a socket after validating the target IP address.
|
|
|
|
Args:
|
|
address_info: Tuple of (family, socktype, proto, canonname, sockaddr)
|
|
where sockaddr is (ip, port) for IPv4 or (ip, port, flowinfo, scope_id)
|
|
for IPv6.
|
|
|
|
Returns:
|
|
A newly created socket.
|
|
|
|
Raises:
|
|
OSError: If the IP address is private/reserved or invalid.
|
|
"""
|
|
family, socktype, proto, canonname, sockaddr = address_info
|
|
|
|
# Extract IP from sockaddr tuple
|
|
ip_str: str = sockaddr[0]
|
|
|
|
try:
|
|
# Validate that the IP is public
|
|
if is_private_ip(ip_str):
|
|
log.warning(
|
|
"dns_rebinding_attempt_blocked",
|
|
resolved_ip=ip_str,
|
|
)
|
|
raise OSError(
|
|
f"DNS rebinding attack detected: resolved IP '{ip_str}' is "
|
|
"private/reserved. Blocklist URLs must resolve to public addresses."
|
|
)
|
|
except ipaddress.AddressValueError as exc:
|
|
log.error(
|
|
"dns_validation_invalid_ip",
|
|
ip_address=ip_str,
|
|
error=str(exc),
|
|
)
|
|
raise OSError(f"Invalid IP address: {ip_str}") from exc
|
|
|
|
# IP is valid and public, create the socket
|
|
sock = socket.socket(family, socktype, proto)
|
|
return sock
|
|
|
|
return validated_socket_factory
|