Extract shared raw config file helpers and simplify raw_config_io_service
This commit is contained in:
230
backend/app/services/config_file_helpers.py
Normal file
230
backend/app/services/config_file_helpers.py
Normal file
@@ -0,0 +1,230 @@
|
|||||||
|
"""Shared helpers for raw fail2ban config file operations.
|
||||||
|
|
||||||
|
This module contains generic path validation, directory scanning, file read/write,
|
||||||
|
and creation helpers used by :mod:`app.services.raw_config_io_service`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from app.exceptions import (
|
||||||
|
ConfigDirError,
|
||||||
|
ConfigFileExistsError,
|
||||||
|
ConfigFileNameError,
|
||||||
|
ConfigFileNotFoundError,
|
||||||
|
ConfigFileWriteError,
|
||||||
|
)
|
||||||
|
from app.models.file_config import ConfFileContent, ConfFileEntry, ConfFilesResponse
|
||||||
|
|
||||||
|
_MAX_CONTENT_BYTES: int = 512 * 1024 # 512 KB – hard cap on file write size
|
||||||
|
_CONF_EXTENSIONS: tuple[str, str] = (".conf", ".local")
|
||||||
|
_SAFE_NAME_RE: re.Pattern[str] = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$")
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_subdir(config_dir: str, subdir: str) -> Path:
|
||||||
|
"""Resolve the requested subdirectory inside the config directory.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config_dir: The top-level fail2ban configuration directory.
|
||||||
|
subdir: The subdirectory under the config directory.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A resolved Path for the requested subdirectory.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConfigDirError: If the config directory does not exist or is not a directory.
|
||||||
|
"""
|
||||||
|
base = Path(config_dir).resolve()
|
||||||
|
if not base.is_dir():
|
||||||
|
raise ConfigDirError(f"fail2ban config directory not found: {config_dir!r}")
|
||||||
|
return base / subdir
|
||||||
|
|
||||||
|
|
||||||
|
def _assert_within(base: Path, target: Path) -> None:
|
||||||
|
"""Validate that *target* is contained within *base*.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
base: The allowed root directory.
|
||||||
|
target: The path to validate.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConfigFileNameError: If *target* is outside *base*.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
target.relative_to(base)
|
||||||
|
except ValueError as exc:
|
||||||
|
raise ConfigFileNameError(
|
||||||
|
f"Path {str(target)!r} escapes config directory {str(base)!r}"
|
||||||
|
) from exc
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_new_name(name: str) -> None:
|
||||||
|
"""Validate a new config file base name.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: Proposed base name without extension.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConfigFileNameError: If the name is invalid.
|
||||||
|
"""
|
||||||
|
if not _SAFE_NAME_RE.match(name):
|
||||||
|
raise ConfigFileNameError(
|
||||||
|
f"Invalid config file name {name!r}. "
|
||||||
|
"Use only alphanumeric characters, hyphens, underscores, and dots; "
|
||||||
|
"must start with an alphanumeric character."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_content(content: str) -> None:
|
||||||
|
"""Validate raw file content before writing.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
content: Proposed file content.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConfigFileWriteError: If content exceeds the maximum allowed size.
|
||||||
|
"""
|
||||||
|
if len(content.encode("utf-8")) > _MAX_CONTENT_BYTES:
|
||||||
|
raise ConfigFileWriteError(
|
||||||
|
f"Content exceeds maximum allowed size of {_MAX_CONTENT_BYTES // 1024} KB."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _list_conf_files(subdir: Path) -> ConfFilesResponse:
|
||||||
|
"""List supported config files in *subdir*.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
subdir: Directory containing raw config files.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A :class:`~app.models.file_config.ConfFilesResponse` with names and file
|
||||||
|
metadata.
|
||||||
|
"""
|
||||||
|
resolved_subdir = subdir.resolve()
|
||||||
|
if not resolved_subdir.is_dir():
|
||||||
|
return ConfFilesResponse(files=[], total=0)
|
||||||
|
|
||||||
|
files: list[ConfFileEntry] = []
|
||||||
|
for path in sorted(resolved_subdir.iterdir()):
|
||||||
|
if not path.is_file():
|
||||||
|
continue
|
||||||
|
if path.suffix not in _CONF_EXTENSIONS:
|
||||||
|
continue
|
||||||
|
_assert_within(resolved_subdir, path.resolve())
|
||||||
|
files.append(ConfFileEntry(name=path.stem, filename=path.name))
|
||||||
|
return ConfFilesResponse(files=files, total=len(files))
|
||||||
|
|
||||||
|
|
||||||
|
def _read_conf_file(subdir: Path, name: str) -> ConfFileContent:
|
||||||
|
"""Read a single config file by base name or exact filename.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
subdir: Containing directory.
|
||||||
|
name: Base name or exact filename.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:class:`~app.models.file_config.ConfFileContent` for the resolved file.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConfigFileNameError: If *name* is unsafe.
|
||||||
|
ConfigFileNotFoundError: If no matching file exists.
|
||||||
|
"""
|
||||||
|
resolved_subdir = subdir.resolve()
|
||||||
|
if "." in name and not name.startswith("."):
|
||||||
|
candidates = [resolved_subdir / name]
|
||||||
|
else:
|
||||||
|
candidates = [resolved_subdir / (name + ext) for ext in _CONF_EXTENSIONS]
|
||||||
|
|
||||||
|
for path in candidates:
|
||||||
|
resolved = path.resolve()
|
||||||
|
_assert_within(resolved_subdir, resolved)
|
||||||
|
if resolved.is_file():
|
||||||
|
content = resolved.read_text(encoding="utf-8", errors="replace")
|
||||||
|
return ConfFileContent(
|
||||||
|
name=resolved.stem,
|
||||||
|
filename=resolved.name,
|
||||||
|
content=content,
|
||||||
|
)
|
||||||
|
|
||||||
|
raise ConfigFileNotFoundError(name)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_conf_file(subdir: Path, name: str, content: str) -> Path:
|
||||||
|
"""Overwrite an existing config file in *subdir*.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
subdir: Containing directory.
|
||||||
|
name: Base name or filename.
|
||||||
|
content: New file content.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The resolved path of the written file.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConfigFileNameError: If *name* is unsafe.
|
||||||
|
ConfigFileNotFoundError: If no matching file exists.
|
||||||
|
ConfigFileWriteError: If the file cannot be written.
|
||||||
|
"""
|
||||||
|
resolved_subdir = subdir.resolve()
|
||||||
|
_validate_content(content)
|
||||||
|
|
||||||
|
if "." in name and not name.startswith("."):
|
||||||
|
candidates = [resolved_subdir / name]
|
||||||
|
else:
|
||||||
|
candidates = [resolved_subdir / (name + ext) for ext in _CONF_EXTENSIONS]
|
||||||
|
|
||||||
|
target: Path | None = None
|
||||||
|
for path in candidates:
|
||||||
|
resolved = path.resolve()
|
||||||
|
_assert_within(resolved_subdir, resolved)
|
||||||
|
if resolved.is_file():
|
||||||
|
target = resolved
|
||||||
|
break
|
||||||
|
|
||||||
|
if target is None:
|
||||||
|
raise ConfigFileNotFoundError(name)
|
||||||
|
|
||||||
|
try:
|
||||||
|
target.write_text(content, encoding="utf-8")
|
||||||
|
except OSError as exc:
|
||||||
|
raise ConfigFileWriteError(f"Cannot write {name!r}: {exc}") from exc
|
||||||
|
|
||||||
|
return target
|
||||||
|
|
||||||
|
|
||||||
|
def _create_conf_file(subdir: Path, name: str, content: str) -> str:
|
||||||
|
"""Create a new .conf file in *subdir*.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
subdir: Containing directory.
|
||||||
|
name: Base name for the new file.
|
||||||
|
content: Initial file content.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The filename of the created file.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ConfigFileNameError: If the name is invalid.
|
||||||
|
ConfigFileExistsError: If a conflicting file already exists.
|
||||||
|
ConfigFileWriteError: If the file cannot be written.
|
||||||
|
"""
|
||||||
|
resolved_subdir = subdir.resolve()
|
||||||
|
_validate_new_name(name)
|
||||||
|
_validate_content(content)
|
||||||
|
|
||||||
|
for ext in _CONF_EXTENSIONS:
|
||||||
|
existing = (resolved_subdir / (name + ext)).resolve()
|
||||||
|
_assert_within(resolved_subdir, existing)
|
||||||
|
if existing.exists():
|
||||||
|
raise ConfigFileExistsError(name + ext)
|
||||||
|
|
||||||
|
target = (resolved_subdir / (name + ".conf")).resolve()
|
||||||
|
_assert_within(resolved_subdir, target)
|
||||||
|
try:
|
||||||
|
target.write_text(content, encoding="utf-8")
|
||||||
|
except OSError as exc:
|
||||||
|
raise ConfigFileWriteError(f"Cannot create {name!r}: {exc}") from exc
|
||||||
|
|
||||||
|
return target.name
|
||||||
@@ -41,6 +41,16 @@ from app.models.file_config import (
|
|||||||
JailConfigFileContent,
|
JailConfigFileContent,
|
||||||
JailConfigFilesResponse,
|
JailConfigFilesResponse,
|
||||||
)
|
)
|
||||||
|
from app.services.config_file_helpers import (
|
||||||
|
_CONF_EXTENSIONS,
|
||||||
|
_assert_within,
|
||||||
|
_create_conf_file,
|
||||||
|
_list_conf_files,
|
||||||
|
_read_conf_file,
|
||||||
|
_resolve_subdir,
|
||||||
|
_validate_content,
|
||||||
|
_write_conf_file,
|
||||||
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from app.models.config import (
|
from app.models.config import (
|
||||||
@@ -54,96 +64,6 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Constants
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
_MAX_CONTENT_BYTES: int = 512 * 1024 # 512 KB – hard cap on file write size
|
|
||||||
_CONF_EXTENSIONS: tuple[str, str] = (".conf", ".local")
|
|
||||||
|
|
||||||
# Allowed characters in a new file's base name. Tighter than the OS allows
|
|
||||||
# on purpose: alphanumeric, hyphen, underscore, dot (but not leading dot).
|
|
||||||
_SAFE_NAME_RE: re.Pattern[str] = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$")
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Custom exceptions
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Internal path helpers
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def _resolve_subdir(config_dir: str, subdir: str) -> Path:
|
|
||||||
"""Resolve and return the path of *subdir* inside *config_dir*.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
config_dir: The top-level fail2ban config directory.
|
|
||||||
subdir: Subdirectory name (e.g. ``"jail.d"``).
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Resolved :class:`~pathlib.Path` to the subdirectory.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ConfigDirError: If *config_dir* does not exist or is not a directory.
|
|
||||||
"""
|
|
||||||
base = Path(config_dir).resolve()
|
|
||||||
if not base.is_dir():
|
|
||||||
raise ConfigDirError(f"fail2ban config directory not found: {config_dir!r}")
|
|
||||||
return base / subdir
|
|
||||||
|
|
||||||
|
|
||||||
def _assert_within(base: Path, target: Path) -> None:
|
|
||||||
"""Raise :class:`ConfigFileNameError` if *target* is outside *base*.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
base: The allowed root directory (resolved).
|
|
||||||
target: The path to validate (resolved).
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ConfigFileNameError: If *target* would escape *base*.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
target.relative_to(base)
|
|
||||||
except ValueError as err:
|
|
||||||
raise ConfigFileNameError(
|
|
||||||
f"Path {str(target)!r} escapes config directory {str(base)!r}"
|
|
||||||
) from err
|
|
||||||
|
|
||||||
|
|
||||||
def _validate_new_name(name: str) -> None:
|
|
||||||
"""Validate a base name for a new config file.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
name: The proposed base name (without extension).
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ConfigFileNameError: If *name* contains invalid characters or patterns.
|
|
||||||
"""
|
|
||||||
if not _SAFE_NAME_RE.match(name):
|
|
||||||
raise ConfigFileNameError(
|
|
||||||
f"Invalid config file name {name!r}. "
|
|
||||||
"Use only alphanumeric characters, hyphens, underscores, and dots; "
|
|
||||||
"must start with an alphanumeric character."
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _validate_content(content: str) -> None:
|
|
||||||
"""Reject content that exceeds the size limit.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
content: The proposed file content.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ConfigFileWriteError: If *content* exceeds :data:`_MAX_CONTENT_BYTES`.
|
|
||||||
"""
|
|
||||||
if len(content.encode("utf-8")) > _MAX_CONTENT_BYTES:
|
|
||||||
raise ConfigFileWriteError(
|
|
||||||
f"Content exceeds maximum allowed size of {_MAX_CONTENT_BYTES // 1024} KB."
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Internal helpers — INI parsing / patching
|
# Internal helpers — INI parsing / patching
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -431,145 +351,6 @@ async def write_jail_config_file(
|
|||||||
await run_blocking( _do)
|
await run_blocking( _do)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Internal helpers — generic conf file listing / reading / writing
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def _list_conf_files(subdir: Path) -> ConfFilesResponse:
|
|
||||||
"""List ``.conf`` and ``.local`` files in *subdir*.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
subdir: Resolved path to the directory to scan.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
:class:`~app.models.file_config.ConfFilesResponse`.
|
|
||||||
"""
|
|
||||||
if not subdir.is_dir():
|
|
||||||
return ConfFilesResponse(files=[], total=0)
|
|
||||||
|
|
||||||
files: list[ConfFileEntry] = []
|
|
||||||
for path in sorted(subdir.iterdir()):
|
|
||||||
if not path.is_file():
|
|
||||||
continue
|
|
||||||
if path.suffix not in _CONF_EXTENSIONS:
|
|
||||||
continue
|
|
||||||
_assert_within(subdir.resolve(), path.resolve())
|
|
||||||
files.append(ConfFileEntry(name=path.stem, filename=path.name))
|
|
||||||
return ConfFilesResponse(files=files, total=len(files))
|
|
||||||
|
|
||||||
|
|
||||||
def _read_conf_file(subdir: Path, name: str) -> ConfFileContent:
|
|
||||||
"""Read a single conf file by base name.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
subdir: Resolved path to the containing directory.
|
|
||||||
name: Base name with optional extension. If no extension is given,
|
|
||||||
``.conf`` is tried first, then ``.local``.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
:class:`~app.models.file_config.ConfFileContent`.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ConfigFileNameError: If *name* is unsafe.
|
|
||||||
ConfigFileNotFoundError: If no matching file is found.
|
|
||||||
"""
|
|
||||||
resolved_subdir = subdir.resolve()
|
|
||||||
# Accept names with or without extension.
|
|
||||||
if "." in name and not name.startswith("."):
|
|
||||||
candidates = [resolved_subdir / name]
|
|
||||||
else:
|
|
||||||
candidates = [resolved_subdir / (name + ext) for ext in _CONF_EXTENSIONS]
|
|
||||||
|
|
||||||
for path in candidates:
|
|
||||||
resolved = path.resolve()
|
|
||||||
_assert_within(resolved_subdir, resolved)
|
|
||||||
if resolved.is_file():
|
|
||||||
content = resolved.read_text(encoding="utf-8", errors="replace")
|
|
||||||
return ConfFileContent(
|
|
||||||
name=resolved.stem,
|
|
||||||
filename=resolved.name,
|
|
||||||
content=content,
|
|
||||||
)
|
|
||||||
raise ConfigFileNotFoundError(name)
|
|
||||||
|
|
||||||
|
|
||||||
def _write_conf_file(subdir: Path, name: str, content: str) -> None:
|
|
||||||
"""Overwrite or create a conf file.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
subdir: Resolved path to the containing directory.
|
|
||||||
name: Base name with optional extension.
|
|
||||||
content: New file content.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ConfigFileNameError: If *name* is unsafe.
|
|
||||||
ConfigFileNotFoundError: If *name* does not match an existing file
|
|
||||||
(use :func:`_create_conf_file` for new files).
|
|
||||||
ConfigFileWriteError: If the file cannot be written.
|
|
||||||
"""
|
|
||||||
resolved_subdir = subdir.resolve()
|
|
||||||
_validate_content(content)
|
|
||||||
|
|
||||||
# Accept names with or without extension.
|
|
||||||
if "." in name and not name.startswith("."):
|
|
||||||
candidates = [resolved_subdir / name]
|
|
||||||
else:
|
|
||||||
candidates = [resolved_subdir / (name + ext) for ext in _CONF_EXTENSIONS]
|
|
||||||
|
|
||||||
target: Path | None = None
|
|
||||||
for path in candidates:
|
|
||||||
resolved = path.resolve()
|
|
||||||
_assert_within(resolved_subdir, resolved)
|
|
||||||
if resolved.is_file():
|
|
||||||
target = resolved
|
|
||||||
break
|
|
||||||
|
|
||||||
if target is None:
|
|
||||||
raise ConfigFileNotFoundError(name)
|
|
||||||
|
|
||||||
try:
|
|
||||||
target.write_text(content, encoding="utf-8")
|
|
||||||
except OSError as exc:
|
|
||||||
raise ConfigFileWriteError(f"Cannot write {name!r}: {exc}") from exc
|
|
||||||
|
|
||||||
|
|
||||||
def _create_conf_file(subdir: Path, name: str, content: str) -> str:
|
|
||||||
"""Create a new ``.conf`` file in *subdir*.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
subdir: Resolved path to the containing directory.
|
|
||||||
name: Base name for the new file (without extension).
|
|
||||||
content: Initial file content.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The filename that was created (e.g. ``myfilter.conf``).
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ConfigFileNameError: If *name* is invalid.
|
|
||||||
ConfigFileExistsError: If a ``.conf`` or ``.local`` file with *name* already exists.
|
|
||||||
ConfigFileWriteError: If the file cannot be written.
|
|
||||||
"""
|
|
||||||
resolved_subdir = subdir.resolve()
|
|
||||||
_validate_new_name(name)
|
|
||||||
_validate_content(content)
|
|
||||||
|
|
||||||
for ext in _CONF_EXTENSIONS:
|
|
||||||
existing = (resolved_subdir / (name + ext)).resolve()
|
|
||||||
_assert_within(resolved_subdir, existing)
|
|
||||||
if existing.exists():
|
|
||||||
raise ConfigFileExistsError(name + ext)
|
|
||||||
|
|
||||||
target = (resolved_subdir / (name + ".conf")).resolve()
|
|
||||||
_assert_within(resolved_subdir, target)
|
|
||||||
try:
|
|
||||||
target.write_text(content, encoding="utf-8")
|
|
||||||
except OSError as exc:
|
|
||||||
raise ConfigFileWriteError(f"Cannot create {name!r}: {exc}") from exc
|
|
||||||
|
|
||||||
return target.name
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Public API — filter files (Task 4d)
|
# Public API — filter files (Task 4d)
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -15,10 +15,10 @@ from app.exceptions import (
|
|||||||
ConfigFileNotFoundError,
|
ConfigFileNotFoundError,
|
||||||
ConfigFileWriteError,
|
ConfigFileWriteError,
|
||||||
)
|
)
|
||||||
|
from app.services.config_file_helpers import _validate_new_name
|
||||||
from app.services.raw_config_io_service import (
|
from app.services.raw_config_io_service import (
|
||||||
_parse_enabled,
|
_parse_enabled,
|
||||||
_set_enabled_in_content,
|
_set_enabled_in_content,
|
||||||
_validate_new_name,
|
|
||||||
create_action_file,
|
create_action_file,
|
||||||
create_filter_file,
|
create_filter_file,
|
||||||
create_jail_config_file,
|
create_jail_config_file,
|
||||||
|
|||||||
Reference in New Issue
Block a user