"""File-based fail2ban configuration service. Provides functions to list, read, and write files in the fail2ban configuration directory (``jail.d/``, ``filter.d/``, ``action.d/``). All file operations are synchronous (wrapped in :func:`app.utils.async_utils.run_blocking` by callers that need async behaviour) because the config files are small and infrequently touched — the overhead of async I/O is not warranted here. Security note: every path-related helper validates that the resolved path stays strictly inside the configured config directory to prevent directory traversal attacks. """ from __future__ import annotations import asyncio from app.utils.async_utils import run_blocking from app.exceptions import ( ConfigDirError, ConfigFileExistsError, ConfigFileNameError, ConfigFileNotFoundError, ConfigFileWriteError, ) import configparser import re from pathlib import Path from typing import TYPE_CHECKING import structlog from app.models.file_config import ( ConfFileContent, ConfFileCreateRequest, ConfFileEntry, ConfFilesResponse, ConfFileUpdateRequest, JailConfigFile, JailConfigFileContent, JailConfigFilesResponse, ) from app.services.config_file_helpers import ( _CONF_EXTENSIONS, _assert_within, _create_conf_file, _list_conf_files, _read_conf_file, _resolve_subdir, _validate_content, _write_conf_file, ) if TYPE_CHECKING: from app.models.config import ( ActionConfig, ActionConfigUpdate, FilterConfig, FilterConfigUpdate, JailFileConfig, JailFileConfigUpdate, ) log: structlog.stdlib.BoundLogger = structlog.get_logger() # --------------------------------------------------------------------------- # Internal helpers — INI parsing / patching # --------------------------------------------------------------------------- def _parse_enabled(path: Path) -> bool: """Return the ``enabled`` value for the primary section in *path*. Reads the INI file with :mod:`configparser` and looks for an ``enabled`` key in the section whose name matches the file stem (or in ``DEFAULT``). Returns ``True`` if the key is absent (fail2ban's own default). Args: path: Path to a ``.conf`` or ``.local`` jail config file. Returns: ``True`` if the jail is (or defaults to) enabled, ``False`` otherwise. """ cp = configparser.ConfigParser( # Treat all keys case-insensitively; interpolation disabled because # fail2ban uses %(variables)s which would confuse configparser. interpolation=None, ) try: cp.read(str(path), encoding="utf-8") except configparser.Error: return True # Unreadable files are treated as enabled (safe default). jail_name = path.stem # Prefer the jail-specific section; fall back to DEFAULT. for section in (jail_name, "DEFAULT"): if cp.has_option(section, "enabled"): raw = cp.get(section, "enabled").strip().lower() return raw in ("true", "1", "yes") return True def _set_enabled_in_content(content: str, enabled: bool) -> str: """Return *content* with the first ``enabled = …`` line replaced. If no ``enabled`` line exists, appends one to the last ``[section]`` block found in the file. Args: content: Current raw file content. enabled: New value for the ``enabled`` key. Returns: Modified file content as a string. """ value = "true" if enabled else "false" # Try to replace an existing "enabled = ..." line (inside any section). pattern = re.compile( r"^(\s*enabled\s*=\s*).*$", re.MULTILINE | re.IGNORECASE, ) if pattern.search(content): return pattern.sub(rf"\g<1>{value}", content, count=1) # No existing enabled line. Find the last [section] header and append # the enabled setting right after it. section_pattern = re.compile(r"^\[([^\[\]]+)\]\s*$", re.MULTILINE) matches = list(section_pattern.finditer(content)) if matches: # Insert after the last section header line. last_match = matches[-1] insert_pos = last_match.end() return content[:insert_pos] + f"\nenabled = {value}" + content[insert_pos:] # No section found at all — prepend a minimal block. return f"[DEFAULT]\nenabled = {value}\n\n" + content # --------------------------------------------------------------------------- # Public API — jail config files (Task 4a) # --------------------------------------------------------------------------- async def list_jail_config_files(config_dir: str) -> JailConfigFilesResponse: """List all jail config files in ``/jail.d/``. Only ``.conf`` and ``.local`` files are returned. The ``enabled`` state is parsed from each file's content. Args: config_dir: Path to the fail2ban configuration directory. Returns: :class:`~app.models.file_config.JailConfigFilesResponse`. Raises: ConfigDirError: If *config_dir* does not exist. """ def _do() -> JailConfigFilesResponse: jail_d = _resolve_subdir(config_dir, "jail.d") if not jail_d.is_dir(): log.warning("jail_d_not_found", config_dir=config_dir) return JailConfigFilesResponse(files=[], total=0) files: list[JailConfigFile] = [] for path in sorted(jail_d.iterdir()): if not path.is_file(): continue if path.suffix not in _CONF_EXTENSIONS: continue _assert_within(jail_d.resolve(), path.resolve()) files.append( JailConfigFile( name=path.stem, filename=path.name, enabled=_parse_enabled(path), ) ) log.info("jail_config_files_listed", count=len(files)) return JailConfigFilesResponse(files=files, total=len(files)) return await run_blocking( _do) async def get_jail_config_file(config_dir: str, filename: str) -> JailConfigFileContent: """Return the content and metadata of a single jail config file. Args: config_dir: Path to the fail2ban configuration directory. filename: The filename (e.g. ``sshd.conf``) — must end in ``.conf`` or ``.local``. Returns: :class:`~app.models.file_config.JailConfigFileContent`. Raises: ConfigFileNameError: If *filename* is unsafe. ConfigFileNotFoundError: If the file does not exist. ConfigDirError: If the config directory does not exist. """ def _do() -> JailConfigFileContent: jail_d = _resolve_subdir(config_dir, "jail.d").resolve() if not jail_d.is_dir(): raise ConfigFileNotFoundError(filename) path = (jail_d / filename).resolve() _assert_within(jail_d, path) if path.suffix not in _CONF_EXTENSIONS: raise ConfigFileNameError( f"Invalid file extension for {filename!r}. " "Only .conf and .local files are supported." ) if not path.is_file(): raise ConfigFileNotFoundError(filename) content = path.read_text(encoding="utf-8", errors="replace") return JailConfigFileContent( name=path.stem, filename=path.name, enabled=_parse_enabled(path), content=content, ) return await run_blocking( _do) async def set_jail_config_enabled( config_dir: str, filename: str, enabled: bool, ) -> None: """Set the ``enabled`` flag in a jail config file. Reads the file, modifies (or inserts) the ``enabled`` key, and writes it back. The update preserves all other content including comments. Args: config_dir: Path to the fail2ban configuration directory. filename: The filename (e.g. ``sshd.conf``). enabled: New value for the ``enabled`` key. Raises: ConfigFileNameError: If *filename* is unsafe. ConfigFileNotFoundError: If the file does not exist. ConfigFileWriteError: If the file cannot be written. ConfigDirError: If the config directory does not exist. """ def _do() -> None: jail_d = _resolve_subdir(config_dir, "jail.d").resolve() if not jail_d.is_dir(): raise ConfigFileNotFoundError(filename) path = (jail_d / filename).resolve() _assert_within(jail_d, path) if path.suffix not in _CONF_EXTENSIONS: raise ConfigFileNameError( f"Only .conf and .local files are supported, got {filename!r}." ) if not path.is_file(): raise ConfigFileNotFoundError(filename) original = path.read_text(encoding="utf-8", errors="replace") updated = _set_enabled_in_content(original, enabled) try: path.write_text(updated, encoding="utf-8") except OSError as exc: raise ConfigFileWriteError( f"Cannot write {filename!r}: {exc}" ) from exc log.info( "jail_config_file_enabled_set", filename=filename, enabled=enabled, ) await run_blocking( _do) async def create_jail_config_file( config_dir: str, req: ConfFileCreateRequest, ) -> str: """Create a new jail.d config file. Args: config_dir: Path to the fail2ban configuration directory. req: :class:`~app.models.file_config.ConfFileCreateRequest`. Returns: The filename that was created. Raises: ConfigFileExistsError: If a file with that name already exists. ConfigFileNameError: If the name is invalid. ConfigFileWriteError: If the file cannot be created. ConfigDirError: If *config_dir* does not exist. """ def _do() -> str: jail_d = _resolve_subdir(config_dir, "jail.d") filename = _create_conf_file(jail_d, req.name, req.content) log.info("jail_config_file_created", filename=filename) return filename return await run_blocking( _do) async def write_jail_config_file( config_dir: str, filename: str, req: ConfFileUpdateRequest, ) -> None: """Overwrite an existing jail.d config file with new raw content. Args: config_dir: Path to the fail2ban configuration directory. filename: Filename including extension (e.g. ``sshd.conf``). req: :class:`~app.models.file_config.ConfFileUpdateRequest` with new content. Raises: ConfigFileNotFoundError: If the file does not exist. ConfigFileNameError: If *filename* is unsafe or has a bad extension. ConfigFileWriteError: If the file cannot be written. ConfigDirError: If *config_dir* does not exist. """ def _do() -> None: jail_d = _resolve_subdir(config_dir, "jail.d").resolve() if not jail_d.is_dir(): raise ConfigFileNotFoundError(filename) path = (jail_d / filename).resolve() _assert_within(jail_d, path) if path.suffix not in _CONF_EXTENSIONS: raise ConfigFileNameError( f"Only .conf and .local files are supported, got {filename!r}." ) if not path.is_file(): raise ConfigFileNotFoundError(filename) try: path.write_text(req.content, encoding="utf-8") except OSError as exc: raise ConfigFileWriteError( f"Cannot write {filename!r}: {exc}" ) from exc log.info("jail_config_file_written", filename=filename) await run_blocking( _do) # --------------------------------------------------------------------------- # Public API — filter files (Task 4d) # --------------------------------------------------------------------------- async def list_filter_files(config_dir: str) -> ConfFilesResponse: """List all filter definition files in ``/filter.d/``. Args: config_dir: Path to the fail2ban configuration directory. Returns: :class:`~app.models.file_config.ConfFilesResponse`. Raises: ConfigDirError: If *config_dir* does not exist. """ def _do() -> ConfFilesResponse: filter_d = _resolve_subdir(config_dir, "filter.d") result = _list_conf_files(filter_d) log.info("filter_files_listed", count=result.total) return result return await run_blocking( _do) async def get_filter_file(config_dir: str, name: str) -> ConfFileContent: """Return the content of a filter definition file. Args: config_dir: Path to the fail2ban configuration directory. name: Base name (with or without ``.conf``/``.local`` extension). Returns: :class:`~app.models.file_config.ConfFileContent`. Raises: ConfigFileNotFoundError: If no matching file is found. ConfigDirError: If *config_dir* does not exist. """ def _do() -> ConfFileContent: filter_d = _resolve_subdir(config_dir, "filter.d") return _read_conf_file(filter_d, name) return await run_blocking( _do) async def write_filter_file( config_dir: str, name: str, req: ConfFileUpdateRequest, ) -> None: """Overwrite an existing filter definition file. Args: config_dir: Path to the fail2ban configuration directory. name: Base name of the file to update (with or without extension). req: :class:`~app.models.file_config.ConfFileUpdateRequest` with new content. Raises: ConfigFileNotFoundError: If no matching file is found. ConfigFileWriteError: If the file cannot be written. ConfigDirError: If *config_dir* does not exist. """ def _do() -> None: filter_d = _resolve_subdir(config_dir, "filter.d") _write_conf_file(filter_d, name, req.content) log.info("filter_file_written", name=name) await run_blocking( _do) async def create_filter_file( config_dir: str, req: ConfFileCreateRequest, ) -> str: """Create a new filter definition file. Args: config_dir: Path to the fail2ban configuration directory. req: :class:`~app.models.file_config.ConfFileCreateRequest`. Returns: The filename that was created. Raises: ConfigFileExistsError: If a file with that name already exists. ConfigFileNameError: If the name is invalid. ConfigFileWriteError: If the file cannot be created. ConfigDirError: If *config_dir* does not exist. """ def _do() -> str: filter_d = _resolve_subdir(config_dir, "filter.d") filename = _create_conf_file(filter_d, req.name, req.content) log.info("filter_file_created", filename=filename) return filename return await run_blocking( _do) # --------------------------------------------------------------------------- # Public API — action files (Task 4e) # --------------------------------------------------------------------------- async def list_action_files(config_dir: str) -> ConfFilesResponse: """List all action definition files in ``/action.d/``. Args: config_dir: Path to the fail2ban configuration directory. Returns: :class:`~app.models.file_config.ConfFilesResponse`. Raises: ConfigDirError: If *config_dir* does not exist. """ def _do() -> ConfFilesResponse: action_d = _resolve_subdir(config_dir, "action.d") result = _list_conf_files(action_d) log.info("action_files_listed", count=result.total) return result return await run_blocking( _do) async def get_action_file(config_dir: str, name: str) -> ConfFileContent: """Return the content of an action definition file. Args: config_dir: Path to the fail2ban configuration directory. name: Base name (with or without ``.conf``/``.local`` extension). Returns: :class:`~app.models.file_config.ConfFileContent`. Raises: ConfigFileNotFoundError: If no matching file is found. ConfigDirError: If *config_dir* does not exist. """ def _do() -> ConfFileContent: action_d = _resolve_subdir(config_dir, "action.d") return _read_conf_file(action_d, name) return await run_blocking( _do) async def write_action_file( config_dir: str, name: str, req: ConfFileUpdateRequest, ) -> None: """Overwrite an existing action definition file. Args: config_dir: Path to the fail2ban configuration directory. name: Base name of the file to update. req: :class:`~app.models.file_config.ConfFileUpdateRequest` with new content. Raises: ConfigFileNotFoundError: If no matching file is found. ConfigFileWriteError: If the file cannot be written. ConfigDirError: If *config_dir* does not exist. """ def _do() -> None: action_d = _resolve_subdir(config_dir, "action.d") _write_conf_file(action_d, name, req.content) log.info("action_file_written", name=name) await run_blocking( _do) async def create_action_file( config_dir: str, req: ConfFileCreateRequest, ) -> str: """Create a new action definition file. Args: config_dir: Path to the fail2ban configuration directory. req: :class:`~app.models.file_config.ConfFileCreateRequest`. Returns: The filename that was created. Raises: ConfigFileExistsError: If a file with that name already exists. ConfigFileNameError: If the name is invalid. ConfigFileWriteError: If the file cannot be created. ConfigDirError: If *config_dir* does not exist. """ def _do() -> str: action_d = _resolve_subdir(config_dir, "action.d") filename = _create_conf_file(action_d, req.name, req.content) log.info("action_file_created", filename=filename) return filename return await run_blocking( _do) # --------------------------------------------------------------------------- # Public API — structured (parsed) filter files (Task 2.1) # --------------------------------------------------------------------------- async def get_parsed_filter_file(config_dir: str, name: str) -> FilterConfig: """Parse a filter definition file and return its structured representation. Reads the raw ``.conf``/``.local`` file from ``filter.d/``, parses it with :func:`~app.utils.conffile_parser.parse_filter_file`, and returns the result. Args: config_dir: Path to the fail2ban configuration directory. name: Base name with or without extension. Returns: :class:`~app.models.config.FilterConfig`. Raises: ConfigFileNotFoundError: If no matching file is found. ConfigDirError: If *config_dir* does not exist. """ from app.utils.conffile_parser import parse_filter_file # avoid circular imports def _do() -> FilterConfig: filter_d = _resolve_subdir(config_dir, "filter.d") raw = _read_conf_file(filter_d, name) result = parse_filter_file(raw.content, name=raw.name, filename=raw.filename) log.debug("filter_file_parsed", name=raw.name) return result return await run_blocking( _do) async def update_parsed_filter_file( config_dir: str, name: str, update: FilterConfigUpdate, ) -> None: """Apply a structured partial update to a filter definition file. Reads the existing file, merges *update* onto it, serializes to INI format, and writes the result back to disk. Args: config_dir: Path to the fail2ban configuration directory. name: Base name of the file to update. update: Partial fields to apply. Raises: ConfigFileNotFoundError: If no matching file is found. ConfigFileWriteError: If the file cannot be written. ConfigDirError: If *config_dir* does not exist. """ from app.utils.conffile_parser import ( # avoid circular imports merge_filter_update, parse_filter_file, serialize_filter_config, ) def _do() -> None: filter_d = _resolve_subdir(config_dir, "filter.d") raw = _read_conf_file(filter_d, name) current = parse_filter_file(raw.content, name=raw.name, filename=raw.filename) merged = merge_filter_update(current, update) new_content = serialize_filter_config(merged) _validate_content(new_content) _write_conf_file(filter_d, name, new_content) log.info("filter_file_updated_parsed", name=name) await run_blocking( _do) # --------------------------------------------------------------------------- # Public API — structured (parsed) action files (Task 3.1) # --------------------------------------------------------------------------- async def get_parsed_action_file(config_dir: str, name: str) -> ActionConfig: """Parse an action definition file and return its structured representation. Args: config_dir: Path to the fail2ban configuration directory. name: Base name with or without extension. Returns: :class:`~app.models.config.ActionConfig`. Raises: ConfigFileNotFoundError: If no matching file is found. ConfigDirError: If *config_dir* does not exist. """ from app.utils.conffile_parser import parse_action_file # avoid circular imports def _do() -> ActionConfig: action_d = _resolve_subdir(config_dir, "action.d") raw = _read_conf_file(action_d, name) result = parse_action_file(raw.content, name=raw.name, filename=raw.filename) log.debug("action_file_parsed", name=raw.name) return result return await run_blocking( _do) async def update_parsed_action_file( config_dir: str, name: str, update: ActionConfigUpdate, ) -> None: """Apply a structured partial update to an action definition file. Args: config_dir: Path to the fail2ban configuration directory. name: Base name of the file to update. update: Partial fields to apply. Raises: ConfigFileNotFoundError: If no matching file is found. ConfigFileWriteError: If the file cannot be written. ConfigDirError: If *config_dir* does not exist. """ from app.utils.conffile_parser import ( # avoid circular imports merge_action_update, parse_action_file, serialize_action_config, ) def _do() -> None: action_d = _resolve_subdir(config_dir, "action.d") raw = _read_conf_file(action_d, name) current = parse_action_file(raw.content, name=raw.name, filename=raw.filename) merged = merge_action_update(current, update) new_content = serialize_action_config(merged) _validate_content(new_content) _write_conf_file(action_d, name, new_content) log.info("action_file_updated_parsed", name=name) await run_blocking( _do) async def get_parsed_jail_file(config_dir: str, filename: str) -> JailFileConfig: """Parse a jail.d config file into a structured :class:`~app.models.config.JailFileConfig`. Args: config_dir: Path to the fail2ban configuration directory. filename: Filename including extension (e.g. ``"sshd.conf"``). Returns: :class:`~app.models.config.JailFileConfig`. Raises: ConfigFileNotFoundError: If no matching file is found. ConfigDirError: If *config_dir* does not exist. """ from app.utils.conffile_parser import parse_jail_file # avoid circular imports def _do() -> JailFileConfig: jail_d = _resolve_subdir(config_dir, "jail.d") raw = _read_conf_file(jail_d, filename) result = parse_jail_file(raw.content, filename=raw.filename) log.debug("jail_file_parsed", filename=raw.filename) return result return await run_blocking( _do) async def update_parsed_jail_file( config_dir: str, filename: str, update: JailFileConfigUpdate, ) -> None: """Apply a structured partial update to a jail.d config file. Args: config_dir: Path to the fail2ban configuration directory. filename: Filename including extension (e.g. ``"sshd.conf"``). update: Partial fields to apply. Raises: ConfigFileNotFoundError: If no matching file is found. ConfigFileWriteError: If the file cannot be written. ConfigDirError: If *config_dir* does not exist. """ from app.utils.conffile_parser import ( # avoid circular imports merge_jail_file_update, parse_jail_file, serialize_jail_file_config, ) def _do() -> None: jail_d = _resolve_subdir(config_dir, "jail.d") raw = _read_conf_file(jail_d, filename) current = parse_jail_file(raw.content, filename=raw.filename) merged = merge_jail_file_update(current, update) new_content = serialize_jail_file_config(merged) _validate_content(new_content) _write_conf_file(jail_d, filename, new_content) log.info("jail_file_updated_parsed", filename=filename) await run_blocking( _do)