Files
BanGUI/backend/app/utils/fail2ban_db_utils.py
Copilot 22db607875 Add fail2ban DB index management and socket-based path resolution
- New get_fail2ban_db_path() in setup_service resolves DB path from configured socket path
- New ensure_fail2ban_indexes() creates missing performance indexes on bans table
- Call ensure_fail2ban_indexes on every startup before first ban query
- Remove completed tasks from Docs/Tasks.md
- Update Docs/PERFORMANCE.md with index findings
2026-05-03 12:17:31 +02:00

97 lines
3.1 KiB
Python

"""Utilities shared by fail2ban-related services."""
from __future__ import annotations
import json
from datetime import UTC, datetime
import structlog
log: structlog.stdlib.BoundLogger = structlog.get_logger()
def escape_like(s: str) -> str:
"""Escape SQLite LIKE wildcard characters in a string.
SQLite's LIKE operator treats % (any sequence) and _ (any single char) as
wildcards. This function escapes them to prevent unintended matches.
Args:
s: The string to escape.
Returns:
The escaped string where backslashes, %, and _ are escaped.
"""
return s.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
async def ensure_fail2ban_indexes(db_path: str) -> None:
"""Create performance indexes on the fail2ban bans table if missing.
The fail2ban database schema does not include an index on timeofban alone,
only composite indexes (jail, timeofban) and (jail, ip). Queries that filter
by timeofban >= X ORDER BY timeofban DESC require a full table scan.
This function adds the missing index idempotently (CREATE INDEX IF NOT EXISTS)
each time the application starts. The overhead of the check is negligible
compared to the query speedup on large tables.
Args:
db_path: Path to the fail2ban SQLite database.
"""
import aiosqlite
index_name = "idx_bans_timeofban_desc"
# Check existing indexes using read-only connection
async with aiosqlite.connect(f"file:{db_path}?mode=ro", uri=True) as db:
async with db.execute(
"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='bans'"
) as cursor:
rows = await cursor.fetchall()
existing = [str(r[0]) for r in rows]
if index_name not in existing:
log.info("creating_fail2ban_bans_index", index=index_name, db_path=db_path)
async with aiosqlite.connect(db_path) as db:
await db.execute(f"CREATE INDEX IF NOT EXISTS {index_name} ON bans(timeofban DESC);")
await db.commit()
log.info("fail2ban_bans_index_created", index=index_name)
else:
log.debug("fail2ban_bans_index_exists", index=index_name)
def ts_to_iso(unix_ts: int) -> str:
"""Convert a Unix timestamp to an ISO 8601 UTC string."""
return datetime.fromtimestamp(unix_ts, tz=UTC).isoformat()
def parse_data_json(raw: object) -> tuple[list[str], int]:
"""Extract matches and failure count from the fail2ban bans.data value."""
if raw is None:
return [], 0
obj: dict[str, object] = {}
if isinstance(raw, str):
try:
parsed = json.loads(raw)
if isinstance(parsed, dict):
obj = parsed
except json.JSONDecodeError:
return [], 0
elif isinstance(raw, dict):
obj = raw
raw_matches = obj.get("matches")
matches = [str(m) for m in raw_matches] if isinstance(raw_matches, list) else []
raw_failures = obj.get("failures")
failures = 0
if isinstance(raw_failures, (int, float, str)):
try:
failures = int(raw_failures)
except (ValueError, TypeError):
failures = 0
return matches, failures