feat(backend): add conf-file parser and extend config models

- Add conffile_parser.py: reads, writes and manipulates fail2ban .conf
  files while preserving comments and section structure
- Extend config models with ActionConfig, FilterConfig, ConfFileContent,
  and related Pydantic schemas for jails, actions, and filters
This commit is contained in:
2026-03-13 13:47:09 +01:00
parent d6da81131f
commit 63b48849a7
2 changed files with 875 additions and 0 deletions

View File

@@ -267,3 +267,183 @@ class MapColorThresholdsUpdate(BaseModel):
..., gt=0, description="Ban count for yellow."
)
threshold_low: int = Field(..., gt=0, description="Ban count for green.")
# ---------------------------------------------------------------------------
# Parsed filter file models
# ---------------------------------------------------------------------------
class FilterConfig(BaseModel):
"""Structured representation of a ``filter.d/*.conf`` file."""
model_config = ConfigDict(strict=True)
name: str = Field(..., description="Filter base name, e.g. ``sshd``.")
filename: str = Field(..., description="Actual filename, e.g. ``sshd.conf``.")
# [INCLUDES]
before: str | None = Field(default=None, description="Included file read before this one.")
after: str | None = Field(default=None, description="Included file read after this one.")
# [DEFAULT] — free-form key=value pairs
variables: dict[str, str] = Field(
default_factory=dict,
description="Free-form ``[DEFAULT]`` section variables.",
)
# [Definition]
prefregex: str | None = Field(
default=None,
description="Prefix regex prepended to every failregex.",
)
failregex: list[str] = Field(
default_factory=list,
description="Failure detection regex patterns (one per list entry).",
)
ignoreregex: list[str] = Field(
default_factory=list,
description="Regex patterns that bypass ban logic.",
)
maxlines: int | None = Field(
default=None,
description="Maximum number of log lines accumulated for a single match attempt.",
)
datepattern: str | None = Field(
default=None,
description="Custom date-parsing pattern, or ``None`` for auto-detect.",
)
journalmatch: str | None = Field(
default=None,
description="Systemd journal match expression.",
)
class FilterConfigUpdate(BaseModel):
"""Partial update payload for a parsed filter file.
Only explicitly set (non-``None``) fields are written back.
"""
model_config = ConfigDict(strict=True)
before: str | None = Field(default=None)
after: str | None = Field(default=None)
variables: dict[str, str] | None = Field(default=None)
prefregex: str | None = Field(default=None)
failregex: list[str] | None = Field(default=None)
ignoreregex: list[str] | None = Field(default=None)
maxlines: int | None = Field(default=None)
datepattern: str | None = Field(default=None)
journalmatch: str | None = Field(default=None)
# ---------------------------------------------------------------------------
# Parsed action file models
# ---------------------------------------------------------------------------
class ActionConfig(BaseModel):
"""Structured representation of an ``action.d/*.conf`` file."""
model_config = ConfigDict(strict=True)
name: str = Field(..., description="Action base name, e.g. ``iptables``.")
filename: str = Field(..., description="Actual filename, e.g. ``iptables.conf``.")
# [INCLUDES]
before: str | None = Field(default=None)
after: str | None = Field(default=None)
# [Definition] — well-known lifecycle commands
actionstart: str | None = Field(
default=None,
description="Executed at jail start or first ban.",
)
actionstop: str | None = Field(
default=None,
description="Executed at jail stop.",
)
actioncheck: str | None = Field(
default=None,
description="Executed before each ban.",
)
actionban: str | None = Field(
default=None,
description="Executed to ban an IP. Tags: ``<ip>``, ``<name>``, ``<port>``.",
)
actionunban: str | None = Field(
default=None,
description="Executed to unban an IP.",
)
actionflush: str | None = Field(
default=None,
description="Executed to flush all bans on shutdown.",
)
# [Definition] — extra variables not covered by the well-known keys
definition_vars: dict[str, str] = Field(
default_factory=dict,
description="Additional ``[Definition]`` variables.",
)
# [Init] — runtime-configurable parameters
init_vars: dict[str, str] = Field(
default_factory=dict,
description="Runtime parameters that can be overridden per jail.",
)
class ActionConfigUpdate(BaseModel):
"""Partial update payload for a parsed action file."""
model_config = ConfigDict(strict=True)
before: str | None = Field(default=None)
after: str | None = Field(default=None)
actionstart: str | None = Field(default=None)
actionstop: str | None = Field(default=None)
actioncheck: str | None = Field(default=None)
actionban: str | None = Field(default=None)
actionunban: str | None = Field(default=None)
actionflush: str | None = Field(default=None)
definition_vars: dict[str, str] | None = Field(default=None)
init_vars: dict[str, str] | None = Field(default=None)
# ---------------------------------------------------------------------------
# Jail file config models (Task 6.1)
# ---------------------------------------------------------------------------
class JailSectionConfig(BaseModel):
"""Settings within a single [jailname] section of a jail.d file."""
model_config = ConfigDict(strict=True)
enabled: bool | None = Field(default=None, description="Whether this jail is enabled.")
port: str | None = Field(default=None, description="Port(s) to monitor (e.g. 'ssh' or '22,2222').")
filter: str | None = Field(default=None, description="Filter name to use (e.g. 'sshd').")
logpath: list[str] = Field(default_factory=list, description="Log file paths to monitor.")
maxretry: int | None = Field(default=None, ge=1, description="Failures before banning.")
findtime: int | None = Field(default=None, ge=1, description="Time window in seconds for counting failures.")
bantime: int | None = Field(default=None, description="Ban duration in seconds. -1 for permanent.")
action: list[str] = Field(default_factory=list, description="Action references.")
backend: str | None = Field(default=None, description="Log monitoring backend.")
extra: dict[str, str] = Field(default_factory=dict, description="Additional settings not captured by named fields.")
class JailFileConfig(BaseModel):
"""Structured representation of a jail.d/*.conf file."""
model_config = ConfigDict(strict=True)
filename: str = Field(..., description="Filename including extension (e.g. 'sshd.conf').")
jails: dict[str, JailSectionConfig] = Field(
default_factory=dict,
description="Mapping of jail name → settings for each [section] in the file.",
)
class JailFileConfigUpdate(BaseModel):
"""Partial update payload for a jail.d file."""
model_config = ConfigDict(strict=True)
jails: dict[str, JailSectionConfig] | None = Field(
default=None,
description="Jail section updates. Only jails present in this dict are updated.",
)

View File

@@ -0,0 +1,695 @@
"""Fail2ban INI-style configuration file parser and serializer.
Provides structured parsing and serialization for ``filter.d/*.conf`` and
``action.d/*.conf`` files, mirroring fail2ban's own ``RawConfigParser``-based
reading logic.
Key design decisions:
- Uses :class:`configparser.RawConfigParser` with ``interpolation=None`` so
fail2ban-style ``%`` / ``<>`` tags are preserved verbatim.
- Multi-line values (lines that begin with whitespace) are handled by
configparser automatically; the raw string is then post-processed to split
``failregex``/``ignoreregex`` into individual patterns.
- Section ordering in serialized output: ``[INCLUDES]`` → ``[DEFAULT]`` →
``[Definition]`` → ``[Init]``. Unknown extra sections from action files
(e.g. ``[ipt_oneport]``) are intentionally discarded because the structured
model does not capture them — users should edit those sections via the raw
(Export) tab.
"""
from __future__ import annotations
import configparser
import contextlib
import io
from typing import TYPE_CHECKING
import structlog
if TYPE_CHECKING:
from pathlib import Path
from app.models.config import (
ActionConfig,
ActionConfigUpdate,
FilterConfig,
FilterConfigUpdate,
JailFileConfig,
JailFileConfigUpdate,
JailSectionConfig,
)
log: structlog.stdlib.BoundLogger = structlog.get_logger()
# ---------------------------------------------------------------------------
# Constants — well-known Definition keys for action files
# ---------------------------------------------------------------------------
_ACTION_LIFECYCLE_KEYS: frozenset[str] = frozenset(
{
"actionstart",
"actionstop",
"actioncheck",
"actionban",
"actionunban",
"actionflush",
}
)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _make_parser() -> configparser.RawConfigParser:
"""Create a :class:`configparser.RawConfigParser` configured for fail2ban.
Returns:
A parser with interpolation disabled, case-sensitive keys, and a
``DEFAULT`` section that does not inherit into other sections.
"""
parser = configparser.RawConfigParser(
# Disable interpolation so fail2ban % / <> tags survive unchanged.
interpolation=None,
# Preserve original key casing (fail2ban keys are lowercase but some
# custom config files may use mixed case).
strict=False,
)
# Keys are case-sensitive in fail2ban.
parser.optionxform = str # type: ignore[assignment]
return parser
def _split_multiline_patterns(raw: str) -> list[str]:
"""Split a raw multi-line configparser value into individual patterns.
Each non-blank, non-comment line becomes a separate entry.
Args:
raw: The raw multi-line string from configparser (may include blank
lines and ``#`` comments).
Returns:
List of stripped non-empty, non-comment pattern strings.
"""
result: list[str] = []
for line in raw.splitlines():
stripped = line.strip()
if stripped and not stripped.startswith("#"):
result.append(stripped)
return result
def _get_opt(parser: configparser.RawConfigParser, section: str, key: str) -> str | None:
"""Return the value of *key* in *section*, or ``None`` if absent.
Args:
parser: Populated parser instance.
section: Section name.
key: Option name.
Returns:
Option value string, or ``None``.
"""
if parser.has_section(section) and parser.has_option(section, key):
return parser.get(section, key)
return None
def _section_dict(
parser: configparser.RawConfigParser, section: str, skip: frozenset[str] | None = None
) -> dict[str, str]:
"""Return all key-value pairs from *section* as a plain dict.
Args:
parser: Populated parser instance.
section: Section name.
skip: Optional set of keys to exclude.
Returns:
Dict of option → value for the section.
"""
if not parser.has_section(section):
return {}
drop = skip or frozenset()
return {
k: v
for k, v in parser.items(section)
if not k.startswith("__") and k not in drop # __ keys come from DEFAULT inheritance
}
# ---------------------------------------------------------------------------
# Filter file parser / serializer
# ---------------------------------------------------------------------------
def parse_filter_file(content: str, name: str = "", filename: str = "") -> FilterConfig:
"""Parse a ``filter.d/*.conf`` file into a :class:`~app.models.config.FilterConfig`.
Args:
content: Raw file content (UTF-8 string).
name: Filter base name (e.g. ``"sshd"``). Used only to populate the
``name`` field on the returned model.
filename: Actual filename (e.g. ``"sshd.conf"``).
Returns:
Populated :class:`~app.models.config.FilterConfig`.
"""
parser = _make_parser()
try:
parser.read_string(content)
except configparser.Error as exc:
log.warning("filter_parse_error", name=name, error=str(exc))
# [INCLUDES]
before = _get_opt(parser, "INCLUDES", "before")
after = _get_opt(parser, "INCLUDES", "after")
# [DEFAULT] — all keys that aren't hidden configparser internals
# configparser stores DEFAULT keys accessible from every section; we
# reconstruct them by reading DEFAULT directly.
variables: dict[str, str] = {}
if parser.defaults():
variables = dict(parser.defaults())
# [Definition]
prefregex = _get_opt(parser, "Definition", "prefregex")
raw_failregex = _get_opt(parser, "Definition", "failregex") or ""
failregex = _split_multiline_patterns(raw_failregex)
raw_ignoreregex = _get_opt(parser, "Definition", "ignoreregex") or ""
ignoreregex = _split_multiline_patterns(raw_ignoreregex)
maxlines_raw = _get_opt(parser, "Definition", "maxlines")
maxlines: int | None = None
if maxlines_raw is not None:
with contextlib.suppress(ValueError):
maxlines = int(maxlines_raw.strip())
datepattern = _get_opt(parser, "Definition", "datepattern")
journalmatch = _get_opt(parser, "Definition", "journalmatch")
log.debug("filter_parsed", name=name, failregex_count=len(failregex))
return FilterConfig(
name=name,
filename=filename,
before=before,
after=after,
variables=variables,
prefregex=prefregex,
failregex=failregex,
ignoreregex=ignoreregex,
maxlines=maxlines,
datepattern=datepattern,
journalmatch=journalmatch,
)
def serialize_filter_config(cfg: FilterConfig) -> str:
"""Serialize a :class:`~app.models.config.FilterConfig` to a ``.conf`` string.
The output preserves the canonical fail2ban INI section ordering:
``[INCLUDES]`` → ``[DEFAULT]`` → ``[Definition]``.
Args:
cfg: The filter configuration to serialize.
Returns:
UTF-8 string suitable for writing to a ``.conf`` file.
"""
buf = io.StringIO()
# [INCLUDES]
if cfg.before is not None or cfg.after is not None:
buf.write("[INCLUDES]\n\n")
if cfg.before is not None:
buf.write(f"before = {cfg.before}\n")
if cfg.after is not None:
buf.write(f"after = {cfg.after}\n")
buf.write("\n")
# [DEFAULT]
if cfg.variables:
buf.write("[DEFAULT]\n\n")
for key, value in cfg.variables.items():
buf.write(f"{key} = {value}\n")
buf.write("\n")
# [Definition]
buf.write("[Definition]\n\n")
if cfg.prefregex is not None:
buf.write(f"prefregex = {cfg.prefregex}\n\n")
if cfg.failregex:
buf.write("failregex = " + cfg.failregex[0] + "\n")
for pattern in cfg.failregex[1:]:
buf.write(f" {pattern}\n")
buf.write("\n")
if cfg.ignoreregex:
buf.write("ignoreregex = " + cfg.ignoreregex[0] + "\n")
for pattern in cfg.ignoreregex[1:]:
buf.write(f" {pattern}\n")
buf.write("\n")
if cfg.maxlines is not None:
buf.write(f"maxlines = {cfg.maxlines}\n\n")
if cfg.datepattern is not None:
buf.write(f"datepattern = {cfg.datepattern}\n\n")
if cfg.journalmatch is not None:
buf.write(f"journalmatch = {cfg.journalmatch}\n\n")
return buf.getvalue()
def merge_filter_update(cfg: FilterConfig, update: FilterConfigUpdate) -> FilterConfig:
"""Apply a partial :class:`~app.models.config.FilterConfigUpdate` onto *cfg*.
Only fields that are explicitly set (not ``None``) in *update* are written.
Returns a new :class:`~app.models.config.FilterConfig` with the merged
values; the original is not mutated.
Args:
cfg: Current filter configuration.
update: Partial update to apply.
Returns:
Updated :class:`~app.models.config.FilterConfig`.
"""
return FilterConfig(
name=cfg.name,
filename=cfg.filename,
before=update.before if update.before is not None else cfg.before,
after=update.after if update.after is not None else cfg.after,
variables=update.variables if update.variables is not None else cfg.variables,
prefregex=update.prefregex if update.prefregex is not None else cfg.prefregex,
failregex=update.failregex if update.failregex is not None else cfg.failregex,
ignoreregex=update.ignoreregex if update.ignoreregex is not None else cfg.ignoreregex,
maxlines=update.maxlines if update.maxlines is not None else cfg.maxlines,
datepattern=update.datepattern if update.datepattern is not None else cfg.datepattern,
journalmatch=update.journalmatch if update.journalmatch is not None else cfg.journalmatch,
)
# ---------------------------------------------------------------------------
# Action file parser / serializer
# ---------------------------------------------------------------------------
def parse_action_file(content: str, name: str = "", filename: str = "") -> ActionConfig:
"""Parse an ``action.d/*.conf`` file into a :class:`~app.models.config.ActionConfig`.
Args:
content: Raw file content (UTF-8 string).
name: Action base name (e.g. ``"iptables"``).
filename: Actual filename (e.g. ``"iptables.conf"``).
Returns:
Populated :class:`~app.models.config.ActionConfig`.
"""
parser = _make_parser()
try:
parser.read_string(content)
except configparser.Error as exc:
log.warning("action_parse_error", name=name, error=str(exc))
# [INCLUDES]
before = _get_opt(parser, "INCLUDES", "before")
after = _get_opt(parser, "INCLUDES", "after")
# [Definition] — extract well-known lifecycle keys, rest goes to definition_vars
def_lifecycle: dict[str, str | None] = dict.fromkeys(_ACTION_LIFECYCLE_KEYS)
definition_vars: dict[str, str] = {}
if parser.has_section("Definition"):
for key, value in parser.items("Definition"):
if key in _ACTION_LIFECYCLE_KEYS:
def_lifecycle[key] = value
else:
definition_vars[key] = value
# [Init] — all keys go into init_vars (multiple [Init?...] sections are ignored)
init_vars: dict[str, str] = {}
if parser.has_section("Init"):
for key, value in parser.items("Init"):
init_vars[key] = value
log.debug("action_parsed", name=name, init_vars_count=len(init_vars))
return ActionConfig(
name=name,
filename=filename,
before=before,
after=after,
actionstart=def_lifecycle.get("actionstart"),
actionstop=def_lifecycle.get("actionstop"),
actioncheck=def_lifecycle.get("actioncheck"),
actionban=def_lifecycle.get("actionban"),
actionunban=def_lifecycle.get("actionunban"),
actionflush=def_lifecycle.get("actionflush"),
definition_vars=definition_vars,
init_vars=init_vars,
)
def serialize_action_config(cfg: ActionConfig) -> str:
"""Serialize an :class:`~app.models.config.ActionConfig` to a ``.conf`` string.
Section order: ``[INCLUDES]`` → ``[Definition]`` → ``[Init]``.
Args:
cfg: The action configuration to serialize.
Returns:
UTF-8 string suitable for writing to a ``.conf`` file.
"""
buf = io.StringIO()
# [INCLUDES]
if cfg.before is not None or cfg.after is not None:
buf.write("[INCLUDES]\n\n")
if cfg.before is not None:
buf.write(f"before = {cfg.before}\n")
if cfg.after is not None:
buf.write(f"after = {cfg.after}\n")
buf.write("\n")
# [Definition]
buf.write("[Definition]\n\n")
# Lifecycle commands first (in canonical order)
_lifecycle_order = (
"actionstart",
"actionstop",
"actioncheck",
"actionban",
"actionunban",
"actionflush",
)
for key in _lifecycle_order:
value = getattr(cfg, key)
if value is not None:
lines = value.splitlines()
if lines:
buf.write(f"{key} = {lines[0]}\n")
for extra in lines[1:]:
buf.write(f" {extra}\n")
buf.write("\n")
# Extra definition variables
for key, value in cfg.definition_vars.items():
lines = value.splitlines()
if lines:
buf.write(f"{key} = {lines[0]}\n")
for extra in lines[1:]:
buf.write(f" {extra}\n")
if cfg.definition_vars:
buf.write("\n")
# [Init]
if cfg.init_vars:
buf.write("[Init]\n\n")
for key, value in cfg.init_vars.items():
buf.write(f"{key} = {value}\n")
buf.write("\n")
return buf.getvalue()
def merge_action_update(cfg: ActionConfig, update: ActionConfigUpdate) -> ActionConfig:
"""Apply a partial :class:`~app.models.config.ActionConfigUpdate` onto *cfg*.
Args:
cfg: Current action configuration.
update: Partial update to apply.
Returns:
Updated :class:`~app.models.config.ActionConfig`.
"""
return ActionConfig(
name=cfg.name,
filename=cfg.filename,
before=update.before if update.before is not None else cfg.before,
after=update.after if update.after is not None else cfg.after,
actionstart=update.actionstart if update.actionstart is not None else cfg.actionstart,
actionstop=update.actionstop if update.actionstop is not None else cfg.actionstop,
actioncheck=update.actioncheck if update.actioncheck is not None else cfg.actioncheck,
actionban=update.actionban if update.actionban is not None else cfg.actionban,
actionunban=update.actionunban if update.actionunban is not None else cfg.actionunban,
actionflush=update.actionflush if update.actionflush is not None else cfg.actionflush,
definition_vars=update.definition_vars
if update.definition_vars is not None
else cfg.definition_vars,
init_vars=update.init_vars if update.init_vars is not None else cfg.init_vars,
)
# ---------------------------------------------------------------------------
# Convenience helpers for reading/writing files
# ---------------------------------------------------------------------------
def read_and_parse_filter(path: Path) -> FilterConfig:
"""Read *path* and return a parsed :class:`~app.models.config.FilterConfig`.
Args:
path: Absolute path to the filter file.
Returns:
Parsed filter config.
"""
content = path.read_text(encoding="utf-8")
return parse_filter_file(content, name=path.stem, filename=path.name)
def read_and_parse_action(path: Path) -> ActionConfig:
"""Read *path* and return a parsed :class:`~app.models.config.ActionConfig`.
Args:
path: Absolute path to the action file.
Returns:
Parsed action config.
"""
content = path.read_text(encoding="utf-8")
return parse_action_file(content, name=path.stem, filename=path.name)
# ---------------------------------------------------------------------------
# Jail file parser / serializer (Task 6.1)
# ---------------------------------------------------------------------------
# Keys handled by named fields in JailSectionConfig.
_JAIL_NAMED_KEYS: frozenset[str] = frozenset(
{
"enabled",
"port",
"filter",
"logpath",
"maxretry",
"findtime",
"bantime",
"action",
"backend",
}
)
def _parse_bool(value: str) -> bool | None:
"""Parse a fail2ban boolean string.
Args:
value: Raw string value from config (e.g. "true", "false", "yes", "no", "1", "0").
Returns:
Boolean, or ``None`` if the value is not a recognised boolean token.
"""
lower = value.strip().lower()
if lower in {"true", "yes", "1"}:
return True
if lower in {"false", "no", "0"}:
return False
return None
def _parse_int(value: str) -> int | None:
"""Parse an integer string, returning ``None`` on failure.
Args:
value: Raw string value from config.
Returns:
Integer, or ``None``.
"""
with contextlib.suppress(ValueError):
return int(value.strip())
return None
def _parse_multiline_list(raw: str) -> list[str]:
"""Split a multi-line configparser value into a list of non-blank lines.
Args:
raw: Raw multi-line string from configparser.
Returns:
List of stripped, non-empty, non-comment strings.
"""
result: list[str] = []
for line in raw.splitlines():
stripped = line.strip()
if stripped and not stripped.startswith("#"):
result.append(stripped)
return result
def parse_jail_file(content: str, filename: str = "") -> JailFileConfig:
"""Parse a ``jail.d/*.conf`` file into a :class:`~app.models.config.JailFileConfig`.
Each INI section in the file maps to a jail. The ``[DEFAULT]`` section (if
present) is silently ignored — fail2ban merges it with jail sections, but
the structured model represents per-jail settings only.
Args:
content: Raw file content (UTF-8 string).
filename: Filename (e.g. ``"sshd.conf"``).
Returns:
Populated :class:`~app.models.config.JailFileConfig`.
"""
parser = _make_parser()
try:
parser.read_string(content)
except configparser.Error as exc:
log.warning("jail_file_parse_error", filename=filename, error=str(exc))
jails: dict[str, JailSectionConfig] = {}
for section in parser.sections():
# Skip meta-sections used by fail2ban include system.
if section in {"INCLUDES", "DEFAULT"}:
continue
items = dict(parser.items(section))
enabled_raw = items.get("enabled")
enabled = _parse_bool(enabled_raw) if enabled_raw is not None else None
port = items.get("port")
filter_name = items.get("filter")
backend = items.get("backend")
logpath_raw = items.get("logpath", "")
logpath = _parse_multiline_list(logpath_raw)
action_raw = items.get("action", "")
action = _parse_multiline_list(action_raw)
maxretry = _parse_int(items.get("maxretry", "")) if "maxretry" in items else None
findtime = _parse_int(items.get("findtime", "")) if "findtime" in items else None
bantime = _parse_int(items.get("bantime", "")) if "bantime" in items else None
extra: dict[str, str] = {
k: v for k, v in items.items() if k not in _JAIL_NAMED_KEYS
}
jails[section] = JailSectionConfig(
enabled=enabled,
port=port,
filter=filter_name,
logpath=logpath,
maxretry=maxretry,
findtime=findtime,
bantime=bantime,
action=action,
backend=backend,
extra=extra,
)
log.debug("jail_file_parsed", filename=filename, jail_count=len(jails))
return JailFileConfig(filename=filename, jails=jails)
def serialize_jail_file_config(cfg: JailFileConfig) -> str:
"""Serialize a :class:`~app.models.config.JailFileConfig` to a fail2ban INI string.
Args:
cfg: Structured jail file configuration.
Returns:
UTF-8 file content suitable for writing to a ``jail.d/*.conf`` file.
"""
buf = io.StringIO()
buf.write(f"# Generated by BanGUI — {cfg.filename}\n")
for jail_name, jail in cfg.jails.items():
buf.write(f"\n[{jail_name}]\n\n")
if jail.enabled is not None:
buf.write(f"enabled = {'true' if jail.enabled else 'false'}\n")
if jail.port is not None:
buf.write(f"port = {jail.port}\n")
if jail.filter is not None:
buf.write(f"filter = {jail.filter}\n")
if jail.backend is not None:
buf.write(f"backend = {jail.backend}\n")
if jail.maxretry is not None:
buf.write(f"maxretry = {jail.maxretry}\n")
if jail.findtime is not None:
buf.write(f"findtime = {jail.findtime}\n")
if jail.bantime is not None:
buf.write(f"bantime = {jail.bantime}\n")
if jail.logpath:
first, *rest = jail.logpath
buf.write(f"logpath = {first}\n")
for path in rest:
buf.write(f" {path}\n")
if jail.action:
first_action, *rest_actions = jail.action
buf.write(f"action = {first_action}\n")
for a in rest_actions:
buf.write(f" {a}\n")
for key, value in jail.extra.items():
buf.write(f"{key} = {value}\n")
return buf.getvalue()
def merge_jail_file_update(cfg: JailFileConfig, update: JailFileConfigUpdate) -> JailFileConfig:
"""Apply a partial :class:`~app.models.config.JailFileConfigUpdate` onto *cfg*.
Only jails present in ``update.jails`` are replaced; other jails are left
unchanged.
Args:
cfg: Current jail file configuration.
update: Partial update to apply.
Returns:
Updated :class:`~app.models.config.JailFileConfig`.
"""
if update.jails is None:
return cfg
merged = dict(cfg.jails)
merged.update(update.jails)
return JailFileConfig(filename=cfg.filename, jails=merged)
def read_and_parse_jail_file(path: Path) -> JailFileConfig:
"""Read *path* and return a parsed :class:`~app.models.config.JailFileConfig`.
Args:
path: Absolute path to the jail config file.
Returns:
Parsed jail file config.
"""
content = path.read_text(encoding="utf-8")
return parse_jail_file(content, filename=path.name)