Files
BanGUI/backend/app/utils/config_writer.py
Lukas 6e4797d71e feat: config file parser + writer utilities with full test coverage (Tasks 4.1, 4.2)
- Fail2BanConfigParser class: merge order, include directives (before/after),
  variable interpolation %(var)s, split_multiline, ordered_conf_files
- config_writer: write_local_override, remove_local_key, delete_local_file
  with atomic writes (os.replace), per-file threading locks, .local-only guard
- 79 tests in tests/test_utils/ (all passing)
- mypy --strict: 60 source files, 0 errors
- ruff: all checks passed
2026-03-13 19:38:03 +01:00

304 lines
9.4 KiB
Python

"""Atomic config file writer for fail2ban ``.local`` override files.
All write operations are atomic: content is first written to a temporary file
in the same directory as the target, then :func:`os.replace` is used to rename
it into place. This guarantees that a crash or power failure during the write
never leaves a partially-written file behind.
A per-file :class:`threading.Lock` prevents concurrent writes from the same
process from racing.
Security constraints
--------------------
- Every write function asserts that the target path **ends in ``.local``**.
This prevents accidentally writing to ``.conf`` files (which belong to the
fail2ban package and should never be modified by BanGUI).
Public functions
----------------
- :func:`write_local_override` — create or update keys inside a ``.local`` file.
- :func:`remove_local_key` — remove a single key from a ``.local`` file.
- :func:`delete_local_file` — delete an entire ``.local`` file.
"""
from __future__ import annotations
import configparser
import contextlib
import io
import os
import tempfile
import threading
from typing import TYPE_CHECKING
import structlog
if TYPE_CHECKING:
from pathlib import Path
log: structlog.stdlib.BoundLogger = structlog.get_logger()
# ---------------------------------------------------------------------------
# Per-file lock registry
# ---------------------------------------------------------------------------
# Maps resolved absolute path strings → threading.Lock instances.
_locks: dict[str, threading.Lock] = {}
# Guards the _locks dict itself.
_registry_lock: threading.Lock = threading.Lock()
def _get_file_lock(path: Path) -> threading.Lock:
"""Return the per-file :class:`threading.Lock` for *path*.
The lock is created on first access and reused on subsequent calls.
Args:
path: Target file path (need not exist yet).
Returns:
:class:`threading.Lock` bound to the resolved absolute path of *path*.
"""
key = str(path.resolve())
with _registry_lock:
if key not in _locks:
_locks[key] = threading.Lock()
return _locks[key]
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _assert_local_file(path: Path) -> None:
"""Raise :class:`ValueError` if *path* does not end with ``.local``.
This is a safety guard against accidentally modifying ``.conf`` files.
Args:
path: Path to validate.
Raises:
ValueError: When *path* does not have a ``.local`` suffix.
"""
if path.suffix != ".local":
raise ValueError(
f"Refusing to write to non-.local file: {path!r}. "
"Only .local override files may be modified by BanGUI."
)
def _make_parser() -> configparser.RawConfigParser:
"""Return a case-sensitive :class:`configparser.RawConfigParser`."""
parser = configparser.RawConfigParser(interpolation=None, strict=False)
parser.optionxform = str # type: ignore[assignment]
return parser
def _read_or_new_parser(path: Path) -> configparser.RawConfigParser:
"""Read *path* into a parser, or return a fresh empty parser.
If the file does not exist or cannot be read, a fresh parser is returned.
Any parse errors are logged as warnings (not re-raised).
Args:
path: Path to the ``.local`` file to read.
Returns:
Populated (or empty) :class:`configparser.RawConfigParser`.
"""
parser = _make_parser()
if path.is_file():
try:
content = path.read_text(encoding="utf-8")
parser.read_string(content)
except (OSError, configparser.Error) as exc:
log.warning("local_file_read_error", path=str(path), error=str(exc))
return parser
def _write_parser_atomic(
parser: configparser.RawConfigParser,
path: Path,
) -> None:
"""Write *parser* contents to *path* atomically.
Writes to a temporary file in the same directory as *path*, then renames
the temporary file over *path* using :func:`os.replace`. The temporary
file is cleaned up on failure.
Args:
parser: Populated parser whose contents should be written.
path: Destination ``.local`` file path.
Raises:
OSError: On filesystem errors (propagated to caller).
"""
buf = io.StringIO()
parser.write(buf)
content = buf.getvalue()
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path_str = tempfile.mkstemp(
dir=str(path.parent),
prefix=f".{path.name}.tmp",
suffix="",
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(content)
os.replace(tmp_path_str, str(path))
except Exception:
with contextlib.suppress(OSError):
os.unlink(tmp_path_str)
raise
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def write_local_override(
base_path: Path,
section: str,
key_values: dict[str, str],
) -> None:
"""Create or update keys in a ``.local`` override file.
If the file already exists, only the specified *key_values* are written
under *section*; all other sections and keys are preserved.
If the file does not exist, it is created with the given *section* and
*key_values*.
The write is **atomic**: a temporary file is written and renamed into place.
Args:
base_path: Absolute path to the ``.local`` file (e.g.
``filter.d/sshd.local``). The parent directory is created if it
does not already exist.
section: INI section name (e.g. ``"Definition"``, ``"Init"``).
key_values: Mapping of option name → value to write/update.
Raises:
ValueError: If *base_path* does not end with ``.local``.
"""
_assert_local_file(base_path)
lock = _get_file_lock(base_path)
with lock:
parser = _read_or_new_parser(base_path)
if not parser.has_section(section):
parser.add_section(section)
for key, value in key_values.items():
parser.set(section, key, value)
log.info(
"local_override_written",
path=str(base_path),
section=section,
keys=sorted(key_values),
)
_write_parser_atomic(parser, base_path)
def remove_local_key(base_path: Path, section: str, key: str) -> None:
"""Remove a single key from a ``.local`` override file.
Post-removal cleanup:
- If the section becomes empty after key removal, the section is also
removed.
- If no sections remain after section removal, the file is deleted.
This function is a no-op when the file, section, or key does not exist.
Args:
base_path: Path to the ``.local`` file to update.
section: INI section containing the key.
key: Option name to remove.
Raises:
ValueError: If *base_path* does not end with ``.local``.
"""
_assert_local_file(base_path)
if not base_path.is_file():
return
lock = _get_file_lock(base_path)
with lock:
parser = _read_or_new_parser(base_path)
if not parser.has_section(section) or not parser.has_option(section, key):
return # Nothing to remove.
parser.remove_option(section, key)
# Remove the section if it has no remaining options.
if not parser.options(section):
parser.remove_section(section)
# Delete the file entirely if it has no remaining sections.
if not parser.sections():
with contextlib.suppress(OSError):
base_path.unlink()
log.info("local_file_deleted_empty", path=str(base_path))
return
log.info(
"local_key_removed",
path=str(base_path),
section=section,
key=key,
)
_write_parser_atomic(parser, base_path)
def delete_local_file(path: Path, *, allow_orphan: bool = False) -> None:
"""Delete a ``.local`` override file.
By default, refuses to delete a ``.local`` file that has no corresponding
``.conf`` file (an *orphan* ``.local``), because it may be the only copy of
a user-defined config. Pass ``allow_orphan=True`` to override this guard.
Args:
path: Path to the ``.local`` file to delete.
allow_orphan: When ``True``, delete even if no corresponding ``.conf``
exists alongside *path*.
Raises:
ValueError: If *path* does not end with ``.local``.
FileNotFoundError: If *path* does not exist.
OSError: If no corresponding ``.conf`` exists and *allow_orphan* is
``False``.
"""
_assert_local_file(path)
if not path.is_file():
raise FileNotFoundError(f"Local file not found: {path!r}")
if not allow_orphan:
conf_path = path.with_suffix(".conf")
if not conf_path.is_file():
raise OSError(
f"No corresponding .conf file found for {path!r}. "
"Pass allow_orphan=True to delete a local-only file."
)
lock = _get_file_lock(path)
with lock:
try:
path.unlink()
log.info("local_file_deleted", path=str(path))
except OSError as exc:
log.error(
"local_file_delete_failed", path=str(path), error=str(exc)
)
raise