"""Utilities shared by fail2ban-related services.""" from __future__ import annotations import json from datetime import UTC, datetime import structlog log: structlog.stdlib.BoundLogger = structlog.get_logger() def escape_like(s: str) -> str: """Escape SQLite LIKE wildcard characters in a string. SQLite's LIKE operator treats % (any sequence) and _ (any single char) as wildcards. This function escapes them to prevent unintended matches. Args: s: The string to escape. Returns: The escaped string where backslashes, %, and _ are escaped. """ return s.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") async def ensure_fail2ban_indexes(db_path: str) -> None: """Create performance indexes on the fail2ban bans table if missing. The fail2ban database schema does not include an index on timeofban alone, only composite indexes (jail, timeofban) and (jail, ip). Queries that filter by timeofban >= X ORDER BY timeofban DESC require a full table scan. This function adds the missing index idempotently (CREATE INDEX IF NOT EXISTS) each time the application starts. The overhead of the check is negligible compared to the query speedup on large tables. Args: db_path: Path to the fail2ban SQLite database. """ import aiosqlite index_name = "idx_bans_timeofban_desc" # Check existing indexes using read-only connection async with aiosqlite.connect(f"file:{db_path}?mode=ro", uri=True) as db: async with db.execute( "SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='bans'" ) as cursor: rows = await cursor.fetchall() existing = [str(r[0]) for r in rows] if index_name not in existing: log.info("creating_fail2ban_bans_index", index=index_name, db_path=db_path) async with aiosqlite.connect(db_path) as db: await db.execute(f"CREATE INDEX IF NOT EXISTS {index_name} ON bans(timeofban DESC);") await db.commit() log.info("fail2ban_bans_index_created", index=index_name) else: log.debug("fail2ban_bans_index_exists", index=index_name) def ts_to_iso(unix_ts: int) -> str: """Convert a Unix timestamp to an ISO 8601 UTC string.""" return datetime.fromtimestamp(unix_ts, tz=UTC).isoformat() def parse_data_json(raw: object) -> tuple[list[str], int]: """Extract matches and failure count from the fail2ban bans.data value.""" if raw is None: return [], 0 obj: dict[str, object] = {} if isinstance(raw, str): try: parsed = json.loads(raw) if isinstance(parsed, dict): obj = parsed except json.JSONDecodeError: return [], 0 elif isinstance(raw, dict): obj = raw raw_matches = obj.get("matches") matches = [str(m) for m in raw_matches] if isinstance(raw_matches, list) else [] raw_failures = obj.get("failures") failures = 0 if isinstance(raw_failures, (int, float, str)): try: failures = int(raw_failures) except (ValueError, TypeError): failures = 0 return matches, failures