refactor: Extract fail2ban response utilities into shared module
Consolidate duplicate _ok(), _to_dict(), ensure_list(), and is_not_found_error() functions from 6 service modules into a single canonical implementation at backend/app/utils/fail2ban_response.py. Changes: - Create fail2ban_response.py with canonical implementations - Remove local duplicates from: ban_service, jail_service, config_service, health_service, server_service, config_file_utils - Update all imports to use shared module - Add comprehensive docstrings and examples - Update Architecture.md and Backend-Development.md documentation Benefits: - Single source of truth for response parsing logic - Eliminates code duplication across service layer - Improves maintainability and consistency - Enables centralized bug fixes and improvements Tests: All 228 service tests passing, no regressions Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -17,19 +17,18 @@ from app.exceptions import (
|
||||
JailNameError,
|
||||
)
|
||||
from app.models.config import (
|
||||
ActionConfig,
|
||||
BantimeEscalation,
|
||||
InactiveJail,
|
||||
JailValidationIssue,
|
||||
JailValidationResult,
|
||||
)
|
||||
from app.utils import conffile_parser
|
||||
from app.utils.constants import FAIL2BAN_TRUTHY_VALUES
|
||||
from app.utils.fail2ban_client import (
|
||||
Fail2BanClient,
|
||||
Fail2BanConnectionError,
|
||||
Fail2BanResponse,
|
||||
)
|
||||
from app.utils.fail2ban_response import ok, to_dict
|
||||
|
||||
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
||||
|
||||
@@ -256,26 +255,8 @@ async def _get_active_jail_names(socket_path: str) -> set[str]:
|
||||
try:
|
||||
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
||||
|
||||
def _to_dict_inner(pairs: object) -> dict[str, object]:
|
||||
if not isinstance(pairs, (list, tuple)):
|
||||
return {}
|
||||
result: dict[str, object] = {}
|
||||
for item in pairs:
|
||||
try:
|
||||
k, v = item
|
||||
result[str(k)] = v
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
return result
|
||||
|
||||
def _ok(response: object) -> object:
|
||||
code, data = cast("Fail2BanResponse", response)
|
||||
if code != 0:
|
||||
raise ValueError(f"fail2ban error {code}: {data!r}")
|
||||
return data
|
||||
|
||||
status_raw = _ok(await client.send(["status"]))
|
||||
status_dict = _to_dict_inner(status_raw)
|
||||
status_raw = ok(await client.send(["status"]))
|
||||
status_dict = to_dict(status_raw)
|
||||
jail_list_raw: str = str(status_dict.get("Jail list", "") or "").strip()
|
||||
if not jail_list_raw:
|
||||
return set()
|
||||
|
||||
165
backend/app/utils/fail2ban_response.py
Normal file
165
backend/app/utils/fail2ban_response.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""Shared utilities for parsing fail2ban responses.
|
||||
|
||||
This module provides canonical implementations of response parsing helpers
|
||||
used across all service modules. All services should import from here instead
|
||||
of maintaining local copies.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
def ok(response: object) -> object:
|
||||
"""Extract the payload from a fail2ban ``(return_code, data)`` response.
|
||||
|
||||
fail2ban commands return a tuple of ``(return_code, data)`` where
|
||||
``return_code`` is 0 for success or non-zero for errors. This function
|
||||
extracts and returns the ``data`` portion.
|
||||
|
||||
Args:
|
||||
response: Raw value returned by :meth:`~Fail2BanClient.send`.
|
||||
|
||||
Returns:
|
||||
The payload ``data`` portion of the response.
|
||||
|
||||
Raises:
|
||||
ValueError: If the response indicates an error (return code ≠ 0) or
|
||||
has an unexpected shape.
|
||||
|
||||
Examples:
|
||||
>>> response = (0, {'Jail list': 'sshd,recidive'})
|
||||
>>> ok(response)
|
||||
{'Jail list': 'sshd,recidive'}
|
||||
|
||||
>>> error_response = (1, 'Unknown jail')
|
||||
>>> ok(error_response)
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
ValueError: fail2ban returned error code 1: 'Unknown jail'
|
||||
"""
|
||||
try:
|
||||
code_val: int
|
||||
data_val: object
|
||||
code_val, data_val = response # type: ignore[misc]
|
||||
except (TypeError, ValueError) as exc:
|
||||
raise ValueError(f"Unexpected fail2ban response shape: {response!r}") from exc
|
||||
|
||||
if code_val != 0:
|
||||
raise ValueError(f"fail2ban returned error code {code_val}: {data_val!r}")
|
||||
|
||||
return data_val
|
||||
|
||||
|
||||
def to_dict(pairs: object) -> dict[str, object]:
|
||||
"""Convert a list of ``(key, value)`` pairs to a plain dict.
|
||||
|
||||
fail2ban returns many results as a list of key-value tuples. This
|
||||
function converts them to a regular Python dict, skipping malformed
|
||||
entries and converting keys to strings.
|
||||
|
||||
Args:
|
||||
pairs: A list of ``(key, value)`` pairs (or any iterable thereof).
|
||||
Non-list/tuple inputs return an empty dict.
|
||||
|
||||
Returns:
|
||||
A :class:`dict` with the keys and values from *pairs*. Keys are
|
||||
converted to strings; values are preserved as-is. Malformed entries
|
||||
are silently skipped.
|
||||
|
||||
Examples:
|
||||
>>> to_dict([('name', 'sshd'), ('port', 22)])
|
||||
{'name': 'sshd', 'port': 22}
|
||||
|
||||
>>> to_dict([('a', 1), 'broken', ('b', 2)])
|
||||
{'a': 1, 'b': 2}
|
||||
|
||||
>>> to_dict('not a list')
|
||||
{}
|
||||
"""
|
||||
if not isinstance(pairs, (list, tuple)):
|
||||
return {}
|
||||
result: dict[str, object] = {}
|
||||
for item in pairs:
|
||||
try:
|
||||
k, v = item
|
||||
result[str(k)] = v
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
return result
|
||||
|
||||
|
||||
def ensure_list(value: object | None) -> list[str]:
|
||||
"""Coerce a fail2ban response value to a list of strings.
|
||||
|
||||
Some fail2ban ``get`` responses return ``None`` when a field is empty,
|
||||
a single string when there is only one entry, or a list of strings.
|
||||
This helper normalises all three cases to a consistent list.
|
||||
|
||||
Args:
|
||||
value: The raw value from a fail2ban command response. Can be
|
||||
``None``, a string, a list/tuple of strings, or any other object.
|
||||
|
||||
Returns:
|
||||
A :class:`list` of strings. Empty input returns an empty list.
|
||||
Single strings are wrapped in a list. Lists/tuples are converted to
|
||||
strings element-wise.
|
||||
|
||||
Examples:
|
||||
>>> ensure_list(None)
|
||||
[]
|
||||
|
||||
>>> ensure_list('sshd')
|
||||
['sshd']
|
||||
|
||||
>>> ensure_list(['sshd', 'apache2'])
|
||||
['sshd', 'apache2']
|
||||
|
||||
>>> ensure_list(42)
|
||||
['42']
|
||||
"""
|
||||
if value is None:
|
||||
return []
|
||||
if isinstance(value, str):
|
||||
return [value] if value.strip() else []
|
||||
if isinstance(value, (list, tuple)):
|
||||
return [str(v) for v in value if v is not None]
|
||||
return [str(value)]
|
||||
|
||||
|
||||
def is_not_found_error(exc: Exception) -> bool:
|
||||
"""Return ``True`` if *exc* indicates a jail does not exist.
|
||||
|
||||
fail2ban raises errors when a jail is not found, but serializes them
|
||||
in different formats depending on the context. This function checks
|
||||
for multiple common error message patterns.
|
||||
|
||||
Args:
|
||||
exc: The exception to inspect.
|
||||
|
||||
Returns:
|
||||
``True`` if the exception message contains any of the known
|
||||
"not found" phrases (case-insensitive), ``False`` otherwise.
|
||||
|
||||
Examples:
|
||||
>>> exc = ValueError('Unknown jail: sshd')
|
||||
>>> is_not_found_error(exc)
|
||||
True
|
||||
|
||||
>>> exc = ValueError('unknownjail')
|
||||
>>> is_not_found_error(exc)
|
||||
True
|
||||
|
||||
>>> exc = ValueError('Internal error')
|
||||
>>> is_not_found_error(exc)
|
||||
False
|
||||
"""
|
||||
msg = str(exc).lower()
|
||||
return any(
|
||||
phrase in msg
|
||||
for phrase in (
|
||||
"unknown jail",
|
||||
"unknownjail",
|
||||
"no jail",
|
||||
"does not exist",
|
||||
"not found",
|
||||
)
|
||||
)
|
||||
Reference in New Issue
Block a user