Files
BanGUI/backend/app/models/ban.py
Lukas 9242b4709a Add GET /api/dashboard/bans/trend endpoint
Implement time-bucketed ban aggregation for dashboard trend charts:

- Add BanTrendBucket / BanTrendResponse Pydantic models and
  BUCKET_SECONDS / BUCKET_SIZE_LABEL / bucket_count helpers to ban.py
- Add ban_service.ban_trend(): queries fail2ban DB with SQL bucket
  grouping, fills zero-count buckets, respects origin filter
- Add GET /api/dashboard/bans/trend route in dashboard.py
- 20 new tests (10 service, 10 router); 480 total pass, 83% coverage
- ruff + mypy --strict clean
2026-03-11 16:38:19 +01:00

283 lines
9.0 KiB
Python

"""Ban management Pydantic models.
Request, response, and domain models used by the ban router and service.
"""
import math
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field
# ---------------------------------------------------------------------------
# Time-range selector
# ---------------------------------------------------------------------------
#: The four supported time-range presets for the dashboard views.
TimeRange = Literal["24h", "7d", "30d", "365d"]
#: Number of seconds represented by each preset.
TIME_RANGE_SECONDS: dict[str, int] = {
"24h": 24 * 3600,
"7d": 7 * 24 * 3600,
"30d": 30 * 24 * 3600,
"365d": 365 * 24 * 3600,
}
class BanRequest(BaseModel):
"""Payload for ``POST /api/bans`` (ban an IP)."""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="IP address to ban.")
jail: str = Field(..., description="Jail in which to apply the ban.")
class UnbanRequest(BaseModel):
"""Payload for ``DELETE /api/bans`` (unban an IP)."""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="IP address to unban.")
jail: str | None = Field(
default=None,
description="Jail to remove the ban from. ``null`` means all jails.",
)
unban_all: bool = Field(
default=False,
description="When ``true`` the IP is unbanned from every jail.",
)
#: Discriminator literal for the origin of a ban.
BanOrigin = Literal["blocklist", "selfblock"]
#: Jail name used by the blocklist import service.
BLOCKLIST_JAIL: str = "blocklist-import"
def _derive_origin(jail: str) -> BanOrigin:
"""Derive the ban origin from the jail name.
Args:
jail: The jail that issued the ban.
Returns:
``"blocklist"`` when the jail is the dedicated blocklist-import
jail, ``"selfblock"`` otherwise.
"""
return "blocklist" if jail == BLOCKLIST_JAIL else "selfblock"
class Ban(BaseModel):
"""Domain model representing a single active or historical ban record."""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="Banned IP address.")
jail: str = Field(..., description="Jail that issued the ban.")
banned_at: str = Field(..., description="ISO 8601 UTC timestamp of the ban.")
expires_at: str | None = Field(
default=None,
description="ISO 8601 UTC expiry timestamp, or ``null`` if permanent.",
)
ban_count: int = Field(..., ge=1, description="Number of times this IP was banned.")
country: str | None = Field(
default=None,
description="ISO 3166-1 alpha-2 country code resolved from the IP.",
)
origin: BanOrigin = Field(
...,
description="Whether this ban came from a blocklist import or fail2ban itself.",
)
class BanResponse(BaseModel):
"""Response containing a single ban record."""
model_config = ConfigDict(strict=True)
ban: Ban
class BanListResponse(BaseModel):
"""Paginated list of ban records."""
model_config = ConfigDict(strict=True)
bans: list[Ban] = Field(default_factory=list)
total: int = Field(..., ge=0, description="Total number of matching records.")
class ActiveBan(BaseModel):
"""A currently active ban entry returned by ``GET /api/bans/active``."""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="Banned IP address.")
jail: str = Field(..., description="Jail holding the ban.")
banned_at: str | None = Field(default=None, description="ISO 8601 UTC start of the ban.")
expires_at: str | None = Field(
default=None,
description="ISO 8601 UTC expiry, or ``null`` if permanent.",
)
ban_count: int = Field(default=1, ge=1, description="Running ban count for this IP.")
country: str | None = Field(default=None, description="ISO 3166-1 alpha-2 country code.")
class ActiveBanListResponse(BaseModel):
"""List of all currently active bans across all jails."""
model_config = ConfigDict(strict=True)
bans: list[ActiveBan] = Field(default_factory=list)
total: int = Field(..., ge=0)
class UnbanAllResponse(BaseModel):
"""Response for ``DELETE /api/bans/all``."""
model_config = ConfigDict(strict=True)
message: str = Field(..., description="Human-readable summary of the operation.")
count: int = Field(..., ge=0, description="Number of IPs that were unbanned.")
# ---------------------------------------------------------------------------
# Dashboard ban-list view models
# ---------------------------------------------------------------------------
class DashboardBanItem(BaseModel):
"""A single row in the dashboard ban-list table.
Populated from the fail2ban database and enriched with geo data.
"""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="Banned IP address.")
jail: str = Field(..., description="Jail that issued the ban.")
banned_at: str = Field(..., description="ISO 8601 UTC timestamp of the ban.")
service: str | None = Field(
default=None,
description="First matched log line — used as context for the ban.",
)
country_code: str | None = Field(
default=None,
description="ISO 3166-1 alpha-2 country code, or ``null`` if unknown.",
)
country_name: str | None = Field(
default=None,
description="Human-readable country name, or ``null`` if unknown.",
)
asn: str | None = Field(
default=None,
description="Autonomous System Number string (e.g. ``'AS3320'``).",
)
org: str | None = Field(
default=None,
description="Organisation name associated with the IP.",
)
ban_count: int = Field(..., ge=1, description="How many times this IP was banned.")
origin: BanOrigin = Field(
...,
description="Whether this ban came from a blocklist import or fail2ban itself.",
)
class DashboardBanListResponse(BaseModel):
"""Paginated dashboard ban-list response."""
model_config = ConfigDict(strict=True)
items: list[DashboardBanItem] = Field(default_factory=list)
total: int = Field(..., ge=0, description="Total bans in the selected time window.")
page: int = Field(..., ge=1)
page_size: int = Field(..., ge=1)
class BansByCountryResponse(BaseModel):
"""Response for the bans-by-country aggregation endpoint.
Contains a per-country ban count, a human-readable country name map, and
the full (un-paginated) ban list for the selected time window so the
frontend can render both the world map and its companion table from a
single request.
"""
model_config = ConfigDict(strict=True)
countries: dict[str, int] = Field(
default_factory=dict,
description="ISO 3166-1 alpha-2 country code → ban count.",
)
country_names: dict[str, str] = Field(
default_factory=dict,
description="ISO 3166-1 alpha-2 country code → human-readable country name.",
)
bans: list[DashboardBanItem] = Field(
default_factory=list,
description="All bans in the selected time window (up to the server limit).",
)
total: int = Field(..., ge=0, description="Total ban count in the window.")
# ---------------------------------------------------------------------------
# Trend endpoint models
# ---------------------------------------------------------------------------
#: Bucket size in seconds for each time-range preset.
BUCKET_SECONDS: dict[str, int] = {
"24h": 3_600, # 1 hour → 24 buckets
"7d": 6 * 3_600, # 6 hours → 28 buckets
"30d": 86_400, # 1 day → 30 buckets
"365d": 7 * 86_400, # 7 days → ~53 buckets
}
#: Human-readable bucket size label for each time-range preset.
BUCKET_SIZE_LABEL: dict[str, str] = {
"24h": "1h",
"7d": "6h",
"30d": "1d",
"365d": "7d",
}
def bucket_count(range_: TimeRange) -> int:
"""Return the number of buckets needed to cover *range_* completely.
Args:
range_: One of the supported time-range presets.
Returns:
Ceiling division of the range duration by the bucket size so that
the last bucket is included even when the window is not an exact
multiple of the bucket size.
"""
return math.ceil(TIME_RANGE_SECONDS[range_] / BUCKET_SECONDS[range_])
class BanTrendBucket(BaseModel):
"""A single time bucket in the ban trend series."""
model_config = ConfigDict(strict=True)
timestamp: str = Field(..., description="ISO 8601 UTC start of the bucket.")
count: int = Field(..., ge=0, description="Number of bans that started in this bucket.")
class BanTrendResponse(BaseModel):
"""Response for the ``GET /api/dashboard/bans/trend`` endpoint."""
model_config = ConfigDict(strict=True)
buckets: list[BanTrendBucket] = Field(
default_factory=list,
description="Time-ordered list of ban-count buckets covering the full window.",
)
bucket_size: str = Field(
...,
description="Human-readable bucket size label (e.g. '1h', '6h', '1d', '7d').",
)