Files
BanGUI/backend/app/models/config.py
Lukas 94bdabe622 TASK-016: Validate delete_log_path query parameter with allowlist
- Extract path validation logic into shared helper function in
  backend/app/utils/path_utils.py (validate_log_path)
- Refactor AddLogPathRequest to use the helper function
- Apply the same validation to DELETE /api/config/jails/{name}/logpath
  endpoint by validating the log_path query parameter
- Return HTTP 422 with descriptive error if validation fails
- Add comprehensive unit tests for path validation
- Update Backend-Development.md with usage examples

This prevents path-traversal attacks on the delete_log_path endpoint
by ensuring all log paths are within allowlisted directories.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-26 14:04:21 +02:00

1113 lines
39 KiB
Python

"""Configuration view/edit Pydantic models.
Request, response, and domain models for the config router and service.
"""
import datetime
from pathlib import Path
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field, field_validator
from app.config import get_settings
from app.utils.path_utils import validate_log_path
DNSMode = Literal["yes", "warn", "no", "raw"]
LogEncoding = Literal["auto", "ascii", "utf-8", "UTF-8", "latin-1"]
BackendType = Literal["auto", "polling", "pyinotify", "systemd", "gamin"]
LogLevel = Literal["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"]
LogTarget = Literal["STDOUT", "STDERR", "SYSLOG"]
# ---------------------------------------------------------------------------
# Ban-time escalation
# ---------------------------------------------------------------------------
class BantimeEscalation(BaseModel):
"""Incremental ban-time escalation configuration for a jail."""
model_config = ConfigDict(strict=True)
increment: bool = Field(
default=False,
description="Whether incremental banning is enabled.",
)
factor: float | None = Field(
default=None,
description="Multiplier applied to the base ban time on each repeat offence.",
)
formula: str | None = Field(
default=None,
description="Python expression evaluated to compute the escalated ban time.",
)
multipliers: str | None = Field(
default=None,
description="Space-separated integers used as per-offence multipliers.",
)
max_time: int | None = Field(
default=None,
description="Maximum ban duration in seconds when escalation is active.",
)
rnd_time: int | None = Field(
default=None,
description="Random jitter (seconds) added to each escalated ban time.",
)
overall_jails: bool = Field(
default=False,
description="Count repeat offences across all jails, not just the current one.",
)
class BantimeEscalationUpdate(BaseModel):
"""Partial update payload for ban-time escalation settings."""
model_config = ConfigDict(strict=True)
increment: bool | None = Field(default=None)
factor: float | None = Field(default=None)
formula: str | None = Field(default=None)
multipliers: str | None = Field(default=None)
max_time: int | None = Field(default=None)
rnd_time: int | None = Field(default=None)
overall_jails: bool | None = Field(default=None)
# ---------------------------------------------------------------------------
# Jail configuration models
# ---------------------------------------------------------------------------
class JailConfig(BaseModel):
"""Configuration snapshot of a single jail (editable fields)."""
model_config = ConfigDict(strict=True)
name: str = Field(..., description="Jail name as configured in fail2ban.")
ban_time: int = Field(..., description="Ban duration in seconds. -1 for permanent.")
max_retry: int = Field(..., ge=1, description="Number of failures before a ban is issued.")
find_time: int = Field(..., ge=1, description="Time window (seconds) for counting failures.")
fail_regex: list[str] = Field(default_factory=list, description="Failure detection regex patterns.")
ignore_regex: list[str] = Field(default_factory=list, description="Regex patterns that bypass the ban logic.")
log_paths: list[str] = Field(default_factory=list, description="Monitored log files.")
date_pattern: str | None = Field(default=None, description="Custom date pattern for log parsing.")
log_encoding: LogEncoding = Field(default="UTF-8", description="Log file encoding.")
backend: BackendType = Field(default="polling", description="Log monitoring backend.")
use_dns: DNSMode = Field(default="warn", description="DNS lookup mode: yes | warn | no | raw.")
prefregex: str = Field(default="", description="Prefix regex prepended to every failregex; empty means disabled.")
actions: list[str] = Field(default_factory=list, description="Names of actions attached to this jail.")
bantime_escalation: BantimeEscalation | None = Field(
default=None,
description="Incremental ban-time escalation settings, or None if not configured.",
)
class JailConfigResponse(BaseModel):
"""Response for ``GET /api/config/jails/{name}``."""
model_config = ConfigDict(strict=True)
jail: JailConfig
class JailConfigListResponse(BaseModel):
"""Response for ``GET /api/config/jails``."""
model_config = ConfigDict(strict=True)
jails: list[JailConfig] = Field(default_factory=list)
total: int = Field(..., ge=0)
class JailConfigUpdate(BaseModel):
"""Payload for ``PUT /api/config/jails/{name}``."""
model_config = ConfigDict(strict=True)
ban_time: int | None = Field(default=None, description="Ban duration in seconds. -1 for permanent.")
max_retry: int | None = Field(default=None, ge=1)
find_time: int | None = Field(default=None, ge=1)
fail_regex: list[str] | None = Field(default=None, description="Failure detection regex patterns.")
ignore_regex: list[str] | None = Field(default=None)
prefregex: str | None = Field(default=None, description="Prefix regex; None = skip, '' = clear, non-empty = set.")
date_pattern: str | None = Field(default=None)
dns_mode: DNSMode | None = Field(default=None, description="DNS lookup mode: yes | warn | no | raw.")
backend: BackendType | None = Field(default=None, description="Log monitoring backend.")
log_encoding: LogEncoding | None = Field(default=None, description="Log file encoding.")
enabled: bool | None = Field(default=None)
bantime_escalation: BantimeEscalationUpdate | None = Field(
default=None,
description="Incremental ban-time escalation settings to update.",
)
# ---------------------------------------------------------------------------
# Regex tester models
# ---------------------------------------------------------------------------
class RegexTestRequest(BaseModel):
"""Payload for ``POST /api/config/regex-test``."""
model_config = ConfigDict(strict=True)
log_line: str = Field(..., description="Sample log line to test against.")
fail_regex: str = Field(..., description="Regex pattern to match.")
class RegexTestResponse(BaseModel):
"""Result of a regex test."""
model_config = ConfigDict(strict=True)
matched: bool = Field(..., description="Whether the pattern matched the log line.")
groups: list[str] = Field(
default_factory=list,
description="Named groups captured by a successful match.",
)
error: str | None = Field(
default=None,
description="Compilation error message if the regex is invalid.",
)
# ---------------------------------------------------------------------------
# Global config models
# ---------------------------------------------------------------------------
class GlobalConfigResponse(BaseModel):
"""Response for ``GET /api/config/global``."""
model_config = ConfigDict(strict=True)
log_level: LogLevel
log_target: str = Field(..., description="Log target: STDOUT, STDERR, SYSLOG, or a validated file path.")
db_purge_age: int = Field(..., description="Seconds after which ban records are purged from the fail2ban DB.")
db_max_matches: int = Field(..., description="Maximum stored log-line matches per ban record.")
@field_validator("log_target", mode="after")
@classmethod
def validate_log_target(cls, value: str) -> str:
"""Validate that log_target is either a special value or a valid file path.
Args:
value: The log target to validate.
Returns:
The validated log target.
Raises:
ValueError: If the target is not a special value and not in allowed directories.
"""
if value.upper() in ("STDOUT", "STDERR", "SYSLOG"):
return value
settings = get_settings()
try:
resolved_path = Path(value).resolve()
except (OSError, RuntimeError) as e:
raise ValueError(f"Cannot resolve path {value!r}: {e}") from e
for allowed_dir in settings.allowed_log_dirs:
allowed_path = Path(allowed_dir).resolve()
try:
resolved_path.relative_to(allowed_path)
return value
except ValueError:
continue
allowed_dirs_str = ", ".join(settings.allowed_log_dirs)
raise ValueError(
f"Log target {value!r} is outside allowed directories: {allowed_dirs_str}"
)
class GlobalConfigUpdate(BaseModel):
"""Payload for ``PUT /api/config/global``."""
model_config = ConfigDict(strict=True)
log_level: LogLevel | None = Field(
default=None,
description="Log level: CRITICAL, ERROR, WARNING, NOTICE, INFO, or DEBUG.",
)
log_target: str | None = Field(
default=None,
description="Log target: STDOUT, STDERR, SYSLOG, or a validated file path.",
)
db_purge_age: int | None = Field(default=None, ge=0)
db_max_matches: int | None = Field(default=None, ge=0)
@field_validator("log_target", mode="after")
@classmethod
def validate_log_target(cls, value: str | None) -> str | None:
"""Validate that log_target is either a special value or a valid file path.
Args:
value: The log target to validate, or None.
Returns:
The validated log target, or None if input was None.
Raises:
ValueError: If the target is not a special value and not in allowed directories.
"""
if value is None:
return None
if value.upper() in ("STDOUT", "STDERR", "SYSLOG"):
return value
settings = get_settings()
try:
resolved_path = Path(value).resolve()
except (OSError, RuntimeError) as e:
raise ValueError(f"Cannot resolve path {value!r}: {e}") from e
for allowed_dir in settings.allowed_log_dirs:
allowed_path = Path(allowed_dir).resolve()
try:
resolved_path.relative_to(allowed_path)
return value
except ValueError:
continue
allowed_dirs_str = ", ".join(settings.allowed_log_dirs)
raise ValueError(
f"Log target {value!r} is outside allowed directories: {allowed_dirs_str}"
)
# ---------------------------------------------------------------------------
# Log observation / preview models
# ---------------------------------------------------------------------------
class AddLogPathRequest(BaseModel):
"""Payload for ``POST /api/config/jails/{name}/logpath``."""
model_config = ConfigDict(strict=True)
log_path: str = Field(..., description="Absolute path to the log file to monitor.")
tail: bool = Field(
default=True,
description="If true, monitor from current end of file (tail). If false, read from the beginning.",
)
@field_validator("log_path", mode="after")
@classmethod
def validate_log_path_field(cls, value: str) -> str:
"""Validate that the log path is within allowed directories.
Args:
value: The log path to validate.
Returns:
The validated log path.
Raises:
ValueError: If the path is outside allowed log directories.
"""
return validate_log_path(value)
class LogPreviewRequest(BaseModel):
"""Payload for ``POST /api/config/preview-log``."""
model_config = ConfigDict(strict=True)
log_path: str = Field(..., description="Absolute path to the log file to preview.")
fail_regex: str = Field(..., description="Regex pattern to test against log lines.")
num_lines: int = Field(default=200, ge=1, le=5000, description="Number of lines to read from the end of the file.")
class LogPreviewLine(BaseModel):
"""A single log line with match information."""
model_config = ConfigDict(strict=True)
line: str
matched: bool
groups: list[str] = Field(default_factory=list)
class LogPreviewResponse(BaseModel):
"""Response for ``POST /api/config/preview-log``."""
model_config = ConfigDict(strict=True)
lines: list[LogPreviewLine] = Field(default_factory=list)
total_lines: int = Field(..., ge=0)
matched_count: int = Field(..., ge=0)
regex_error: str | None = Field(default=None, description="Set if the regex failed to compile.")
# ---------------------------------------------------------------------------
# Map color threshold models
# ---------------------------------------------------------------------------
class MapColorThresholdsResponse(BaseModel):
"""Response for ``GET /api/config/map-thresholds``."""
model_config = ConfigDict(strict=True)
threshold_high: int = Field(
..., description="Ban count for red coloring."
)
threshold_medium: int = Field(
..., description="Ban count for yellow coloring."
)
threshold_low: int = Field(
..., description="Ban count for green coloring."
)
class MapColorThresholdsUpdate(BaseModel):
"""Payload for ``PUT /api/config/map-thresholds``."""
model_config = ConfigDict(strict=True)
threshold_high: int = Field(..., gt=0, description="Ban count for red.")
threshold_medium: int = Field(
..., gt=0, description="Ban count for yellow."
)
threshold_low: int = Field(..., gt=0, description="Ban count for green.")
# ---------------------------------------------------------------------------
# Parsed filter file models
# ---------------------------------------------------------------------------
class FilterConfig(BaseModel):
"""Structured representation of a ``filter.d/*.conf`` file.
The ``active``, ``used_by_jails``, ``source_file``, and
``has_local_override`` fields are populated by
:func:`~app.services.filter_config_service.list_filters` and
:func:`~app.services.filter_config_service.get_filter`. When the model is
returned from the raw file-based endpoints (``/filters/{name}/parsed``),
these fields carry their default values.
"""
model_config = ConfigDict(strict=True)
name: str = Field(..., description="Filter base name, e.g. ``sshd``.")
filename: str = Field(..., description="Actual filename, e.g. ``sshd.conf``.")
# [INCLUDES]
before: str | None = Field(default=None, description="Included file read before this one.")
after: str | None = Field(default=None, description="Included file read after this one.")
# [DEFAULT] — free-form key=value pairs
variables: dict[str, str] = Field(
default_factory=dict,
description="Free-form ``[DEFAULT]`` section variables.",
)
# [Definition]
prefregex: str | None = Field(
default=None,
description="Prefix regex prepended to every failregex.",
)
failregex: list[str] = Field(
default_factory=list,
description="Failure detection regex patterns (one per list entry).",
)
ignoreregex: list[str] = Field(
default_factory=list,
description="Regex patterns that bypass ban logic.",
)
maxlines: int | None = Field(
default=None,
description="Maximum number of log lines accumulated for a single match attempt.",
)
datepattern: str | None = Field(
default=None,
description="Custom date-parsing pattern, or ``None`` for auto-detect.",
)
journalmatch: str | None = Field(
default=None,
description="Systemd journal match expression.",
)
# Active-status fields — populated by filter_config_service.list_filters /
# get_filter; default to safe "inactive" values when not computed.
active: bool = Field(
default=False,
description=(
"``True`` when this filter is referenced by at least one currently "
"enabled (running) jail."
),
)
used_by_jails: list[str] = Field(
default_factory=list,
description=(
"Names of currently enabled jails that reference this filter. "
"Empty when ``active`` is ``False``."
),
)
source_file: str = Field(
default="",
description="Absolute path to the ``.conf`` source file for this filter.",
)
has_local_override: bool = Field(
default=False,
description=(
"``True`` when a ``.local`` override file exists alongside the "
"base ``.conf`` file."
),
)
class FilterConfigUpdate(BaseModel):
"""Partial update payload for a parsed filter file.
Only explicitly set (non-``None``) fields are written back.
"""
model_config = ConfigDict(strict=True)
before: str | None = Field(default=None)
after: str | None = Field(default=None)
variables: dict[str, str] | None = Field(default=None)
prefregex: str | None = Field(default=None)
failregex: list[str] | None = Field(default=None)
ignoreregex: list[str] | None = Field(default=None)
maxlines: int | None = Field(default=None)
datepattern: str | None = Field(default=None)
journalmatch: str | None = Field(default=None)
class FilterUpdateRequest(BaseModel):
"""Payload for ``PUT /api/config/filters/{name}``.
Accepts only the user-editable ``[Definition]`` fields. Fields left as
``None`` are not changed; the existing value from the merged conf/local is
preserved.
"""
model_config = ConfigDict(strict=True)
failregex: list[str] | None = Field(
default=None,
description="Updated failure-detection regex patterns. ``None`` = keep existing.",
)
ignoreregex: list[str] | None = Field(
default=None,
description="Updated bypass-ban regex patterns. ``None`` = keep existing.",
)
datepattern: str | None = Field(
default=None,
description="Custom date-parsing pattern. ``None`` = keep existing.",
)
journalmatch: str | None = Field(
default=None,
description="Systemd journal match expression. ``None`` = keep existing.",
)
class FilterCreateRequest(BaseModel):
"""Payload for ``POST /api/config/filters``.
Creates a new user-defined filter at ``filter.d/{name}.local``.
"""
model_config = ConfigDict(strict=True)
name: str = Field(
...,
description="Filter base name (e.g. ``my-custom-filter``). Must not already exist in ``filter.d/``.",
)
failregex: list[str] = Field(
default_factory=list,
description="Failure-detection regex patterns.",
)
ignoreregex: list[str] = Field(
default_factory=list,
description="Regex patterns that bypass ban logic.",
)
prefregex: str | None = Field(
default=None,
description="Prefix regex prepended to every failregex.",
)
datepattern: str | None = Field(
default=None,
description="Custom date-parsing pattern.",
)
journalmatch: str | None = Field(
default=None,
description="Systemd journal match expression.",
)
class AssignFilterRequest(BaseModel):
"""Payload for ``POST /api/config/jails/{jail_name}/filter``."""
model_config = ConfigDict(strict=True)
filter_name: str = Field(
...,
description="Filter base name to assign to the jail (e.g. ``sshd``).",
)
class FilterListResponse(BaseModel):
"""Response for ``GET /api/config/filters``."""
model_config = ConfigDict(strict=True)
filters: list[FilterConfig] = Field(
default_factory=list,
description=(
"All discovered filters, each annotated with active/inactive status "
"and the jails that reference them."
),
)
total: int = Field(..., ge=0, description="Total number of filters found.")
# ---------------------------------------------------------------------------
# Parsed action file models
# ---------------------------------------------------------------------------
class ActionConfig(BaseModel):
"""Structured representation of an ``action.d/*.conf`` file."""
model_config = ConfigDict(strict=True)
name: str = Field(..., description="Action base name, e.g. ``iptables``.")
filename: str = Field(..., description="Actual filename, e.g. ``iptables.conf``.")
# [INCLUDES]
before: str | None = Field(default=None)
after: str | None = Field(default=None)
# [Definition] — well-known lifecycle commands
actionstart: str | None = Field(
default=None,
description="Executed at jail start or first ban.",
)
actionstop: str | None = Field(
default=None,
description="Executed at jail stop.",
)
actioncheck: str | None = Field(
default=None,
description="Executed before each ban.",
)
actionban: str | None = Field(
default=None,
description="Executed to ban an IP. Tags: ``<ip>``, ``<name>``, ``<port>``.",
)
actionunban: str | None = Field(
default=None,
description="Executed to unban an IP.",
)
actionflush: str | None = Field(
default=None,
description="Executed to flush all bans on shutdown.",
)
# [Definition] — extra variables not covered by the well-known keys
definition_vars: dict[str, str] = Field(
default_factory=dict,
description="Additional ``[Definition]`` variables.",
)
# [Init] — runtime-configurable parameters
init_vars: dict[str, str] = Field(
default_factory=dict,
description="Runtime parameters that can be overridden per jail.",
)
# Active-status fields — populated by action_config_service.list_actions /
# get_action; default to safe "inactive" values when not computed.
active: bool = Field(
default=False,
description=(
"``True`` when this action is referenced by at least one currently "
"enabled (running) jail."
),
)
used_by_jails: list[str] = Field(
default_factory=list,
description=(
"Names of currently enabled jails that reference this action. "
"Empty when ``active`` is ``False``."
),
)
source_file: str = Field(
default="",
description="Absolute path to the ``.conf`` source file for this action.",
)
has_local_override: bool = Field(
default=False,
description=(
"``True`` when a ``.local`` override file exists alongside the "
"base ``.conf`` file."
),
)
class ActionConfigUpdate(BaseModel):
"""Partial update payload for a parsed action file."""
model_config = ConfigDict(strict=True)
before: str | None = Field(default=None)
after: str | None = Field(default=None)
actionstart: str | None = Field(default=None)
actionstop: str | None = Field(default=None)
actioncheck: str | None = Field(default=None)
actionban: str | None = Field(default=None)
actionunban: str | None = Field(default=None)
actionflush: str | None = Field(default=None)
definition_vars: dict[str, str] | None = Field(default=None)
init_vars: dict[str, str] | None = Field(default=None)
class ActionListResponse(BaseModel):
"""Response for ``GET /api/config/actions``."""
model_config = ConfigDict(strict=True)
actions: list[ActionConfig] = Field(
default_factory=list,
description=(
"All discovered actions, each annotated with active/inactive status "
"and the jails that reference them."
),
)
total: int = Field(..., ge=0, description="Total number of actions found.")
class ActionUpdateRequest(BaseModel):
"""Payload for ``PUT /api/config/actions/{name}``.
Accepts only the user-editable ``[Definition]`` lifecycle fields and
``[Init]`` parameters. Fields left as ``None`` are not changed.
"""
model_config = ConfigDict(strict=True)
actionstart: str | None = Field(
default=None,
description="Updated ``actionstart`` command. ``None`` = keep existing.",
)
actionstop: str | None = Field(
default=None,
description="Updated ``actionstop`` command. ``None`` = keep existing.",
)
actioncheck: str | None = Field(
default=None,
description="Updated ``actioncheck`` command. ``None`` = keep existing.",
)
actionban: str | None = Field(
default=None,
description="Updated ``actionban`` command. ``None`` = keep existing.",
)
actionunban: str | None = Field(
default=None,
description="Updated ``actionunban`` command. ``None`` = keep existing.",
)
actionflush: str | None = Field(
default=None,
description="Updated ``actionflush`` command. ``None`` = keep existing.",
)
definition_vars: dict[str, str] | None = Field(
default=None,
description="Additional ``[Definition]`` variables to set. ``None`` = keep existing.",
)
init_vars: dict[str, str] | None = Field(
default=None,
description="``[Init]`` parameters to set. ``None`` = keep existing.",
)
class ActionCreateRequest(BaseModel):
"""Payload for ``POST /api/config/actions``.
Creates a new user-defined action at ``action.d/{name}.local``.
"""
model_config = ConfigDict(strict=True)
name: str = Field(
...,
description="Action base name (e.g. ``my-custom-action``). Must not already exist.",
)
actionstart: str | None = Field(default=None, description="Command to execute at jail start.")
actionstop: str | None = Field(default=None, description="Command to execute at jail stop.")
actioncheck: str | None = Field(default=None, description="Command to execute before each ban.")
actionban: str | None = Field(default=None, description="Command to execute to ban an IP.")
actionunban: str | None = Field(default=None, description="Command to execute to unban an IP.")
actionflush: str | None = Field(default=None, description="Command to flush all bans on shutdown.")
definition_vars: dict[str, str] = Field(
default_factory=dict,
description="Additional ``[Definition]`` variables.",
)
init_vars: dict[str, str] = Field(
default_factory=dict,
description="``[Init]`` runtime parameters.",
)
class AssignActionRequest(BaseModel):
"""Payload for ``POST /api/config/jails/{jail_name}/action``."""
model_config = ConfigDict(strict=True)
action_name: str = Field(
...,
description="Action base name to add to the jail (e.g. ``iptables-multiport``).",
)
params: dict[str, str] = Field(
default_factory=dict,
description=(
"Optional per-jail action parameters written as "
"``action_name[key=value, ...]`` in the jail config."
),
)
# ---------------------------------------------------------------------------
# Jail file config models (Task 6.1)
# ---------------------------------------------------------------------------
class JailSectionConfig(BaseModel):
"""Settings within a single [jailname] section of a jail.d file."""
model_config = ConfigDict(strict=True)
enabled: bool | None = Field(default=None, description="Whether this jail is enabled.")
port: str | None = Field(default=None, description="Port(s) to monitor (e.g. 'ssh' or '22,2222').")
filter: str | None = Field(default=None, description="Filter name to use (e.g. 'sshd').")
logpath: list[str] = Field(default_factory=list, description="Log file paths to monitor.")
maxretry: int | None = Field(default=None, ge=1, description="Failures before banning.")
findtime: int | None = Field(default=None, ge=1, description="Time window in seconds for counting failures.")
bantime: int | None = Field(default=None, description="Ban duration in seconds. -1 for permanent.")
action: list[str] = Field(default_factory=list, description="Action references.")
backend: BackendType | None = Field(default=None, description="Log monitoring backend.")
extra: dict[str, str] = Field(default_factory=dict, description="Additional settings not captured by named fields.")
class JailFileConfig(BaseModel):
"""Structured representation of a jail.d/*.conf file."""
model_config = ConfigDict(strict=True)
filename: str = Field(..., description="Filename including extension (e.g. 'sshd.conf').")
jails: dict[str, JailSectionConfig] = Field(
default_factory=dict,
description="Mapping of jail name → settings for each [section] in the file.",
)
class JailFileConfigUpdate(BaseModel):
"""Partial update payload for a jail.d file."""
model_config = ConfigDict(strict=True)
jails: dict[str, JailSectionConfig] | None = Field(
default=None,
description="Jail section updates. Only jails present in this dict are updated.",
)
# ---------------------------------------------------------------------------
# Inactive jail models (Stage 1)
# ---------------------------------------------------------------------------
class InactiveJail(BaseModel):
"""A jail defined in fail2ban config files that is not currently active.
A jail is considered inactive when its ``enabled`` key is ``false`` (or
absent from the config, since fail2ban defaults to disabled) **or** when it
is explicitly enabled in config but fail2ban is not reporting it as
running.
"""
model_config = ConfigDict(strict=True)
name: str = Field(..., description="Jail name from the config section header.")
filter: str = Field(
...,
description=(
"Filter name used by this jail. May include fail2ban mode suffix, "
"e.g. ``sshd[mode=normal]``."
),
)
actions: list[str] = Field(
default_factory=list,
description="Action references listed in the config (raw strings).",
)
port: str | None = Field(
default=None,
description="Port(s) to monitor, e.g. ``ssh`` or ``22,2222``.",
)
logpath: list[str] = Field(
default_factory=list,
description="Log file paths to monitor.",
)
bantime: str | None = Field(
default=None,
description="Ban duration as a raw config string, e.g. ``10m`` or ``-1``.",
)
findtime: str | None = Field(
default=None,
description="Failure-counting window as a raw config string, e.g. ``10m``.",
)
maxretry: int | None = Field(
default=None,
description="Number of failures before a ban is issued.",
)
# ---- Extended fields for full GUI display ----
ban_time_seconds: int = Field(
default=600,
description="Ban duration in seconds, parsed from bantime string.",
)
find_time_seconds: int = Field(
default=600,
description="Failure-counting window in seconds, parsed from findtime string.",
)
log_encoding: LogEncoding = Field(
default="auto",
description="Log encoding, e.g. ``utf-8`` or ``auto``.",
)
backend: BackendType = Field(
default="auto",
description="Log-monitoring backend, e.g. ``auto``, ``pyinotify``, ``polling``.",
)
date_pattern: str | None = Field(
default=None,
description="Date pattern for log parsing, or None for auto-detect.",
)
use_dns: DNSMode = Field(
default="warn",
description="DNS resolution mode: ``yes``, ``warn``, ``no``, or ``raw``.",
)
prefregex: str = Field(
default="",
description="Prefix regex prepended to every failregex.",
)
fail_regex: list[str] = Field(
default_factory=list,
description="List of failure regex patterns.",
)
ignore_regex: list[str] = Field(
default_factory=list,
description="List of ignore regex patterns.",
)
bantime_escalation: BantimeEscalation | None = Field(
default=None,
description="Ban-time escalation configuration, if enabled.",
)
source_file: str = Field(
...,
description="Absolute path to the config file where this jail is defined.",
)
enabled: bool = Field(
...,
description=(
"Effective ``enabled`` value from the merged config. ``False`` for "
"inactive jails that appear in this list."
),
)
has_local_override: bool = Field(
default=False,
description=(
"``True`` when a ``jail.d/{name}.local`` file exists for this jail. "
"Only meaningful for inactive jails; indicates that a cleanup action "
"is available."
),
)
class InactiveJailListResponse(BaseModel):
"""Response for ``GET /api/config/jails/inactive``."""
model_config = ConfigDict(strict=True)
jails: list[InactiveJail] = Field(default_factory=list)
total: int = Field(..., ge=0)
class ActivateJailRequest(BaseModel):
"""Optional override values when activating an inactive jail.
All fields are optional. Omitted fields are not written to the
``.local`` override file so that fail2ban falls back to its default
values.
"""
model_config = ConfigDict(strict=True)
bantime: str | None = Field(
default=None,
description="Override ban duration, e.g. ``1h`` or ``3600``.",
)
findtime: str | None = Field(
default=None,
description="Override failure-counting window, e.g. ``10m``.",
)
maxretry: int | None = Field(
default=None,
ge=1,
description="Override maximum failures before a ban.",
)
port: str | None = Field(
default=None,
description="Override port(s) to monitor.",
)
logpath: list[str] | None = Field(
default=None,
description="Override log file paths.",
)
class JailActivationResponse(BaseModel):
"""Response for jail activation and deactivation endpoints."""
model_config = ConfigDict(strict=True)
name: str = Field(..., description="Name of the affected jail.")
active: bool = Field(
...,
description="New activation state: ``True`` after activate, ``False`` after deactivate.",
)
message: str = Field(..., description="Human-readable result message.")
fail2ban_running: bool = Field(
default=True,
description=(
"Whether the fail2ban daemon is still running after the activation "
"and reload. ``False`` signals that the daemon may have crashed."
),
)
validation_warnings: list[str] = Field(
default_factory=list,
description="Non-fatal warnings from the pre-activation validation step.",
)
recovered: bool | None = Field(
default=None,
description=(
"Set when activation failed after writing the config file. "
"``True`` means the system automatically rolled back the change and "
"restarted fail2ban. ``False`` means the rollback itself also "
"failed and manual intervention is required. ``None`` when "
"activation succeeded or failed before the file was written."
),
)
# ---------------------------------------------------------------------------
# Jail validation models (Task 3)
# ---------------------------------------------------------------------------
class JailValidationIssue(BaseModel):
"""A single issue found during pre-activation validation of a jail config."""
model_config = ConfigDict(strict=True)
field: str = Field(
...,
description="Config field associated with this issue, e.g. 'filter', 'failregex', 'logpath'.",
)
message: str = Field(..., description="Human-readable description of the issue.")
class JailValidationResult(BaseModel):
"""Result of pre-activation validation of a single jail configuration."""
model_config = ConfigDict(strict=True)
jail_name: str = Field(..., description="Name of the validated jail.")
valid: bool = Field(..., description="True when no issues were found.")
issues: list[JailValidationIssue] = Field(
default_factory=list,
description="Validation issues found. Empty when valid=True.",
)
# ---------------------------------------------------------------------------
# Rollback response model (Task 3)
# ---------------------------------------------------------------------------
class RollbackResponse(BaseModel):
"""Response for ``POST /api/config/jails/{name}/rollback``."""
model_config = ConfigDict(strict=True)
jail_name: str = Field(..., description="Name of the jail that was disabled.")
disabled: bool = Field(
...,
description="Whether the jail's .local override was successfully written with enabled=false.",
)
fail2ban_running: bool = Field(
...,
description="Whether fail2ban is online after the rollback attempt.",
)
active_jails: int = Field(
default=0,
ge=0,
description="Number of currently active jails after a successful restart.",
)
message: str = Field(..., description="Human-readable result message.")
# ---------------------------------------------------------------------------
# Pending recovery model (Task 3)
# ---------------------------------------------------------------------------
class PendingRecovery(BaseModel):
"""Records a probable activation-caused fail2ban crash pending user action."""
model_config = ConfigDict(strict=True)
jail_name: str = Field(
...,
description="Name of the jail whose activation likely caused the crash.",
)
activated_at: datetime.datetime = Field(
...,
description="ISO-8601 UTC timestamp of when the jail was activated.",
)
detected_at: datetime.datetime = Field(
...,
description="ISO-8601 UTC timestamp of when the crash was detected.",
)
recovered: bool = Field(
default=False,
description="Whether fail2ban has been successfully restarted.",
)
# ---------------------------------------------------------------------------
# fail2ban log viewer models
# ---------------------------------------------------------------------------
class Fail2BanLogResponse(BaseModel):
"""Response for ``GET /api/config/fail2ban-log``."""
model_config = ConfigDict(strict=True)
log_path: str = Field(..., description="Resolved absolute path of the log file being read.")
lines: list[str] = Field(default_factory=list, description="Log lines returned (tail, optionally filtered).")
total_lines: int = Field(..., ge=0, description="Total number of lines in the file before filtering.")
log_level: str = Field(..., description="Current fail2ban log level.")
log_target: str = Field(..., description="Current fail2ban log target (file path or special value).")
class ServiceStatusResponse(BaseModel):
"""Response for ``GET /api/config/service-status``."""
model_config = ConfigDict(strict=True)
online: bool = Field(..., description="Whether fail2ban is reachable via its socket.")
version: str | None = Field(default=None, description="BanGUI application version (or None when offline).")
jail_count: int = Field(default=0, ge=0, description="Number of currently active jails.")
total_bans: int = Field(default=0, ge=0, description="Aggregated current ban count across all jails.")
total_failures: int = Field(default=0, ge=0, description="Aggregated current failure count across all jails.")
log_level: str = Field(default="UNKNOWN", description="Current fail2ban log level.")
log_target: str = Field(default="UNKNOWN", description="Current fail2ban log target.")