231 lines
6.9 KiB
Python
231 lines
6.9 KiB
Python
"""Shared helpers for raw fail2ban config file operations.
|
||
|
||
This module contains generic path validation, directory scanning, file read/write,
|
||
and creation helpers used by :mod:`app.services.raw_config_io_service`.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
from pathlib import Path
|
||
|
||
from app.exceptions import (
|
||
ConfigDirError,
|
||
ConfigFileExistsError,
|
||
ConfigFileNameError,
|
||
ConfigFileNotFoundError,
|
||
ConfigFileWriteError,
|
||
)
|
||
from app.models.file_config import ConfFileContent, ConfFileEntry, ConfFilesResponse
|
||
|
||
_MAX_CONTENT_BYTES: int = 512 * 1024 # 512 KB – hard cap on file write size
|
||
_CONF_EXTENSIONS: tuple[str, str] = (".conf", ".local")
|
||
_SAFE_NAME_RE: re.Pattern[str] = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$")
|
||
|
||
|
||
def _resolve_subdir(config_dir: str, subdir: str) -> Path:
|
||
"""Resolve the requested subdirectory inside the config directory.
|
||
|
||
Args:
|
||
config_dir: The top-level fail2ban configuration directory.
|
||
subdir: The subdirectory under the config directory.
|
||
|
||
Returns:
|
||
A resolved Path for the requested subdirectory.
|
||
|
||
Raises:
|
||
ConfigDirError: If the config directory does not exist or is not a directory.
|
||
"""
|
||
base = Path(config_dir).resolve()
|
||
if not base.is_dir():
|
||
raise ConfigDirError(f"fail2ban config directory not found: {config_dir!r}")
|
||
return base / subdir
|
||
|
||
|
||
def _assert_within(base: Path, target: Path) -> None:
|
||
"""Validate that *target* is contained within *base*.
|
||
|
||
Args:
|
||
base: The allowed root directory.
|
||
target: The path to validate.
|
||
|
||
Raises:
|
||
ConfigFileNameError: If *target* is outside *base*.
|
||
"""
|
||
try:
|
||
target.relative_to(base)
|
||
except ValueError as exc:
|
||
raise ConfigFileNameError(
|
||
f"Path {str(target)!r} escapes config directory {str(base)!r}"
|
||
) from exc
|
||
|
||
|
||
def _validate_new_name(name: str) -> None:
|
||
"""Validate a new config file base name.
|
||
|
||
Args:
|
||
name: Proposed base name without extension.
|
||
|
||
Raises:
|
||
ConfigFileNameError: If the name is invalid.
|
||
"""
|
||
if not _SAFE_NAME_RE.match(name):
|
||
raise ConfigFileNameError(
|
||
f"Invalid config file name {name!r}. "
|
||
"Use only alphanumeric characters, hyphens, underscores, and dots; "
|
||
"must start with an alphanumeric character."
|
||
)
|
||
|
||
|
||
def _validate_content(content: str) -> None:
|
||
"""Validate raw file content before writing.
|
||
|
||
Args:
|
||
content: Proposed file content.
|
||
|
||
Raises:
|
||
ConfigFileWriteError: If content exceeds the maximum allowed size.
|
||
"""
|
||
if len(content.encode("utf-8")) > _MAX_CONTENT_BYTES:
|
||
raise ConfigFileWriteError(
|
||
f"Content exceeds maximum allowed size of {_MAX_CONTENT_BYTES // 1024} KB."
|
||
)
|
||
|
||
|
||
def _list_conf_files(subdir: Path) -> ConfFilesResponse:
|
||
"""List supported config files in *subdir*.
|
||
|
||
Args:
|
||
subdir: Directory containing raw config files.
|
||
|
||
Returns:
|
||
A :class:`~app.models.file_config.ConfFilesResponse` with names and file
|
||
metadata.
|
||
"""
|
||
resolved_subdir = subdir.resolve()
|
||
if not resolved_subdir.is_dir():
|
||
return ConfFilesResponse(files=[], total=0)
|
||
|
||
files: list[ConfFileEntry] = []
|
||
for path in sorted(resolved_subdir.iterdir()):
|
||
if not path.is_file():
|
||
continue
|
||
if path.suffix not in _CONF_EXTENSIONS:
|
||
continue
|
||
_assert_within(resolved_subdir, path.resolve())
|
||
files.append(ConfFileEntry(name=path.stem, filename=path.name))
|
||
return ConfFilesResponse(files=files, total=len(files))
|
||
|
||
|
||
def _read_conf_file(subdir: Path, name: str) -> ConfFileContent:
|
||
"""Read a single config file by base name or exact filename.
|
||
|
||
Args:
|
||
subdir: Containing directory.
|
||
name: Base name or exact filename.
|
||
|
||
Returns:
|
||
:class:`~app.models.file_config.ConfFileContent` for the resolved file.
|
||
|
||
Raises:
|
||
ConfigFileNameError: If *name* is unsafe.
|
||
ConfigFileNotFoundError: If no matching file exists.
|
||
"""
|
||
resolved_subdir = subdir.resolve()
|
||
if "." in name and not name.startswith("."):
|
||
candidates = [resolved_subdir / name]
|
||
else:
|
||
candidates = [resolved_subdir / (name + ext) for ext in _CONF_EXTENSIONS]
|
||
|
||
for path in candidates:
|
||
resolved = path.resolve()
|
||
_assert_within(resolved_subdir, resolved)
|
||
if resolved.is_file():
|
||
content = resolved.read_text(encoding="utf-8", errors="replace")
|
||
return ConfFileContent(
|
||
name=resolved.stem,
|
||
filename=resolved.name,
|
||
content=content,
|
||
)
|
||
|
||
raise ConfigFileNotFoundError(name)
|
||
|
||
|
||
def _write_conf_file(subdir: Path, name: str, content: str) -> Path:
|
||
"""Overwrite an existing config file in *subdir*.
|
||
|
||
Args:
|
||
subdir: Containing directory.
|
||
name: Base name or filename.
|
||
content: New file content.
|
||
|
||
Returns:
|
||
The resolved path of the written file.
|
||
|
||
Raises:
|
||
ConfigFileNameError: If *name* is unsafe.
|
||
ConfigFileNotFoundError: If no matching file exists.
|
||
ConfigFileWriteError: If the file cannot be written.
|
||
"""
|
||
resolved_subdir = subdir.resolve()
|
||
_validate_content(content)
|
||
|
||
if "." in name and not name.startswith("."):
|
||
candidates = [resolved_subdir / name]
|
||
else:
|
||
candidates = [resolved_subdir / (name + ext) for ext in _CONF_EXTENSIONS]
|
||
|
||
target: Path | None = None
|
||
for path in candidates:
|
||
resolved = path.resolve()
|
||
_assert_within(resolved_subdir, resolved)
|
||
if resolved.is_file():
|
||
target = resolved
|
||
break
|
||
|
||
if target is None:
|
||
raise ConfigFileNotFoundError(name)
|
||
|
||
try:
|
||
target.write_text(content, encoding="utf-8")
|
||
except OSError as exc:
|
||
raise ConfigFileWriteError(f"Cannot write {name!r}: {exc}") from exc
|
||
|
||
return target
|
||
|
||
|
||
def _create_conf_file(subdir: Path, name: str, content: str) -> str:
|
||
"""Create a new .conf file in *subdir*.
|
||
|
||
Args:
|
||
subdir: Containing directory.
|
||
name: Base name for the new file.
|
||
content: Initial file content.
|
||
|
||
Returns:
|
||
The filename of the created file.
|
||
|
||
Raises:
|
||
ConfigFileNameError: If the name is invalid.
|
||
ConfigFileExistsError: If a conflicting file already exists.
|
||
ConfigFileWriteError: If the file cannot be written.
|
||
"""
|
||
resolved_subdir = subdir.resolve()
|
||
_validate_new_name(name)
|
||
_validate_content(content)
|
||
|
||
for ext in _CONF_EXTENSIONS:
|
||
existing = (resolved_subdir / (name + ext)).resolve()
|
||
_assert_within(resolved_subdir, existing)
|
||
if existing.exists():
|
||
raise ConfigFileExistsError(name + ext)
|
||
|
||
target = (resolved_subdir / (name + ".conf")).resolve()
|
||
_assert_within(resolved_subdir, target)
|
||
try:
|
||
target.write_text(content, encoding="utf-8")
|
||
except OSError as exc:
|
||
raise ConfigFileWriteError(f"Cannot create {name!r}: {exc}") from exc
|
||
|
||
return target.name
|