Files
BanGUI/backend/app/services/log_service.py
Lukas b634ce876a refactor: Extract fail2ban response utilities into shared module
Consolidate duplicate _ok(), _to_dict(), ensure_list(), and is_not_found_error()
functions from 6 service modules into a single canonical implementation at
backend/app/utils/fail2ban_response.py.

Changes:
- Create fail2ban_response.py with canonical implementations
- Remove local duplicates from: ban_service, jail_service, config_service,
  health_service, server_service, config_file_utils
- Update all imports to use shared module
- Add comprehensive docstrings and examples
- Update Architecture.md and Backend-Development.md documentation

Benefits:
- Single source of truth for response parsing logic
- Eliminates code duplication across service layer
- Improves maintainability and consistency
- Enables centralized bug fixes and improvements

Tests: All 228 service tests passing, no regressions

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-23 15:11:21 +02:00

291 lines
8.0 KiB
Python

"""Log helper service.
Contains regex test, log preview, and fail2ban log reading helpers.
"""
from __future__ import annotations
import asyncio
import re
from pathlib import Path
import structlog
from app.exceptions import ConfigOperationError
from app.models.config import (
Fail2BanLogResponse,
LogPreviewLine,
LogPreviewRequest,
LogPreviewResponse,
RegexTestRequest,
RegexTestResponse,
)
from app.utils.async_utils import run_blocking
from app.utils.fail2ban_client import (
Fail2BanClient,
Fail2BanConnectionError,
Fail2BanProtocolError,
)
from app.utils.fail2ban_response import ok
log: structlog.stdlib.BoundLogger = structlog.get_logger()
_SOCKET_TIMEOUT: float = 10.0
_NON_FILE_LOG_TARGETS: frozenset[str] = frozenset(
{"STDOUT", "STDERR", "SYSLOG", "SYSTEMD-JOURNAL"}
)
_SAFE_LOG_PREFIXES: tuple[str, ...] = ("/var/log", "/config/log")
def _count_file_lines(file_path: str) -> int:
"""Count the total number of lines in *file_path* synchronously."""
count = 0
with open(file_path, "rb") as fh:
for chunk in iter(lambda: fh.read(65536), b""):
count += chunk.count(b"\n")
return count
async def _safe_get(
client: Fail2BanClient,
command: list[str],
default: object | None = None,
) -> object | None:
"""Send a command and return *default* if it fails."""
try:
return ok(await client.send(command))
except (
Fail2BanConnectionError,
Fail2BanProtocolError,
OSError,
ValueError,
):
return default
async def _safe_get_typed(
client: Fail2BanClient,
command: list[str],
default: object | None = None,
) -> object | None:
"""Send a command and return a typed result or *default* if it fails."""
return await _safe_get(client, command, default)
async def read_fail2ban_log(
socket_path: str,
lines: int,
filter_text: str | None = None,
) -> Fail2BanLogResponse:
"""Read the tail of the fail2ban daemon log file.
Queries the fail2ban socket for the current log target and log level,
validates that the target is a readable file, then returns the last
*lines* entries optionally filtered by *filter_text*.
"""
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
log_level_raw, log_target_raw = await asyncio.gather(
_safe_get_typed(client, ["get", "loglevel"], "INFO"),
_safe_get_typed(client, ["get", "logtarget"], "STDOUT"),
)
log_level = str(log_level_raw or "INFO").upper()
log_target = str(log_target_raw or "STDOUT")
if log_target.upper() in _NON_FILE_LOG_TARGETS:
raise ConfigOperationError(
f"fail2ban is logging to {log_target!r}. "
"File-based log viewing is only available when fail2ban logs "
"to a file path."
)
try:
resolved = Path(log_target).resolve()
except (ValueError, OSError) as exc:
raise ConfigOperationError(
f"Cannot resolve log target path {log_target!r}: {exc}"
) from exc
resolved_str = str(resolved)
if not any(resolved_str.startswith(safe) for safe in _SAFE_LOG_PREFIXES):
raise ConfigOperationError(
f"Log path {resolved_str!r} is outside the allowed directory. "
"Only paths under /var/log or /config/log are permitted."
)
if not resolved.is_file():
raise ConfigOperationError(f"Log file not found: {resolved_str!r}")
total_lines, raw_lines = await asyncio.gather(
run_blocking(_count_file_lines, resolved_str),
run_blocking(_read_tail_lines, resolved_str, lines),
)
filtered = (
[ln for ln in raw_lines if filter_text in ln]
if filter_text
else raw_lines
)
log.info(
"fail2ban_log_read",
log_path=resolved_str,
lines_requested=lines,
lines_returned=len(filtered),
filter_active=filter_text is not None,
)
return Fail2BanLogResponse(
log_path=resolved_str,
lines=filtered,
total_lines=total_lines,
log_level=log_level,
log_target=log_target,
)
def test_regex(request: RegexTestRequest) -> RegexTestResponse:
"""Test a regex pattern against a sample log line.
Args:
request: The regex test payload.
Returns:
RegexTestResponse with match result, groups and optional error.
"""
try:
compiled = re.compile(request.fail_regex)
except re.error as exc:
return RegexTestResponse(matched=False, groups=[], error=str(exc))
match = compiled.search(request.log_line)
if match is None:
return RegexTestResponse(matched=False)
groups: list[str] = list(match.groups() or [])
return RegexTestResponse(
matched=True,
groups=[str(g) for g in groups if g is not None],
)
async def preview_log(req: LogPreviewRequest) -> LogPreviewResponse:
"""Inspect the last lines of a log file and evaluate regex matches.
Args:
req: Log preview request.
Returns:
LogPreviewResponse with lines, total_lines and matched_count, or error.
"""
try:
compiled = re.compile(req.fail_regex)
except re.error as exc:
return LogPreviewResponse(
lines=[],
total_lines=0,
matched_count=0,
regex_error=str(exc),
)
path = Path(req.log_path)
try:
resolved = path.resolve(strict=False)
except (ValueError, OSError) as exc:
return LogPreviewResponse(
lines=[],
total_lines=0,
matched_count=0,
regex_error=f"Cannot resolve log path {req.log_path!r}: {exc}",
)
if not any(
resolved.is_relative_to(Path(prefix))
for prefix in _SAFE_LOG_PREFIXES
):
return LogPreviewResponse(
lines=[],
total_lines=0,
matched_count=0,
regex_error=(
f"Log path {str(resolved)!r} is outside the allowed directory. "
"Only paths under /var/log or /config/log are permitted."
),
)
if not resolved.is_file():
return LogPreviewResponse(
lines=[],
total_lines=0,
matched_count=0,
regex_error=f"File not found: {req.log_path!r}",
)
try:
raw_lines = await run_blocking(
_read_tail_lines,
str(resolved),
req.num_lines,
)
except OSError as exc:
return LogPreviewResponse(
lines=[],
total_lines=0,
matched_count=0,
regex_error=f"Cannot read file: {exc}",
)
result_lines: list[LogPreviewLine] = []
matched_count = 0
for line in raw_lines:
m = compiled.search(line)
groups = [
str(g)
for g in (m.groups() or [])
if g is not None
] if m else []
result_lines.append(
LogPreviewLine(line=line, matched=(m is not None), groups=groups),
)
if m:
matched_count += 1
return LogPreviewResponse(
lines=result_lines,
total_lines=len(result_lines),
matched_count=matched_count,
)
def _read_tail_lines(file_path: str, num_lines: int) -> list[str]:
"""Read the last *num_lines* from *file_path* in a memory-efficient way."""
chunk_size = 8192
raw_lines: list[bytes] = []
with open(file_path, "rb") as fh:
fh.seek(0, 2)
end_pos = fh.tell()
if end_pos == 0:
return []
buf = b""
pos = end_pos
while len(raw_lines) <= num_lines and pos > 0:
read_size = min(chunk_size, pos)
pos -= read_size
fh.seek(pos)
chunk = fh.read(read_size)
buf = chunk + buf
raw_lines = buf.split(b"\n")
if pos > 0 and len(raw_lines) > 1:
raw_lines = raw_lines[1:]
return [
ln.decode("utf-8", errors="replace").rstrip()
for ln in raw_lines[-num_lines:]
if ln.strip()
]