Refactor backend services and utilities

- Update service layer implementations
- Improve configuration handling utilities
- Update documentation tasks

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-04-25 18:39:30 +02:00
parent 83452ffc23
commit 420ea18fd9
12 changed files with 52 additions and 83 deletions

View File

@@ -31,6 +31,7 @@ from app.models.jail import (
)
from app.services import geo_service
from app.utils.config_file_utils import start_daemon, wait_for_fail2ban
from app.utils.constants import FAIL2BAN_SOCKET_TIMEOUT
from app.utils.fail2ban_client import (
Fail2BanClient,
Fail2BanCommand,
@@ -73,8 +74,6 @@ class IpLookupResult(TypedDict):
# Constants
# ---------------------------------------------------------------------------
_SOCKET_TIMEOUT: float = 10.0
# Capability detection for optional fail2ban transmitter commands (backend, idle).
# These commands are not supported in all fail2ban versions. Caching the result
# avoids sending unsupported commands every polling cycle and spamming the
@@ -223,7 +222,7 @@ async def list_jails(socket_path: str) -> JailListResponse:
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
# 1. Fetch global status to get jail names.
global_status = to_dict(ok(await client.send(["status"])))
@@ -376,7 +375,7 @@ async def get_jail(socket_path: str, name: str) -> JailDetailResponse:
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
# Verify the jail exists by sending a status command first.
try:
@@ -493,7 +492,7 @@ async def start_jail(socket_path: str, name: str) -> None:
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
try:
ok(await client.send(["start", name]))
log.info("jail_started", jail=name)
@@ -518,7 +517,7 @@ async def stop_jail(socket_path: str, name: str) -> None:
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
try:
ok(await client.send(["stop", name]))
log.info("jail_stopped", jail=name)
@@ -548,7 +547,7 @@ async def set_idle(socket_path: str, name: str, *, on: bool) -> None:
cannot be reached.
"""
state = "on" if on else "off"
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
try:
ok(await client.send(["set", name, "idle", state]))
log.info("jail_idle_toggled", jail=name, idle=on)
@@ -578,7 +577,7 @@ async def reload_jail(socket_path: str, name: str) -> None:
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
try:
ok(await client.send(["reload", name, [], [["start", name]]]))
log.info("jail_reloaded", jail=name)
@@ -608,7 +607,7 @@ async def restart(socket_path: str) -> None:
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
try:
ok(await client.send(["stop"]))
log.info("fail2ban_stopped_for_restart")
@@ -619,7 +618,7 @@ async def restart(socket_path: str) -> None:
async def restart_daemon(
socket_path: str,
start_cmd_parts: list[str],
max_wait_seconds: float = _SOCKET_TIMEOUT,
max_wait_seconds: float = FAIL2BAN_SOCKET_TIMEOUT,
) -> bool:
"""Restart the fail2ban daemon and verify it comes back online.
@@ -781,7 +780,7 @@ async def get_jail_banned_ips(
# Clamp page_size to the allowed maximum.
page_size = min(page_size, _MAX_PAGE_SIZE)
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
# Verify the jail exists.
try:
@@ -896,7 +895,7 @@ async def get_ignore_list(socket_path: str, name: str) -> list[str]:
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
try:
raw = ok(await client.send(["get", name, "ignoreip"]))
return ensure_list(raw)
@@ -926,7 +925,7 @@ async def add_ignore_ip(socket_path: str, name: str, ip: str) -> None:
except ValueError as exc:
raise ValueError(f"Invalid IP address or network: {ip!r}") from exc
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
try:
ok(await client.send(["set", name, "addignoreip", ip]))
log.info("ignore_ip_added", jail=name, ip=ip)
@@ -950,7 +949,7 @@ async def del_ignore_ip(socket_path: str, name: str, ip: str) -> None:
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
try:
ok(await client.send(["set", name, "delignoreip", ip]))
log.info("ignore_ip_removed", jail=name, ip=ip)
@@ -975,7 +974,7 @@ async def get_ignore_self(socket_path: str, name: str) -> bool:
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
try:
raw = ok(await client.send(["get", name, "ignoreself"]))
return bool(raw)
@@ -1000,7 +999,7 @@ async def set_ignore_self(socket_path: str, name: str, *, on: bool) -> None:
cannot be reached.
"""
value = "true" if on else "false"
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
try:
ok(await client.send(["set", name, "ignoreself", value]))
log.info("ignore_self_toggled", jail=name, on=on)
@@ -1047,7 +1046,7 @@ async def lookup_ip(
except ValueError as exc:
raise ValueError(f"Invalid IP address: {ip!r}") from exc
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
with contextlib.suppress(ValueError, Fail2BanConnectionError):
# Use fail2ban's "banned <ip>" command which checks all jails.
@@ -1120,7 +1119,7 @@ async def unban_all_ips(socket_path: str) -> int:
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
cannot be reached.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
client = Fail2BanClient(socket_path=socket_path, timeout=FAIL2BAN_SOCKET_TIMEOUT)
count: int = int(str(ok(await client.send(["unban", "--all"])) or 0))
log.info("all_ips_unbanned", count=count)
return count