Files
BanGUI/backend/app/services/config_file_service.py

1137 lines
37 KiB
Python

"""Fail2ban jail configuration file parser and activator.
Parses the full set of fail2ban jail configuration files
(``jail.conf``, ``jail.local``, ``jail.d/*.conf``, ``jail.d/*.local``)
to discover all defined jails — both active and inactive — and provides
functions to activate or deactivate them by writing ``.local`` override
files.
Merge order (fail2ban convention):
1. ``jail.conf``
2. ``jail.local``
3. ``jail.d/*.conf`` (alphabetical)
4. ``jail.d/*.local`` (alphabetical)
Security note: the ``activate_jail`` and ``deactivate_jail`` callers must
supply a validated jail name. This module validates the name against an
allowlist pattern before constructing any filesystem paths to prevent
directory traversal.
"""
from __future__ import annotations
import asyncio
import configparser
import contextlib
import io
import os
import re
import tempfile
from pathlib import Path
from typing import cast
import structlog
from app.exceptions import (
ActionAlreadyExistsError,
ActionNameError,
ActionNotFoundError,
ActionReadonlyError,
ConfigWriteError,
FilterAlreadyExistsError,
FilterInvalidRegexError,
FilterNameError,
FilterNotFoundError,
FilterReadonlyError,
JailAlreadyActiveError,
JailAlreadyInactiveError,
JailNameError,
JailNotFoundError,
JailNotFoundInConfigError,
)
import app.services.jail_service as _jail_service_module
from app.models.config import (
ActionConfig,
ActionConfigUpdate,
ActionCreateRequest,
ActionListResponse,
ActionUpdateRequest,
ActivateJailRequest,
AssignActionRequest,
AssignFilterRequest,
BantimeEscalation,
FilterConfig,
FilterConfigUpdate,
FilterCreateRequest,
FilterListResponse,
FilterUpdateRequest,
InactiveJail,
InactiveJailListResponse,
JailActivationResponse,
JailValidationIssue,
JailValidationResult,
RollbackResponse,
)
from app.utils import conffile_parser
from app.services import action_config_service as _action_config_service
from app.services import filter_config_service as _filter_config_service
from app.services import jail_config_service as _jail_config_service
from app.utils.config_file_utils import (
_build_inactive_jail,
_build_parser,
_extract_action_base_name,
_get_active_jail_names,
_is_truthy,
_parse_int_safe,
_parse_jails_sync,
_parse_multiline,
_probe_fail2ban_running,
_resolve_filter,
_safe_filter_name,
_safe_jail_name,
_set_jail_local_key_sync,
start_daemon as _start_daemon,
_validate_jail_config_sync,
wait_for_fail2ban as _wait_for_fail2ban,
)
from app.utils.constants import FAIL2BAN_TRUTHY_VALUES
from app.utils.async_utils import run_blocking
from app.utils.fail2ban_client import (
Fail2BanClient,
Fail2BanConnectionError,
Fail2BanResponse,
)
log: structlog.stdlib.BoundLogger = structlog.get_logger()
# Proxy object for jail reload operations. Tests can patch
# app.services.config_file_service.jail_service.reload_all as needed.
class _JailServiceProxy:
async def reload_all(
self,
socket_path: str,
include_jails: list[str] | None = None,
exclude_jails: list[str] | None = None,
) -> None:
kwargs: dict[str, list[str]] = {}
if include_jails is not None:
kwargs["include_jails"] = include_jails
if exclude_jails is not None:
kwargs["exclude_jails"] = exclude_jails
await _jail_service_module.reload_all(socket_path, **kwargs)
jail_service = _JailServiceProxy()
_write_local_override_sync = _jail_config_service._write_local_override_sync
async def _reload_all(
socket_path: str,
include_jails: list[str] | None = None,
exclude_jails: list[str] | None = None,
) -> None:
"""Reload fail2ban jails using the configured hook or default helper."""
await jail_service.reload_all(
socket_path,
include_jails=include_jails,
exclude_jails=exclude_jails,
)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
# The core config file helper functions are implemented in
# ``app.utils.config_file_utils`` so the config sub-services can import
# shared parsing helpers without creating a circular import path.
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _ordered_config_files(config_dir: Path) -> list[Path]:
"""Return all jail config files in fail2ban merge order.
Args:
config_dir: The fail2ban configuration root directory.
Returns:
List of paths in ascending priority order (later entries override
earlier ones).
"""
files: list[Path] = []
jail_conf = config_dir / 'jail.conf'
if jail_conf.is_file():
files.append(jail_conf)
jail_local = config_dir / 'jail.local'
if jail_local.is_file():
files.append(jail_local)
jail_d = config_dir / 'jail.d'
if jail_d.is_dir():
files.extend(sorted(jail_d.glob('*.conf')))
files.extend(sorted(jail_d.glob('*.local')))
return files
# Public shared helpers for config file services.
ordered_config_files = _ordered_config_files
build_parser = _build_parser
is_truthy = _is_truthy
parse_multiline = _parse_multiline
parse_jails_sync = _parse_jails_sync
build_inactive_jail = _build_inactive_jail
get_active_jail_names = _get_active_jail_names
validate_jail_config_sync = _validate_jail_config_sync
set_jail_local_key_sync = _set_jail_local_key_sync
safe_jail_name = _safe_jail_name
safe_filter_name = _safe_filter_name
_POST_RELOAD_MAX_ATTEMPTS = _jail_config_service._POST_RELOAD_MAX_ATTEMPTS
_validate_regex_patterns = _jail_config_service._validate_regex_patterns
_write_filter_local_sync = _filter_config_service._write_filter_local_sync
_write_action_local_sync = _action_config_service._write_action_local_sync
_append_jail_action_sync = _action_config_service._append_jail_action_sync
_remove_jail_action_sync = _action_config_service._remove_jail_action_sync
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def list_inactive_jails(
config_dir: str,
socket_path: str,
) -> InactiveJailListResponse:
"""Delegate to the canonical jail config service."""
return await _jail_config_service.list_inactive_jails(config_dir, socket_path)
async def activate_jail(
config_dir: str,
socket_path: str,
name: str,
req: ActivateJailRequest,
) -> JailActivationResponse:
"""Delegate to the canonical jail config service."""
return await _jail_config_service._activate_jail(config_dir, socket_path, name, req)
async def _rollback_activation_async(
config_dir: str,
name: str,
socket_path: str,
original_content: bytes | None,
) -> bool:
"""Restore the pre-activation ``.local`` file and reload fail2ban.
Called internally by :func:`activate_jail` when the activation fails after
the config file was already written. Tries to:
1. Restore the original file content (or delete the file if it was newly
created by the activation attempt).
2. Reload fail2ban so the daemon runs with the restored configuration.
3. Probe fail2ban to confirm it came back up.
Args:
config_dir: Absolute path to the fail2ban configuration directory.
name: Name of the jail whose ``.local`` file should be restored.
socket_path: Path to the fail2ban Unix domain socket.
original_content: Raw bytes of the original ``.local`` file, or
``None`` if the file did not exist before the activation.
Returns:
``True`` if fail2ban is responsive again after the rollback, ``False``
if recovery also failed.
"""
local_path = Path(config_dir) / "jail.d" / f"{name}.local"
# Step 1 — restore original file (or delete it).
try:
await run_blocking(_restore_local_file_sync, local_path, original_content)
log.info("jail_activation_rollback_file_restored", jail=name)
except ConfigWriteError as exc:
log.error("jail_activation_rollback_restore_failed", jail=name, error=str(exc))
return False
# Step 2 — reload fail2ban with the restored config.
try:
await _reload_all(socket_path)
log.info("jail_activation_rollback_reload_ok", jail=name)
except Exception as exc: # noqa: BLE001
log.warning("jail_activation_rollback_reload_failed", jail=name, error=str(exc))
return False
# Step 3 — wait for fail2ban to come back.
for attempt in range(_POST_RELOAD_MAX_ATTEMPTS):
if attempt > 0:
await asyncio.sleep(_POST_RELOAD_PROBE_INTERVAL)
if await _probe_fail2ban_running(socket_path):
log.info("jail_activation_rollback_recovered", jail=name)
return True
log.warning("jail_activation_rollback_still_down", jail=name)
return False
async def deactivate_jail(
config_dir: str,
socket_path: str,
name: str,
) -> JailActivationResponse:
"""Delegate to the canonical jail config service."""
return await _jail_config_service._deactivate_jail(config_dir, socket_path, name)
async def delete_jail_local_override(
config_dir: str,
socket_path: str,
name: str,
) -> None:
"""Delegate to the canonical jail config service."""
return await _jail_config_service.delete_jail_local_override(config_dir, socket_path, name)
async def validate_jail_config(
config_dir: str,
name: str,
) -> JailValidationResult:
"""Delegate to the canonical jail config service."""
return await _jail_config_service.validate_jail_config(config_dir, name)
async def rollback_jail(
config_dir: str,
socket_path: str,
name: str,
start_cmd_parts: list[str],
) -> RollbackResponse:
"""Delegate to the canonical jail config helper."""
return await _jail_config_service._rollback_jail(config_dir, socket_path, name, start_cmd_parts)
async def start_daemon(start_cmd_parts: list[str]) -> bool:
"""Start fail2ban using the configured command."""
return await _start_daemon(start_cmd_parts)
async def wait_for_fail2ban(
socket_path: str,
max_wait_seconds: float,
poll_interval: float = 0.5,
) -> bool:
"""Probe the fail2ban socket until it is responsive or the timeout expires."""
return await _wait_for_fail2ban(socket_path, max_wait_seconds, poll_interval)
# ---------------------------------------------------------------------------
# Filter discovery helpers (Task 2.1)
# ---------------------------------------------------------------------------
# Allowlist pattern for filter names used in path construction.
_SAFE_FILTER_NAME_RE: re.Pattern[str] = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$")
def _extract_filter_base_name(filter_raw: str) -> str:
"""Extract the base filter name from a raw fail2ban filter string.
fail2ban jail configs may specify a filter with an optional mode suffix,
e.g. ``sshd``, ``sshd[mode=aggressive]``, or
``%(__name__)s[mode=%(mode)s]``. This function strips the ``[…]`` mode
block and any leading/trailing whitespace to return just the file-system
base name used to look up ``filter.d/{name}.conf``.
Args:
filter_raw: Raw ``filter`` value from a jail config (already
with ``%(__name__)s`` substituted by the caller).
Returns:
Base filter name, e.g. ``"sshd"``.
"""
bracket = filter_raw.find("[")
if bracket != -1:
return filter_raw[:bracket].strip()
return filter_raw.strip()
def _build_filter_to_jails_map(
all_jails: dict[str, dict[str, str]],
active_names: set[str],
) -> dict[str, list[str]]:
"""Return a mapping of filter base name → list of active jail names.
Iterates over every jail whose name is in *active_names*, resolves its
``filter`` config key, and records the jail against the base filter name.
Args:
all_jails: Merged jail config dict — ``{jail_name: {key: value}}``.
active_names: Set of jail names currently running in fail2ban.
Returns:
``{filter_base_name: [jail_name, …]}``.
"""
mapping: dict[str, list[str]] = {}
for jail_name, settings in all_jails.items():
if jail_name not in active_names:
continue
raw_filter = settings.get("filter", "")
mode = settings.get("mode", "normal")
resolved = _resolve_filter(raw_filter, jail_name, mode) if raw_filter else jail_name
base = _extract_filter_base_name(resolved)
if base:
mapping.setdefault(base, []).append(jail_name)
return mapping
def _parse_filters_sync(
filter_d: Path,
) -> list[tuple[str, str, str, bool, str]]:
"""Synchronously scan ``filter.d/`` and return per-filter tuples.
Each tuple contains:
- ``name`` — filter base name (``"sshd"``).
- ``filename`` — actual filename (``"sshd.conf"`` or ``"sshd.local"``).
- ``content`` — merged file content (``conf`` overridden by ``local``).
- ``has_local`` — whether a ``.local`` override exists alongside a ``.conf``.
- ``source_path`` — absolute path to the primary (``conf``) source file, or
to the ``.local`` file for user-created (local-only) filters.
Also discovers ``.local``-only files (user-created filters with no
corresponding ``.conf``). These are returned with ``has_local = False``
and ``source_path`` pointing to the ``.local`` file itself.
Args:
filter_d: Path to the ``filter.d`` directory.
Returns:
List of ``(name, filename, content, has_local, source_path)`` tuples,
sorted by name.
"""
if not filter_d.is_dir():
log.warning("filter_d_not_found", path=str(filter_d))
return []
conf_names: set[str] = set()
results: list[tuple[str, str, str, bool, str]] = []
# ---- .conf-based filters (with optional .local override) ----------------
for conf_path in sorted(filter_d.glob("*.conf")):
if not conf_path.is_file():
continue
name = conf_path.stem
filename = conf_path.name
conf_names.add(name)
local_path = conf_path.with_suffix(".local")
has_local = local_path.is_file()
try:
content = conf_path.read_text(encoding="utf-8")
except OSError as exc:
log.warning("filter_read_error", name=name, path=str(conf_path), error=str(exc))
continue
if has_local:
try:
local_content = local_path.read_text(encoding="utf-8")
# Append local content after conf so configparser reads local
# values last (higher priority).
content = content + "\n" + local_content
except OSError as exc:
log.warning(
"filter_local_read_error",
name=name,
path=str(local_path),
error=str(exc),
)
results.append((name, filename, content, has_local, str(conf_path)))
# ---- .local-only filters (user-created, no corresponding .conf) ----------
for local_path in sorted(filter_d.glob("*.local")):
if not local_path.is_file():
continue
name = local_path.stem
if name in conf_names:
# Already covered above as a .conf filter with a .local override.
continue
try:
content = local_path.read_text(encoding="utf-8")
except OSError as exc:
log.warning(
"filter_local_read_error",
name=name,
path=str(local_path),
error=str(exc),
)
continue
results.append((name, local_path.name, content, False, str(local_path)))
results.sort(key=lambda t: t[0])
log.debug("filters_scanned", count=len(results), filter_d=str(filter_d))
return results
# ---------------------------------------------------------------------------
# Public API — filter discovery (Task 2.1)
# ---------------------------------------------------------------------------
async def list_filters(
config_dir: str,
socket_path: str,
) -> FilterListResponse:
"""Delegate to the canonical filter config service."""
return await _filter_config_service.list_filters(config_dir, socket_path)
async def get_filter(
config_dir: str,
socket_path: str,
name: str,
) -> FilterConfig:
"""Delegate to the canonical filter config service."""
return await _filter_config_service.get_filter(config_dir, socket_path, name)
# ---------------------------------------------------------------------------
# Public API — filter write operations (Task 2.2)
# ---------------------------------------------------------------------------
async def update_filter(
config_dir: str,
socket_path: str,
name: str,
req: FilterUpdateRequest,
do_reload: bool = False,
) -> FilterConfig:
"""Delegate to the canonical filter config service."""
return await _filter_config_service.update_filter(
config_dir,
socket_path,
name,
req,
do_reload=do_reload,
)
async def create_filter(
config_dir: str,
socket_path: str,
req: FilterCreateRequest,
do_reload: bool = False,
) -> FilterConfig:
"""Delegate to the canonical filter config service."""
return await _filter_config_service.create_filter(
config_dir,
socket_path,
req,
do_reload=do_reload,
)
async def delete_filter(
config_dir: str,
name: str,
) -> None:
"""Delegate to the canonical filter config service."""
return await _filter_config_service.delete_filter(config_dir, name)
async def assign_filter_to_jail(
config_dir: str,
socket_path: str,
jail_name: str,
req: AssignFilterRequest,
do_reload: bool = False,
) -> None:
"""Delegate to the canonical filter config service."""
return await _filter_config_service.assign_filter_to_jail(
config_dir,
socket_path,
jail_name,
req,
do_reload=do_reload,
)
# ---------------------------------------------------------------------------
# Action discovery helpers (Task 3.1)
# ---------------------------------------------------------------------------
# Allowlist pattern for action names used in path construction.
_SAFE_ACTION_NAME_RE: re.Pattern[str] = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$")
def _safe_action_name(name: str) -> str:
"""Validate *name* and return it unchanged or raise :class:`ActionNameError`.
Args:
name: Proposed action name (without extension).
Returns:
The name unchanged if valid.
Raises:
ActionNameError: If *name* contains unsafe characters.
"""
if not _SAFE_ACTION_NAME_RE.match(name):
raise ActionNameError(
f"Action name {name!r} contains invalid characters. "
"Only alphanumeric characters, hyphens, underscores, and dots are "
"allowed; must start with an alphanumeric character."
)
return name
def _build_action_to_jails_map(
all_jails: dict[str, dict[str, str]],
active_names: set[str],
) -> dict[str, list[str]]:
"""Return a mapping of action base name → list of active jail names.
Iterates over every jail whose name is in *active_names*, resolves each
entry in its ``action`` config key to an action base name (stripping
``[…]`` parameter blocks), and records the jail against each base name.
Args:
all_jails: Merged jail config dict — ``{jail_name: {key: value}}``.
active_names: Set of jail names currently running in fail2ban.
Returns:
``{action_base_name: [jail_name, …]}``.
"""
mapping: dict[str, list[str]] = {}
for jail_name, settings in all_jails.items():
if jail_name not in active_names:
continue
raw_action = settings.get("action", "")
if not raw_action:
continue
for line in raw_action.splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
# Strip optional [key=value] parameter block to get the base name.
bracket = stripped.find("[")
base = stripped[:bracket].strip() if bracket != -1 else stripped
if base:
mapping.setdefault(base, []).append(jail_name)
return mapping
def _parse_actions_sync(
action_d: Path,
) -> list[tuple[str, str, str, bool, str]]:
"""Synchronously scan ``action.d/`` and return per-action tuples.
Each tuple contains:
- ``name`` — action base name (``"iptables"``).
- ``filename`` — actual filename (``"iptables.conf"``).
- ``content`` — merged file content (``conf`` overridden by ``local``).
- ``has_local`` — whether a ``.local`` override exists alongside a ``.conf``.
- ``source_path`` — absolute path to the primary (``conf``) source file, or
to the ``.local`` file for user-created (local-only) actions.
Also discovers ``.local``-only files (user-created actions with no
corresponding ``.conf``).
Args:
action_d: Path to the ``action.d`` directory.
Returns:
List of ``(name, filename, content, has_local, source_path)`` tuples,
sorted by name.
"""
if not action_d.is_dir():
log.warning("action_d_not_found", path=str(action_d))
return []
conf_names: set[str] = set()
results: list[tuple[str, str, str, bool, str]] = []
# ---- .conf-based actions (with optional .local override) ----------------
for conf_path in sorted(action_d.glob("*.conf")):
if not conf_path.is_file():
continue
name = conf_path.stem
filename = conf_path.name
conf_names.add(name)
local_path = conf_path.with_suffix(".local")
has_local = local_path.is_file()
try:
content = conf_path.read_text(encoding="utf-8")
except OSError as exc:
log.warning("action_read_error", name=name, path=str(conf_path), error=str(exc))
continue
if has_local:
try:
local_content = local_path.read_text(encoding="utf-8")
content = content + "\n" + local_content
except OSError as exc:
log.warning(
"action_local_read_error",
name=name,
path=str(local_path),
error=str(exc),
)
results.append((name, filename, content, has_local, str(conf_path)))
# ---- .local-only actions (user-created, no corresponding .conf) ----------
for local_path in sorted(action_d.glob("*.local")):
if not local_path.is_file():
continue
name = local_path.stem
if name in conf_names:
continue
try:
content = local_path.read_text(encoding="utf-8")
except OSError as exc:
log.warning(
"action_local_read_error",
name=name,
path=str(local_path),
error=str(exc),
)
continue
results.append((name, local_path.name, content, False, str(local_path)))
results.sort(key=lambda t: t[0])
log.debug("actions_scanned", count=len(results), action_d=str(action_d))
return results
def _append_jail_action_sync(
config_dir: Path,
jail_name: str,
action_entry: str,
) -> None:
"""Append an action entry to the ``action`` key in ``jail.d/{jail_name}.local``.
If the ``.local`` file already contains an ``action`` key under the jail
section, the new entry is appended as an additional line (multi-line
configparser format) unless it is already present. If no ``action`` key
exists, one is created.
Args:
config_dir: The fail2ban configuration root directory.
jail_name: Validated jail name.
action_entry: Full action string including any ``[…]`` parameters.
Raises:
ConfigWriteError: If writing fails.
"""
jail_d = config_dir / "jail.d"
try:
jail_d.mkdir(parents=True, exist_ok=True)
except OSError as exc:
raise ConfigWriteError(f"Cannot create jail.d directory: {exc}") from exc
local_path = jail_d / f"{jail_name}.local"
parser = _build_parser()
if local_path.is_file():
try:
parser.read(str(local_path), encoding="utf-8")
except (configparser.Error, OSError) as exc:
log.warning(
"jail_local_read_for_update_error",
jail=jail_name,
error=str(exc),
)
if not parser.has_section(jail_name):
parser.add_section(jail_name)
existing_raw = parser.get(jail_name, "action") if parser.has_option(jail_name, "action") else ""
existing_lines = [
line.strip() for line in existing_raw.splitlines() if line.strip() and not line.strip().startswith("#")
]
# Extract base names from existing entries for duplicate checking.
def _base(entry: str) -> str:
bracket = entry.find("[")
return entry[:bracket].strip() if bracket != -1 else entry.strip()
new_base = _base(action_entry)
if not any(_base(e) == new_base for e in existing_lines):
existing_lines.append(action_entry)
if existing_lines:
# configparser multi-line: continuation lines start with whitespace.
new_value = existing_lines[0] + "".join(f"\n {line}" for line in existing_lines[1:])
parser.set(jail_name, "action", new_value)
else:
parser.set(jail_name, "action", action_entry)
buf = io.StringIO()
buf.write("# Managed by BanGUI — do not edit manually\n\n")
parser.write(buf)
content = buf.getvalue()
try:
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=jail_d,
delete=False,
suffix=".tmp",
) as tmp:
tmp.write(content)
tmp_name = tmp.name
os.replace(tmp_name, local_path)
except OSError as exc:
with contextlib.suppress(OSError):
os.unlink(tmp_name) # noqa: F821
raise ConfigWriteError(f"Failed to write {local_path}: {exc}") from exc
log.info(
"jail_action_appended",
jail=jail_name,
action=action_entry,
path=str(local_path),
)
def _remove_jail_action_sync(
config_dir: Path,
jail_name: str,
action_name: str,
) -> None:
"""Remove an action entry from the ``action`` key in ``jail.d/{jail_name}.local``.
Reads the ``.local`` file, removes any ``action`` entries whose base name
matches *action_name*, and writes the result back atomically. If no
``.local`` file exists, this is a no-op.
Args:
config_dir: The fail2ban configuration root directory.
jail_name: Validated jail name.
action_name: Base name of the action to remove (without ``[…]``).
Raises:
ConfigWriteError: If writing fails.
"""
jail_d = config_dir / "jail.d"
local_path = jail_d / f"{jail_name}.local"
if not local_path.is_file():
return
parser = _build_parser()
try:
parser.read(str(local_path), encoding="utf-8")
except (configparser.Error, OSError) as exc:
log.warning(
"jail_local_read_for_update_error",
jail=jail_name,
error=str(exc),
)
return
if not parser.has_section(jail_name) or not parser.has_option(jail_name, "action"):
return
existing_raw = parser.get(jail_name, "action")
existing_lines = [
line.strip() for line in existing_raw.splitlines() if line.strip() and not line.strip().startswith("#")
]
def _base(entry: str) -> str:
bracket = entry.find("[")
return entry[:bracket].strip() if bracket != -1 else entry.strip()
filtered = [e for e in existing_lines if _base(e) != action_name]
if len(filtered) == len(existing_lines):
# Action was not found — silently return (idempotent).
return
if filtered:
new_value = filtered[0] + "".join(f"\n {line}" for line in filtered[1:])
parser.set(jail_name, "action", new_value)
else:
parser.remove_option(jail_name, "action")
buf = io.StringIO()
buf.write("# Managed by BanGUI — do not edit manually\n\n")
parser.write(buf)
content = buf.getvalue()
try:
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=jail_d,
delete=False,
suffix=".tmp",
) as tmp:
tmp.write(content)
tmp_name = tmp.name
os.replace(tmp_name, local_path)
except OSError as exc:
with contextlib.suppress(OSError):
os.unlink(tmp_name) # noqa: F821
raise ConfigWriteError(f"Failed to write {local_path}: {exc}") from exc
log.info(
"jail_action_removed",
jail=jail_name,
action=action_name,
path=str(local_path),
)
def _write_action_local_sync(action_d: Path, name: str, content: str) -> None:
"""Write *content* to ``action.d/{name}.local`` atomically.
The write is atomic: content is written to a temp file first, then
renamed into place. The ``action.d/`` directory is created if absent.
Args:
action_d: Path to the ``action.d`` directory.
name: Validated action base name (used as filename stem).
content: Full serialized action content to write.
Raises:
ConfigWriteError: If writing fails.
"""
try:
action_d.mkdir(parents=True, exist_ok=True)
except OSError as exc:
raise ConfigWriteError(f"Cannot create action.d directory: {exc}") from exc
local_path = action_d / f"{name}.local"
try:
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=action_d,
delete=False,
suffix=".tmp",
) as tmp:
tmp.write(content)
tmp_name = tmp.name
os.replace(tmp_name, local_path)
except OSError as exc:
with contextlib.suppress(OSError):
os.unlink(tmp_name) # noqa: F821
raise ConfigWriteError(f"Failed to write {local_path}: {exc}") from exc
log.info("action_local_written", action=name, path=str(local_path))
# ---------------------------------------------------------------------------
# Public API — action discovery (Task 3.1)
# ---------------------------------------------------------------------------
async def list_actions(
config_dir: str,
socket_path: str,
) -> ActionListResponse:
"""Delegate to the canonical action config service."""
return await _action_config_service.list_actions(config_dir, socket_path)
async def get_action(
config_dir: str,
socket_path: str,
name: str,
) -> ActionConfig:
"""Delegate to the canonical action config service."""
return await _action_config_service.get_action(config_dir, socket_path, name)
# ---------------------------------------------------------------------------
# Public API — action write operations (Task 3.2)
# ---------------------------------------------------------------------------
async def update_action(
config_dir: str,
socket_path: str,
name: str,
req: ActionUpdateRequest,
do_reload: bool = False,
) -> ActionConfig:
"""Delegate to the canonical action config service."""
return await _action_config_service.update_action(
config_dir,
socket_path,
name,
req,
do_reload=do_reload,
)
async def create_action(
config_dir: str,
socket_path: str,
req: ActionCreateRequest,
do_reload: bool = False,
) -> ActionConfig:
"""Delegate to the canonical action config service."""
return await _action_config_service.create_action(
config_dir,
socket_path,
req,
do_reload=do_reload,
)
async def delete_action(
config_dir: str,
name: str,
) -> None:
"""Delegate to the canonical action config service."""
return await _action_config_service.delete_action(config_dir, name)
async def assign_action_to_jail(
config_dir: str,
socket_path: str,
jail_name: str,
req: AssignActionRequest,
do_reload: bool = False,
) -> None:
"""Add an action to a jail by updating the jail's ``.local`` file.
Appends ``{req.action_name}[{params}]`` (or just ``{req.action_name}`` when
no params are given) to the ``action`` key in the ``[{jail_name}]`` section
of ``jail.d/{jail_name}.local``. If the action is already listed it is not
duplicated. If the ``.local`` file does not exist it is created.
Args:
config_dir: Absolute path to the fail2ban configuration directory.
socket_path: Path to the fail2ban Unix domain socket.
jail_name: Name of the jail to update.
req: Request containing the action name and optional parameters.
do_reload: When ``True``, trigger a full fail2ban reload after writing.
Raises:
JailNameError: If *jail_name* contains invalid characters.
ActionNameError: If ``req.action_name`` contains invalid characters.
JailNotFoundInConfigError: If *jail_name* is not defined in any config
file.
ActionNotFoundError: If ``req.action_name`` does not exist in
``action.d/``.
ConfigWriteError: If writing fails.
"""
_safe_jail_name(jail_name)
_safe_action_name(req.action_name)
all_jails, _src = await run_blocking(_parse_jails_sync, Path(config_dir))
if jail_name not in all_jails:
raise JailNotFoundInConfigError(jail_name)
action_d = Path(config_dir) / "action.d"
def _check_action() -> None:
if (
not (action_d / f"{req.action_name}.conf").is_file()
and not (action_d / f"{req.action_name}.local").is_file()
):
raise ActionNotFoundError(req.action_name)
await run_blocking(_check_action)
# Build the action string with optional parameters.
if req.params:
param_str = ", ".join(f"{k}={v}" for k, v in sorted(req.params.items()))
action_entry = f"{req.action_name}[{param_str}]"
else:
action_entry = req.action_name
await run_blocking(
_append_jail_action_sync,
Path(config_dir),
jail_name,
action_entry,
)
if do_reload:
try:
await _reload_all(socket_path)
except Exception as exc: # noqa: BLE001
log.warning(
"reload_after_assign_action_failed",
jail=jail_name,
action=req.action_name,
error=str(exc),
)
log.info(
"action_assigned_to_jail",
jail=jail_name,
action=req.action_name,
reload=do_reload,
)
async def remove_action_from_jail(
config_dir: str,
socket_path: str,
jail_name: str,
action_name: str,
do_reload: bool = False,
) -> None:
"""Remove an action from a jail's ``.local`` config.
Reads ``jail.d/{jail_name}.local``, removes the line(s) that reference
``{action_name}`` from the ``action`` key (including any ``[…]`` parameter
blocks), and writes the file back atomically.
Args:
config_dir: Absolute path to the fail2ban configuration directory.
socket_path: Path to the fail2ban Unix domain socket.
jail_name: Name of the jail to update.
action_name: Base name of the action to remove.
do_reload: When ``True``, trigger a full fail2ban reload after writing.
Raises:
JailNameError: If *jail_name* contains invalid characters.
ActionNameError: If *action_name* contains invalid characters.
JailNotFoundInConfigError: If *jail_name* is not defined in any config.
ConfigWriteError: If writing fails.
"""
_safe_jail_name(jail_name)
_safe_action_name(action_name)
all_jails, _src = await run_blocking(_parse_jails_sync, Path(config_dir))
if jail_name not in all_jails:
raise JailNotFoundInConfigError(jail_name)
await run_blocking(
_remove_jail_action_sync,
Path(config_dir),
jail_name,
action_name,
)
if do_reload:
try:
await _reload_all(socket_path)
except Exception as exc: # noqa: BLE001
log.warning(
"reload_after_remove_action_failed",
jail=jail_name,
action=action_name,
error=str(exc),
)
log.info(
"action_removed_from_jail",
jail=jail_name,
action=action_name,
reload=do_reload,
)