Files
BanGUI/backend/app/services/config_file_service.py

1946 lines
63 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Fail2ban jail configuration file parser and activator.
Parses the full set of fail2ban jail configuration files
(``jail.conf``, ``jail.local``, ``jail.d/*.conf``, ``jail.d/*.local``)
to discover all defined jails — both active and inactive — and provides
functions to activate or deactivate them by writing ``.local`` override
files.
Merge order (fail2ban convention):
1. ``jail.conf``
2. ``jail.local``
3. ``jail.d/*.conf`` (alphabetical)
4. ``jail.d/*.local`` (alphabetical)
Security note: the ``activate_jail`` and ``deactivate_jail`` callers must
supply a validated jail name. This module validates the name against an
allowlist pattern before constructing any filesystem paths to prevent
directory traversal.
"""
from __future__ import annotations
import asyncio
import configparser
import contextlib
import io
import os
import re
import tempfile
from pathlib import Path
from typing import cast
import structlog
from app.exceptions import (
ActionAlreadyExistsError,
ActionNameError,
ActionNotFoundError,
ActionReadonlyError,
ConfigWriteError,
FilterAlreadyExistsError,
FilterInvalidRegexError,
FilterNameError,
FilterNotFoundError,
FilterReadonlyError,
JailAlreadyActiveError,
JailAlreadyInactiveError,
JailNameError,
JailNotFoundError,
JailNotFoundInConfigError,
)
import app.services.jail_service as _jail_service_module
from app.models.config import (
ActionConfig,
ActionConfigUpdate,
ActionCreateRequest,
ActionListResponse,
ActionUpdateRequest,
ActivateJailRequest,
AssignActionRequest,
AssignFilterRequest,
BantimeEscalation,
FilterConfig,
FilterConfigUpdate,
FilterCreateRequest,
FilterListResponse,
FilterUpdateRequest,
InactiveJail,
InactiveJailListResponse,
JailActivationResponse,
JailValidationIssue,
JailValidationResult,
RollbackResponse,
)
from app.utils import conffile_parser
from app.utils.constants import FAIL2BAN_TRUTHY_VALUES
from app.utils.async_utils import run_blocking
from app.utils.fail2ban_client import (
Fail2BanClient,
Fail2BanConnectionError,
Fail2BanResponse,
)
log: structlog.stdlib.BoundLogger = structlog.get_logger()
# Proxy object for jail reload operations. Tests can patch
# app.services.config_file_service.jail_service.reload_all as needed.
class _JailServiceProxy:
async def reload_all(
self,
socket_path: str,
include_jails: list[str] | None = None,
exclude_jails: list[str] | None = None,
) -> None:
kwargs: dict[str, list[str]] = {}
if include_jails is not None:
kwargs["include_jails"] = include_jails
if exclude_jails is not None:
kwargs["exclude_jails"] = exclude_jails
await _jail_service_module.reload_all(socket_path, **kwargs)
jail_service = _JailServiceProxy()
async def _reload_all(
socket_path: str,
include_jails: list[str] | None = None,
exclude_jails: list[str] | None = None,
) -> None:
"""Reload fail2ban jails using the configured hook or default helper."""
await jail_service.reload_all(
socket_path,
include_jails=include_jails,
exclude_jails=exclude_jails,
)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_SOCKET_TIMEOUT: float = 10.0
# Allowlist pattern for jail names used in path construction.
_SAFE_JAIL_NAME_RE: re.Pattern[str] = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$")
# Sections that are not jail definitions.
_META_SECTIONS: frozenset[str] = frozenset({"INCLUDES", "DEFAULT"})
# False-ish values for the ``enabled`` key.
_FALSE_VALUES: frozenset[str] = frozenset({"false", "no", "0"})
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _safe_jail_name(name: str) -> str:
"""Validate *name* and return it unchanged or raise :class:`JailNameError`.
Args:
name: Proposed jail name.
Returns:
The name unchanged if valid.
Raises:
JailNameError: If *name* contains unsafe characters.
"""
if not _SAFE_JAIL_NAME_RE.match(name):
raise JailNameError(
f"Jail name {name!r} contains invalid characters. "
"Only alphanumeric characters, hyphens, underscores, and dots are "
"allowed; must start with an alphanumeric character."
)
return name
def _safe_filter_name(name: str) -> str:
"""Validate *name* and return it unchanged or raise :class:`FilterNameError`.
Args:
name: Proposed filter name (without extension).
Returns:
The name unchanged if valid.
Raises:
FilterNameError: If *name* contains unsafe characters.
"""
if not _SAFE_FILTER_NAME_RE.match(name):
raise FilterNameError(
f"Filter name {name!r} contains invalid characters. "
"Only alphanumeric characters, hyphens, underscores, and dots are "
"allowed; must start with an alphanumeric character."
)
return name
def _ordered_config_files(config_dir: Path) -> list[Path]:
"""Return all jail config files in fail2ban merge order.
Args:
config_dir: The fail2ban configuration root directory.
Returns:
List of paths in ascending priority order (later entries override
earlier ones).
"""
files: list[Path] = []
jail_conf = config_dir / "jail.conf"
if jail_conf.is_file():
files.append(jail_conf)
jail_local = config_dir / "jail.local"
if jail_local.is_file():
files.append(jail_local)
jail_d = config_dir / "jail.d"
if jail_d.is_dir():
files.extend(sorted(jail_d.glob("*.conf")))
files.extend(sorted(jail_d.glob("*.local")))
return files
def _build_parser() -> configparser.RawConfigParser:
"""Create a :class:`configparser.RawConfigParser` for fail2ban configs.
Returns:
Parser with interpolation disabled and case-sensitive option names.
"""
parser = configparser.RawConfigParser(interpolation=None, strict=False)
# fail2ban keys are lowercase but preserve case to be safe.
parser.optionxform = str # type: ignore[assignment]
return parser
def _is_truthy(value: str) -> bool:
"""Return ``True`` if *value* is a fail2ban boolean true string.
Args:
value: Raw string from config (e.g. ``"true"``, ``"yes"``, ``"1"``).
Returns:
``True`` when the value represents enabled.
"""
return value.strip().lower() in FAIL2BAN_TRUTHY_VALUES
def _parse_int_safe(value: str) -> int | None:
"""Parse *value* as int, returning ``None`` on failure.
Args:
value: Raw string to parse.
Returns:
Integer value, or ``None``.
"""
try:
return int(value.strip())
except (ValueError, AttributeError):
return None
def _parse_time_to_seconds(value: str | None, default: int) -> int:
"""Convert a fail2ban time string (e.g. ``1h``, ``10m``, ``3600``) to seconds.
Supports the suffixes ``s`` (seconds), ``m`` (minutes), ``h`` (hours),
``d`` (days), ``w`` (weeks), and plain integers (already seconds).
``-1`` is treated as a permanent ban and returned as-is.
Args:
value: Raw time string from config, or ``None``.
default: Value to return when ``value`` is absent or unparseable.
Returns:
Duration in seconds, or ``-1`` for permanent, or ``default`` on failure.
"""
if not value:
return default
stripped = value.strip()
if stripped == "-1":
return -1
multipliers: dict[str, int] = {
"w": 604800,
"d": 86400,
"h": 3600,
"m": 60,
"s": 1,
}
for suffix, factor in multipliers.items():
if stripped.endswith(suffix) and len(stripped) > 1:
try:
return int(stripped[:-1]) * factor
except ValueError:
return default
try:
return int(stripped)
except ValueError:
return default
def _parse_multiline(raw: str) -> list[str]:
"""Split a multi-line INI value into individual non-blank lines.
Args:
raw: Raw multi-line string from configparser.
Returns:
List of stripped, non-empty, non-comment strings.
"""
result: list[str] = []
for line in raw.splitlines():
stripped = line.strip()
if stripped and not stripped.startswith("#"):
result.append(stripped)
return result
def _resolve_filter(raw_filter: str, jail_name: str, mode: str) -> str:
"""Resolve fail2ban variable placeholders in a filter string.
Handles the common default ``%(__name__)s[mode=%(mode)s]`` pattern that
fail2ban uses so the filter name displayed to the user is readable.
Args:
raw_filter: Raw ``filter`` value from config (may contain ``%()s``).
jail_name: The jail's section name, used to substitute ``%(__name__)s``.
mode: The jail's ``mode`` value, used to substitute ``%(mode)s``.
Returns:
Human-readable filter string.
"""
result = raw_filter.replace("%(__name__)s", jail_name)
result = result.replace("%(mode)s", mode)
return result
def _parse_jails_sync(
config_dir: Path,
) -> tuple[dict[str, dict[str, str]], dict[str, str]]:
"""Synchronously parse all jail configs and return merged definitions.
This is a CPU-bound / IO-bound sync function; callers must dispatch to
an executor for async use.
Args:
config_dir: The fail2ban configuration root directory.
Returns:
A two-tuple ``(jails, source_files)`` where:
- ``jails``: ``{jail_name: {key: value}}`` merged settings for each
jail with DEFAULT values already applied.
- ``source_files``: ``{jail_name: str(path)}`` path of the file that
last defined each jail section (for display in the UI).
"""
parser = _build_parser()
files = _ordered_config_files(config_dir)
# Track which file each section came from (last write wins).
source_files: dict[str, str] = {}
for path in files:
try:
single = _build_parser()
single.read(str(path), encoding="utf-8")
for section in single.sections():
if section not in _META_SECTIONS:
source_files[section] = str(path)
except (configparser.Error, OSError) as exc:
log.warning("jail_config_read_error", path=str(path), error=str(exc))
# Full merged parse: configparser applies DEFAULT values to every section.
try:
parser.read([str(p) for p in files], encoding="utf-8")
except configparser.Error as exc:
log.warning("jail_config_parse_error", error=str(exc))
jails: dict[str, dict[str, str]] = {}
for section in parser.sections():
if section in _META_SECTIONS:
continue
try:
# items() merges DEFAULT values automatically.
jails[section] = dict(parser.items(section))
except configparser.Error as exc:
log.warning("jail_section_parse_error", section=section, error=str(exc))
log.debug("jails_parsed", count=len(jails), config_dir=str(config_dir))
return jails, source_files
def _build_inactive_jail(
name: str,
settings: dict[str, str],
source_file: str,
config_dir: Path | None = None,
) -> InactiveJail:
"""Construct an :class:`~app.models.config.InactiveJail` from raw settings.
Args:
name: Jail section name.
settings: Merged key→value dict (DEFAULT values already applied).
source_file: Path of the file that last defined this section.
config_dir: Absolute path to the fail2ban configuration directory, used
to check whether a ``jail.d/{name}.local`` override file exists.
Returns:
Populated :class:`~app.models.config.InactiveJail`.
"""
raw_filter = settings.get("filter", "")
mode = settings.get("mode", "normal")
filter_name = _resolve_filter(raw_filter, name, mode) if raw_filter else name
raw_action = settings.get("action", "")
actions = _parse_multiline(raw_action) if raw_action else []
raw_logpath = settings.get("logpath", "")
logpath = _parse_multiline(raw_logpath) if raw_logpath else []
enabled_raw = settings.get("enabled", "false")
enabled = _is_truthy(enabled_raw)
maxretry_raw = settings.get("maxretry", "")
maxretry = _parse_int_safe(maxretry_raw)
# Extended fields for full GUI display
ban_time_seconds = _parse_time_to_seconds(settings.get("bantime"), 600)
find_time_seconds = _parse_time_to_seconds(settings.get("findtime"), 600)
log_encoding = settings.get("logencoding") or "auto"
backend = settings.get("backend") or "auto"
date_pattern = settings.get("datepattern") or None
use_dns = settings.get("usedns") or "warn"
prefregex = settings.get("prefregex") or ""
fail_regex = _parse_multiline(settings.get("failregex", ""))
ignore_regex = _parse_multiline(settings.get("ignoreregex", ""))
# Ban-time escalation
esc_increment = _is_truthy(settings.get("bantime.increment", "false"))
esc_factor_raw = settings.get("bantime.factor")
esc_factor = float(esc_factor_raw) if esc_factor_raw else None
esc_formula = settings.get("bantime.formula") or None
esc_multipliers = settings.get("bantime.multipliers") or None
esc_max_raw = settings.get("bantime.maxtime")
esc_max_time = _parse_time_to_seconds(esc_max_raw, 0) if esc_max_raw else None
esc_rnd_raw = settings.get("bantime.rndtime")
esc_rnd_time = _parse_time_to_seconds(esc_rnd_raw, 0) if esc_rnd_raw else None
esc_overall = _is_truthy(settings.get("bantime.overalljails", "false"))
bantime_escalation = (
BantimeEscalation(
increment=esc_increment,
factor=esc_factor,
formula=esc_formula,
multipliers=esc_multipliers,
max_time=esc_max_time,
rnd_time=esc_rnd_time,
overall_jails=esc_overall,
)
if esc_increment
else None
)
return InactiveJail(
name=name,
filter=filter_name,
actions=actions,
port=settings.get("port") or None,
logpath=logpath,
bantime=settings.get("bantime") or None,
findtime=settings.get("findtime") or None,
maxretry=maxretry,
ban_time_seconds=ban_time_seconds,
find_time_seconds=find_time_seconds,
log_encoding=log_encoding,
backend=backend,
date_pattern=date_pattern,
use_dns=use_dns,
prefregex=prefregex,
fail_regex=fail_regex,
ignore_regex=ignore_regex,
bantime_escalation=bantime_escalation,
source_file=source_file,
enabled=enabled,
has_local_override=((config_dir / "jail.d" / f"{name}.local").is_file() if config_dir is not None else False),
)
async def _get_active_jail_names(socket_path: str) -> set[str]:
"""Fetch the set of currently running jail names from fail2ban.
Returns an empty set gracefully if fail2ban is unreachable.
Args:
socket_path: Path to the fail2ban Unix domain socket.
Returns:
Set of active jail names, or empty set on connection failure.
"""
try:
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
def _to_dict_inner(pairs: object) -> dict[str, object]:
if not isinstance(pairs, (list, tuple)):
return {}
result: dict[str, object] = {}
for item in pairs:
try:
k, v = item
result[str(k)] = v
except (TypeError, ValueError):
pass
return result
def _ok(response: object) -> object:
code, data = cast("Fail2BanResponse", response)
if code != 0:
raise ValueError(f"fail2ban error {code}: {data!r}")
return data
status_raw = _ok(await client.send(["status"]))
status_dict = _to_dict_inner(status_raw)
jail_list_raw: str = str(status_dict.get("Jail list", "") or "").strip()
if not jail_list_raw:
return set()
return {j.strip() for j in jail_list_raw.split(",") if j.strip()}
except Fail2BanConnectionError:
log.warning("fail2ban_unreachable_during_inactive_list")
return set()
except Exception as exc: # noqa: BLE001
log.warning("fail2ban_status_error_during_inactive_list", error=str(exc))
return set()
# ---------------------------------------------------------------------------
# Validation helpers (Task 3)
# ---------------------------------------------------------------------------
# Seconds to wait between fail2ban liveness probes after a reload.
_POST_RELOAD_PROBE_INTERVAL: float = 2.0
# Maximum number of post-reload probe attempts (initial attempt + retries).
_POST_RELOAD_MAX_ATTEMPTS: int = 4
def _extract_action_base_name(action_str: str) -> str | None:
"""Return the base action name from an action assignment string.
Returns ``None`` for complex fail2ban expressions that cannot be resolved
to a single filename (e.g. ``%(action_)s`` interpolations or multi-token
composite actions).
Args:
action_str: A single line from the jail's ``action`` setting.
Returns:
Simple base name suitable for a filesystem lookup, or ``None``.
"""
if "%" in action_str or "$" in action_str:
return None
base = action_str.split("[")[0].strip()
if _SAFE_ACTION_NAME_RE.match(base):
return base
return None
def _validate_jail_config_sync(
config_dir: Path,
name: str,
) -> JailValidationResult:
"""Run synchronous pre-activation checks on a jail configuration.
Validates:
1. Filter file existence in ``filter.d/``.
2. Action file existence in ``action.d/`` (for resolvable action names).
3. Regex compilation for every ``failregex`` and ``ignoreregex`` pattern.
4. Log path existence on disk (generates warnings, not errors).
Args:
config_dir: The fail2ban configuration root directory.
name: Validated jail name.
Returns:
:class:`~app.models.config.JailValidationResult` with any issues found.
"""
issues: list[JailValidationIssue] = []
all_jails, _ = _parse_jails_sync(config_dir)
settings = all_jails.get(name)
if settings is None:
return JailValidationResult(
jail_name=name,
valid=False,
issues=[
JailValidationIssue(
field="name",
message=f"Jail {name!r} not found in config files.",
)
],
)
filter_d = config_dir / "filter.d"
action_d = config_dir / "action.d"
# 1. Filter existence check.
raw_filter = settings.get("filter", "")
if raw_filter:
mode = settings.get("mode", "normal")
resolved = _resolve_filter(raw_filter, name, mode)
base_filter = _extract_filter_base_name(resolved)
if base_filter:
conf_ok = (filter_d / f"{base_filter}.conf").is_file()
local_ok = (filter_d / f"{base_filter}.local").is_file()
if not conf_ok and not local_ok:
issues.append(
JailValidationIssue(
field="filter",
message=(f"Filter file not found: filter.d/{base_filter}.conf (or .local)"),
)
)
# 2. Action existence check.
raw_action = settings.get("action", "")
if raw_action:
for action_line in _parse_multiline(raw_action):
action_name = _extract_action_base_name(action_line)
if action_name:
conf_ok = (action_d / f"{action_name}.conf").is_file()
local_ok = (action_d / f"{action_name}.local").is_file()
if not conf_ok and not local_ok:
issues.append(
JailValidationIssue(
field="action",
message=(f"Action file not found: action.d/{action_name}.conf (or .local)"),
)
)
# 3. failregex compilation.
for pattern in _parse_multiline(settings.get("failregex", "")):
try:
re.compile(pattern)
except re.error as exc:
issues.append(
JailValidationIssue(
field="failregex",
message=f"Invalid regex pattern: {exc}",
)
)
# 4. ignoreregex compilation.
for pattern in _parse_multiline(settings.get("ignoreregex", "")):
try:
re.compile(pattern)
except re.error as exc:
issues.append(
JailValidationIssue(
field="ignoreregex",
message=f"Invalid regex pattern: {exc}",
)
)
# 5. Log path existence (warning only — paths may be created at runtime).
raw_logpath = settings.get("logpath", "")
if raw_logpath:
for log_path in _parse_multiline(raw_logpath):
# Skip glob patterns and fail2ban variable references.
if "*" in log_path or "?" in log_path or "%(" in log_path:
continue
if not Path(log_path).exists():
issues.append(
JailValidationIssue(
field="logpath",
message=f"Log file not found on disk: {log_path}",
)
)
valid = len(issues) == 0
log.debug(
"jail_validation_complete",
jail=name,
valid=valid,
issue_count=len(issues),
)
return JailValidationResult(jail_name=name, valid=valid, issues=issues)
async def _probe_fail2ban_running(socket_path: str) -> bool:
"""Return ``True`` if the fail2ban socket responds to a ping.
Args:
socket_path: Path to the fail2ban Unix domain socket.
Returns:
``True`` when fail2ban is reachable, ``False`` otherwise.
"""
try:
client = Fail2BanClient(socket_path=socket_path, timeout=5.0)
resp = await client.send(["ping"])
return isinstance(resp, (list, tuple)) and resp[0] == 0
except Exception: # noqa: BLE001
return False
async def wait_for_fail2ban(
socket_path: str,
max_wait_seconds: float = 10.0,
poll_interval: float = 2.0,
) -> bool:
"""Poll the fail2ban socket until it responds or the timeout expires.
Args:
socket_path: Path to the fail2ban Unix domain socket.
max_wait_seconds: Total time budget in seconds.
poll_interval: Delay between probe attempts in seconds.
Returns:
``True`` if fail2ban came online within the budget.
"""
elapsed = 0.0
while elapsed < max_wait_seconds:
if await _probe_fail2ban_running(socket_path):
return True
await asyncio.sleep(poll_interval)
elapsed += poll_interval
return False
async def start_daemon(start_cmd_parts: list[str]) -> bool:
"""Start the fail2ban daemon using *start_cmd_parts*.
Uses :func:`asyncio.create_subprocess_exec` (no shell interpretation)
to avoid command injection.
Args:
start_cmd_parts: Command and arguments, e.g.
``["fail2ban-client", "start"]``.
Returns:
``True`` when the process exited with code 0.
"""
if not start_cmd_parts:
log.warning("fail2ban_start_cmd_empty")
return False
try:
proc = await asyncio.create_subprocess_exec(
*start_cmd_parts,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await asyncio.wait_for(proc.wait(), timeout=30.0)
success = proc.returncode == 0
if not success:
log.warning(
"fail2ban_start_cmd_nonzero",
cmd=start_cmd_parts,
returncode=proc.returncode,
)
return success
except (TimeoutError, OSError) as exc:
log.warning("fail2ban_start_cmd_error", cmd=start_cmd_parts, error=str(exc))
return False
def _write_local_override_sync(
config_dir: Path,
jail_name: str,
enabled: bool,
overrides: dict[str, object],
) -> None:
"""Write a ``jail.d/{name}.local`` file atomically.
Always writes to ``jail.d/{jail_name}.local``. If the file already
exists it is replaced entirely. The write is atomic: content is
written to a temp file first, then renamed into place.
Args:
config_dir: The fail2ban configuration root directory.
jail_name: Validated jail name (used as filename stem).
enabled: Value to write for ``enabled =``.
overrides: Optional setting overrides (bantime, findtime, maxretry,
port, logpath).
Raises:
ConfigWriteError: If writing fails.
"""
jail_d = config_dir / "jail.d"
try:
jail_d.mkdir(parents=True, exist_ok=True)
except OSError as exc:
raise ConfigWriteError(f"Cannot create jail.d directory: {exc}") from exc
local_path = jail_d / f"{jail_name}.local"
lines: list[str] = [
"# Managed by BanGUI — do not edit manually",
"",
f"[{jail_name}]",
"",
f"enabled = {'true' if enabled else 'false'}",
# Provide explicit banaction defaults so fail2ban can resolve the
# %(banaction)s interpolation used in the built-in action_ chain.
"banaction = iptables-multiport",
"banaction_allports = iptables-allports",
]
if overrides.get("bantime") is not None:
lines.append(f"bantime = {overrides['bantime']}")
if overrides.get("findtime") is not None:
lines.append(f"findtime = {overrides['findtime']}")
if overrides.get("maxretry") is not None:
lines.append(f"maxretry = {overrides['maxretry']}")
if overrides.get("port") is not None:
lines.append(f"port = {overrides['port']}")
if overrides.get("logpath"):
paths: list[str] = cast("list[str]", overrides["logpath"])
if paths:
lines.append(f"logpath = {paths[0]}")
for p in paths[1:]:
lines.append(f" {p}")
content = "\n".join(lines) + "\n"
try:
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=jail_d,
delete=False,
suffix=".tmp",
) as tmp:
tmp.write(content)
tmp_name = tmp.name
os.replace(tmp_name, local_path)
except OSError as exc:
# Clean up temp file if rename failed.
with contextlib.suppress(OSError):
os.unlink(tmp_name) # noqa: F821 — only reachable when tmp_name is set
raise ConfigWriteError(f"Failed to write {local_path}: {exc}") from exc
log.info(
"jail_local_written",
jail=jail_name,
path=str(local_path),
enabled=enabled,
)
def _restore_local_file_sync(local_path: Path, original_content: bytes | None) -> None:
"""Restore a ``.local`` file to its pre-activation state.
If *original_content* is ``None``, the file is deleted (it did not exist
before the activation). Otherwise the original bytes are written back
atomically via a temp-file rename.
Args:
local_path: Absolute path to the ``.local`` file to restore.
original_content: Original raw bytes to write back, or ``None`` to
delete the file.
Raises:
ConfigWriteError: If the write or delete operation fails.
"""
if original_content is None:
try:
local_path.unlink(missing_ok=True)
except OSError as exc:
raise ConfigWriteError(f"Failed to delete {local_path} during rollback: {exc}") from exc
return
tmp_name: str | None = None
try:
with tempfile.NamedTemporaryFile(
mode="wb",
dir=local_path.parent,
delete=False,
suffix=".tmp",
) as tmp:
tmp.write(original_content)
tmp_name = tmp.name
os.replace(tmp_name, local_path)
except OSError as exc:
with contextlib.suppress(OSError):
if tmp_name is not None:
os.unlink(tmp_name)
raise ConfigWriteError(f"Failed to restore {local_path} during rollback: {exc}") from exc
def _validate_regex_patterns(patterns: list[str]) -> None:
"""Validate each pattern in *patterns* using Python's ``re`` module.
Args:
patterns: List of regex strings to validate.
Raises:
FilterInvalidRegexError: If any pattern fails to compile.
"""
for pattern in patterns:
try:
re.compile(pattern)
except re.error as exc:
raise FilterInvalidRegexError(pattern, str(exc)) from exc
def _write_filter_local_sync(filter_d: Path, name: str, content: str) -> None:
"""Write *content* to ``filter.d/{name}.local`` atomically.
The write is atomic: content is written to a temp file first, then
renamed into place. The ``filter.d/`` directory is created if absent.
Args:
filter_d: Path to the ``filter.d`` directory.
name: Validated filter base name (used as filename stem).
content: Full serialized filter content to write.
Raises:
ConfigWriteError: If writing fails.
"""
try:
filter_d.mkdir(parents=True, exist_ok=True)
except OSError as exc:
raise ConfigWriteError(f"Cannot create filter.d directory: {exc}") from exc
local_path = filter_d / f"{name}.local"
try:
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=filter_d,
delete=False,
suffix=".tmp",
) as tmp:
tmp.write(content)
tmp_name = tmp.name
os.replace(tmp_name, local_path)
except OSError as exc:
with contextlib.suppress(OSError):
os.unlink(tmp_name) # noqa: F821
raise ConfigWriteError(f"Failed to write {local_path}: {exc}") from exc
log.info("filter_local_written", filter=name, path=str(local_path))
def _set_jail_local_key_sync(
config_dir: Path,
jail_name: str,
key: str,
value: str,
) -> None:
"""Update ``jail.d/{jail_name}.local`` to set a single key in the jail section.
If the ``.local`` file already exists it is read, the key is updated (or
added), and the file is written back atomically without disturbing other
settings. If the file does not exist a new one is created containing
only the BanGUI header comment, the jail section, and the requested key.
Args:
config_dir: The fail2ban configuration root directory.
jail_name: Validated jail name (used as section name and filename stem).
key: Config key to set inside the jail section.
value: Config value to assign.
Raises:
ConfigWriteError: If writing fails.
"""
jail_d = config_dir / "jail.d"
try:
jail_d.mkdir(parents=True, exist_ok=True)
except OSError as exc:
raise ConfigWriteError(f"Cannot create jail.d directory: {exc}") from exc
local_path = jail_d / f"{jail_name}.local"
parser = _build_parser()
if local_path.is_file():
try:
parser.read(str(local_path), encoding="utf-8")
except (configparser.Error, OSError) as exc:
log.warning(
"jail_local_read_for_update_error",
jail=jail_name,
error=str(exc),
)
if not parser.has_section(jail_name):
parser.add_section(jail_name)
parser.set(jail_name, key, value)
# Serialize: write a BanGUI header then the parser output.
buf = io.StringIO()
buf.write("# Managed by BanGUI — do not edit manually\n\n")
parser.write(buf)
content = buf.getvalue()
try:
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=jail_d,
delete=False,
suffix=".tmp",
) as tmp:
tmp.write(content)
tmp_name = tmp.name
os.replace(tmp_name, local_path)
except OSError as exc:
with contextlib.suppress(OSError):
os.unlink(tmp_name) # noqa: F821
raise ConfigWriteError(f"Failed to write {local_path}: {exc}") from exc
log.info(
"jail_local_key_set",
jail=jail_name,
key=key,
path=str(local_path),
)
# Public shared helpers for config file services.
ordered_config_files = _ordered_config_files
build_parser = _build_parser
is_truthy = _is_truthy
parse_multiline = _parse_multiline
parse_jails_sync = _parse_jails_sync
build_inactive_jail = _build_inactive_jail
get_active_jail_names = _get_active_jail_names
validate_jail_config_sync = _validate_jail_config_sync
set_jail_local_key_sync = _set_jail_local_key_sync
safe_jail_name = _safe_jail_name
safe_filter_name = _safe_filter_name
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def list_inactive_jails(
config_dir: str,
socket_path: str,
) -> InactiveJailListResponse:
"""Delegate to the canonical jail config service."""
from app.services.jail_config_service import list_inactive_jails as _delegate
return await _delegate(config_dir, socket_path)
async def activate_jail(
config_dir: str,
socket_path: str,
name: str,
req: ActivateJailRequest,
) -> JailActivationResponse:
"""Delegate to the canonical jail config service."""
from app.services.jail_config_service import _activate_jail as _delegate
return await _delegate(config_dir, socket_path, name, req)
async def _rollback_activation_async(
config_dir: str,
name: str,
socket_path: str,
original_content: bytes | None,
) -> bool:
"""Restore the pre-activation ``.local`` file and reload fail2ban.
Called internally by :func:`activate_jail` when the activation fails after
the config file was already written. Tries to:
1. Restore the original file content (or delete the file if it was newly
created by the activation attempt).
2. Reload fail2ban so the daemon runs with the restored configuration.
3. Probe fail2ban to confirm it came back up.
Args:
config_dir: Absolute path to the fail2ban configuration directory.
name: Name of the jail whose ``.local`` file should be restored.
socket_path: Path to the fail2ban Unix domain socket.
original_content: Raw bytes of the original ``.local`` file, or
``None`` if the file did not exist before the activation.
Returns:
``True`` if fail2ban is responsive again after the rollback, ``False``
if recovery also failed.
"""
local_path = Path(config_dir) / "jail.d" / f"{name}.local"
# Step 1 — restore original file (or delete it).
try:
await run_blocking(_restore_local_file_sync, local_path, original_content)
log.info("jail_activation_rollback_file_restored", jail=name)
except ConfigWriteError as exc:
log.error("jail_activation_rollback_restore_failed", jail=name, error=str(exc))
return False
# Step 2 — reload fail2ban with the restored config.
try:
await _reload_all(socket_path)
log.info("jail_activation_rollback_reload_ok", jail=name)
except Exception as exc: # noqa: BLE001
log.warning("jail_activation_rollback_reload_failed", jail=name, error=str(exc))
return False
# Step 3 — wait for fail2ban to come back.
for attempt in range(_POST_RELOAD_MAX_ATTEMPTS):
if attempt > 0:
await asyncio.sleep(_POST_RELOAD_PROBE_INTERVAL)
if await _probe_fail2ban_running(socket_path):
log.info("jail_activation_rollback_recovered", jail=name)
return True
log.warning("jail_activation_rollback_still_down", jail=name)
return False
async def deactivate_jail(
config_dir: str,
socket_path: str,
name: str,
) -> JailActivationResponse:
"""Delegate to the canonical jail config service."""
from app.services.jail_config_service import _deactivate_jail as _delegate
return await _delegate(config_dir, socket_path, name)
async def delete_jail_local_override(
config_dir: str,
socket_path: str,
name: str,
) -> None:
"""Delegate to the canonical jail config service."""
from app.services.jail_config_service import delete_jail_local_override as _delegate
return await _delegate(config_dir, socket_path, name)
async def validate_jail_config(
config_dir: str,
name: str,
) -> JailValidationResult:
"""Delegate to the canonical jail config service."""
from app.services.jail_config_service import validate_jail_config as _delegate
return await _delegate(config_dir, name)
async def rollback_jail(
config_dir: str,
socket_path: str,
name: str,
start_cmd_parts: list[str],
) -> RollbackResponse:
"""Delegate to the canonical jail config helper."""
from app.services.jail_config_service import _rollback_jail as _delegate
return await _delegate(config_dir, socket_path, name, start_cmd_parts)
# ---------------------------------------------------------------------------
# Filter discovery helpers (Task 2.1)
# ---------------------------------------------------------------------------
# Allowlist pattern for filter names used in path construction.
_SAFE_FILTER_NAME_RE: re.Pattern[str] = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$")
def _extract_filter_base_name(filter_raw: str) -> str:
"""Extract the base filter name from a raw fail2ban filter string.
fail2ban jail configs may specify a filter with an optional mode suffix,
e.g. ``sshd``, ``sshd[mode=aggressive]``, or
``%(__name__)s[mode=%(mode)s]``. This function strips the ``[…]`` mode
block and any leading/trailing whitespace to return just the file-system
base name used to look up ``filter.d/{name}.conf``.
Args:
filter_raw: Raw ``filter`` value from a jail config (already
with ``%(__name__)s`` substituted by the caller).
Returns:
Base filter name, e.g. ``"sshd"``.
"""
bracket = filter_raw.find("[")
if bracket != -1:
return filter_raw[:bracket].strip()
return filter_raw.strip()
def _build_filter_to_jails_map(
all_jails: dict[str, dict[str, str]],
active_names: set[str],
) -> dict[str, list[str]]:
"""Return a mapping of filter base name → list of active jail names.
Iterates over every jail whose name is in *active_names*, resolves its
``filter`` config key, and records the jail against the base filter name.
Args:
all_jails: Merged jail config dict — ``{jail_name: {key: value}}``.
active_names: Set of jail names currently running in fail2ban.
Returns:
``{filter_base_name: [jail_name, …]}``.
"""
mapping: dict[str, list[str]] = {}
for jail_name, settings in all_jails.items():
if jail_name not in active_names:
continue
raw_filter = settings.get("filter", "")
mode = settings.get("mode", "normal")
resolved = _resolve_filter(raw_filter, jail_name, mode) if raw_filter else jail_name
base = _extract_filter_base_name(resolved)
if base:
mapping.setdefault(base, []).append(jail_name)
return mapping
def _parse_filters_sync(
filter_d: Path,
) -> list[tuple[str, str, str, bool, str]]:
"""Synchronously scan ``filter.d/`` and return per-filter tuples.
Each tuple contains:
- ``name`` — filter base name (``"sshd"``).
- ``filename`` — actual filename (``"sshd.conf"`` or ``"sshd.local"``).
- ``content`` — merged file content (``conf`` overridden by ``local``).
- ``has_local`` — whether a ``.local`` override exists alongside a ``.conf``.
- ``source_path`` — absolute path to the primary (``conf``) source file, or
to the ``.local`` file for user-created (local-only) filters.
Also discovers ``.local``-only files (user-created filters with no
corresponding ``.conf``). These are returned with ``has_local = False``
and ``source_path`` pointing to the ``.local`` file itself.
Args:
filter_d: Path to the ``filter.d`` directory.
Returns:
List of ``(name, filename, content, has_local, source_path)`` tuples,
sorted by name.
"""
if not filter_d.is_dir():
log.warning("filter_d_not_found", path=str(filter_d))
return []
conf_names: set[str] = set()
results: list[tuple[str, str, str, bool, str]] = []
# ---- .conf-based filters (with optional .local override) ----------------
for conf_path in sorted(filter_d.glob("*.conf")):
if not conf_path.is_file():
continue
name = conf_path.stem
filename = conf_path.name
conf_names.add(name)
local_path = conf_path.with_suffix(".local")
has_local = local_path.is_file()
try:
content = conf_path.read_text(encoding="utf-8")
except OSError as exc:
log.warning("filter_read_error", name=name, path=str(conf_path), error=str(exc))
continue
if has_local:
try:
local_content = local_path.read_text(encoding="utf-8")
# Append local content after conf so configparser reads local
# values last (higher priority).
content = content + "\n" + local_content
except OSError as exc:
log.warning(
"filter_local_read_error",
name=name,
path=str(local_path),
error=str(exc),
)
results.append((name, filename, content, has_local, str(conf_path)))
# ---- .local-only filters (user-created, no corresponding .conf) ----------
for local_path in sorted(filter_d.glob("*.local")):
if not local_path.is_file():
continue
name = local_path.stem
if name in conf_names:
# Already covered above as a .conf filter with a .local override.
continue
try:
content = local_path.read_text(encoding="utf-8")
except OSError as exc:
log.warning(
"filter_local_read_error",
name=name,
path=str(local_path),
error=str(exc),
)
continue
results.append((name, local_path.name, content, False, str(local_path)))
results.sort(key=lambda t: t[0])
log.debug("filters_scanned", count=len(results), filter_d=str(filter_d))
return results
# ---------------------------------------------------------------------------
# Public API — filter discovery (Task 2.1)
# ---------------------------------------------------------------------------
async def list_filters(
config_dir: str,
socket_path: str,
) -> FilterListResponse:
"""Delegate to the canonical filter config service."""
from app.services.filter_config_service import list_filters as _delegate
return await _delegate(config_dir, socket_path)
async def get_filter(
config_dir: str,
socket_path: str,
name: str,
) -> FilterConfig:
"""Delegate to the canonical filter config service."""
from app.services.filter_config_service import get_filter as _delegate
return await _delegate(config_dir, socket_path, name)
# ---------------------------------------------------------------------------
# Public API — filter write operations (Task 2.2)
# ---------------------------------------------------------------------------
async def update_filter(
config_dir: str,
socket_path: str,
name: str,
req: FilterUpdateRequest,
do_reload: bool = False,
) -> FilterConfig:
"""Delegate to the canonical filter config service."""
from app.services.filter_config_service import update_filter as _delegate
return await _delegate(config_dir, socket_path, name, req, do_reload=do_reload)
async def create_filter(
config_dir: str,
socket_path: str,
req: FilterCreateRequest,
do_reload: bool = False,
) -> FilterConfig:
"""Delegate to the canonical filter config service."""
from app.services.filter_config_service import create_filter as _delegate
return await _delegate(config_dir, socket_path, req, do_reload=do_reload)
async def delete_filter(
config_dir: str,
name: str,
) -> None:
"""Delegate to the canonical filter config service."""
from app.services.filter_config_service import delete_filter as _delegate
return await _delegate(config_dir, name)
async def assign_filter_to_jail(
config_dir: str,
socket_path: str,
jail_name: str,
req: AssignFilterRequest,
do_reload: bool = False,
) -> None:
"""Delegate to the canonical filter config service."""
from app.services.filter_config_service import assign_filter_to_jail as _delegate
return await _delegate(config_dir, socket_path, jail_name, req, do_reload=do_reload)
# ---------------------------------------------------------------------------
# Action discovery helpers (Task 3.1)
# ---------------------------------------------------------------------------
# Allowlist pattern for action names used in path construction.
_SAFE_ACTION_NAME_RE: re.Pattern[str] = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$")
def _safe_action_name(name: str) -> str:
"""Validate *name* and return it unchanged or raise :class:`ActionNameError`.
Args:
name: Proposed action name (without extension).
Returns:
The name unchanged if valid.
Raises:
ActionNameError: If *name* contains unsafe characters.
"""
if not _SAFE_ACTION_NAME_RE.match(name):
raise ActionNameError(
f"Action name {name!r} contains invalid characters. "
"Only alphanumeric characters, hyphens, underscores, and dots are "
"allowed; must start with an alphanumeric character."
)
return name
def _build_action_to_jails_map(
all_jails: dict[str, dict[str, str]],
active_names: set[str],
) -> dict[str, list[str]]:
"""Return a mapping of action base name → list of active jail names.
Iterates over every jail whose name is in *active_names*, resolves each
entry in its ``action`` config key to an action base name (stripping
``[…]`` parameter blocks), and records the jail against each base name.
Args:
all_jails: Merged jail config dict — ``{jail_name: {key: value}}``.
active_names: Set of jail names currently running in fail2ban.
Returns:
``{action_base_name: [jail_name, …]}``.
"""
mapping: dict[str, list[str]] = {}
for jail_name, settings in all_jails.items():
if jail_name not in active_names:
continue
raw_action = settings.get("action", "")
if not raw_action:
continue
for line in raw_action.splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
# Strip optional [key=value] parameter block to get the base name.
bracket = stripped.find("[")
base = stripped[:bracket].strip() if bracket != -1 else stripped
if base:
mapping.setdefault(base, []).append(jail_name)
return mapping
def _parse_actions_sync(
action_d: Path,
) -> list[tuple[str, str, str, bool, str]]:
"""Synchronously scan ``action.d/`` and return per-action tuples.
Each tuple contains:
- ``name`` — action base name (``"iptables"``).
- ``filename`` — actual filename (``"iptables.conf"``).
- ``content`` — merged file content (``conf`` overridden by ``local``).
- ``has_local`` — whether a ``.local`` override exists alongside a ``.conf``.
- ``source_path`` — absolute path to the primary (``conf``) source file, or
to the ``.local`` file for user-created (local-only) actions.
Also discovers ``.local``-only files (user-created actions with no
corresponding ``.conf``).
Args:
action_d: Path to the ``action.d`` directory.
Returns:
List of ``(name, filename, content, has_local, source_path)`` tuples,
sorted by name.
"""
if not action_d.is_dir():
log.warning("action_d_not_found", path=str(action_d))
return []
conf_names: set[str] = set()
results: list[tuple[str, str, str, bool, str]] = []
# ---- .conf-based actions (with optional .local override) ----------------
for conf_path in sorted(action_d.glob("*.conf")):
if not conf_path.is_file():
continue
name = conf_path.stem
filename = conf_path.name
conf_names.add(name)
local_path = conf_path.with_suffix(".local")
has_local = local_path.is_file()
try:
content = conf_path.read_text(encoding="utf-8")
except OSError as exc:
log.warning("action_read_error", name=name, path=str(conf_path), error=str(exc))
continue
if has_local:
try:
local_content = local_path.read_text(encoding="utf-8")
content = content + "\n" + local_content
except OSError as exc:
log.warning(
"action_local_read_error",
name=name,
path=str(local_path),
error=str(exc),
)
results.append((name, filename, content, has_local, str(conf_path)))
# ---- .local-only actions (user-created, no corresponding .conf) ----------
for local_path in sorted(action_d.glob("*.local")):
if not local_path.is_file():
continue
name = local_path.stem
if name in conf_names:
continue
try:
content = local_path.read_text(encoding="utf-8")
except OSError as exc:
log.warning(
"action_local_read_error",
name=name,
path=str(local_path),
error=str(exc),
)
continue
results.append((name, local_path.name, content, False, str(local_path)))
results.sort(key=lambda t: t[0])
log.debug("actions_scanned", count=len(results), action_d=str(action_d))
return results
def _append_jail_action_sync(
config_dir: Path,
jail_name: str,
action_entry: str,
) -> None:
"""Append an action entry to the ``action`` key in ``jail.d/{jail_name}.local``.
If the ``.local`` file already contains an ``action`` key under the jail
section, the new entry is appended as an additional line (multi-line
configparser format) unless it is already present. If no ``action`` key
exists, one is created.
Args:
config_dir: The fail2ban configuration root directory.
jail_name: Validated jail name.
action_entry: Full action string including any ``[…]`` parameters.
Raises:
ConfigWriteError: If writing fails.
"""
jail_d = config_dir / "jail.d"
try:
jail_d.mkdir(parents=True, exist_ok=True)
except OSError as exc:
raise ConfigWriteError(f"Cannot create jail.d directory: {exc}") from exc
local_path = jail_d / f"{jail_name}.local"
parser = _build_parser()
if local_path.is_file():
try:
parser.read(str(local_path), encoding="utf-8")
except (configparser.Error, OSError) as exc:
log.warning(
"jail_local_read_for_update_error",
jail=jail_name,
error=str(exc),
)
if not parser.has_section(jail_name):
parser.add_section(jail_name)
existing_raw = parser.get(jail_name, "action") if parser.has_option(jail_name, "action") else ""
existing_lines = [
line.strip() for line in existing_raw.splitlines() if line.strip() and not line.strip().startswith("#")
]
# Extract base names from existing entries for duplicate checking.
def _base(entry: str) -> str:
bracket = entry.find("[")
return entry[:bracket].strip() if bracket != -1 else entry.strip()
new_base = _base(action_entry)
if not any(_base(e) == new_base for e in existing_lines):
existing_lines.append(action_entry)
if existing_lines:
# configparser multi-line: continuation lines start with whitespace.
new_value = existing_lines[0] + "".join(f"\n {line}" for line in existing_lines[1:])
parser.set(jail_name, "action", new_value)
else:
parser.set(jail_name, "action", action_entry)
buf = io.StringIO()
buf.write("# Managed by BanGUI — do not edit manually\n\n")
parser.write(buf)
content = buf.getvalue()
try:
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=jail_d,
delete=False,
suffix=".tmp",
) as tmp:
tmp.write(content)
tmp_name = tmp.name
os.replace(tmp_name, local_path)
except OSError as exc:
with contextlib.suppress(OSError):
os.unlink(tmp_name) # noqa: F821
raise ConfigWriteError(f"Failed to write {local_path}: {exc}") from exc
log.info(
"jail_action_appended",
jail=jail_name,
action=action_entry,
path=str(local_path),
)
def _remove_jail_action_sync(
config_dir: Path,
jail_name: str,
action_name: str,
) -> None:
"""Remove an action entry from the ``action`` key in ``jail.d/{jail_name}.local``.
Reads the ``.local`` file, removes any ``action`` entries whose base name
matches *action_name*, and writes the result back atomically. If no
``.local`` file exists, this is a no-op.
Args:
config_dir: The fail2ban configuration root directory.
jail_name: Validated jail name.
action_name: Base name of the action to remove (without ``[…]``).
Raises:
ConfigWriteError: If writing fails.
"""
jail_d = config_dir / "jail.d"
local_path = jail_d / f"{jail_name}.local"
if not local_path.is_file():
return
parser = _build_parser()
try:
parser.read(str(local_path), encoding="utf-8")
except (configparser.Error, OSError) as exc:
log.warning(
"jail_local_read_for_update_error",
jail=jail_name,
error=str(exc),
)
return
if not parser.has_section(jail_name) or not parser.has_option(jail_name, "action"):
return
existing_raw = parser.get(jail_name, "action")
existing_lines = [
line.strip() for line in existing_raw.splitlines() if line.strip() and not line.strip().startswith("#")
]
def _base(entry: str) -> str:
bracket = entry.find("[")
return entry[:bracket].strip() if bracket != -1 else entry.strip()
filtered = [e for e in existing_lines if _base(e) != action_name]
if len(filtered) == len(existing_lines):
# Action was not found — silently return (idempotent).
return
if filtered:
new_value = filtered[0] + "".join(f"\n {line}" for line in filtered[1:])
parser.set(jail_name, "action", new_value)
else:
parser.remove_option(jail_name, "action")
buf = io.StringIO()
buf.write("# Managed by BanGUI — do not edit manually\n\n")
parser.write(buf)
content = buf.getvalue()
try:
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=jail_d,
delete=False,
suffix=".tmp",
) as tmp:
tmp.write(content)
tmp_name = tmp.name
os.replace(tmp_name, local_path)
except OSError as exc:
with contextlib.suppress(OSError):
os.unlink(tmp_name) # noqa: F821
raise ConfigWriteError(f"Failed to write {local_path}: {exc}") from exc
log.info(
"jail_action_removed",
jail=jail_name,
action=action_name,
path=str(local_path),
)
def _write_action_local_sync(action_d: Path, name: str, content: str) -> None:
"""Write *content* to ``action.d/{name}.local`` atomically.
The write is atomic: content is written to a temp file first, then
renamed into place. The ``action.d/`` directory is created if absent.
Args:
action_d: Path to the ``action.d`` directory.
name: Validated action base name (used as filename stem).
content: Full serialized action content to write.
Raises:
ConfigWriteError: If writing fails.
"""
try:
action_d.mkdir(parents=True, exist_ok=True)
except OSError as exc:
raise ConfigWriteError(f"Cannot create action.d directory: {exc}") from exc
local_path = action_d / f"{name}.local"
try:
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=action_d,
delete=False,
suffix=".tmp",
) as tmp:
tmp.write(content)
tmp_name = tmp.name
os.replace(tmp_name, local_path)
except OSError as exc:
with contextlib.suppress(OSError):
os.unlink(tmp_name) # noqa: F821
raise ConfigWriteError(f"Failed to write {local_path}: {exc}") from exc
log.info("action_local_written", action=name, path=str(local_path))
# ---------------------------------------------------------------------------
# Public API — action discovery (Task 3.1)
# ---------------------------------------------------------------------------
async def list_actions(
config_dir: str,
socket_path: str,
) -> ActionListResponse:
"""Delegate to the canonical action config service."""
from app.services.action_config_service import list_actions as _delegate
return await _delegate(config_dir, socket_path)
async def get_action(
config_dir: str,
socket_path: str,
name: str,
) -> ActionConfig:
"""Delegate to the canonical action config service."""
from app.services.action_config_service import get_action as _delegate
return await _delegate(config_dir, socket_path, name)
# ---------------------------------------------------------------------------
# Public API — action write operations (Task 3.2)
# ---------------------------------------------------------------------------
async def update_action(
config_dir: str,
socket_path: str,
name: str,
req: ActionUpdateRequest,
do_reload: bool = False,
) -> ActionConfig:
"""Delegate to the canonical action config service."""
from app.services.action_config_service import update_action as _delegate
return await _delegate(config_dir, socket_path, name, req, do_reload=do_reload)
async def create_action(
config_dir: str,
socket_path: str,
req: ActionCreateRequest,
do_reload: bool = False,
) -> ActionConfig:
"""Delegate to the canonical action config service."""
from app.services.action_config_service import create_action as _delegate
return await _delegate(config_dir, socket_path, req, do_reload=do_reload)
async def delete_action(
config_dir: str,
name: str,
) -> None:
"""Delegate to the canonical action config service."""
from app.services.action_config_service import delete_action as _delegate
return await _delegate(config_dir, name)
async def assign_action_to_jail(
config_dir: str,
socket_path: str,
jail_name: str,
req: AssignActionRequest,
do_reload: bool = False,
) -> None:
"""Add an action to a jail by updating the jail's ``.local`` file.
Appends ``{req.action_name}[{params}]`` (or just ``{req.action_name}`` when
no params are given) to the ``action`` key in the ``[{jail_name}]`` section
of ``jail.d/{jail_name}.local``. If the action is already listed it is not
duplicated. If the ``.local`` file does not exist it is created.
Args:
config_dir: Absolute path to the fail2ban configuration directory.
socket_path: Path to the fail2ban Unix domain socket.
jail_name: Name of the jail to update.
req: Request containing the action name and optional parameters.
do_reload: When ``True``, trigger a full fail2ban reload after writing.
Raises:
JailNameError: If *jail_name* contains invalid characters.
ActionNameError: If ``req.action_name`` contains invalid characters.
JailNotFoundInConfigError: If *jail_name* is not defined in any config
file.
ActionNotFoundError: If ``req.action_name`` does not exist in
``action.d/``.
ConfigWriteError: If writing fails.
"""
_safe_jail_name(jail_name)
_safe_action_name(req.action_name)
all_jails, _src = await run_blocking(_parse_jails_sync, Path(config_dir))
if jail_name not in all_jails:
raise JailNotFoundInConfigError(jail_name)
action_d = Path(config_dir) / "action.d"
def _check_action() -> None:
if (
not (action_d / f"{req.action_name}.conf").is_file()
and not (action_d / f"{req.action_name}.local").is_file()
):
raise ActionNotFoundError(req.action_name)
await run_blocking(_check_action)
# Build the action string with optional parameters.
if req.params:
param_str = ", ".join(f"{k}={v}" for k, v in sorted(req.params.items()))
action_entry = f"{req.action_name}[{param_str}]"
else:
action_entry = req.action_name
await run_blocking(
_append_jail_action_sync,
Path(config_dir),
jail_name,
action_entry,
)
if do_reload:
try:
await _reload_all(socket_path)
except Exception as exc: # noqa: BLE001
log.warning(
"reload_after_assign_action_failed",
jail=jail_name,
action=req.action_name,
error=str(exc),
)
log.info(
"action_assigned_to_jail",
jail=jail_name,
action=req.action_name,
reload=do_reload,
)
async def remove_action_from_jail(
config_dir: str,
socket_path: str,
jail_name: str,
action_name: str,
do_reload: bool = False,
) -> None:
"""Remove an action from a jail's ``.local`` config.
Reads ``jail.d/{jail_name}.local``, removes the line(s) that reference
``{action_name}`` from the ``action`` key (including any ``[…]`` parameter
blocks), and writes the file back atomically.
Args:
config_dir: Absolute path to the fail2ban configuration directory.
socket_path: Path to the fail2ban Unix domain socket.
jail_name: Name of the jail to update.
action_name: Base name of the action to remove.
do_reload: When ``True``, trigger a full fail2ban reload after writing.
Raises:
JailNameError: If *jail_name* contains invalid characters.
ActionNameError: If *action_name* contains invalid characters.
JailNotFoundInConfigError: If *jail_name* is not defined in any config.
ConfigWriteError: If writing fails.
"""
_safe_jail_name(jail_name)
_safe_action_name(action_name)
all_jails, _src = await run_blocking(_parse_jails_sync, Path(config_dir))
if jail_name not in all_jails:
raise JailNotFoundInConfigError(jail_name)
await run_blocking(
_remove_jail_action_sync,
Path(config_dir),
jail_name,
action_name,
)
if do_reload:
try:
await _reload_all(socket_path)
except Exception as exc: # noqa: BLE001
log.warning(
"reload_after_remove_action_failed",
jail=jail_name,
action=action_name,
error=str(exc),
)
log.info(
"action_removed_from_jail",
jail=jail_name,
action=action_name,
reload=do_reload,
)