diff --git a/backend/app/services/config_file_helpers.py b/backend/app/services/config_file_helpers.py new file mode 100644 index 0000000..f2e3b1a --- /dev/null +++ b/backend/app/services/config_file_helpers.py @@ -0,0 +1,230 @@ +"""Shared helpers for raw fail2ban config file operations. + +This module contains generic path validation, directory scanning, file read/write, +and creation helpers used by :mod:`app.services.raw_config_io_service`. +""" + +from __future__ import annotations + +import re +from pathlib import Path + +from app.exceptions import ( + ConfigDirError, + ConfigFileExistsError, + ConfigFileNameError, + ConfigFileNotFoundError, + ConfigFileWriteError, +) +from app.models.file_config import ConfFileContent, ConfFileEntry, ConfFilesResponse + +_MAX_CONTENT_BYTES: int = 512 * 1024 # 512 KB – hard cap on file write size +_CONF_EXTENSIONS: tuple[str, str] = (".conf", ".local") +_SAFE_NAME_RE: re.Pattern[str] = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$") + + +def _resolve_subdir(config_dir: str, subdir: str) -> Path: + """Resolve the requested subdirectory inside the config directory. + + Args: + config_dir: The top-level fail2ban configuration directory. + subdir: The subdirectory under the config directory. + + Returns: + A resolved Path for the requested subdirectory. + + Raises: + ConfigDirError: If the config directory does not exist or is not a directory. + """ + base = Path(config_dir).resolve() + if not base.is_dir(): + raise ConfigDirError(f"fail2ban config directory not found: {config_dir!r}") + return base / subdir + + +def _assert_within(base: Path, target: Path) -> None: + """Validate that *target* is contained within *base*. + + Args: + base: The allowed root directory. + target: The path to validate. + + Raises: + ConfigFileNameError: If *target* is outside *base*. + """ + try: + target.relative_to(base) + except ValueError as exc: + raise ConfigFileNameError( + f"Path {str(target)!r} escapes config directory {str(base)!r}" + ) from exc + + +def _validate_new_name(name: str) -> None: + """Validate a new config file base name. + + Args: + name: Proposed base name without extension. + + Raises: + ConfigFileNameError: If the name is invalid. + """ + if not _SAFE_NAME_RE.match(name): + raise ConfigFileNameError( + f"Invalid config file name {name!r}. " + "Use only alphanumeric characters, hyphens, underscores, and dots; " + "must start with an alphanumeric character." + ) + + +def _validate_content(content: str) -> None: + """Validate raw file content before writing. + + Args: + content: Proposed file content. + + Raises: + ConfigFileWriteError: If content exceeds the maximum allowed size. + """ + if len(content.encode("utf-8")) > _MAX_CONTENT_BYTES: + raise ConfigFileWriteError( + f"Content exceeds maximum allowed size of {_MAX_CONTENT_BYTES // 1024} KB." + ) + + +def _list_conf_files(subdir: Path) -> ConfFilesResponse: + """List supported config files in *subdir*. + + Args: + subdir: Directory containing raw config files. + + Returns: + A :class:`~app.models.file_config.ConfFilesResponse` with names and file + metadata. + """ + resolved_subdir = subdir.resolve() + if not resolved_subdir.is_dir(): + return ConfFilesResponse(files=[], total=0) + + files: list[ConfFileEntry] = [] + for path in sorted(resolved_subdir.iterdir()): + if not path.is_file(): + continue + if path.suffix not in _CONF_EXTENSIONS: + continue + _assert_within(resolved_subdir, path.resolve()) + files.append(ConfFileEntry(name=path.stem, filename=path.name)) + return ConfFilesResponse(files=files, total=len(files)) + + +def _read_conf_file(subdir: Path, name: str) -> ConfFileContent: + """Read a single config file by base name or exact filename. + + Args: + subdir: Containing directory. + name: Base name or exact filename. + + Returns: + :class:`~app.models.file_config.ConfFileContent` for the resolved file. + + Raises: + ConfigFileNameError: If *name* is unsafe. + ConfigFileNotFoundError: If no matching file exists. + """ + resolved_subdir = subdir.resolve() + if "." in name and not name.startswith("."): + candidates = [resolved_subdir / name] + else: + candidates = [resolved_subdir / (name + ext) for ext in _CONF_EXTENSIONS] + + for path in candidates: + resolved = path.resolve() + _assert_within(resolved_subdir, resolved) + if resolved.is_file(): + content = resolved.read_text(encoding="utf-8", errors="replace") + return ConfFileContent( + name=resolved.stem, + filename=resolved.name, + content=content, + ) + + raise ConfigFileNotFoundError(name) + + +def _write_conf_file(subdir: Path, name: str, content: str) -> Path: + """Overwrite an existing config file in *subdir*. + + Args: + subdir: Containing directory. + name: Base name or filename. + content: New file content. + + Returns: + The resolved path of the written file. + + Raises: + ConfigFileNameError: If *name* is unsafe. + ConfigFileNotFoundError: If no matching file exists. + ConfigFileWriteError: If the file cannot be written. + """ + resolved_subdir = subdir.resolve() + _validate_content(content) + + if "." in name and not name.startswith("."): + candidates = [resolved_subdir / name] + else: + candidates = [resolved_subdir / (name + ext) for ext in _CONF_EXTENSIONS] + + target: Path | None = None + for path in candidates: + resolved = path.resolve() + _assert_within(resolved_subdir, resolved) + if resolved.is_file(): + target = resolved + break + + if target is None: + raise ConfigFileNotFoundError(name) + + try: + target.write_text(content, encoding="utf-8") + except OSError as exc: + raise ConfigFileWriteError(f"Cannot write {name!r}: {exc}") from exc + + return target + + +def _create_conf_file(subdir: Path, name: str, content: str) -> str: + """Create a new .conf file in *subdir*. + + Args: + subdir: Containing directory. + name: Base name for the new file. + content: Initial file content. + + Returns: + The filename of the created file. + + Raises: + ConfigFileNameError: If the name is invalid. + ConfigFileExistsError: If a conflicting file already exists. + ConfigFileWriteError: If the file cannot be written. + """ + resolved_subdir = subdir.resolve() + _validate_new_name(name) + _validate_content(content) + + for ext in _CONF_EXTENSIONS: + existing = (resolved_subdir / (name + ext)).resolve() + _assert_within(resolved_subdir, existing) + if existing.exists(): + raise ConfigFileExistsError(name + ext) + + target = (resolved_subdir / (name + ".conf")).resolve() + _assert_within(resolved_subdir, target) + try: + target.write_text(content, encoding="utf-8") + except OSError as exc: + raise ConfigFileWriteError(f"Cannot create {name!r}: {exc}") from exc + + return target.name diff --git a/backend/app/services/raw_config_io_service.py b/backend/app/services/raw_config_io_service.py index d773008..154ea69 100644 --- a/backend/app/services/raw_config_io_service.py +++ b/backend/app/services/raw_config_io_service.py @@ -41,6 +41,16 @@ from app.models.file_config import ( JailConfigFileContent, JailConfigFilesResponse, ) +from app.services.config_file_helpers import ( + _CONF_EXTENSIONS, + _assert_within, + _create_conf_file, + _list_conf_files, + _read_conf_file, + _resolve_subdir, + _validate_content, + _write_conf_file, +) if TYPE_CHECKING: from app.models.config import ( @@ -54,96 +64,6 @@ if TYPE_CHECKING: log: structlog.stdlib.BoundLogger = structlog.get_logger() -# --------------------------------------------------------------------------- -# Constants -# --------------------------------------------------------------------------- - -_MAX_CONTENT_BYTES: int = 512 * 1024 # 512 KB – hard cap on file write size -_CONF_EXTENSIONS: tuple[str, str] = (".conf", ".local") - -# Allowed characters in a new file's base name. Tighter than the OS allows -# on purpose: alphanumeric, hyphen, underscore, dot (but not leading dot). -_SAFE_NAME_RE: re.Pattern[str] = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$") - -# --------------------------------------------------------------------------- -# Custom exceptions -# --------------------------------------------------------------------------- - - -# --------------------------------------------------------------------------- -# Internal path helpers -# --------------------------------------------------------------------------- - - -def _resolve_subdir(config_dir: str, subdir: str) -> Path: - """Resolve and return the path of *subdir* inside *config_dir*. - - Args: - config_dir: The top-level fail2ban config directory. - subdir: Subdirectory name (e.g. ``"jail.d"``). - - Returns: - Resolved :class:`~pathlib.Path` to the subdirectory. - - Raises: - ConfigDirError: If *config_dir* does not exist or is not a directory. - """ - base = Path(config_dir).resolve() - if not base.is_dir(): - raise ConfigDirError(f"fail2ban config directory not found: {config_dir!r}") - return base / subdir - - -def _assert_within(base: Path, target: Path) -> None: - """Raise :class:`ConfigFileNameError` if *target* is outside *base*. - - Args: - base: The allowed root directory (resolved). - target: The path to validate (resolved). - - Raises: - ConfigFileNameError: If *target* would escape *base*. - """ - try: - target.relative_to(base) - except ValueError as err: - raise ConfigFileNameError( - f"Path {str(target)!r} escapes config directory {str(base)!r}" - ) from err - - -def _validate_new_name(name: str) -> None: - """Validate a base name for a new config file. - - Args: - name: The proposed base name (without extension). - - Raises: - ConfigFileNameError: If *name* contains invalid characters or patterns. - """ - if not _SAFE_NAME_RE.match(name): - raise ConfigFileNameError( - f"Invalid config file name {name!r}. " - "Use only alphanumeric characters, hyphens, underscores, and dots; " - "must start with an alphanumeric character." - ) - - -def _validate_content(content: str) -> None: - """Reject content that exceeds the size limit. - - Args: - content: The proposed file content. - - Raises: - ConfigFileWriteError: If *content* exceeds :data:`_MAX_CONTENT_BYTES`. - """ - if len(content.encode("utf-8")) > _MAX_CONTENT_BYTES: - raise ConfigFileWriteError( - f"Content exceeds maximum allowed size of {_MAX_CONTENT_BYTES // 1024} KB." - ) - - # --------------------------------------------------------------------------- # Internal helpers — INI parsing / patching # --------------------------------------------------------------------------- @@ -431,145 +351,6 @@ async def write_jail_config_file( await run_blocking( _do) -# --------------------------------------------------------------------------- -# Internal helpers — generic conf file listing / reading / writing -# --------------------------------------------------------------------------- - - -def _list_conf_files(subdir: Path) -> ConfFilesResponse: - """List ``.conf`` and ``.local`` files in *subdir*. - - Args: - subdir: Resolved path to the directory to scan. - - Returns: - :class:`~app.models.file_config.ConfFilesResponse`. - """ - if not subdir.is_dir(): - return ConfFilesResponse(files=[], total=0) - - files: list[ConfFileEntry] = [] - for path in sorted(subdir.iterdir()): - if not path.is_file(): - continue - if path.suffix not in _CONF_EXTENSIONS: - continue - _assert_within(subdir.resolve(), path.resolve()) - files.append(ConfFileEntry(name=path.stem, filename=path.name)) - return ConfFilesResponse(files=files, total=len(files)) - - -def _read_conf_file(subdir: Path, name: str) -> ConfFileContent: - """Read a single conf file by base name. - - Args: - subdir: Resolved path to the containing directory. - name: Base name with optional extension. If no extension is given, - ``.conf`` is tried first, then ``.local``. - - Returns: - :class:`~app.models.file_config.ConfFileContent`. - - Raises: - ConfigFileNameError: If *name* is unsafe. - ConfigFileNotFoundError: If no matching file is found. - """ - resolved_subdir = subdir.resolve() - # Accept names with or without extension. - if "." in name and not name.startswith("."): - candidates = [resolved_subdir / name] - else: - candidates = [resolved_subdir / (name + ext) for ext in _CONF_EXTENSIONS] - - for path in candidates: - resolved = path.resolve() - _assert_within(resolved_subdir, resolved) - if resolved.is_file(): - content = resolved.read_text(encoding="utf-8", errors="replace") - return ConfFileContent( - name=resolved.stem, - filename=resolved.name, - content=content, - ) - raise ConfigFileNotFoundError(name) - - -def _write_conf_file(subdir: Path, name: str, content: str) -> None: - """Overwrite or create a conf file. - - Args: - subdir: Resolved path to the containing directory. - name: Base name with optional extension. - content: New file content. - - Raises: - ConfigFileNameError: If *name* is unsafe. - ConfigFileNotFoundError: If *name* does not match an existing file - (use :func:`_create_conf_file` for new files). - ConfigFileWriteError: If the file cannot be written. - """ - resolved_subdir = subdir.resolve() - _validate_content(content) - - # Accept names with or without extension. - if "." in name and not name.startswith("."): - candidates = [resolved_subdir / name] - else: - candidates = [resolved_subdir / (name + ext) for ext in _CONF_EXTENSIONS] - - target: Path | None = None - for path in candidates: - resolved = path.resolve() - _assert_within(resolved_subdir, resolved) - if resolved.is_file(): - target = resolved - break - - if target is None: - raise ConfigFileNotFoundError(name) - - try: - target.write_text(content, encoding="utf-8") - except OSError as exc: - raise ConfigFileWriteError(f"Cannot write {name!r}: {exc}") from exc - - -def _create_conf_file(subdir: Path, name: str, content: str) -> str: - """Create a new ``.conf`` file in *subdir*. - - Args: - subdir: Resolved path to the containing directory. - name: Base name for the new file (without extension). - content: Initial file content. - - Returns: - The filename that was created (e.g. ``myfilter.conf``). - - Raises: - ConfigFileNameError: If *name* is invalid. - ConfigFileExistsError: If a ``.conf`` or ``.local`` file with *name* already exists. - ConfigFileWriteError: If the file cannot be written. - """ - resolved_subdir = subdir.resolve() - _validate_new_name(name) - _validate_content(content) - - for ext in _CONF_EXTENSIONS: - existing = (resolved_subdir / (name + ext)).resolve() - _assert_within(resolved_subdir, existing) - if existing.exists(): - raise ConfigFileExistsError(name + ext) - - target = (resolved_subdir / (name + ".conf")).resolve() - _assert_within(resolved_subdir, target) - try: - target.write_text(content, encoding="utf-8") - except OSError as exc: - raise ConfigFileWriteError(f"Cannot create {name!r}: {exc}") from exc - - return target.name - - # --------------------------------------------------------------------------- # Public API — filter files (Task 4d) # --------------------------------------------------------------------------- diff --git a/backend/tests/test_services/test_file_config_service.py b/backend/tests/test_services/test_file_config_service.py index 10a8084..4087e93 100644 --- a/backend/tests/test_services/test_file_config_service.py +++ b/backend/tests/test_services/test_file_config_service.py @@ -15,10 +15,10 @@ from app.exceptions import ( ConfigFileNotFoundError, ConfigFileWriteError, ) +from app.services.config_file_helpers import _validate_new_name from app.services.raw_config_io_service import ( _parse_enabled, _set_enabled_in_content, - _validate_new_name, create_action_file, create_filter_file, create_jail_config_file,