"""Fail2ban jail configuration file parser and activator. Parses the full set of fail2ban jail configuration files (``jail.conf``, ``jail.local``, ``jail.d/*.conf``, ``jail.d/*.local``) to discover all defined jails — both active and inactive — and provides functions to activate or deactivate them by writing ``.local`` override files. Merge order (fail2ban convention): 1. ``jail.conf`` 2. ``jail.local`` 3. ``jail.d/*.conf`` (alphabetical) 4. ``jail.d/*.local`` (alphabetical) Security note: the ``activate_jail`` and ``deactivate_jail`` callers must supply a validated jail name. This module validates the name against an allowlist pattern before constructing any filesystem paths to prevent directory traversal. """ from __future__ import annotations import asyncio import configparser import os import re import tempfile from pathlib import Path from typing import Any import structlog from app.models.config import ( ActivateJailRequest, InactiveJail, InactiveJailListResponse, JailActivationResponse, ) from app.services import jail_service from app.utils.fail2ban_client import Fail2BanClient, Fail2BanConnectionError log: structlog.stdlib.BoundLogger = structlog.get_logger() # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- _SOCKET_TIMEOUT: float = 10.0 # Allowlist pattern for jail names used in path construction. _SAFE_JAIL_NAME_RE: re.Pattern[str] = re.compile( r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$" ) # Sections that are not jail definitions. _META_SECTIONS: frozenset[str] = frozenset({"INCLUDES", "DEFAULT"}) # True-ish values for the ``enabled`` key. _TRUE_VALUES: frozenset[str] = frozenset({"true", "yes", "1"}) # False-ish values for the ``enabled`` key. _FALSE_VALUES: frozenset[str] = frozenset({"false", "no", "0"}) # --------------------------------------------------------------------------- # Custom exceptions # --------------------------------------------------------------------------- class JailNotFoundInConfigError(Exception): """Raised when the requested jail name is not defined in any config file.""" def __init__(self, name: str) -> None: """Initialise with the jail name that was not found. Args: name: The jail name that could not be located. """ self.name: str = name super().__init__(f"Jail not found in config files: {name!r}") class JailAlreadyActiveError(Exception): """Raised when trying to activate a jail that is already active.""" def __init__(self, name: str) -> None: """Initialise with the jail name. Args: name: The jail that is already active. """ self.name: str = name super().__init__(f"Jail is already active: {name!r}") class JailAlreadyInactiveError(Exception): """Raised when trying to deactivate a jail that is already inactive.""" def __init__(self, name: str) -> None: """Initialise with the jail name. Args: name: The jail that is already inactive. """ self.name: str = name super().__init__(f"Jail is already inactive: {name!r}") class JailNameError(Exception): """Raised when a jail name contains invalid characters.""" class ConfigWriteError(Exception): """Raised when writing a ``.local`` override file fails.""" # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _safe_jail_name(name: str) -> str: """Validate *name* and return it unchanged or raise :class:`JailNameError`. Args: name: Proposed jail name. Returns: The name unchanged if valid. Raises: JailNameError: If *name* contains unsafe characters. """ if not _SAFE_JAIL_NAME_RE.match(name): raise JailNameError( f"Jail name {name!r} contains invalid characters. " "Only alphanumeric characters, hyphens, underscores, and dots are " "allowed; must start with an alphanumeric character." ) return name def _ordered_config_files(config_dir: Path) -> list[Path]: """Return all jail config files in fail2ban merge order. Args: config_dir: The fail2ban configuration root directory. Returns: List of paths in ascending priority order (later entries override earlier ones). """ files: list[Path] = [] jail_conf = config_dir / "jail.conf" if jail_conf.is_file(): files.append(jail_conf) jail_local = config_dir / "jail.local" if jail_local.is_file(): files.append(jail_local) jail_d = config_dir / "jail.d" if jail_d.is_dir(): files.extend(sorted(jail_d.glob("*.conf"))) files.extend(sorted(jail_d.glob("*.local"))) return files def _build_parser() -> configparser.RawConfigParser: """Create a :class:`configparser.RawConfigParser` for fail2ban configs. Returns: Parser with interpolation disabled and case-sensitive option names. """ parser = configparser.RawConfigParser(interpolation=None, strict=False) # fail2ban keys are lowercase but preserve case to be safe. parser.optionxform = str # type: ignore[assignment] return parser def _is_truthy(value: str) -> bool: """Return ``True`` if *value* is a fail2ban boolean true string. Args: value: Raw string from config (e.g. ``"true"``, ``"yes"``, ``"1"``). Returns: ``True`` when the value represents enabled. """ return value.strip().lower() in _TRUE_VALUES def _parse_int_safe(value: str) -> int | None: """Parse *value* as int, returning ``None`` on failure. Args: value: Raw string to parse. Returns: Integer value, or ``None``. """ try: return int(value.strip()) except (ValueError, AttributeError): return None def _parse_multiline(raw: str) -> list[str]: """Split a multi-line INI value into individual non-blank lines. Args: raw: Raw multi-line string from configparser. Returns: List of stripped, non-empty, non-comment strings. """ result: list[str] = [] for line in raw.splitlines(): stripped = line.strip() if stripped and not stripped.startswith("#"): result.append(stripped) return result def _resolve_filter(raw_filter: str, jail_name: str, mode: str) -> str: """Resolve fail2ban variable placeholders in a filter string. Handles the common default ``%(__name__)s[mode=%(mode)s]`` pattern that fail2ban uses so the filter name displayed to the user is readable. Args: raw_filter: Raw ``filter`` value from config (may contain ``%()s``). jail_name: The jail's section name, used to substitute ``%(__name__)s``. mode: The jail's ``mode`` value, used to substitute ``%(mode)s``. Returns: Human-readable filter string. """ result = raw_filter.replace("%(__name__)s", jail_name) result = result.replace("%(mode)s", mode) return result def _parse_jails_sync( config_dir: Path, ) -> tuple[dict[str, dict[str, str]], dict[str, str]]: """Synchronously parse all jail configs and return merged definitions. This is a CPU-bound / IO-bound sync function; callers must dispatch to an executor for async use. Args: config_dir: The fail2ban configuration root directory. Returns: A two-tuple ``(jails, source_files)`` where: - ``jails``: ``{jail_name: {key: value}}`` – merged settings for each jail with DEFAULT values already applied. - ``source_files``: ``{jail_name: str(path)}`` – path of the file that last defined each jail section (for display in the UI). """ parser = _build_parser() files = _ordered_config_files(config_dir) # Track which file each section came from (last write wins). source_files: dict[str, str] = {} for path in files: try: single = _build_parser() single.read(str(path), encoding="utf-8") for section in single.sections(): if section not in _META_SECTIONS: source_files[section] = str(path) except (configparser.Error, OSError) as exc: log.warning("jail_config_read_error", path=str(path), error=str(exc)) # Full merged parse: configparser applies DEFAULT values to every section. try: parser.read([str(p) for p in files], encoding="utf-8") except configparser.Error as exc: log.warning("jail_config_parse_error", error=str(exc)) jails: dict[str, dict[str, str]] = {} for section in parser.sections(): if section in _META_SECTIONS: continue try: # items() merges DEFAULT values automatically. jails[section] = dict(parser.items(section)) except configparser.Error as exc: log.warning( "jail_section_parse_error", section=section, error=str(exc) ) log.debug("jails_parsed", count=len(jails), config_dir=str(config_dir)) return jails, source_files def _build_inactive_jail( name: str, settings: dict[str, str], source_file: str, ) -> InactiveJail: """Construct an :class:`~app.models.config.InactiveJail` from raw settings. Args: name: Jail section name. settings: Merged key→value dict (DEFAULT values already applied). source_file: Path of the file that last defined this section. Returns: Populated :class:`~app.models.config.InactiveJail`. """ raw_filter = settings.get("filter", "") mode = settings.get("mode", "normal") filter_name = _resolve_filter(raw_filter, name, mode) if raw_filter else name raw_action = settings.get("action", "") actions = _parse_multiline(raw_action) if raw_action else [] raw_logpath = settings.get("logpath", "") logpath = _parse_multiline(raw_logpath) if raw_logpath else [] enabled_raw = settings.get("enabled", "false") enabled = _is_truthy(enabled_raw) maxretry_raw = settings.get("maxretry", "") maxretry = _parse_int_safe(maxretry_raw) return InactiveJail( name=name, filter=filter_name, actions=actions, port=settings.get("port") or None, logpath=logpath, bantime=settings.get("bantime") or None, findtime=settings.get("findtime") or None, maxretry=maxretry, source_file=source_file, enabled=enabled, ) async def _get_active_jail_names(socket_path: str) -> set[str]: """Fetch the set of currently running jail names from fail2ban. Returns an empty set gracefully if fail2ban is unreachable. Args: socket_path: Path to the fail2ban Unix domain socket. Returns: Set of active jail names, or empty set on connection failure. """ try: client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT) def _to_dict_inner(pairs: Any) -> dict[str, Any]: if not isinstance(pairs, (list, tuple)): return {} result: dict[str, Any] = {} for item in pairs: try: k, v = item result[str(k)] = v except (TypeError, ValueError): pass return result def _ok(response: Any) -> Any: code, data = response if code != 0: raise ValueError(f"fail2ban error {code}: {data!r}") return data status_raw = _ok(await client.send(["status"])) status_dict = _to_dict_inner(status_raw) jail_list_raw: str = str(status_dict.get("Jail list", "") or "").strip() if not jail_list_raw: return set() return {j.strip() for j in jail_list_raw.split(",") if j.strip()} except Fail2BanConnectionError: log.warning("fail2ban_unreachable_during_inactive_list") return set() except Exception as exc: # noqa: BLE001 log.warning( "fail2ban_status_error_during_inactive_list", error=str(exc) ) return set() def _write_local_override_sync( config_dir: Path, jail_name: str, enabled: bool, overrides: dict[str, Any], ) -> None: """Write a ``jail.d/{name}.local`` file atomically. Always writes to ``jail.d/{jail_name}.local``. If the file already exists it is replaced entirely. The write is atomic: content is written to a temp file first, then renamed into place. Args: config_dir: The fail2ban configuration root directory. jail_name: Validated jail name (used as filename stem). enabled: Value to write for ``enabled =``. overrides: Optional setting overrides (bantime, findtime, maxretry, port, logpath). Raises: ConfigWriteError: If writing fails. """ jail_d = config_dir / "jail.d" try: jail_d.mkdir(parents=True, exist_ok=True) except OSError as exc: raise ConfigWriteError( f"Cannot create jail.d directory: {exc}" ) from exc local_path = jail_d / f"{jail_name}.local" lines: list[str] = [ "# Managed by BanGUI — do not edit manually", "", f"[{jail_name}]", "", f"enabled = {'true' if enabled else 'false'}", ] if overrides.get("bantime") is not None: lines.append(f"bantime = {overrides['bantime']}") if overrides.get("findtime") is not None: lines.append(f"findtime = {overrides['findtime']}") if overrides.get("maxretry") is not None: lines.append(f"maxretry = {overrides['maxretry']}") if overrides.get("port") is not None: lines.append(f"port = {overrides['port']}") if overrides.get("logpath"): paths: list[str] = overrides["logpath"] if paths: lines.append(f"logpath = {paths[0]}") for p in paths[1:]: lines.append(f" {p}") content = "\n".join(lines) + "\n" try: with tempfile.NamedTemporaryFile( mode="w", encoding="utf-8", dir=jail_d, delete=False, suffix=".tmp", ) as tmp: tmp.write(content) tmp_name = tmp.name os.replace(tmp_name, local_path) except OSError as exc: # Clean up temp file if rename failed. try: os.unlink(tmp_name) # noqa: F821 — only reachable when tmp_name is set except OSError: pass raise ConfigWriteError( f"Failed to write {local_path}: {exc}" ) from exc log.info( "jail_local_written", jail=jail_name, path=str(local_path), enabled=enabled, ) # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- async def list_inactive_jails( config_dir: str, socket_path: str, ) -> InactiveJailListResponse: """Return all jails defined in config files that are not currently active. Parses ``jail.conf``, ``jail.local``, and ``jail.d/`` following the fail2ban merge order. A jail is considered inactive when: - Its merged ``enabled`` value is ``false`` (or absent, which defaults to ``false`` in fail2ban), **or** - Its ``enabled`` value is ``true`` in config but fail2ban does not report it as running. Args: config_dir: Absolute path to the fail2ban configuration directory. socket_path: Path to the fail2ban Unix domain socket. Returns: :class:`~app.models.config.InactiveJailListResponse` with all inactive jails. """ loop = asyncio.get_event_loop() parsed_result: tuple[dict[str, dict[str, str]], dict[str, str]] = ( await loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)) ) all_jails, source_files = parsed_result active_names: set[str] = await _get_active_jail_names(socket_path) inactive: list[InactiveJail] = [] for jail_name, settings in sorted(all_jails.items()): if jail_name in active_names: # fail2ban reports this jail as running — skip it. continue source = source_files.get(jail_name, config_dir) inactive.append(_build_inactive_jail(jail_name, settings, source)) log.info( "inactive_jails_listed", total_defined=len(all_jails), active=len(active_names), inactive=len(inactive), ) return InactiveJailListResponse(jails=inactive, total=len(inactive)) async def activate_jail( config_dir: str, socket_path: str, name: str, req: ActivateJailRequest, ) -> JailActivationResponse: """Enable an inactive jail and reload fail2ban. Writes ``enabled = true`` (plus any override values from *req*) to ``jail.d/{name}.local`` and then triggers a full fail2ban reload so the jail starts immediately. Args: config_dir: Absolute path to the fail2ban configuration directory. socket_path: Path to the fail2ban Unix domain socket. name: Name of the jail to activate. Must exist in the parsed config. req: Optional override values to write alongside ``enabled = true``. Returns: :class:`~app.models.config.JailActivationResponse`. Raises: JailNameError: If *name* contains invalid characters. JailNotFoundInConfigError: If *name* is not defined in any config file. JailAlreadyActiveError: If fail2ban already reports *name* as running. ConfigWriteError: If writing the ``.local`` file fails. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the fail2ban socket is unreachable during reload. """ _safe_jail_name(name) loop = asyncio.get_event_loop() all_jails, _source_files = await loop.run_in_executor( None, _parse_jails_sync, Path(config_dir) ) if name not in all_jails: raise JailNotFoundInConfigError(name) active_names = await _get_active_jail_names(socket_path) if name in active_names: raise JailAlreadyActiveError(name) overrides: dict[str, Any] = { "bantime": req.bantime, "findtime": req.findtime, "maxretry": req.maxretry, "port": req.port, "logpath": req.logpath, } await loop.run_in_executor( None, _write_local_override_sync, Path(config_dir), name, True, overrides, ) try: await jail_service.reload_all(socket_path) except Exception as exc: # noqa: BLE001 log.warning("reload_after_activate_failed", jail=name, error=str(exc)) log.info("jail_activated", jail=name) return JailActivationResponse( name=name, active=True, message=f"Jail {name!r} activated successfully.", ) async def deactivate_jail( config_dir: str, socket_path: str, name: str, ) -> JailActivationResponse: """Disable an active jail and reload fail2ban. Writes ``enabled = false`` to ``jail.d/{name}.local`` and triggers a full fail2ban reload so the jail stops immediately. Args: config_dir: Absolute path to the fail2ban configuration directory. socket_path: Path to the fail2ban Unix domain socket. name: Name of the jail to deactivate. Must exist in the parsed config. Returns: :class:`~app.models.config.JailActivationResponse`. Raises: JailNameError: If *name* contains invalid characters. JailNotFoundInConfigError: If *name* is not defined in any config file. JailAlreadyInactiveError: If fail2ban already reports *name* as not running. ConfigWriteError: If writing the ``.local`` file fails. ~app.utils.fail2ban_client.Fail2BanConnectionError: If the fail2ban socket is unreachable during reload. """ _safe_jail_name(name) loop = asyncio.get_event_loop() all_jails, _source_files = await loop.run_in_executor( None, _parse_jails_sync, Path(config_dir) ) if name not in all_jails: raise JailNotFoundInConfigError(name) active_names = await _get_active_jail_names(socket_path) if name not in active_names: raise JailAlreadyInactiveError(name) await loop.run_in_executor( None, _write_local_override_sync, Path(config_dir), name, False, {}, ) try: await jail_service.reload_all(socket_path) except Exception as exc: # noqa: BLE001 log.warning("reload_after_deactivate_failed", jail=name, error=str(exc)) log.info("jail_deactivated", jail=name) return JailActivationResponse( name=name, active=False, message=f"Jail {name!r} deactivated successfully.", )