"""Fail2ban INI-style configuration file parser and serializer. Provides structured parsing and serialization for ``filter.d/*.conf`` and ``action.d/*.conf`` files, mirroring fail2ban's own ``RawConfigParser``-based reading logic. Key design decisions: - Uses :class:`configparser.RawConfigParser` with ``interpolation=None`` so fail2ban-style ``%`` / ``<>`` tags are preserved verbatim. - Multi-line values (lines that begin with whitespace) are handled by configparser automatically; the raw string is then post-processed to split ``failregex``/``ignoreregex`` into individual patterns. - Section ordering in serialized output: ``[INCLUDES]`` → ``[DEFAULT]`` → ``[Definition]`` → ``[Init]``. Unknown extra sections from action files (e.g. ``[ipt_oneport]``) are intentionally discarded because the structured model does not capture them — users should edit those sections via the raw (Export) tab. """ from __future__ import annotations import configparser import contextlib import io from typing import TYPE_CHECKING import structlog if TYPE_CHECKING: from pathlib import Path from app.models.config import ( ActionConfig, ActionConfigUpdate, FilterConfig, FilterConfigUpdate, JailFileConfig, JailFileConfigUpdate, JailSectionConfig, ) log: structlog.stdlib.BoundLogger = structlog.get_logger() # --------------------------------------------------------------------------- # Constants — well-known Definition keys for action files # --------------------------------------------------------------------------- _ACTION_LIFECYCLE_KEYS: frozenset[str] = frozenset( { "actionstart", "actionstop", "actioncheck", "actionban", "actionunban", "actionflush", } ) # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _make_parser() -> configparser.RawConfigParser: """Create a :class:`configparser.RawConfigParser` configured for fail2ban. Returns: A parser with interpolation disabled, case-sensitive keys, and a ``DEFAULT`` section that does not inherit into other sections. """ parser = configparser.RawConfigParser( # Disable interpolation so fail2ban % / <> tags survive unchanged. interpolation=None, # Preserve original key casing (fail2ban keys are lowercase but some # custom config files may use mixed case). strict=False, ) # Keys are case-sensitive in fail2ban. parser.optionxform = str # type: ignore[assignment] return parser def _split_multiline_patterns(raw: str) -> list[str]: """Split a raw multi-line configparser value into individual patterns. Each non-blank, non-comment line becomes a separate entry. Args: raw: The raw multi-line string from configparser (may include blank lines and ``#`` comments). Returns: List of stripped non-empty, non-comment pattern strings. """ result: list[str] = [] for line in raw.splitlines(): stripped = line.strip() if stripped and not stripped.startswith("#"): result.append(stripped) return result def _get_opt(parser: configparser.RawConfigParser, section: str, key: str) -> str | None: """Return the value of *key* in *section*, or ``None`` if absent. Args: parser: Populated parser instance. section: Section name. key: Option name. Returns: Option value string, or ``None``. """ if parser.has_section(section) and parser.has_option(section, key): return parser.get(section, key) return None def _section_dict( parser: configparser.RawConfigParser, section: str, skip: frozenset[str] | None = None ) -> dict[str, str]: """Return all key-value pairs from *section* as a plain dict. Args: parser: Populated parser instance. section: Section name. skip: Optional set of keys to exclude. Returns: Dict of option → value for the section. """ if not parser.has_section(section): return {} drop = skip or frozenset() return { k: v for k, v in parser.items(section) if not k.startswith("__") and k not in drop # __ keys come from DEFAULT inheritance } # --------------------------------------------------------------------------- # Filter file parser / serializer # --------------------------------------------------------------------------- def parse_filter_file(content: str, name: str = "", filename: str = "") -> FilterConfig: """Parse a ``filter.d/*.conf`` file into a :class:`~app.models.config.FilterConfig`. Args: content: Raw file content (UTF-8 string). name: Filter base name (e.g. ``"sshd"``). Used only to populate the ``name`` field on the returned model. filename: Actual filename (e.g. ``"sshd.conf"``). Returns: Populated :class:`~app.models.config.FilterConfig`. """ parser = _make_parser() try: parser.read_string(content) except configparser.Error as exc: log.warning("filter_parse_error", name=name, error=str(exc)) # [INCLUDES] before = _get_opt(parser, "INCLUDES", "before") after = _get_opt(parser, "INCLUDES", "after") # [DEFAULT] — all keys that aren't hidden configparser internals # configparser stores DEFAULT keys accessible from every section; we # reconstruct them by reading DEFAULT directly. variables: dict[str, str] = {} if parser.defaults(): variables = dict(parser.defaults()) # [Definition] prefregex = _get_opt(parser, "Definition", "prefregex") raw_failregex = _get_opt(parser, "Definition", "failregex") or "" failregex = _split_multiline_patterns(raw_failregex) raw_ignoreregex = _get_opt(parser, "Definition", "ignoreregex") or "" ignoreregex = _split_multiline_patterns(raw_ignoreregex) maxlines_raw = _get_opt(parser, "Definition", "maxlines") maxlines: int | None = None if maxlines_raw is not None: with contextlib.suppress(ValueError): maxlines = int(maxlines_raw.strip()) datepattern = _get_opt(parser, "Definition", "datepattern") journalmatch = _get_opt(parser, "Definition", "journalmatch") log.debug("filter_parsed", name=name, failregex_count=len(failregex)) return FilterConfig( name=name, filename=filename, before=before, after=after, variables=variables, prefregex=prefregex, failregex=failregex, ignoreregex=ignoreregex, maxlines=maxlines, datepattern=datepattern, journalmatch=journalmatch, ) def serialize_filter_config(cfg: FilterConfig) -> str: """Serialize a :class:`~app.models.config.FilterConfig` to a ``.conf`` string. The output preserves the canonical fail2ban INI section ordering: ``[INCLUDES]`` → ``[DEFAULT]`` → ``[Definition]``. Args: cfg: The filter configuration to serialize. Returns: UTF-8 string suitable for writing to a ``.conf`` file. """ buf = io.StringIO() # [INCLUDES] if cfg.before is not None or cfg.after is not None: buf.write("[INCLUDES]\n\n") if cfg.before is not None: buf.write(f"before = {cfg.before}\n") if cfg.after is not None: buf.write(f"after = {cfg.after}\n") buf.write("\n") # [DEFAULT] if cfg.variables: buf.write("[DEFAULT]\n\n") for key, value in cfg.variables.items(): buf.write(f"{key} = {value}\n") buf.write("\n") # [Definition] buf.write("[Definition]\n\n") if cfg.prefregex is not None: buf.write(f"prefregex = {cfg.prefregex}\n\n") if cfg.failregex: buf.write("failregex = " + cfg.failregex[0] + "\n") for pattern in cfg.failregex[1:]: buf.write(f" {pattern}\n") buf.write("\n") if cfg.ignoreregex: buf.write("ignoreregex = " + cfg.ignoreregex[0] + "\n") for pattern in cfg.ignoreregex[1:]: buf.write(f" {pattern}\n") buf.write("\n") if cfg.maxlines is not None: buf.write(f"maxlines = {cfg.maxlines}\n\n") if cfg.datepattern is not None: buf.write(f"datepattern = {cfg.datepattern}\n\n") if cfg.journalmatch is not None: buf.write(f"journalmatch = {cfg.journalmatch}\n\n") return buf.getvalue() def merge_filter_update(cfg: FilterConfig, update: FilterConfigUpdate) -> FilterConfig: """Apply a partial :class:`~app.models.config.FilterConfigUpdate` onto *cfg*. Only fields that are explicitly set (not ``None``) in *update* are written. Returns a new :class:`~app.models.config.FilterConfig` with the merged values; the original is not mutated. Args: cfg: Current filter configuration. update: Partial update to apply. Returns: Updated :class:`~app.models.config.FilterConfig`. """ return FilterConfig( name=cfg.name, filename=cfg.filename, before=update.before if update.before is not None else cfg.before, after=update.after if update.after is not None else cfg.after, variables=update.variables if update.variables is not None else cfg.variables, prefregex=update.prefregex if update.prefregex is not None else cfg.prefregex, failregex=update.failregex if update.failregex is not None else cfg.failregex, ignoreregex=update.ignoreregex if update.ignoreregex is not None else cfg.ignoreregex, maxlines=update.maxlines if update.maxlines is not None else cfg.maxlines, datepattern=update.datepattern if update.datepattern is not None else cfg.datepattern, journalmatch=update.journalmatch if update.journalmatch is not None else cfg.journalmatch, ) # --------------------------------------------------------------------------- # Action file parser / serializer # --------------------------------------------------------------------------- def parse_action_file(content: str, name: str = "", filename: str = "") -> ActionConfig: """Parse an ``action.d/*.conf`` file into a :class:`~app.models.config.ActionConfig`. Args: content: Raw file content (UTF-8 string). name: Action base name (e.g. ``"iptables"``). filename: Actual filename (e.g. ``"iptables.conf"``). Returns: Populated :class:`~app.models.config.ActionConfig`. """ parser = _make_parser() try: parser.read_string(content) except configparser.Error as exc: log.warning("action_parse_error", name=name, error=str(exc)) # [INCLUDES] before = _get_opt(parser, "INCLUDES", "before") after = _get_opt(parser, "INCLUDES", "after") # [Definition] — extract well-known lifecycle keys, rest goes to definition_vars def_lifecycle: dict[str, str | None] = dict.fromkeys(_ACTION_LIFECYCLE_KEYS) definition_vars: dict[str, str] = {} if parser.has_section("Definition"): for key, value in parser.items("Definition"): if key in _ACTION_LIFECYCLE_KEYS: def_lifecycle[key] = value else: definition_vars[key] = value # [Init] — all keys go into init_vars (multiple [Init?...] sections are ignored) init_vars: dict[str, str] = {} if parser.has_section("Init"): for key, value in parser.items("Init"): init_vars[key] = value log.debug("action_parsed", name=name, init_vars_count=len(init_vars)) return ActionConfig( name=name, filename=filename, before=before, after=after, actionstart=def_lifecycle.get("actionstart"), actionstop=def_lifecycle.get("actionstop"), actioncheck=def_lifecycle.get("actioncheck"), actionban=def_lifecycle.get("actionban"), actionunban=def_lifecycle.get("actionunban"), actionflush=def_lifecycle.get("actionflush"), definition_vars=definition_vars, init_vars=init_vars, ) def serialize_action_config(cfg: ActionConfig) -> str: """Serialize an :class:`~app.models.config.ActionConfig` to a ``.conf`` string. Section order: ``[INCLUDES]`` → ``[Definition]`` → ``[Init]``. Args: cfg: The action configuration to serialize. Returns: UTF-8 string suitable for writing to a ``.conf`` file. """ buf = io.StringIO() # [INCLUDES] if cfg.before is not None or cfg.after is not None: buf.write("[INCLUDES]\n\n") if cfg.before is not None: buf.write(f"before = {cfg.before}\n") if cfg.after is not None: buf.write(f"after = {cfg.after}\n") buf.write("\n") # [Definition] buf.write("[Definition]\n\n") # Lifecycle commands first (in canonical order) _lifecycle_order = ( "actionstart", "actionstop", "actioncheck", "actionban", "actionunban", "actionflush", ) for key in _lifecycle_order: value = getattr(cfg, key) if value is not None: lines = value.splitlines() if lines: buf.write(f"{key} = {lines[0]}\n") for extra in lines[1:]: buf.write(f" {extra}\n") buf.write("\n") # Extra definition variables for key, value in cfg.definition_vars.items(): lines = value.splitlines() if lines: buf.write(f"{key} = {lines[0]}\n") for extra in lines[1:]: buf.write(f" {extra}\n") if cfg.definition_vars: buf.write("\n") # [Init] if cfg.init_vars: buf.write("[Init]\n\n") for key, value in cfg.init_vars.items(): buf.write(f"{key} = {value}\n") buf.write("\n") return buf.getvalue() def merge_action_update(cfg: ActionConfig, update: ActionConfigUpdate) -> ActionConfig: """Apply a partial :class:`~app.models.config.ActionConfigUpdate` onto *cfg*. Args: cfg: Current action configuration. update: Partial update to apply. Returns: Updated :class:`~app.models.config.ActionConfig`. """ return ActionConfig( name=cfg.name, filename=cfg.filename, before=update.before if update.before is not None else cfg.before, after=update.after if update.after is not None else cfg.after, actionstart=update.actionstart if update.actionstart is not None else cfg.actionstart, actionstop=update.actionstop if update.actionstop is not None else cfg.actionstop, actioncheck=update.actioncheck if update.actioncheck is not None else cfg.actioncheck, actionban=update.actionban if update.actionban is not None else cfg.actionban, actionunban=update.actionunban if update.actionunban is not None else cfg.actionunban, actionflush=update.actionflush if update.actionflush is not None else cfg.actionflush, definition_vars=update.definition_vars if update.definition_vars is not None else cfg.definition_vars, init_vars=update.init_vars if update.init_vars is not None else cfg.init_vars, ) # --------------------------------------------------------------------------- # Convenience helpers for reading/writing files # --------------------------------------------------------------------------- def read_and_parse_filter(path: Path) -> FilterConfig: """Read *path* and return a parsed :class:`~app.models.config.FilterConfig`. Args: path: Absolute path to the filter file. Returns: Parsed filter config. """ content = path.read_text(encoding="utf-8") return parse_filter_file(content, name=path.stem, filename=path.name) def read_and_parse_action(path: Path) -> ActionConfig: """Read *path* and return a parsed :class:`~app.models.config.ActionConfig`. Args: path: Absolute path to the action file. Returns: Parsed action config. """ content = path.read_text(encoding="utf-8") return parse_action_file(content, name=path.stem, filename=path.name) # --------------------------------------------------------------------------- # Jail file parser / serializer (Task 6.1) # --------------------------------------------------------------------------- # Keys handled by named fields in JailSectionConfig. _JAIL_NAMED_KEYS: frozenset[str] = frozenset( { "enabled", "port", "filter", "logpath", "maxretry", "findtime", "bantime", "action", "backend", } ) def _parse_bool(value: str) -> bool | None: """Parse a fail2ban boolean string. Args: value: Raw string value from config (e.g. "true", "false", "yes", "no", "1", "0"). Returns: Boolean, or ``None`` if the value is not a recognised boolean token. """ lower = value.strip().lower() if lower in {"true", "yes", "1"}: return True if lower in {"false", "no", "0"}: return False return None def _parse_int(value: str) -> int | None: """Parse an integer string, returning ``None`` on failure. Args: value: Raw string value from config. Returns: Integer, or ``None``. """ with contextlib.suppress(ValueError): return int(value.strip()) return None def _parse_multiline_list(raw: str) -> list[str]: """Split a multi-line configparser value into a list of non-blank lines. Args: raw: Raw multi-line string from configparser. Returns: List of stripped, non-empty, non-comment strings. """ result: list[str] = [] for line in raw.splitlines(): stripped = line.strip() if stripped and not stripped.startswith("#"): result.append(stripped) return result def parse_jail_file(content: str, filename: str = "") -> JailFileConfig: """Parse a ``jail.d/*.conf`` file into a :class:`~app.models.config.JailFileConfig`. Each INI section in the file maps to a jail. The ``[DEFAULT]`` section (if present) is silently ignored — fail2ban merges it with jail sections, but the structured model represents per-jail settings only. Args: content: Raw file content (UTF-8 string). filename: Filename (e.g. ``"sshd.conf"``). Returns: Populated :class:`~app.models.config.JailFileConfig`. """ parser = _make_parser() try: parser.read_string(content) except configparser.Error as exc: log.warning("jail_file_parse_error", filename=filename, error=str(exc)) jails: dict[str, JailSectionConfig] = {} for section in parser.sections(): # Skip meta-sections used by fail2ban include system. if section in {"INCLUDES", "DEFAULT"}: continue items = dict(parser.items(section)) enabled_raw = items.get("enabled") enabled = _parse_bool(enabled_raw) if enabled_raw is not None else None port = items.get("port") filter_name = items.get("filter") backend = items.get("backend") logpath_raw = items.get("logpath", "") logpath = _parse_multiline_list(logpath_raw) action_raw = items.get("action", "") action = _parse_multiline_list(action_raw) maxretry = _parse_int(items.get("maxretry", "")) if "maxretry" in items else None findtime = _parse_int(items.get("findtime", "")) if "findtime" in items else None bantime = _parse_int(items.get("bantime", "")) if "bantime" in items else None extra: dict[str, str] = { k: v for k, v in items.items() if k not in _JAIL_NAMED_KEYS } jails[section] = JailSectionConfig( enabled=enabled, port=port, filter=filter_name, logpath=logpath, maxretry=maxretry, findtime=findtime, bantime=bantime, action=action, backend=backend, extra=extra, ) log.debug("jail_file_parsed", filename=filename, jail_count=len(jails)) return JailFileConfig(filename=filename, jails=jails) def serialize_jail_file_config(cfg: JailFileConfig) -> str: """Serialize a :class:`~app.models.config.JailFileConfig` to a fail2ban INI string. Args: cfg: Structured jail file configuration. Returns: UTF-8 file content suitable for writing to a ``jail.d/*.conf`` file. """ buf = io.StringIO() buf.write(f"# Generated by BanGUI — {cfg.filename}\n") for jail_name, jail in cfg.jails.items(): buf.write(f"\n[{jail_name}]\n\n") if jail.enabled is not None: buf.write(f"enabled = {'true' if jail.enabled else 'false'}\n") if jail.port is not None: buf.write(f"port = {jail.port}\n") if jail.filter is not None: buf.write(f"filter = {jail.filter}\n") if jail.backend is not None: buf.write(f"backend = {jail.backend}\n") if jail.maxretry is not None: buf.write(f"maxretry = {jail.maxretry}\n") if jail.findtime is not None: buf.write(f"findtime = {jail.findtime}\n") if jail.bantime is not None: buf.write(f"bantime = {jail.bantime}\n") if jail.logpath: first, *rest = jail.logpath buf.write(f"logpath = {first}\n") for path in rest: buf.write(f" {path}\n") if jail.action: first_action, *rest_actions = jail.action buf.write(f"action = {first_action}\n") for a in rest_actions: buf.write(f" {a}\n") for key, value in jail.extra.items(): buf.write(f"{key} = {value}\n") return buf.getvalue() def merge_jail_file_update(cfg: JailFileConfig, update: JailFileConfigUpdate) -> JailFileConfig: """Apply a partial :class:`~app.models.config.JailFileConfigUpdate` onto *cfg*. Only jails present in ``update.jails`` are replaced; other jails are left unchanged. Args: cfg: Current jail file configuration. update: Partial update to apply. Returns: Updated :class:`~app.models.config.JailFileConfig`. """ if update.jails is None: return cfg merged = dict(cfg.jails) merged.update(update.jails) return JailFileConfig(filename=cfg.filename, jails=merged) def read_and_parse_jail_file(path: Path) -> JailFileConfig: """Read *path* and return a parsed :class:`~app.models.config.JailFileConfig`. Args: path: Absolute path to the jail config file. Returns: Parsed jail file config. """ content = path.read_text(encoding="utf-8") return parse_jail_file(content, filename=path.name)