Files
BanGUI/backend/app/services/config_file_helpers.py

295 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Shared helpers for raw fail2ban config file operations.
This module contains generic path validation, directory scanning, file read/write,
and creation helpers used by :mod:`app.services.raw_config_io_service`.
"""
from __future__ import annotations
import contextlib
import os
import re
import tempfile
from pathlib import Path
from app.exceptions import (
ConfigDirError,
ConfigFileExistsError,
ConfigFileNameError,
ConfigFileNotFoundError,
ConfigFileWriteError,
)
from app.models.file_config import ConfFileContent, ConfFileEntry, ConfFilesResponse
_MAX_CONTENT_BYTES: int = 512 * 1024 # 512 KB hard cap on file write size
_CONF_EXTENSIONS: tuple[str, str] = (".conf", ".local")
_SAFE_NAME_RE: re.Pattern[str] = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$")
def _resolve_subdir(config_dir: str, subdir: str) -> Path:
"""Resolve the requested subdirectory inside the config directory.
Args:
config_dir: The top-level fail2ban configuration directory.
subdir: The subdirectory under the config directory.
Returns:
A resolved Path for the requested subdirectory.
Raises:
ConfigDirError: If the config directory does not exist or is not a directory.
"""
base = Path(config_dir).resolve()
if not base.is_dir():
raise ConfigDirError(f"fail2ban config directory not found: {config_dir!r}")
return base / subdir
def _assert_within(base: Path, target: Path) -> None:
"""Validate that *target* is contained within *base*.
Args:
base: The allowed root directory.
target: The path to validate.
Raises:
ConfigFileNameError: If *target* is outside *base*.
"""
try:
target.relative_to(base)
except ValueError as exc:
raise ConfigFileNameError(
f"Path {str(target)!r} escapes config directory {str(base)!r}"
) from exc
def _validate_new_name(name: str) -> None:
"""Validate a new config file base name.
Args:
name: Proposed base name without extension.
Raises:
ConfigFileNameError: If the name is invalid.
"""
if not _SAFE_NAME_RE.match(name):
raise ConfigFileNameError(
f"Invalid config file name {name!r}. "
"Use only alphanumeric characters, hyphens, underscores, and dots; "
"must start with an alphanumeric character."
)
def _validate_content(content: str) -> None:
"""Validate raw file content before writing.
Args:
content: Proposed file content.
Raises:
ConfigFileWriteError: If content exceeds the maximum allowed size.
"""
if len(content.encode("utf-8")) > _MAX_CONTENT_BYTES:
raise ConfigFileWriteError(
f"Content exceeds maximum allowed size of {_MAX_CONTENT_BYTES // 1024} KB."
)
def _list_conf_files(subdir: Path) -> ConfFilesResponse:
"""List supported config files in *subdir*.
Args:
subdir: Directory containing raw config files.
Returns:
A :class:`~app.models.file_config.ConfFilesResponse` with names and file
metadata.
"""
resolved_subdir = subdir.resolve()
if not resolved_subdir.is_dir():
return ConfFilesResponse(files=[], total=0)
files: list[ConfFileEntry] = []
for path in sorted(resolved_subdir.iterdir()):
if not path.is_file():
continue
if path.suffix not in _CONF_EXTENSIONS:
continue
_assert_within(resolved_subdir, path.resolve())
files.append(ConfFileEntry(name=path.stem, filename=path.name))
return ConfFilesResponse(files=files, total=len(files))
def _read_conf_file(subdir: Path, name: str) -> ConfFileContent:
"""Read a single config file by base name or exact filename.
Args:
subdir: Containing directory.
name: Base name or exact filename.
Returns:
:class:`~app.models.file_config.ConfFileContent` for the resolved file.
Raises:
ConfigFileNameError: If *name* is unsafe.
ConfigFileNotFoundError: If no matching file exists.
"""
resolved_subdir = subdir.resolve()
if "." in name and not name.startswith("."):
candidates = [resolved_subdir / name]
else:
candidates = [resolved_subdir / (name + ext) for ext in _CONF_EXTENSIONS]
for path in candidates:
resolved = path.resolve()
_assert_within(resolved_subdir, resolved)
if resolved.is_file():
content = resolved.read_text(encoding="utf-8", errors="replace")
return ConfFileContent(
name=resolved.stem,
filename=resolved.name,
content=content,
)
raise ConfigFileNotFoundError(name)
def _write_conf_file(subdir: Path, name: str, content: str) -> Path:
"""Overwrite an existing config file in *subdir*.
Args:
subdir: Containing directory.
name: Base name or filename.
content: New file content.
Returns:
The resolved path of the written file.
Raises:
ConfigFileNameError: If *name* is unsafe.
ConfigFileNotFoundError: If no matching file exists.
ConfigFileWriteError: If the file cannot be written.
"""
resolved_subdir = subdir.resolve()
_validate_content(content)
if "." in name and not name.startswith("."):
candidates = [resolved_subdir / name]
else:
candidates = [resolved_subdir / (name + ext) for ext in _CONF_EXTENSIONS]
target: Path | None = None
for path in candidates:
resolved = path.resolve()
_assert_within(resolved_subdir, resolved)
if resolved.is_file():
target = resolved
break
if target is None:
raise ConfigFileNotFoundError(name)
tmp_name: str | None = None
try:
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=target.parent,
delete=False,
suffix=".tmp",
) as tmp:
tmp.write(content)
tmp_name = tmp.name
os.replace(tmp_name, target)
except OSError as exc:
with contextlib.suppress(OSError):
if tmp_name is not None:
os.unlink(tmp_name)
raise ConfigFileWriteError(f"Cannot write {name!r}: {exc}") from exc
return target
def _create_conf_file(subdir: Path, name: str, content: str) -> str:
"""Create a new .conf file in *subdir*.
Args:
subdir: Containing directory.
name: Base name for the new file.
content: Initial file content.
Returns:
The filename of the created file.
Raises:
ConfigFileNameError: If the name is invalid.
ConfigFileExistsError: If a conflicting file already exists.
ConfigFileWriteError: If the file cannot be written.
"""
resolved_subdir = subdir.resolve()
_validate_new_name(name)
_validate_content(content)
for ext in _CONF_EXTENSIONS:
existing = (resolved_subdir / (name + ext)).resolve()
_assert_within(resolved_subdir, existing)
if existing.exists():
raise ConfigFileExistsError(name + ext)
target = (resolved_subdir / (name + ".conf")).resolve()
_assert_within(resolved_subdir, target)
tmp_name: str | None = None
try:
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=target.parent,
delete=False,
suffix=".tmp",
) as tmp:
tmp.write(content)
tmp_name = tmp.name
os.replace(tmp_name, target)
except OSError as exc:
with contextlib.suppress(OSError):
if tmp_name is not None:
os.unlink(tmp_name)
raise ConfigFileWriteError(f"Cannot create {name!r}: {exc}") from exc
return target.name
def atomic_write(path: Path, content: str) -> None:
"""Write content to a file atomically using temp file + rename.
This function ensures that if the process is killed during the write,
the target file is not corrupted. Uses a temporary file in the same
directory as the target, then atomically renames it into place.
Args:
path: Target file path.
content: Content to write.
Raises:
ConfigFileWriteError: If the file cannot be written.
"""
_validate_content(content)
tmp_name: str | None = None
try:
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=path.parent,
delete=False,
suffix=".tmp",
) as tmp:
tmp.write(content)
tmp_name = tmp.name
os.replace(tmp_name, path)
except OSError as exc:
with contextlib.suppress(OSError):
if tmp_name is not None:
os.unlink(tmp_name)
raise ConfigFileWriteError(f"Cannot write {path.name!r}: {exc}") from exc