"""Shared helpers for raw fail2ban config file operations. This module contains generic path validation, directory scanning, file read/write, and creation helpers used by :mod:`app.services.raw_config_io_service`. """ from __future__ import annotations import contextlib import os import re import tempfile from pathlib import Path from app.exceptions import ( ConfigDirError, ConfigFileExistsError, ConfigFileNameError, ConfigFileNotFoundError, ConfigFileWriteError, ) from app.models.file_config import ConfFileContent, ConfFileEntry, ConfFilesResponse _MAX_CONTENT_BYTES: int = 512 * 1024 # 512 KB – hard cap on file write size _CONF_EXTENSIONS: tuple[str, str] = (".conf", ".local") _SAFE_NAME_RE: re.Pattern[str] = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$") def _resolve_subdir(config_dir: str, subdir: str) -> Path: """Resolve the requested subdirectory inside the config directory. Args: config_dir: The top-level fail2ban configuration directory. subdir: The subdirectory under the config directory. Returns: A resolved Path for the requested subdirectory. Raises: ConfigDirError: If the config directory does not exist or is not a directory. """ base = Path(config_dir).resolve() if not base.is_dir(): raise ConfigDirError(f"fail2ban config directory not found: {config_dir!r}") return base / subdir def _assert_within(base: Path, target: Path) -> None: """Validate that *target* is contained within *base*. Args: base: The allowed root directory. target: The path to validate. Raises: ConfigFileNameError: If *target* is outside *base*. """ try: target.relative_to(base) except ValueError as exc: raise ConfigFileNameError( f"Path {str(target)!r} escapes config directory {str(base)!r}" ) from exc def _validate_new_name(name: str) -> None: """Validate a new config file base name. Args: name: Proposed base name without extension. Raises: ConfigFileNameError: If the name is invalid. """ if not _SAFE_NAME_RE.match(name): raise ConfigFileNameError( f"Invalid config file name {name!r}. " "Use only alphanumeric characters, hyphens, underscores, and dots; " "must start with an alphanumeric character." ) def _validate_content(content: str) -> None: """Validate raw file content before writing. Args: content: Proposed file content. Raises: ConfigFileWriteError: If content exceeds the maximum allowed size. """ if len(content.encode("utf-8")) > _MAX_CONTENT_BYTES: raise ConfigFileWriteError( f"Content exceeds maximum allowed size of {_MAX_CONTENT_BYTES // 1024} KB." ) def _list_conf_files(subdir: Path) -> ConfFilesResponse: """List supported config files in *subdir*. Args: subdir: Directory containing raw config files. Returns: A :class:`~app.models.file_config.ConfFilesResponse` with names and file metadata. """ resolved_subdir = subdir.resolve() if not resolved_subdir.is_dir(): return ConfFilesResponse(files=[], total=0) files: list[ConfFileEntry] = [] for path in sorted(resolved_subdir.iterdir()): if not path.is_file(): continue if path.suffix not in _CONF_EXTENSIONS: continue _assert_within(resolved_subdir, path.resolve()) files.append(ConfFileEntry(name=path.stem, filename=path.name)) return ConfFilesResponse(files=files, total=len(files)) def _read_conf_file(subdir: Path, name: str) -> ConfFileContent: """Read a single config file by base name or exact filename. Args: subdir: Containing directory. name: Base name or exact filename. Returns: :class:`~app.models.file_config.ConfFileContent` for the resolved file. Raises: ConfigFileNameError: If *name* is unsafe. ConfigFileNotFoundError: If no matching file exists. """ resolved_subdir = subdir.resolve() if "." in name and not name.startswith("."): candidates = [resolved_subdir / name] else: candidates = [resolved_subdir / (name + ext) for ext in _CONF_EXTENSIONS] for path in candidates: resolved = path.resolve() _assert_within(resolved_subdir, resolved) if resolved.is_file(): content = resolved.read_text(encoding="utf-8", errors="replace") return ConfFileContent( name=resolved.stem, filename=resolved.name, content=content, ) raise ConfigFileNotFoundError(name) def _write_conf_file(subdir: Path, name: str, content: str) -> Path: """Overwrite an existing config file in *subdir*. Args: subdir: Containing directory. name: Base name or filename. content: New file content. Returns: The resolved path of the written file. Raises: ConfigFileNameError: If *name* is unsafe. ConfigFileNotFoundError: If no matching file exists. ConfigFileWriteError: If the file cannot be written. """ resolved_subdir = subdir.resolve() _validate_content(content) if "." in name and not name.startswith("."): candidates = [resolved_subdir / name] else: candidates = [resolved_subdir / (name + ext) for ext in _CONF_EXTENSIONS] target: Path | None = None for path in candidates: resolved = path.resolve() _assert_within(resolved_subdir, resolved) if resolved.is_file(): target = resolved break if target is None: raise ConfigFileNotFoundError(name) tmp_name: str | None = None try: with tempfile.NamedTemporaryFile( mode="w", encoding="utf-8", dir=target.parent, delete=False, suffix=".tmp", ) as tmp: tmp.write(content) tmp_name = tmp.name os.replace(tmp_name, target) except OSError as exc: with contextlib.suppress(OSError): if tmp_name is not None: os.unlink(tmp_name) raise ConfigFileWriteError(f"Cannot write {name!r}: {exc}") from exc return target def _create_conf_file(subdir: Path, name: str, content: str) -> str: """Create a new .conf file in *subdir*. Args: subdir: Containing directory. name: Base name for the new file. content: Initial file content. Returns: The filename of the created file. Raises: ConfigFileNameError: If the name is invalid. ConfigFileExistsError: If a conflicting file already exists. ConfigFileWriteError: If the file cannot be written. """ resolved_subdir = subdir.resolve() _validate_new_name(name) _validate_content(content) for ext in _CONF_EXTENSIONS: existing = (resolved_subdir / (name + ext)).resolve() _assert_within(resolved_subdir, existing) if existing.exists(): raise ConfigFileExistsError(name + ext) target = (resolved_subdir / (name + ".conf")).resolve() _assert_within(resolved_subdir, target) tmp_name: str | None = None try: with tempfile.NamedTemporaryFile( mode="w", encoding="utf-8", dir=target.parent, delete=False, suffix=".tmp", ) as tmp: tmp.write(content) tmp_name = tmp.name os.replace(tmp_name, target) except OSError as exc: with contextlib.suppress(OSError): if tmp_name is not None: os.unlink(tmp_name) raise ConfigFileWriteError(f"Cannot create {name!r}: {exc}") from exc return target.name def atomic_write(path: Path, content: str) -> None: """Write content to a file atomically using temp file + rename. This function ensures that if the process is killed during the write, the target file is not corrupted. Uses a temporary file in the same directory as the target, then atomically renames it into place. Args: path: Target file path. content: Content to write. Raises: ConfigFileWriteError: If the file cannot be written. """ _validate_content(content) tmp_name: str | None = None try: with tempfile.NamedTemporaryFile( mode="w", encoding="utf-8", dir=path.parent, delete=False, suffix=".tmp", ) as tmp: tmp.write(content) tmp_name = tmp.name os.replace(tmp_name, path) except OSError as exc: with contextlib.suppress(OSError): if tmp_name is not None: os.unlink(tmp_name) raise ConfigFileWriteError(f"Cannot write {path.name!r}: {exc}") from exc