No canonical snake_case/camelCase serialization policy

This commit is contained in:
2026-04-28 21:27:26 +02:00
parent b27765928a
commit ad21590f60
14 changed files with 186 additions and 475 deletions

View File

@@ -7,10 +7,10 @@ import datetime
from pathlib import Path
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic import Field, field_validator
from app.config import get_settings
from app.models.response import CollectionResponse
from app.models.response import BanGuiBaseModel, CollectionResponse
from app.utils.path_utils import validate_log_path
DNSMode = Literal["yes", "warn", "no", "raw"]
@@ -23,12 +23,9 @@ LogTarget = Literal["STDOUT", "STDERR", "SYSLOG"]
# Ban-time escalation
# ---------------------------------------------------------------------------
class BantimeEscalation(BaseModel):
class BantimeEscalation(BanGuiBaseModel):
"""Incremental ban-time escalation configuration for a jail."""
model_config = ConfigDict(strict=True)
increment: bool = Field(
default=False,
description="Whether incremental banning is enabled.",
@@ -58,12 +55,9 @@ class BantimeEscalation(BaseModel):
description="Count repeat offences across all jails, not just the current one.",
)
class BantimeEscalationUpdate(BaseModel):
class BantimeEscalationUpdate(BanGuiBaseModel):
"""Partial update payload for ban-time escalation settings."""
model_config = ConfigDict(strict=True)
increment: bool | None = Field(default=None)
factor: float | None = Field(default=None)
formula: str | None = Field(default=None)
@@ -72,17 +66,13 @@ class BantimeEscalationUpdate(BaseModel):
rnd_time: int | None = Field(default=None)
overall_jails: bool | None = Field(default=None)
# ---------------------------------------------------------------------------
# Jail configuration models
# ---------------------------------------------------------------------------
class JailConfig(BaseModel):
class JailConfig(BanGuiBaseModel):
"""Configuration snapshot of a single jail (editable fields)."""
model_config = ConfigDict(strict=True)
name: str = Field(..., description="Jail name as configured in fail2ban.")
ban_time: int = Field(..., description="Ban duration in seconds. -1 for permanent.")
max_retry: int = Field(..., ge=1, description="Number of failures before a ban is issued.")
@@ -101,15 +91,11 @@ class JailConfig(BaseModel):
description="Incremental ban-time escalation settings, or None if not configured.",
)
class JailConfigResponse(BaseModel):
class JailConfigResponse(BanGuiBaseModel):
"""Response for ``GET /api/config/jails/{name}``."""
model_config = ConfigDict(strict=True)
jail: JailConfig
class JailConfigListResponse(CollectionResponse[JailConfig]):
"""Response for ``GET /api/config/jails``.
@@ -118,12 +104,9 @@ class JailConfigListResponse(CollectionResponse[JailConfig]):
pass
class JailConfigUpdate(BaseModel):
class JailConfigUpdate(BanGuiBaseModel):
"""Payload for ``PUT /api/config/jails/{name}``."""
model_config = ConfigDict(strict=True)
ban_time: int | None = Field(default=None, description="Ban duration in seconds. -1 for permanent.")
max_retry: int | None = Field(default=None, ge=1)
find_time: int | None = Field(default=None, ge=1)
@@ -140,26 +123,19 @@ class JailConfigUpdate(BaseModel):
description="Incremental ban-time escalation settings to update.",
)
# ---------------------------------------------------------------------------
# Regex tester models
# ---------------------------------------------------------------------------
class RegexTestRequest(BaseModel):
class RegexTestRequest(BanGuiBaseModel):
"""Payload for ``POST /api/config/regex-test``."""
model_config = ConfigDict(strict=True)
log_line: str = Field(..., description="Sample log line to test against.")
fail_regex: str = Field(..., description="Regex pattern to match.")
class RegexTestResponse(BaseModel):
class RegexTestResponse(BanGuiBaseModel):
"""Result of a regex test."""
model_config = ConfigDict(strict=True)
matched: bool = Field(..., description="Whether the pattern matched the log line.")
groups: list[str] = Field(
default_factory=list,
@@ -170,17 +146,13 @@ class RegexTestResponse(BaseModel):
description="Compilation error message if the regex is invalid.",
)
# ---------------------------------------------------------------------------
# Global config models
# ---------------------------------------------------------------------------
class GlobalConfigResponse(BaseModel):
class GlobalConfigResponse(BanGuiBaseModel):
"""Response for ``GET /api/config/global``."""
model_config = ConfigDict(strict=True)
log_level: LogLevel
log_target: str = Field(..., description="Log target: STDOUT, STDERR, SYSLOG, or a validated file path.")
db_purge_age: int = Field(..., description="Seconds after which ban records are purged from the fail2ban DB.")
@@ -222,12 +194,9 @@ class GlobalConfigResponse(BaseModel):
f"Log target {value!r} is outside allowed directories: {allowed_dirs_str}"
)
class GlobalConfigUpdate(BaseModel):
class GlobalConfigUpdate(BanGuiBaseModel):
"""Payload for ``PUT /api/config/global``."""
model_config = ConfigDict(strict=True)
log_level: LogLevel | None = Field(
default=None,
description="Log level: CRITICAL, ERROR, WARNING, NOTICE, INFO, or DEBUG.",
@@ -278,17 +247,13 @@ class GlobalConfigUpdate(BaseModel):
f"Log target {value!r} is outside allowed directories: {allowed_dirs_str}"
)
# ---------------------------------------------------------------------------
# Log observation / preview models
# ---------------------------------------------------------------------------
class AddLogPathRequest(BaseModel):
class AddLogPathRequest(BanGuiBaseModel):
"""Payload for ``POST /api/config/jails/{name}/logpath``."""
model_config = ConfigDict(strict=True)
log_path: str = Field(..., description="Absolute path to the log file to monitor.")
tail: bool = Field(
default=True,
@@ -311,49 +276,35 @@ class AddLogPathRequest(BaseModel):
"""
return validate_log_path(value)
class LogPreviewRequest(BaseModel):
class LogPreviewRequest(BanGuiBaseModel):
"""Payload for ``POST /api/config/preview-log``."""
model_config = ConfigDict(strict=True)
log_path: str = Field(..., description="Absolute path to the log file to preview.")
fail_regex: str = Field(..., description="Regex pattern to test against log lines.")
num_lines: int = Field(default=200, ge=1, le=5000, description="Number of lines to read from the end of the file.")
class LogPreviewLine(BaseModel):
class LogPreviewLine(BanGuiBaseModel):
"""A single log line with match information."""
model_config = ConfigDict(strict=True)
line: str
matched: bool
groups: list[str] = Field(default_factory=list)
class LogPreviewResponse(BaseModel):
class LogPreviewResponse(BanGuiBaseModel):
"""Response for ``POST /api/config/preview-log``."""
model_config = ConfigDict(strict=True)
lines: list[LogPreviewLine] = Field(default_factory=list)
total_lines: int = Field(..., ge=0)
matched_count: int = Field(..., ge=0)
regex_error: str | None = Field(default=None, description="Set if the regex failed to compile.")
# ---------------------------------------------------------------------------
# Map color threshold models
# ---------------------------------------------------------------------------
class MapColorThresholdsResponse(BaseModel):
class MapColorThresholdsResponse(BanGuiBaseModel):
"""Response for ``GET /api/config/map-thresholds``."""
model_config = ConfigDict(strict=True)
threshold_high: int = Field(
..., description="Ban count for red coloring."
)
@@ -364,25 +315,20 @@ class MapColorThresholdsResponse(BaseModel):
..., description="Ban count for green coloring."
)
class MapColorThresholdsUpdate(BaseModel):
class MapColorThresholdsUpdate(BanGuiBaseModel):
"""Payload for ``PUT /api/config/map-thresholds``."""
model_config = ConfigDict(strict=True)
threshold_high: int = Field(..., gt=0, description="Ban count for red.")
threshold_medium: int = Field(
..., gt=0, description="Ban count for yellow."
)
threshold_low: int = Field(..., gt=0, description="Ban count for green.")
# ---------------------------------------------------------------------------
# Parsed filter file models
# ---------------------------------------------------------------------------
class FilterConfig(BaseModel):
class FilterConfig(BanGuiBaseModel):
"""Structured representation of a ``filter.d/*.conf`` file.
The ``active``, ``used_by_jails``, ``source_file``, and
@@ -393,8 +339,6 @@ class FilterConfig(BaseModel):
these fields carry their default values.
"""
model_config = ConfigDict(strict=True)
name: str = Field(..., description="Filter base name, e.g. ``sshd``.")
filename: str = Field(..., description="Actual filename, e.g. ``sshd.conf``.")
# [INCLUDES]
@@ -458,15 +402,12 @@ class FilterConfig(BaseModel):
),
)
class FilterConfigUpdate(BaseModel):
class FilterConfigUpdate(BanGuiBaseModel):
"""Partial update payload for a parsed filter file.
Only explicitly set (non-``None``) fields are written back.
"""
model_config = ConfigDict(strict=True)
before: str | None = Field(default=None)
after: str | None = Field(default=None)
variables: dict[str, str] | None = Field(default=None)
@@ -477,8 +418,7 @@ class FilterConfigUpdate(BaseModel):
datepattern: str | None = Field(default=None)
journalmatch: str | None = Field(default=None)
class FilterUpdateRequest(BaseModel):
class FilterUpdateRequest(BanGuiBaseModel):
"""Payload for ``PUT /api/config/filters/{name}``.
Accepts only the user-editable ``[Definition]`` fields. Fields left as
@@ -486,8 +426,6 @@ class FilterUpdateRequest(BaseModel):
preserved.
"""
model_config = ConfigDict(strict=True)
failregex: list[str] | None = Field(
default=None,
description="Updated failure-detection regex patterns. ``None`` = keep existing.",
@@ -505,15 +443,12 @@ class FilterUpdateRequest(BaseModel):
description="Systemd journal match expression. ``None`` = keep existing.",
)
class FilterCreateRequest(BaseModel):
class FilterCreateRequest(BanGuiBaseModel):
"""Payload for ``POST /api/config/filters``.
Creates a new user-defined filter at ``filter.d/{name}.local``.
"""
model_config = ConfigDict(strict=True)
name: str = Field(
...,
description="Filter base name (e.g. ``my-custom-filter``). Must not already exist in ``filter.d/``.",
@@ -539,23 +474,17 @@ class FilterCreateRequest(BaseModel):
description="Systemd journal match expression.",
)
class AssignFilterRequest(BaseModel):
class AssignFilterRequest(BanGuiBaseModel):
"""Payload for ``POST /api/config/jails/{jail_name}/filter``."""
model_config = ConfigDict(strict=True)
filter_name: str = Field(
...,
description="Filter base name to assign to the jail (e.g. ``sshd``).",
)
class FilterListResponse(BaseModel):
class FilterListResponse(BanGuiBaseModel):
"""Response for ``GET /api/config/filters``."""
model_config = ConfigDict(strict=True)
filters: list[FilterConfig] = Field(
default_factory=list,
description=(
@@ -565,17 +494,13 @@ class FilterListResponse(BaseModel):
)
total: int = Field(..., ge=0, description="Total number of filters found.")
# ---------------------------------------------------------------------------
# Parsed action file models
# ---------------------------------------------------------------------------
class ActionConfig(BaseModel):
class ActionConfig(BanGuiBaseModel):
"""Structured representation of an ``action.d/*.conf`` file."""
model_config = ConfigDict(strict=True)
name: str = Field(..., description="Action base name, e.g. ``iptables``.")
filename: str = Field(..., description="Actual filename, e.g. ``iptables.conf``.")
# [INCLUDES]
@@ -644,12 +569,9 @@ class ActionConfig(BaseModel):
),
)
class ActionConfigUpdate(BaseModel):
class ActionConfigUpdate(BanGuiBaseModel):
"""Partial update payload for a parsed action file."""
model_config = ConfigDict(strict=True)
before: str | None = Field(default=None)
after: str | None = Field(default=None)
actionstart: str | None = Field(default=None)
@@ -661,12 +583,9 @@ class ActionConfigUpdate(BaseModel):
definition_vars: dict[str, str] | None = Field(default=None)
init_vars: dict[str, str] | None = Field(default=None)
class ActionListResponse(BaseModel):
class ActionListResponse(BanGuiBaseModel):
"""Response for ``GET /api/config/actions``."""
model_config = ConfigDict(strict=True)
actions: list[ActionConfig] = Field(
default_factory=list,
description=(
@@ -676,16 +595,13 @@ class ActionListResponse(BaseModel):
)
total: int = Field(..., ge=0, description="Total number of actions found.")
class ActionUpdateRequest(BaseModel):
class ActionUpdateRequest(BanGuiBaseModel):
"""Payload for ``PUT /api/config/actions/{name}``.
Accepts only the user-editable ``[Definition]`` lifecycle fields and
``[Init]`` parameters. Fields left as ``None`` are not changed.
"""
model_config = ConfigDict(strict=True)
actionstart: str | None = Field(
default=None,
description="Updated ``actionstart`` command. ``None`` = keep existing.",
@@ -719,15 +635,12 @@ class ActionUpdateRequest(BaseModel):
description="``[Init]`` parameters to set. ``None`` = keep existing.",
)
class ActionCreateRequest(BaseModel):
class ActionCreateRequest(BanGuiBaseModel):
"""Payload for ``POST /api/config/actions``.
Creates a new user-defined action at ``action.d/{name}.local``.
"""
model_config = ConfigDict(strict=True)
name: str = Field(
...,
description="Action base name (e.g. ``my-custom-action``). Must not already exist.",
@@ -747,12 +660,9 @@ class ActionCreateRequest(BaseModel):
description="``[Init]`` runtime parameters.",
)
class AssignActionRequest(BaseModel):
class AssignActionRequest(BanGuiBaseModel):
"""Payload for ``POST /api/config/jails/{jail_name}/action``."""
model_config = ConfigDict(strict=True)
action_name: str = Field(
...,
description="Action base name to add to the jail (e.g. ``iptables-multiport``).",
@@ -765,17 +675,13 @@ class AssignActionRequest(BaseModel):
),
)
# ---------------------------------------------------------------------------
# Jail file config models (Task 6.1)
# ---------------------------------------------------------------------------
class JailSectionConfig(BaseModel):
class JailSectionConfig(BanGuiBaseModel):
"""Settings within a single [jailname] section of a jail.d file."""
model_config = ConfigDict(strict=True)
enabled: bool | None = Field(default=None, description="Whether this jail is enabled.")
port: str | None = Field(default=None, description="Port(s) to monitor (e.g. 'ssh' or '22,2222').")
filter: str | None = Field(default=None, description="Filter name to use (e.g. 'sshd').")
@@ -787,36 +693,28 @@ class JailSectionConfig(BaseModel):
backend: BackendType | None = Field(default=None, description="Log monitoring backend.")
extra: dict[str, str] = Field(default_factory=dict, description="Additional settings not captured by named fields.")
class JailFileConfig(BaseModel):
class JailFileConfig(BanGuiBaseModel):
"""Structured representation of a jail.d/*.conf file."""
model_config = ConfigDict(strict=True)
filename: str = Field(..., description="Filename including extension (e.g. 'sshd.conf').")
jails: dict[str, JailSectionConfig] = Field(
default_factory=dict,
description="Mapping of jail name → settings for each [section] in the file.",
)
class JailFileConfigUpdate(BaseModel):
class JailFileConfigUpdate(BanGuiBaseModel):
"""Partial update payload for a jail.d file."""
model_config = ConfigDict(strict=True)
jails: dict[str, JailSectionConfig] | None = Field(
default=None,
description="Jail section updates. Only jails present in this dict are updated.",
)
# ---------------------------------------------------------------------------
# Inactive jail models (Stage 1)
# ---------------------------------------------------------------------------
class InactiveJail(BaseModel):
class InactiveJail(BanGuiBaseModel):
"""A jail defined in fail2ban config files that is not currently active.
A jail is considered inactive when its ``enabled`` key is ``false`` (or
@@ -825,8 +723,6 @@ class InactiveJail(BaseModel):
running.
"""
model_config = ConfigDict(strict=True)
name: str = Field(..., description="Jail name from the config section header.")
filter: str = Field(
...,
@@ -920,7 +816,6 @@ class InactiveJail(BaseModel):
),
)
class InactiveJailListResponse(CollectionResponse[InactiveJail]):
"""Response for ``GET /api/config/jails/inactive``.
@@ -929,8 +824,7 @@ class InactiveJailListResponse(CollectionResponse[InactiveJail]):
pass
class ActivateJailRequest(BaseModel):
class ActivateJailRequest(BanGuiBaseModel):
"""Optional override values when activating an inactive jail.
All fields are optional. Omitted fields are not written to the
@@ -938,8 +832,6 @@ class ActivateJailRequest(BaseModel):
values.
"""
model_config = ConfigDict(strict=True)
bantime: str | None = Field(
default=None,
description="Override ban duration, e.g. ``1h`` or ``3600``.",
@@ -962,12 +854,9 @@ class ActivateJailRequest(BaseModel):
description="Override log file paths.",
)
class JailActivationResponse(BaseModel):
class JailActivationResponse(BanGuiBaseModel):
"""Response for jail activation and deactivation endpoints."""
model_config = ConfigDict(strict=True)
name: str = Field(..., description="Name of the affected jail.")
active: bool = Field(
...,
@@ -996,29 +885,22 @@ class JailActivationResponse(BaseModel):
),
)
# ---------------------------------------------------------------------------
# Jail validation models (Task 3)
# ---------------------------------------------------------------------------
class JailValidationIssue(BaseModel):
class JailValidationIssue(BanGuiBaseModel):
"""A single issue found during pre-activation validation of a jail config."""
model_config = ConfigDict(strict=True)
field: str = Field(
...,
description="Config field associated with this issue, e.g. 'filter', 'failregex', 'logpath'.",
)
message: str = Field(..., description="Human-readable description of the issue.")
class JailValidationResult(BaseModel):
class JailValidationResult(BanGuiBaseModel):
"""Result of pre-activation validation of a single jail configuration."""
model_config = ConfigDict(strict=True)
jail_name: str = Field(..., description="Name of the validated jail.")
valid: bool = Field(..., description="True when no issues were found.")
issues: list[JailValidationIssue] = Field(
@@ -1026,17 +908,13 @@ class JailValidationResult(BaseModel):
description="Validation issues found. Empty when valid=True.",
)
# ---------------------------------------------------------------------------
# Rollback response model (Task 3)
# ---------------------------------------------------------------------------
class RollbackResponse(BaseModel):
class RollbackResponse(BanGuiBaseModel):
"""Response for ``POST /api/config/jails/{name}/rollback``."""
model_config = ConfigDict(strict=True)
jail_name: str = Field(..., description="Name of the jail that was disabled.")
disabled: bool = Field(
...,
@@ -1053,17 +931,13 @@ class RollbackResponse(BaseModel):
)
message: str = Field(..., description="Human-readable result message.")
# ---------------------------------------------------------------------------
# Pending recovery model (Task 3)
# ---------------------------------------------------------------------------
class PendingRecovery(BaseModel):
class PendingRecovery(BanGuiBaseModel):
"""Records a probable activation-caused fail2ban crash pending user action."""
model_config = ConfigDict(strict=True)
jail_name: str = Field(
...,
description="Name of the jail whose activation likely caused the crash.",
@@ -1081,29 +955,22 @@ class PendingRecovery(BaseModel):
description="Whether fail2ban has been successfully restarted.",
)
# ---------------------------------------------------------------------------
# fail2ban log viewer models
# ---------------------------------------------------------------------------
class Fail2BanLogResponse(BaseModel):
class Fail2BanLogResponse(BanGuiBaseModel):
"""Response for ``GET /api/config/fail2ban-log``."""
model_config = ConfigDict(strict=True)
log_path: str = Field(..., description="Resolved absolute path of the log file being read.")
lines: list[str] = Field(default_factory=list, description="Log lines returned (tail, optionally filtered).")
total_lines: int = Field(..., ge=0, description="Total number of lines in the file before filtering.")
log_level: str = Field(..., description="Current fail2ban log level.")
log_target: str = Field(..., description="Current fail2ban log target (file path or special value).")
class ServiceStatusResponse(BaseModel):
class ServiceStatusResponse(BanGuiBaseModel):
"""Response for ``GET /api/config/service-status``."""
model_config = ConfigDict(strict=True)
online: bool = Field(..., description="Whether fail2ban is reachable via its socket.")
version: str | None = Field(default=None, description="BanGUI application version (or None when offline).")
jail_count: int = Field(default=0, ge=0, description="Number of currently active jails.")