"""Atomic config file writer for fail2ban ``.local`` override files. All write operations are atomic: content is first written to a temporary file in the same directory as the target, then :func:`os.replace` is used to rename it into place. This guarantees that a crash or power failure during the write never leaves a partially-written file behind. A per-file :class:`threading.Lock` prevents concurrent writes from the same process from racing. Security constraints -------------------- - Every write function asserts that the target path **ends in ``.local``**. This prevents accidentally writing to ``.conf`` files (which belong to the fail2ban package and should never be modified by BanGUI). Public functions ---------------- - :func:`write_local_override` — create or update keys inside a ``.local`` file. - :func:`remove_local_key` — remove a single key from a ``.local`` file. - :func:`delete_local_file` — delete an entire ``.local`` file. """ from __future__ import annotations import configparser import contextlib import io import os import tempfile import threading from typing import TYPE_CHECKING import structlog if TYPE_CHECKING: from pathlib import Path log: structlog.stdlib.BoundLogger = structlog.get_logger() # --------------------------------------------------------------------------- # Per-file lock registry # --------------------------------------------------------------------------- # Maps resolved absolute path strings → threading.Lock instances. _locks: dict[str, threading.Lock] = {} # Guards the _locks dict itself. _registry_lock: threading.Lock = threading.Lock() def _get_file_lock(path: Path) -> threading.Lock: """Return the per-file :class:`threading.Lock` for *path*. The lock is created on first access and reused on subsequent calls. Args: path: Target file path (need not exist yet). Returns: :class:`threading.Lock` bound to the resolved absolute path of *path*. """ key = str(path.resolve()) with _registry_lock: if key not in _locks: _locks[key] = threading.Lock() return _locks[key] # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _assert_local_file(path: Path) -> None: """Raise :class:`ValueError` if *path* does not end with ``.local``. This is a safety guard against accidentally modifying ``.conf`` files. Args: path: Path to validate. Raises: ValueError: When *path* does not have a ``.local`` suffix. """ if path.suffix != ".local": raise ValueError( f"Refusing to write to non-.local file: {path!r}. " "Only .local override files may be modified by BanGUI." ) def _make_parser() -> configparser.RawConfigParser: """Return a case-sensitive :class:`configparser.RawConfigParser`.""" parser = configparser.RawConfigParser(interpolation=None, strict=False) parser.optionxform = str # type: ignore[assignment] return parser def _read_or_new_parser(path: Path) -> configparser.RawConfigParser: """Read *path* into a parser, or return a fresh empty parser. If the file does not exist or cannot be read, a fresh parser is returned. Any parse errors are logged as warnings (not re-raised). Args: path: Path to the ``.local`` file to read. Returns: Populated (or empty) :class:`configparser.RawConfigParser`. """ parser = _make_parser() if path.is_file(): try: content = path.read_text(encoding="utf-8") parser.read_string(content) except (OSError, configparser.Error) as exc: log.warning("local_file_read_error", path=str(path), error=str(exc)) return parser def _write_parser_atomic( parser: configparser.RawConfigParser, path: Path, ) -> None: """Write *parser* contents to *path* atomically. Writes to a temporary file in the same directory as *path*, then renames the temporary file over *path* using :func:`os.replace`. The temporary file is cleaned up on failure. Args: parser: Populated parser whose contents should be written. path: Destination ``.local`` file path. Raises: OSError: On filesystem errors (propagated to caller). """ buf = io.StringIO() parser.write(buf) content = buf.getvalue() path.parent.mkdir(parents=True, exist_ok=True) fd, tmp_path_str = tempfile.mkstemp( dir=str(path.parent), prefix=f".{path.name}.tmp", suffix="", ) try: with os.fdopen(fd, "w", encoding="utf-8") as f: f.write(content) os.replace(tmp_path_str, str(path)) except Exception: with contextlib.suppress(OSError): os.unlink(tmp_path_str) raise # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def write_local_override( base_path: Path, section: str, key_values: dict[str, str], ) -> None: """Create or update keys in a ``.local`` override file. If the file already exists, only the specified *key_values* are written under *section*; all other sections and keys are preserved. If the file does not exist, it is created with the given *section* and *key_values*. The write is **atomic**: a temporary file is written and renamed into place. Args: base_path: Absolute path to the ``.local`` file (e.g. ``filter.d/sshd.local``). The parent directory is created if it does not already exist. section: INI section name (e.g. ``"Definition"``, ``"Init"``). key_values: Mapping of option name → value to write/update. Raises: ValueError: If *base_path* does not end with ``.local``. """ _assert_local_file(base_path) lock = _get_file_lock(base_path) with lock: parser = _read_or_new_parser(base_path) if not parser.has_section(section): parser.add_section(section) for key, value in key_values.items(): parser.set(section, key, value) log.info( "local_override_written", path=str(base_path), section=section, keys=sorted(key_values), ) _write_parser_atomic(parser, base_path) def remove_local_key(base_path: Path, section: str, key: str) -> None: """Remove a single key from a ``.local`` override file. Post-removal cleanup: - If the section becomes empty after key removal, the section is also removed. - If no sections remain after section removal, the file is deleted. This function is a no-op when the file, section, or key does not exist. Args: base_path: Path to the ``.local`` file to update. section: INI section containing the key. key: Option name to remove. Raises: ValueError: If *base_path* does not end with ``.local``. """ _assert_local_file(base_path) if not base_path.is_file(): return lock = _get_file_lock(base_path) with lock: parser = _read_or_new_parser(base_path) if not parser.has_section(section) or not parser.has_option(section, key): return # Nothing to remove. parser.remove_option(section, key) # Remove the section if it has no remaining options. if not parser.options(section): parser.remove_section(section) # Delete the file entirely if it has no remaining sections. if not parser.sections(): with contextlib.suppress(OSError): base_path.unlink() log.info("local_file_deleted_empty", path=str(base_path)) return log.info( "local_key_removed", path=str(base_path), section=section, key=key, ) _write_parser_atomic(parser, base_path) def delete_local_file(path: Path, *, allow_orphan: bool = False) -> None: """Delete a ``.local`` override file. By default, refuses to delete a ``.local`` file that has no corresponding ``.conf`` file (an *orphan* ``.local``), because it may be the only copy of a user-defined config. Pass ``allow_orphan=True`` to override this guard. Args: path: Path to the ``.local`` file to delete. allow_orphan: When ``True``, delete even if no corresponding ``.conf`` exists alongside *path*. Raises: ValueError: If *path* does not end with ``.local``. FileNotFoundError: If *path* does not exist. OSError: If no corresponding ``.conf`` exists and *allow_orphan* is ``False``. """ _assert_local_file(path) if not path.is_file(): raise FileNotFoundError(f"Local file not found: {path!r}") if not allow_orphan: conf_path = path.with_suffix(".conf") if not conf_path.is_file(): raise OSError( f"No corresponding .conf file found for {path!r}. " "Pass allow_orphan=True to delete a local-only file." ) lock = _get_file_lock(path) with lock: try: path.unlink() log.info("local_file_deleted", path=str(path)) except OSError as exc: log.error( "local_file_delete_failed", path=str(path), error=str(exc) ) raise