"""Fail2ban INI-style config parser with include and interpolation support. Provides a :class:`Fail2BanConfigParser` class that wraps Python's :class:`configparser.RawConfigParser` with fail2ban-specific behaviour: - **Merge order**: ``.conf`` file first, then ``.local`` overlay, then ``*.d/`` directory overrides — each subsequent layer overwrites earlier values. - **Include directives**: ``[INCLUDES]`` sections can specify ``before`` and ``after`` filenames. ``before`` is loaded at lower priority (loaded first), ``after`` at higher priority (loaded last). Both are resolved relative to the directory of the including file. Circular includes and runaway recursion are detected and logged. - **Variable interpolation**: :meth:`interpolate` resolves ``%(variable)s`` references using the ``[DEFAULT]`` section, the ``[Init]`` section, and any caller-supplied variables. Multiple passes handle nested references. - **Multi-line values**: Handled transparently by ``configparser``; the :meth:`split_multiline` helper further strips blank lines and ``#`` comments. - **Comments**: ``configparser`` strips full-line ``#``/``;`` comments; inline comments inside multi-line values are stripped by :meth:`split_multiline`. All methods are synchronous. Call from async contexts via :func:`asyncio.get_event_loop().run_in_executor`. """ from __future__ import annotations import configparser import re from typing import TYPE_CHECKING import structlog if TYPE_CHECKING: from pathlib import Path log: structlog.stdlib.BoundLogger = structlog.get_logger() # Compiled pattern that matches fail2ban-style %(variable_name)s references. _INTERPOLATE_RE: re.Pattern[str] = re.compile(r"%\((\w+)\)s") # Guard against infinite interpolation loops. _MAX_INTERPOLATION_PASSES: int = 10 class Fail2BanConfigParser: """Parse fail2ban INI config files with include resolution and interpolation. Typical usage for a ``filter.d/`` file:: parser = Fail2BanConfigParser(config_dir=Path("/etc/fail2ban")) parser.read_with_overrides(Path("/etc/fail2ban/filter.d/sshd.conf")) section = parser.section_dict("Definition") failregex = parser.split_multiline(section.get("failregex", "")) Args: config_dir: Optional fail2ban configuration root directory. Used only by :meth:`ordered_conf_files`; pass ``None`` if not needed. max_include_depth: Maximum ``[INCLUDES]`` nesting depth before giving up. """ def __init__( self, config_dir: Path | None = None, max_include_depth: int = 10, ) -> None: self._config_dir = config_dir self._max_include_depth = max_include_depth self._parser: configparser.RawConfigParser = self._make_parser() # Tracks resolved absolute paths to detect include cycles. self._read_paths: set[Path] = set() # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @staticmethod def _make_parser() -> configparser.RawConfigParser: """Return a case-sensitive :class:`configparser.RawConfigParser`.""" parser = configparser.RawConfigParser(interpolation=None, strict=False) # Keep original key casing (fail2ban is case-sensitive in option names). parser.optionxform = str # type: ignore[assignment] return parser def _get_include( self, include_dir: Path, tmp_parser: configparser.RawConfigParser, key: str, ) -> Path | None: """Return the resolved path for an include directive, or ``None``.""" if not tmp_parser.has_section("INCLUDES"): return None if not tmp_parser.has_option("INCLUDES", key): return None raw = tmp_parser.get("INCLUDES", key).strip() if not raw: return None return include_dir / raw # ------------------------------------------------------------------ # Public interface — reading files # ------------------------------------------------------------------ def read_file(self, path: Path, _depth: int = 0) -> None: """Read *path*, following ``[INCLUDES]`` ``before``/``after`` directives. ``before`` references are loaded before the current file (lower priority); ``after`` references are loaded after (higher priority). Circular includes are detected by tracking resolved absolute paths. Args: path: Config file to read. _depth: Current include nesting depth. Internal parameter. """ if _depth > self._max_include_depth: log.warning( "include_depth_exceeded", path=str(path), max_depth=self._max_include_depth, ) return resolved = path.resolve() if resolved in self._read_paths: log.debug("include_cycle_detected", path=str(path)) return try: content = path.read_text(encoding="utf-8") except OSError as exc: log.warning("config_read_error", path=str(path), error=str(exc)) return # Pre-scan for includes without yet committing to the main parser. tmp = self._make_parser() try: tmp.read_string(content) except configparser.Error as exc: log.warning("config_parse_error", path=str(path), error=str(exc)) return include_dir = path.parent before_path = self._get_include(include_dir, tmp, "before") after_path = self._get_include(include_dir, tmp, "after") # Load ``before`` first (lower priority than current file). if before_path is not None: self.read_file(before_path, _depth=_depth + 1) # Mark this path visited *before* merging to guard against cycles # introduced by the ``after`` include referencing the same file. self._read_paths.add(resolved) # Merge current file into the accumulating parser. try: self._parser.read_string(content, source=str(path)) except configparser.Error as exc: log.warning( "config_parse_string_error", path=str(path), error=str(exc) ) # Load ``after`` last (highest priority). if after_path is not None: self.read_file(after_path, _depth=_depth + 1) def read_with_overrides(self, conf_path: Path) -> None: """Read *conf_path* and its ``.local`` override if it exists. The ``.local`` file is read after the ``.conf`` file so its values take precedence. Include directives inside each file are still honoured. Args: conf_path: Path to the ``.conf`` file. The corresponding ``.local`` is derived by replacing the suffix with ``.local``. """ self.read_file(conf_path) local_path = conf_path.with_suffix(".local") if local_path.is_file(): self.read_file(local_path) # ------------------------------------------------------------------ # Public interface — querying parsed data # ------------------------------------------------------------------ def sections(self) -> list[str]: """Return all section names (excludes the ``[DEFAULT]`` pseudo-section). Returns: Sorted list of section names present in the parsed files. """ return list(self._parser.sections()) def has_section(self, section: str) -> bool: """Return whether *section* exists in the parsed configuration. Args: section: Section name to check. """ return self._parser.has_section(section) def get(self, section: str, key: str) -> str | None: """Return the raw value for *key* in *section*, or ``None``. Args: section: Section name. key: Option name. Returns: Raw option value string, or ``None`` if not present. """ if self._parser.has_section(section) and self._parser.has_option( section, key ): return self._parser.get(section, key) return None def section_dict( self, section: str, *, skip: frozenset[str] | None = None, ) -> dict[str, str]: """Return all key-value pairs from *section* as a plain :class:`dict`. Keys whose names start with ``__`` (configparser internals from ``DEFAULT`` inheritance) are always excluded. Args: section: Section name to read. skip: Additional key names to exclude. Returns: Mapping of option name → raw value. Empty dict if section absent. """ if not self._parser.has_section(section): return {} drop: frozenset[str] = skip or frozenset() return { k: v for k, v in self._parser.items(section) if not k.startswith("__") and k not in drop } def defaults(self) -> dict[str, str]: """Return all ``[DEFAULT]`` section key-value pairs. Returns: Dict of default keys and their values. """ return dict(self._parser.defaults()) # ------------------------------------------------------------------ # Public interface — interpolation and helpers # ------------------------------------------------------------------ def interpolate( self, value: str, extra_vars: dict[str, str] | None = None, ) -> str: """Resolve ``%(variable)s`` references in *value*. Variables are resolved in the following priority order (low → high): 1. ``[DEFAULT]`` section values. 2. ``[Init]`` section values (fail2ban action parameters). 3. *extra_vars* provided by the caller. Multiple passes are performed to handle nested references (up to :data:`_MAX_INTERPOLATION_PASSES` iterations). Unresolvable references are left unchanged. Args: value: Raw string possibly containing ``%(name)s`` placeholders. extra_vars: Optional caller-supplied variables (highest priority). Returns: String with ``%(name)s`` references substituted where possible. """ vars_: dict[str, str] = {} vars_.update(self.defaults()) vars_.update(self.section_dict("Init")) if extra_vars: vars_.update(extra_vars) def _sub(m: re.Match[str]) -> str: return vars_.get(m.group(1), m.group(0)) result = value for _ in range(_MAX_INTERPOLATION_PASSES): new = _INTERPOLATE_RE.sub(_sub, result) if new == result: break result = new return result @staticmethod def split_multiline(raw: str) -> list[str]: """Split a multi-line INI value into individual non-blank lines. Each line is stripped of surrounding whitespace. Lines that are empty or that start with ``#`` (comments) are discarded. Used for ``failregex``, ``ignoreregex``, ``action``, and ``logpath`` values which fail2ban allows to span multiple lines. Args: raw: Raw multi-line string from configparser. Returns: List of stripped, non-empty, non-comment strings. """ result: list[str] = [] for line in raw.splitlines(): stripped = line.strip() if stripped and not stripped.startswith("#"): result.append(stripped) return result # ------------------------------------------------------------------ # Class-level utility — file ordering # ------------------------------------------------------------------ @classmethod def ordered_conf_files(cls, config_dir: Path, base_name: str) -> list[Path]: """Return config files for *base_name* in fail2ban merge order. Merge order (ascending priority — later entries override earlier): 1. ``{config_dir}/{base_name}.conf`` 2. ``{config_dir}/{base_name}.local`` 3. ``{config_dir}/{base_name}.d/*.conf`` (sorted alphabetically) 4. ``{config_dir}/{base_name}.d/*.local`` (sorted alphabetically) Args: config_dir: Fail2ban configuration root directory. base_name: Config base name without extension (e.g. ``"jail"``). Returns: List of existing :class:`~pathlib.Path` objects in ascending priority order (only files that actually exist are included). """ files: list[Path] = [] conf = config_dir / f"{base_name}.conf" if conf.is_file(): files.append(conf) local = config_dir / f"{base_name}.local" if local.is_file(): files.append(local) d_dir = config_dir / f"{base_name}.d" if d_dir.is_dir(): files.extend(sorted(d_dir.glob("*.conf"))) files.extend(sorted(d_dir.glob("*.local"))) return files