Files
BanGUI/backend/app/services/blocklist_ban_executor.py
Lukas 7ec80fdeec refactor(logging): replace structlog with stdlib logging compat layer
- Remove structlog dependency from backend/pyproject.toml
- Add app.utils.logging_compat shim for keyword-arg logging API
- Add app.utils.json_formatter for JSON log output with extra fields
- Update all backend modules to use logging_compat.get_logger()
- Update docstrings in log_sanitizer.py and json_formatter.py
- Update test comment in test_async_utils.py
- Record 406 failing tests in Docs/Tasks.md for tracking
2026-05-10 13:37:54 +02:00

85 lines
2.5 KiB
Python

"""Blocklist ban executor component.
Executes bans via fail2ban for a list of IP addresses, handling errors and
logging failures.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from app.utils.logging_compat import get_logger
from app.exceptions import JailNotFoundError, JailOperationError
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable
log = get_logger(__name__)
class BanExecutor:
"""Executes bans via fail2ban for blocklist-sourced IPs."""
def __init__(
self,
ban_ip: Callable[[str, str, str], Awaitable[None]],
) -> None:
"""Initialize the ban executor.
Args:
ban_ip: Async callable that bans an IP in a jail.
Signature: async def ban_ip(socket_path: str, jail: str, ip: str) -> None
"""
self.ban_ip = ban_ip
async def ban_ips(
self,
socket_path: str,
jail: str,
ips: list[str],
) -> tuple[int, int, str | None]:
"""Ban a list of IPs in the specified fail2ban jail.
On first JailNotFoundError, stops processing (the jail doesn't exist).
On JailOperationError, records the error but continues with next IPs.
Other exceptions are treated as fatal and raised.
Args:
socket_path: Path to fail2ban Unix socket.
jail: Name of the fail2ban jail.
ips: List of IP addresses to ban.
Returns:
Tuple of (successful bans count, failed bans count, first error or None).
Raises:
Exception: If an unexpected error occurs (not JailNotFoundError or
JailOperationError).
"""
successful = 0
failed = 0
first_error: str | None = None
for ip in ips:
try:
await self.ban_ip(socket_path, jail, ip)
successful += 1
except JailNotFoundError as exc:
# Jail doesn't exist — no point continuing
first_error = str(exc)
log.warning(
"blocklist_jail_not_found",
jail=jail,
error=str(exc),
)
break
except JailOperationError as exc:
# Individual ban failed, but continue
failed += 1
if first_error is None:
first_error = str(exc)
log.debug("blocklist_ban_failed", ip=ip, error=str(exc))
return successful, failed, first_error