Files
BanGUI/backend/app/models/ban.py
Lukas 0817a4cb47 fix(regex_validator): add ReDoS detection via regexploit
Detect catastrophic backtracking patterns before regex compilation
using regexploit library. Add ReDoSDetectedError exception and
_MINIMUM_STARRINESS threshold (>=3) to catch dangerous patterns
like (a+)+b. Update pyproject.toml deps, add tests for detection.
2026-05-03 00:05:33 +02:00

283 lines
10 KiB
Python

"""Ban management Pydantic models.
Request, response, and domain models used by the ban router and service.
"""
from typing import Literal
from pydantic import Field, field_validator
from app.models.response import BanGuiBaseModel, CollectionResponse, PaginatedListResponse
class BanRequest(BanGuiBaseModel):
"""Payload for ``POST /api/bans`` (ban an IP)."""
ip: str = Field(..., description="IP address to ban.")
jail: str = Field(..., description="Jail in which to apply the ban.")
class UnbanRequest(BanGuiBaseModel):
"""Payload for ``DELETE /api/bans`` (unban an IP)."""
ip: str = Field(..., description="IP address to unban.")
jail: str | None = Field(
default=None,
description="Jail to remove the ban from. ``null`` means all jails.",
)
unban_all: bool = Field(
default=False,
description="When ``true`` the IP is unbanned from every jail.",
)
#: Discriminator literal for the origin of a ban.
BanOrigin = Literal["blocklist", "selfblock"]
#: Jail name used by the blocklist import service.
BLOCKLIST_JAIL: str = "blocklist-import"
def _derive_origin(jail: str) -> BanOrigin:
"""Derive the ban origin from the jail name.
Args:
jail: The jail that issued the ban.
Returns:
``"blocklist"`` when the jail is the dedicated blocklist-import
jail, ``"selfblock"`` otherwise.
"""
return "blocklist" if jail == BLOCKLIST_JAIL else "selfblock"
class Ban(BanGuiBaseModel):
"""Domain model representing a single active or historical ban record."""
ip: str = Field(..., description="Banned IP address.")
jail: str = Field(..., description="Jail that issued the ban.")
banned_at: str = Field(..., description="ISO 8601 UTC timestamp of the ban.")
expires_at: str | None = Field(
default=None,
description="ISO 8601 UTC expiry timestamp, or ``null`` if permanent.",
)
ban_count: int = Field(..., ge=1, description="Number of times this IP was banned.")
country: str | None = Field(
default=None,
description="ISO 3166-1 alpha-2 country code resolved from the IP.",
)
origin: BanOrigin = Field(
...,
description="Whether this ban came from a blocklist import or fail2ban itself.",
)
@field_validator("country")
@classmethod
def _normalize_empty_country(cls, v: str | None) -> str | None:
"""Coerce empty strings to None for country.
Geo enrichment may produce an empty string instead of None for
unresolved IPs, which breaks frontend truthiness checks.
"""
if v == "":
return None
return v
class BanResponse(BanGuiBaseModel):
"""Response containing a single ban record."""
ban: Ban
class BanListResponse(PaginatedListResponse[Ban]):
"""Paginated list of ban records.
Request: `GET /api/bans` with optional pagination and filter parameters.
Response: Paginated collection of ban records with total count.
Note: Unlike most list endpoints, this endpoint uses `page` and `page_size`
for pagination. When using this response, ensure the router provides these fields.
"""
pass
class ActiveBan(BanGuiBaseModel):
"""A currently active ban entry returned by ``GET /api/bans/active``."""
ip: str = Field(..., description="Banned IP address.")
jail: str = Field(..., description="Jail holding the ban.")
banned_at: str | None = Field(default=None, description="ISO 8601 UTC start of the ban.")
expires_at: str | None = Field(
default=None,
description="ISO 8601 UTC expiry, or ``null`` if permanent.",
)
ban_count: int = Field(default=1, ge=1, description="Running ban count for this IP.")
country: str | None = Field(default=None, description="ISO 3166-1 alpha-2 country code.")
@field_validator("country")
@classmethod
def _normalize_empty_country(cls, v: str | None) -> str | None:
"""Coerce empty strings to None for country.
Geo enrichment may produce an empty string instead of None for
unresolved IPs, which breaks frontend truthiness checks.
"""
if v == "":
return None
return v
class ActiveBanListResponse(CollectionResponse[ActiveBan]):
"""List of all currently active bans across all jails.
Request: `GET /api/bans/active` with optional filter parameters.
Response: Non-paginated collection of currently active bans with total count.
Note: This endpoint does not support pagination. All matching bans are returned.
For paginated results, use individual jail endpoints or the dashboard ban-list view.
"""
pass
class UnbanAllResponse(BanGuiBaseModel):
"""Response for ``DELETE /api/bans/all``."""
message: str = Field(..., description="Human-readable summary of the operation.")
count: int = Field(..., ge=0, description="Number of IPs that were unbanned.")
# ---------------------------------------------------------------------------
# Dashboard ban-list view models
# ---------------------------------------------------------------------------
class DashboardBanItem(BanGuiBaseModel):
"""A single row in the dashboard ban-list table.
Populated from the fail2ban database and enriched with geo data.
"""
ip: str = Field(..., description="Banned IP address.")
jail: str = Field(..., description="Jail that issued the ban.")
banned_at: str = Field(..., description="ISO 8601 UTC timestamp of the ban.")
service: str | None = Field(
default=None,
description="First matched log line — used as context for the ban.",
)
country_code: str | None = Field(
default=None,
description="ISO 3166-1 alpha-2 country code, or ``null`` if unknown.",
)
country_name: str | None = Field(
default=None,
description="Human-readable country name, or ``null`` if unknown.",
)
asn: str | None = Field(
default=None,
description="Autonomous System Number string (e.g. ``'AS3320'``).",
)
org: str | None = Field(
default=None,
description="Organisation name associated with the IP.",
)
ban_count: int = Field(..., ge=1, description="How many times this IP was banned.")
origin: BanOrigin = Field(
...,
description="Whether this ban came from a blocklist import or fail2ban itself.",
)
@field_validator("country_code")
@classmethod
def _normalize_empty_country_code(cls, v: str | None) -> str | None:
"""Coerce empty strings to None for country_code.
The geo enrichment layer may produce an empty string instead of None
for unresolved IPs. Frontend type narrowing uses truthiness, so an
empty string would slip through ``if (ban.country_code)`` checks and
appear as a falsy-but-not-null value — breaking UI rendering.
"""
if v == "":
return None
return v
class DashboardBanListResponse(PaginatedListResponse[DashboardBanItem]):
"""Paginated dashboard ban-list response.
Request: `GET /api/dashboard/bans` with time range, page, and filter parameters.
Response: Paginated collection of dashboard ban items with geo-enrichment.
"""
pass
class BansByCountryResponse(BanGuiBaseModel):
"""Response for the bans-by-country aggregation endpoint.
Contains a per-country ban count, a human-readable country name map, and
the full (un-paginated) ban list for the selected time window so the
frontend can render both the world map and its companion table from a
single request.
"""
countries: dict[str, int] = Field(
default_factory=dict,
description="ISO 3166-1 alpha-2 country code → ban count.",
)
country_names: dict[str, str] = Field(
default_factory=dict,
description="ISO 3166-1 alpha-2 country code → human-readable country name.",
)
bans: list[DashboardBanItem] = Field(
default_factory=list,
description="All bans in the selected time window (up to the server limit).",
)
total: int = Field(..., ge=0, description="Total ban count in the window.")
# ---------------------------------------------------------------------------
# Trend endpoint models
class BanTrendBucket(BanGuiBaseModel):
"""A single time bucket in the ban trend series."""
timestamp: str = Field(..., description="ISO 8601 UTC start of the bucket.")
count: int = Field(..., ge=0, description="Number of bans that started in this bucket.")
class BanTrendResponse(BanGuiBaseModel):
"""Response for the ``GET /api/dashboard/bans/trend`` endpoint."""
buckets: list[BanTrendBucket] = Field(
default_factory=list,
description="Time-ordered list of ban-count buckets covering the full window.",
)
bucket_size: str = Field(
...,
description="Human-readable bucket size label (e.g. '1h', '6h', '1d', '7d').",
)
# ---------------------------------------------------------------------------
# By-jail endpoint models
# ---------------------------------------------------------------------------
class JailBanCount(BanGuiBaseModel):
"""A single jail entry in the bans-by-jail aggregation."""
jail: str = Field(..., description="Jail name.")
count: int = Field(..., ge=0, description="Number of bans recorded in this jail.")
class BansByJailResponse(BanGuiBaseModel):
"""Response for the ``GET /api/dashboard/bans/by-jail`` endpoint."""
jails: list[JailBanCount] = Field(
default_factory=list,
description="Jails ordered by ban count descending.",
)
total: int = Field(..., ge=0, description="Total ban count in the selected window.")
# ---------------------------------------------------------------------------
# Jail-specific paginated bans
# ---------------------------------------------------------------------------
class JailBannedIpsResponse(PaginatedListResponse[ActiveBan]):
"""Paginated response for ``GET /api/jails/{name}/banned``.
Contains only the current page of active ban entries for a single jail,
geo-enriched exclusively for the page slice to avoid rate-limit issues.
Request: `GET /api/jails/{name}/banned` with page and page_size parameters.
Response: Paginated collection of active bans for the specified jail.
"""
pass