- Remove structlog dependency from backend/pyproject.toml - Add app.utils.logging_compat shim for keyword-arg logging API - Add app.utils.json_formatter for JSON log output with extra fields - Update all backend modules to use logging_compat.get_logger() - Update docstrings in log_sanitizer.py and json_formatter.py - Update test comment in test_async_utils.py - Record 406 failing tests in Docs/Tasks.md for tracking
114 lines
3.4 KiB
Python
114 lines
3.4 KiB
Python
"""Blocklist parser and validator component.
|
|
|
|
Parses blocklist text content and validates individual entries as IP addresses
|
|
or CIDR networks. Separates valid IPs from invalid/CIDR entries.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from app.utils.logging_compat import get_logger
|
|
|
|
from app.utils.ip_utils import is_valid_ip, is_valid_network, normalise_ip
|
|
|
|
log = get_logger(__name__)
|
|
|
|
|
|
class ParsedBlocklist:
|
|
"""Result of parsing a blocklist text."""
|
|
|
|
def __init__(
|
|
self,
|
|
valid_ips: list[str],
|
|
skipped_entries: int,
|
|
) -> None:
|
|
"""Initialize parsed result.
|
|
|
|
Args:
|
|
valid_ips: List of valid individual IP addresses.
|
|
skipped_entries: Count of skipped/invalid entries (comments, CIDRs, malformed).
|
|
"""
|
|
self.valid_ips = valid_ips
|
|
self.skipped_entries = skipped_entries
|
|
|
|
@property
|
|
def total_entries(self) -> int:
|
|
"""Total number of entries processed."""
|
|
return len(self.valid_ips) + self.skipped_entries
|
|
|
|
|
|
class BlocklistParser:
|
|
"""Parses and validates blocklist text content."""
|
|
|
|
@staticmethod
|
|
def parse(content: str) -> ParsedBlocklist:
|
|
"""Parse blocklist text and extract valid individual IP addresses.
|
|
|
|
Lines starting with '#' are treated as comments and skipped.
|
|
Empty lines are skipped. CIDR ranges and malformed entries are skipped
|
|
but counted. Only individual IPv4/IPv6 addresses are extracted.
|
|
|
|
Args:
|
|
content: Raw blocklist text content.
|
|
|
|
Returns:
|
|
:class:`ParsedBlocklist` with valid IPs and skip count.
|
|
"""
|
|
valid_ips: list[str] = []
|
|
skipped = 0
|
|
|
|
for line in content.splitlines():
|
|
stripped = line.strip()
|
|
|
|
# Skip empty lines and comments
|
|
if not stripped or stripped.startswith("#"):
|
|
continue
|
|
|
|
# Accept only individual IP addresses, skip CIDRs and malformed
|
|
if is_valid_ip(stripped):
|
|
valid_ips.append(normalise_ip(stripped))
|
|
else:
|
|
skipped += 1
|
|
|
|
return ParsedBlocklist(valid_ips=valid_ips, skipped_entries=skipped)
|
|
|
|
@staticmethod
|
|
def parse_with_stats(
|
|
content: str,
|
|
*,
|
|
sample_lines: int = 20,
|
|
) -> tuple[list[str], dict[str, int]]:
|
|
"""Parse blocklist and return sample of valid IPs with statistics.
|
|
|
|
Used by preview functionality to show sample entries and counts.
|
|
|
|
Args:
|
|
content: Raw blocklist text content.
|
|
sample_lines: Maximum number of sample entries to return.
|
|
|
|
Returns:
|
|
Tuple of (sample IPs list, stats dict with keys: total_lines,
|
|
valid_count, skipped_count).
|
|
"""
|
|
lines = content.splitlines()
|
|
entries: list[str] = []
|
|
valid = 0
|
|
skipped = 0
|
|
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith("#"):
|
|
continue
|
|
if is_valid_ip(stripped) or is_valid_network(stripped):
|
|
valid += 1
|
|
if len(entries) < sample_lines:
|
|
# Normalise individual IPs; keep networks as-is
|
|
entries.append(normalise_ip(stripped) if is_valid_ip(stripped) else stripped)
|
|
else:
|
|
skipped += 1
|
|
|
|
return entries, {
|
|
"total_lines": len(lines),
|
|
"valid_count": valid,
|
|
"skipped_count": skipped,
|
|
}
|