From 63b48849a7f94fef5b3d03ee84df89e2aae5e9f9 Mon Sep 17 00:00:00 2001 From: Lukas Date: Fri, 13 Mar 2026 13:47:09 +0100 Subject: [PATCH] feat(backend): add conf-file parser and extend config models - Add conffile_parser.py: reads, writes and manipulates fail2ban .conf files while preserving comments and section structure - Extend config models with ActionConfig, FilterConfig, ConfFileContent, and related Pydantic schemas for jails, actions, and filters --- backend/app/models/config.py | 180 ++++++ backend/app/services/conffile_parser.py | 695 ++++++++++++++++++++++++ 2 files changed, 875 insertions(+) create mode 100644 backend/app/services/conffile_parser.py diff --git a/backend/app/models/config.py b/backend/app/models/config.py index 2a1ff29..f817cdf 100644 --- a/backend/app/models/config.py +++ b/backend/app/models/config.py @@ -267,3 +267,183 @@ class MapColorThresholdsUpdate(BaseModel): ..., gt=0, description="Ban count for yellow." ) threshold_low: int = Field(..., gt=0, description="Ban count for green.") + + +# --------------------------------------------------------------------------- +# Parsed filter file models +# --------------------------------------------------------------------------- + + +class FilterConfig(BaseModel): + """Structured representation of a ``filter.d/*.conf`` file.""" + + model_config = ConfigDict(strict=True) + + name: str = Field(..., description="Filter base name, e.g. ``sshd``.") + filename: str = Field(..., description="Actual filename, e.g. ``sshd.conf``.") + # [INCLUDES] + before: str | None = Field(default=None, description="Included file read before this one.") + after: str | None = Field(default=None, description="Included file read after this one.") + # [DEFAULT] — free-form key=value pairs + variables: dict[str, str] = Field( + default_factory=dict, + description="Free-form ``[DEFAULT]`` section variables.", + ) + # [Definition] + prefregex: str | None = Field( + default=None, + description="Prefix regex prepended to every failregex.", + ) + failregex: list[str] = Field( + default_factory=list, + description="Failure detection regex patterns (one per list entry).", + ) + ignoreregex: list[str] = Field( + default_factory=list, + description="Regex patterns that bypass ban logic.", + ) + maxlines: int | None = Field( + default=None, + description="Maximum number of log lines accumulated for a single match attempt.", + ) + datepattern: str | None = Field( + default=None, + description="Custom date-parsing pattern, or ``None`` for auto-detect.", + ) + journalmatch: str | None = Field( + default=None, + description="Systemd journal match expression.", + ) + + +class FilterConfigUpdate(BaseModel): + """Partial update payload for a parsed filter file. + + Only explicitly set (non-``None``) fields are written back. + """ + + model_config = ConfigDict(strict=True) + + before: str | None = Field(default=None) + after: str | None = Field(default=None) + variables: dict[str, str] | None = Field(default=None) + prefregex: str | None = Field(default=None) + failregex: list[str] | None = Field(default=None) + ignoreregex: list[str] | None = Field(default=None) + maxlines: int | None = Field(default=None) + datepattern: str | None = Field(default=None) + journalmatch: str | None = Field(default=None) + + +# --------------------------------------------------------------------------- +# Parsed action file models +# --------------------------------------------------------------------------- + + +class ActionConfig(BaseModel): + """Structured representation of an ``action.d/*.conf`` file.""" + + model_config = ConfigDict(strict=True) + + name: str = Field(..., description="Action base name, e.g. ``iptables``.") + filename: str = Field(..., description="Actual filename, e.g. ``iptables.conf``.") + # [INCLUDES] + before: str | None = Field(default=None) + after: str | None = Field(default=None) + # [Definition] — well-known lifecycle commands + actionstart: str | None = Field( + default=None, + description="Executed at jail start or first ban.", + ) + actionstop: str | None = Field( + default=None, + description="Executed at jail stop.", + ) + actioncheck: str | None = Field( + default=None, + description="Executed before each ban.", + ) + actionban: str | None = Field( + default=None, + description="Executed to ban an IP. Tags: ````, ````, ````.", + ) + actionunban: str | None = Field( + default=None, + description="Executed to unban an IP.", + ) + actionflush: str | None = Field( + default=None, + description="Executed to flush all bans on shutdown.", + ) + # [Definition] — extra variables not covered by the well-known keys + definition_vars: dict[str, str] = Field( + default_factory=dict, + description="Additional ``[Definition]`` variables.", + ) + # [Init] — runtime-configurable parameters + init_vars: dict[str, str] = Field( + default_factory=dict, + description="Runtime parameters that can be overridden per jail.", + ) + + +class ActionConfigUpdate(BaseModel): + """Partial update payload for a parsed action file.""" + + model_config = ConfigDict(strict=True) + + before: str | None = Field(default=None) + after: str | None = Field(default=None) + actionstart: str | None = Field(default=None) + actionstop: str | None = Field(default=None) + actioncheck: str | None = Field(default=None) + actionban: str | None = Field(default=None) + actionunban: str | None = Field(default=None) + actionflush: str | None = Field(default=None) + definition_vars: dict[str, str] | None = Field(default=None) + init_vars: dict[str, str] | None = Field(default=None) + + +# --------------------------------------------------------------------------- +# Jail file config models (Task 6.1) +# --------------------------------------------------------------------------- + + +class JailSectionConfig(BaseModel): + """Settings within a single [jailname] section of a jail.d file.""" + + model_config = ConfigDict(strict=True) + + enabled: bool | None = Field(default=None, description="Whether this jail is enabled.") + port: str | None = Field(default=None, description="Port(s) to monitor (e.g. 'ssh' or '22,2222').") + filter: str | None = Field(default=None, description="Filter name to use (e.g. 'sshd').") + logpath: list[str] = Field(default_factory=list, description="Log file paths to monitor.") + maxretry: int | None = Field(default=None, ge=1, description="Failures before banning.") + findtime: int | None = Field(default=None, ge=1, description="Time window in seconds for counting failures.") + bantime: int | None = Field(default=None, description="Ban duration in seconds. -1 for permanent.") + action: list[str] = Field(default_factory=list, description="Action references.") + backend: str | None = Field(default=None, description="Log monitoring backend.") + extra: dict[str, str] = Field(default_factory=dict, description="Additional settings not captured by named fields.") + + +class JailFileConfig(BaseModel): + """Structured representation of a jail.d/*.conf file.""" + + model_config = ConfigDict(strict=True) + + filename: str = Field(..., description="Filename including extension (e.g. 'sshd.conf').") + jails: dict[str, JailSectionConfig] = Field( + default_factory=dict, + description="Mapping of jail name → settings for each [section] in the file.", + ) + + +class JailFileConfigUpdate(BaseModel): + """Partial update payload for a jail.d file.""" + + model_config = ConfigDict(strict=True) + + jails: dict[str, JailSectionConfig] | None = Field( + default=None, + description="Jail section updates. Only jails present in this dict are updated.", + ) diff --git a/backend/app/services/conffile_parser.py b/backend/app/services/conffile_parser.py new file mode 100644 index 0000000..2dbc9eb --- /dev/null +++ b/backend/app/services/conffile_parser.py @@ -0,0 +1,695 @@ +"""Fail2ban INI-style configuration file parser and serializer. + +Provides structured parsing and serialization for ``filter.d/*.conf`` and +``action.d/*.conf`` files, mirroring fail2ban's own ``RawConfigParser``-based +reading logic. + +Key design decisions: +- Uses :class:`configparser.RawConfigParser` with ``interpolation=None`` so + fail2ban-style ``%`` / ``<>`` tags are preserved verbatim. +- Multi-line values (lines that begin with whitespace) are handled by + configparser automatically; the raw string is then post-processed to split + ``failregex``/``ignoreregex`` into individual patterns. +- Section ordering in serialized output: ``[INCLUDES]`` → ``[DEFAULT]`` → + ``[Definition]`` → ``[Init]``. Unknown extra sections from action files + (e.g. ``[ipt_oneport]``) are intentionally discarded because the structured + model does not capture them — users should edit those sections via the raw + (Export) tab. +""" + +from __future__ import annotations + +import configparser +import contextlib +import io +from typing import TYPE_CHECKING + +import structlog + +if TYPE_CHECKING: + from pathlib import Path + +from app.models.config import ( + ActionConfig, + ActionConfigUpdate, + FilterConfig, + FilterConfigUpdate, + JailFileConfig, + JailFileConfigUpdate, + JailSectionConfig, +) + +log: structlog.stdlib.BoundLogger = structlog.get_logger() + +# --------------------------------------------------------------------------- +# Constants — well-known Definition keys for action files +# --------------------------------------------------------------------------- + +_ACTION_LIFECYCLE_KEYS: frozenset[str] = frozenset( + { + "actionstart", + "actionstop", + "actioncheck", + "actionban", + "actionunban", + "actionflush", + } +) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _make_parser() -> configparser.RawConfigParser: + """Create a :class:`configparser.RawConfigParser` configured for fail2ban. + + Returns: + A parser with interpolation disabled, case-sensitive keys, and a + ``DEFAULT`` section that does not inherit into other sections. + """ + parser = configparser.RawConfigParser( + # Disable interpolation so fail2ban % / <> tags survive unchanged. + interpolation=None, + # Preserve original key casing (fail2ban keys are lowercase but some + # custom config files may use mixed case). + strict=False, + ) + # Keys are case-sensitive in fail2ban. + parser.optionxform = str # type: ignore[assignment] + return parser + + +def _split_multiline_patterns(raw: str) -> list[str]: + """Split a raw multi-line configparser value into individual patterns. + + Each non-blank, non-comment line becomes a separate entry. + + Args: + raw: The raw multi-line string from configparser (may include blank + lines and ``#`` comments). + + Returns: + List of stripped non-empty, non-comment pattern strings. + """ + result: list[str] = [] + for line in raw.splitlines(): + stripped = line.strip() + if stripped and not stripped.startswith("#"): + result.append(stripped) + return result + + +def _get_opt(parser: configparser.RawConfigParser, section: str, key: str) -> str | None: + """Return the value of *key* in *section*, or ``None`` if absent. + + Args: + parser: Populated parser instance. + section: Section name. + key: Option name. + + Returns: + Option value string, or ``None``. + """ + if parser.has_section(section) and parser.has_option(section, key): + return parser.get(section, key) + return None + + +def _section_dict( + parser: configparser.RawConfigParser, section: str, skip: frozenset[str] | None = None +) -> dict[str, str]: + """Return all key-value pairs from *section* as a plain dict. + + Args: + parser: Populated parser instance. + section: Section name. + skip: Optional set of keys to exclude. + + Returns: + Dict of option → value for the section. + """ + if not parser.has_section(section): + return {} + drop = skip or frozenset() + return { + k: v + for k, v in parser.items(section) + if not k.startswith("__") and k not in drop # __ keys come from DEFAULT inheritance + } + + +# --------------------------------------------------------------------------- +# Filter file parser / serializer +# --------------------------------------------------------------------------- + + +def parse_filter_file(content: str, name: str = "", filename: str = "") -> FilterConfig: + """Parse a ``filter.d/*.conf`` file into a :class:`~app.models.config.FilterConfig`. + + Args: + content: Raw file content (UTF-8 string). + name: Filter base name (e.g. ``"sshd"``). Used only to populate the + ``name`` field on the returned model. + filename: Actual filename (e.g. ``"sshd.conf"``). + + Returns: + Populated :class:`~app.models.config.FilterConfig`. + """ + parser = _make_parser() + try: + parser.read_string(content) + except configparser.Error as exc: + log.warning("filter_parse_error", name=name, error=str(exc)) + + # [INCLUDES] + before = _get_opt(parser, "INCLUDES", "before") + after = _get_opt(parser, "INCLUDES", "after") + + # [DEFAULT] — all keys that aren't hidden configparser internals + # configparser stores DEFAULT keys accessible from every section; we + # reconstruct them by reading DEFAULT directly. + variables: dict[str, str] = {} + if parser.defaults(): + variables = dict(parser.defaults()) + + # [Definition] + prefregex = _get_opt(parser, "Definition", "prefregex") + + raw_failregex = _get_opt(parser, "Definition", "failregex") or "" + failregex = _split_multiline_patterns(raw_failregex) + + raw_ignoreregex = _get_opt(parser, "Definition", "ignoreregex") or "" + ignoreregex = _split_multiline_patterns(raw_ignoreregex) + + maxlines_raw = _get_opt(parser, "Definition", "maxlines") + maxlines: int | None = None + if maxlines_raw is not None: + with contextlib.suppress(ValueError): + maxlines = int(maxlines_raw.strip()) + + datepattern = _get_opt(parser, "Definition", "datepattern") + journalmatch = _get_opt(parser, "Definition", "journalmatch") + + log.debug("filter_parsed", name=name, failregex_count=len(failregex)) + return FilterConfig( + name=name, + filename=filename, + before=before, + after=after, + variables=variables, + prefregex=prefregex, + failregex=failregex, + ignoreregex=ignoreregex, + maxlines=maxlines, + datepattern=datepattern, + journalmatch=journalmatch, + ) + + +def serialize_filter_config(cfg: FilterConfig) -> str: + """Serialize a :class:`~app.models.config.FilterConfig` to a ``.conf`` string. + + The output preserves the canonical fail2ban INI section ordering: + ``[INCLUDES]`` → ``[DEFAULT]`` → ``[Definition]``. + + Args: + cfg: The filter configuration to serialize. + + Returns: + UTF-8 string suitable for writing to a ``.conf`` file. + """ + buf = io.StringIO() + + # [INCLUDES] + if cfg.before is not None or cfg.after is not None: + buf.write("[INCLUDES]\n\n") + if cfg.before is not None: + buf.write(f"before = {cfg.before}\n") + if cfg.after is not None: + buf.write(f"after = {cfg.after}\n") + buf.write("\n") + + # [DEFAULT] + if cfg.variables: + buf.write("[DEFAULT]\n\n") + for key, value in cfg.variables.items(): + buf.write(f"{key} = {value}\n") + buf.write("\n") + + # [Definition] + buf.write("[Definition]\n\n") + + if cfg.prefregex is not None: + buf.write(f"prefregex = {cfg.prefregex}\n\n") + + if cfg.failregex: + buf.write("failregex = " + cfg.failregex[0] + "\n") + for pattern in cfg.failregex[1:]: + buf.write(f" {pattern}\n") + buf.write("\n") + + if cfg.ignoreregex: + buf.write("ignoreregex = " + cfg.ignoreregex[0] + "\n") + for pattern in cfg.ignoreregex[1:]: + buf.write(f" {pattern}\n") + buf.write("\n") + + if cfg.maxlines is not None: + buf.write(f"maxlines = {cfg.maxlines}\n\n") + + if cfg.datepattern is not None: + buf.write(f"datepattern = {cfg.datepattern}\n\n") + + if cfg.journalmatch is not None: + buf.write(f"journalmatch = {cfg.journalmatch}\n\n") + + return buf.getvalue() + + +def merge_filter_update(cfg: FilterConfig, update: FilterConfigUpdate) -> FilterConfig: + """Apply a partial :class:`~app.models.config.FilterConfigUpdate` onto *cfg*. + + Only fields that are explicitly set (not ``None``) in *update* are written. + Returns a new :class:`~app.models.config.FilterConfig` with the merged + values; the original is not mutated. + + Args: + cfg: Current filter configuration. + update: Partial update to apply. + + Returns: + Updated :class:`~app.models.config.FilterConfig`. + """ + return FilterConfig( + name=cfg.name, + filename=cfg.filename, + before=update.before if update.before is not None else cfg.before, + after=update.after if update.after is not None else cfg.after, + variables=update.variables if update.variables is not None else cfg.variables, + prefregex=update.prefregex if update.prefregex is not None else cfg.prefregex, + failregex=update.failregex if update.failregex is not None else cfg.failregex, + ignoreregex=update.ignoreregex if update.ignoreregex is not None else cfg.ignoreregex, + maxlines=update.maxlines if update.maxlines is not None else cfg.maxlines, + datepattern=update.datepattern if update.datepattern is not None else cfg.datepattern, + journalmatch=update.journalmatch if update.journalmatch is not None else cfg.journalmatch, + ) + + +# --------------------------------------------------------------------------- +# Action file parser / serializer +# --------------------------------------------------------------------------- + + +def parse_action_file(content: str, name: str = "", filename: str = "") -> ActionConfig: + """Parse an ``action.d/*.conf`` file into a :class:`~app.models.config.ActionConfig`. + + Args: + content: Raw file content (UTF-8 string). + name: Action base name (e.g. ``"iptables"``). + filename: Actual filename (e.g. ``"iptables.conf"``). + + Returns: + Populated :class:`~app.models.config.ActionConfig`. + """ + parser = _make_parser() + try: + parser.read_string(content) + except configparser.Error as exc: + log.warning("action_parse_error", name=name, error=str(exc)) + + # [INCLUDES] + before = _get_opt(parser, "INCLUDES", "before") + after = _get_opt(parser, "INCLUDES", "after") + + # [Definition] — extract well-known lifecycle keys, rest goes to definition_vars + def_lifecycle: dict[str, str | None] = dict.fromkeys(_ACTION_LIFECYCLE_KEYS) + definition_vars: dict[str, str] = {} + + if parser.has_section("Definition"): + for key, value in parser.items("Definition"): + if key in _ACTION_LIFECYCLE_KEYS: + def_lifecycle[key] = value + else: + definition_vars[key] = value + + # [Init] — all keys go into init_vars (multiple [Init?...] sections are ignored) + init_vars: dict[str, str] = {} + if parser.has_section("Init"): + for key, value in parser.items("Init"): + init_vars[key] = value + + log.debug("action_parsed", name=name, init_vars_count=len(init_vars)) + return ActionConfig( + name=name, + filename=filename, + before=before, + after=after, + actionstart=def_lifecycle.get("actionstart"), + actionstop=def_lifecycle.get("actionstop"), + actioncheck=def_lifecycle.get("actioncheck"), + actionban=def_lifecycle.get("actionban"), + actionunban=def_lifecycle.get("actionunban"), + actionflush=def_lifecycle.get("actionflush"), + definition_vars=definition_vars, + init_vars=init_vars, + ) + + +def serialize_action_config(cfg: ActionConfig) -> str: + """Serialize an :class:`~app.models.config.ActionConfig` to a ``.conf`` string. + + Section order: ``[INCLUDES]`` → ``[Definition]`` → ``[Init]``. + + Args: + cfg: The action configuration to serialize. + + Returns: + UTF-8 string suitable for writing to a ``.conf`` file. + """ + buf = io.StringIO() + + # [INCLUDES] + if cfg.before is not None or cfg.after is not None: + buf.write("[INCLUDES]\n\n") + if cfg.before is not None: + buf.write(f"before = {cfg.before}\n") + if cfg.after is not None: + buf.write(f"after = {cfg.after}\n") + buf.write("\n") + + # [Definition] + buf.write("[Definition]\n\n") + + # Lifecycle commands first (in canonical order) + _lifecycle_order = ( + "actionstart", + "actionstop", + "actioncheck", + "actionban", + "actionunban", + "actionflush", + ) + for key in _lifecycle_order: + value = getattr(cfg, key) + if value is not None: + lines = value.splitlines() + if lines: + buf.write(f"{key} = {lines[0]}\n") + for extra in lines[1:]: + buf.write(f" {extra}\n") + buf.write("\n") + + # Extra definition variables + for key, value in cfg.definition_vars.items(): + lines = value.splitlines() + if lines: + buf.write(f"{key} = {lines[0]}\n") + for extra in lines[1:]: + buf.write(f" {extra}\n") + if cfg.definition_vars: + buf.write("\n") + + # [Init] + if cfg.init_vars: + buf.write("[Init]\n\n") + for key, value in cfg.init_vars.items(): + buf.write(f"{key} = {value}\n") + buf.write("\n") + + return buf.getvalue() + + +def merge_action_update(cfg: ActionConfig, update: ActionConfigUpdate) -> ActionConfig: + """Apply a partial :class:`~app.models.config.ActionConfigUpdate` onto *cfg*. + + Args: + cfg: Current action configuration. + update: Partial update to apply. + + Returns: + Updated :class:`~app.models.config.ActionConfig`. + """ + return ActionConfig( + name=cfg.name, + filename=cfg.filename, + before=update.before if update.before is not None else cfg.before, + after=update.after if update.after is not None else cfg.after, + actionstart=update.actionstart if update.actionstart is not None else cfg.actionstart, + actionstop=update.actionstop if update.actionstop is not None else cfg.actionstop, + actioncheck=update.actioncheck if update.actioncheck is not None else cfg.actioncheck, + actionban=update.actionban if update.actionban is not None else cfg.actionban, + actionunban=update.actionunban if update.actionunban is not None else cfg.actionunban, + actionflush=update.actionflush if update.actionflush is not None else cfg.actionflush, + definition_vars=update.definition_vars + if update.definition_vars is not None + else cfg.definition_vars, + init_vars=update.init_vars if update.init_vars is not None else cfg.init_vars, + ) + + +# --------------------------------------------------------------------------- +# Convenience helpers for reading/writing files +# --------------------------------------------------------------------------- + + +def read_and_parse_filter(path: Path) -> FilterConfig: + """Read *path* and return a parsed :class:`~app.models.config.FilterConfig`. + + Args: + path: Absolute path to the filter file. + + Returns: + Parsed filter config. + """ + content = path.read_text(encoding="utf-8") + return parse_filter_file(content, name=path.stem, filename=path.name) + + +def read_and_parse_action(path: Path) -> ActionConfig: + """Read *path* and return a parsed :class:`~app.models.config.ActionConfig`. + + Args: + path: Absolute path to the action file. + + Returns: + Parsed action config. + """ + content = path.read_text(encoding="utf-8") + return parse_action_file(content, name=path.stem, filename=path.name) + + +# --------------------------------------------------------------------------- +# Jail file parser / serializer (Task 6.1) +# --------------------------------------------------------------------------- + +# Keys handled by named fields in JailSectionConfig. +_JAIL_NAMED_KEYS: frozenset[str] = frozenset( + { + "enabled", + "port", + "filter", + "logpath", + "maxretry", + "findtime", + "bantime", + "action", + "backend", + } +) + + +def _parse_bool(value: str) -> bool | None: + """Parse a fail2ban boolean string. + + Args: + value: Raw string value from config (e.g. "true", "false", "yes", "no", "1", "0"). + + Returns: + Boolean, or ``None`` if the value is not a recognised boolean token. + """ + lower = value.strip().lower() + if lower in {"true", "yes", "1"}: + return True + if lower in {"false", "no", "0"}: + return False + return None + + +def _parse_int(value: str) -> int | None: + """Parse an integer string, returning ``None`` on failure. + + Args: + value: Raw string value from config. + + Returns: + Integer, or ``None``. + """ + with contextlib.suppress(ValueError): + return int(value.strip()) + return None + + +def _parse_multiline_list(raw: str) -> list[str]: + """Split a multi-line configparser value into a list of non-blank lines. + + Args: + raw: Raw multi-line string from configparser. + + Returns: + List of stripped, non-empty, non-comment strings. + """ + result: list[str] = [] + for line in raw.splitlines(): + stripped = line.strip() + if stripped and not stripped.startswith("#"): + result.append(stripped) + return result + + +def parse_jail_file(content: str, filename: str = "") -> JailFileConfig: + """Parse a ``jail.d/*.conf`` file into a :class:`~app.models.config.JailFileConfig`. + + Each INI section in the file maps to a jail. The ``[DEFAULT]`` section (if + present) is silently ignored — fail2ban merges it with jail sections, but + the structured model represents per-jail settings only. + + Args: + content: Raw file content (UTF-8 string). + filename: Filename (e.g. ``"sshd.conf"``). + + Returns: + Populated :class:`~app.models.config.JailFileConfig`. + """ + parser = _make_parser() + try: + parser.read_string(content) + except configparser.Error as exc: + log.warning("jail_file_parse_error", filename=filename, error=str(exc)) + + jails: dict[str, JailSectionConfig] = {} + for section in parser.sections(): + # Skip meta-sections used by fail2ban include system. + if section in {"INCLUDES", "DEFAULT"}: + continue + + items = dict(parser.items(section)) + + enabled_raw = items.get("enabled") + enabled = _parse_bool(enabled_raw) if enabled_raw is not None else None + + port = items.get("port") + filter_name = items.get("filter") + backend = items.get("backend") + + logpath_raw = items.get("logpath", "") + logpath = _parse_multiline_list(logpath_raw) + + action_raw = items.get("action", "") + action = _parse_multiline_list(action_raw) + + maxretry = _parse_int(items.get("maxretry", "")) if "maxretry" in items else None + findtime = _parse_int(items.get("findtime", "")) if "findtime" in items else None + bantime = _parse_int(items.get("bantime", "")) if "bantime" in items else None + + extra: dict[str, str] = { + k: v for k, v in items.items() if k not in _JAIL_NAMED_KEYS + } + + jails[section] = JailSectionConfig( + enabled=enabled, + port=port, + filter=filter_name, + logpath=logpath, + maxretry=maxretry, + findtime=findtime, + bantime=bantime, + action=action, + backend=backend, + extra=extra, + ) + + log.debug("jail_file_parsed", filename=filename, jail_count=len(jails)) + return JailFileConfig(filename=filename, jails=jails) + + +def serialize_jail_file_config(cfg: JailFileConfig) -> str: + """Serialize a :class:`~app.models.config.JailFileConfig` to a fail2ban INI string. + + Args: + cfg: Structured jail file configuration. + + Returns: + UTF-8 file content suitable for writing to a ``jail.d/*.conf`` file. + """ + buf = io.StringIO() + buf.write(f"# Generated by BanGUI — {cfg.filename}\n") + + for jail_name, jail in cfg.jails.items(): + buf.write(f"\n[{jail_name}]\n\n") + + if jail.enabled is not None: + buf.write(f"enabled = {'true' if jail.enabled else 'false'}\n") + if jail.port is not None: + buf.write(f"port = {jail.port}\n") + if jail.filter is not None: + buf.write(f"filter = {jail.filter}\n") + if jail.backend is not None: + buf.write(f"backend = {jail.backend}\n") + if jail.maxretry is not None: + buf.write(f"maxretry = {jail.maxretry}\n") + if jail.findtime is not None: + buf.write(f"findtime = {jail.findtime}\n") + if jail.bantime is not None: + buf.write(f"bantime = {jail.bantime}\n") + + if jail.logpath: + first, *rest = jail.logpath + buf.write(f"logpath = {first}\n") + for path in rest: + buf.write(f" {path}\n") + + if jail.action: + first_action, *rest_actions = jail.action + buf.write(f"action = {first_action}\n") + for a in rest_actions: + buf.write(f" {a}\n") + + for key, value in jail.extra.items(): + buf.write(f"{key} = {value}\n") + + return buf.getvalue() + + +def merge_jail_file_update(cfg: JailFileConfig, update: JailFileConfigUpdate) -> JailFileConfig: + """Apply a partial :class:`~app.models.config.JailFileConfigUpdate` onto *cfg*. + + Only jails present in ``update.jails`` are replaced; other jails are left + unchanged. + + Args: + cfg: Current jail file configuration. + update: Partial update to apply. + + Returns: + Updated :class:`~app.models.config.JailFileConfig`. + """ + if update.jails is None: + return cfg + merged = dict(cfg.jails) + merged.update(update.jails) + return JailFileConfig(filename=cfg.filename, jails=merged) + + +def read_and_parse_jail_file(path: Path) -> JailFileConfig: + """Read *path* and return a parsed :class:`~app.models.config.JailFileConfig`. + + Args: + path: Absolute path to the jail config file. + + Returns: + Parsed jail file config. + """ + content = path.read_text(encoding="utf-8") + return parse_jail_file(content, filename=path.name)