"""Log helper service. Contains regex test and log preview helpers that are independent of fail2ban socket operations. """ from __future__ import annotations import asyncio import re from pathlib import Path from app.models.config import ( LogPreviewLine, LogPreviewRequest, LogPreviewResponse, RegexTestRequest, RegexTestResponse, ) def test_regex(request: RegexTestRequest) -> RegexTestResponse: """Test a regex pattern against a sample log line. Args: request: The regex test payload. Returns: RegexTestResponse with match result, groups and optional error. """ try: compiled = re.compile(request.fail_regex) except re.error as exc: return RegexTestResponse(matched=False, groups=[], error=str(exc)) match = compiled.search(request.log_line) if match is None: return RegexTestResponse(matched=False) groups: list[str] = list(match.groups() or []) return RegexTestResponse(matched=True, groups=[str(g) for g in groups if g is not None]) async def preview_log(req: LogPreviewRequest) -> LogPreviewResponse: """Inspect the last lines of a log file and evaluate regex matches. Args: req: Log preview request. Returns: LogPreviewResponse with lines, total_lines and matched_count, or error. """ try: compiled = re.compile(req.fail_regex) except re.error as exc: return LogPreviewResponse( lines=[], total_lines=0, matched_count=0, regex_error=str(exc), ) path = Path(req.log_path) if not path.is_file(): return LogPreviewResponse( lines=[], total_lines=0, matched_count=0, regex_error=f"File not found: {req.log_path!r}", ) try: raw_lines = await asyncio.get_event_loop().run_in_executor( None, _read_tail_lines, str(path), req.num_lines, ) except OSError as exc: return LogPreviewResponse( lines=[], total_lines=0, matched_count=0, regex_error=f"Cannot read file: {exc}", ) result_lines: list[LogPreviewLine] = [] matched_count = 0 for line in raw_lines: m = compiled.search(line) groups = [str(g) for g in (m.groups() or []) if g is not None] if m else [] result_lines.append( LogPreviewLine(line=line, matched=(m is not None), groups=groups), ) if m: matched_count += 1 return LogPreviewResponse( lines=result_lines, total_lines=len(result_lines), matched_count=matched_count, ) def _read_tail_lines(file_path: str, num_lines: int) -> list[str]: """Read the last *num_lines* from *file_path* in a memory-efficient way.""" chunk_size = 8192 raw_lines: list[bytes] = [] with open(file_path, "rb") as fh: fh.seek(0, 2) end_pos = fh.tell() if end_pos == 0: return [] buf = b"" pos = end_pos while len(raw_lines) <= num_lines and pos > 0: read_size = min(chunk_size, pos) pos -= read_size fh.seek(pos) chunk = fh.read(read_size) buf = chunk + buf raw_lines = buf.split(b"\n") if pos > 0 and len(raw_lines) > 1: raw_lines = raw_lines[1:] return [ln.decode("utf-8", errors="replace").rstrip() for ln in raw_lines[-num_lines:] if ln.strip()]