Files
BanGUI/backend/app/models/ban.py
Lukas baf45c6c62 feat: Task 4 — paginated banned-IPs section on jail detail page
Backend:
- Add JailBannedIpsResponse Pydantic model (ban.py)
- Add get_jail_banned_ips() service: server-side pagination, optional
  IP substring search, geo enrichment on page slice only (jail_service.py)
- Add GET /api/jails/{name}/banned endpoint with page/page_size/search
  query params, 400/404/502 error handling (routers/jails.py)
- 23 new tests: 13 service tests + 10 router tests (all passing)

Frontend:
- Add JailBannedIpsResponse TS interface (types/jail.ts)
- Add jailBanned endpoint helper (api/endpoints.ts)
- Add fetchJailBannedIps() API function (api/jails.ts)
- Add BannedIpsSection component: Fluent UI DataGrid, debounced search
  (300 ms), prev/next pagination, page-size dropdown, per-row unban
  button, loading spinner, empty state, error MessageBar (BannedIpsSection.tsx)
- Mount BannedIpsSection in JailDetailPage between stats and patterns
- 12 new Vitest tests for BannedIpsSection (all passing)
2026-03-14 16:28:43 +01:00

336 lines
11 KiB
Python

"""Ban management Pydantic models.
Request, response, and domain models used by the ban router and service.
"""
import math
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field
# ---------------------------------------------------------------------------
# Time-range selector
# ---------------------------------------------------------------------------
#: The four supported time-range presets for the dashboard views.
TimeRange = Literal["24h", "7d", "30d", "365d"]
#: Number of seconds represented by each preset.
TIME_RANGE_SECONDS: dict[str, int] = {
"24h": 24 * 3600,
"7d": 7 * 24 * 3600,
"30d": 30 * 24 * 3600,
"365d": 365 * 24 * 3600,
}
class BanRequest(BaseModel):
"""Payload for ``POST /api/bans`` (ban an IP)."""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="IP address to ban.")
jail: str = Field(..., description="Jail in which to apply the ban.")
class UnbanRequest(BaseModel):
"""Payload for ``DELETE /api/bans`` (unban an IP)."""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="IP address to unban.")
jail: str | None = Field(
default=None,
description="Jail to remove the ban from. ``null`` means all jails.",
)
unban_all: bool = Field(
default=False,
description="When ``true`` the IP is unbanned from every jail.",
)
#: Discriminator literal for the origin of a ban.
BanOrigin = Literal["blocklist", "selfblock"]
#: Jail name used by the blocklist import service.
BLOCKLIST_JAIL: str = "blocklist-import"
def _derive_origin(jail: str) -> BanOrigin:
"""Derive the ban origin from the jail name.
Args:
jail: The jail that issued the ban.
Returns:
``"blocklist"`` when the jail is the dedicated blocklist-import
jail, ``"selfblock"`` otherwise.
"""
return "blocklist" if jail == BLOCKLIST_JAIL else "selfblock"
class Ban(BaseModel):
"""Domain model representing a single active or historical ban record."""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="Banned IP address.")
jail: str = Field(..., description="Jail that issued the ban.")
banned_at: str = Field(..., description="ISO 8601 UTC timestamp of the ban.")
expires_at: str | None = Field(
default=None,
description="ISO 8601 UTC expiry timestamp, or ``null`` if permanent.",
)
ban_count: int = Field(..., ge=1, description="Number of times this IP was banned.")
country: str | None = Field(
default=None,
description="ISO 3166-1 alpha-2 country code resolved from the IP.",
)
origin: BanOrigin = Field(
...,
description="Whether this ban came from a blocklist import or fail2ban itself.",
)
class BanResponse(BaseModel):
"""Response containing a single ban record."""
model_config = ConfigDict(strict=True)
ban: Ban
class BanListResponse(BaseModel):
"""Paginated list of ban records."""
model_config = ConfigDict(strict=True)
bans: list[Ban] = Field(default_factory=list)
total: int = Field(..., ge=0, description="Total number of matching records.")
class ActiveBan(BaseModel):
"""A currently active ban entry returned by ``GET /api/bans/active``."""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="Banned IP address.")
jail: str = Field(..., description="Jail holding the ban.")
banned_at: str | None = Field(default=None, description="ISO 8601 UTC start of the ban.")
expires_at: str | None = Field(
default=None,
description="ISO 8601 UTC expiry, or ``null`` if permanent.",
)
ban_count: int = Field(default=1, ge=1, description="Running ban count for this IP.")
country: str | None = Field(default=None, description="ISO 3166-1 alpha-2 country code.")
class ActiveBanListResponse(BaseModel):
"""List of all currently active bans across all jails."""
model_config = ConfigDict(strict=True)
bans: list[ActiveBan] = Field(default_factory=list)
total: int = Field(..., ge=0)
class UnbanAllResponse(BaseModel):
"""Response for ``DELETE /api/bans/all``."""
model_config = ConfigDict(strict=True)
message: str = Field(..., description="Human-readable summary of the operation.")
count: int = Field(..., ge=0, description="Number of IPs that were unbanned.")
# ---------------------------------------------------------------------------
# Dashboard ban-list view models
# ---------------------------------------------------------------------------
class DashboardBanItem(BaseModel):
"""A single row in the dashboard ban-list table.
Populated from the fail2ban database and enriched with geo data.
"""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="Banned IP address.")
jail: str = Field(..., description="Jail that issued the ban.")
banned_at: str = Field(..., description="ISO 8601 UTC timestamp of the ban.")
service: str | None = Field(
default=None,
description="First matched log line — used as context for the ban.",
)
country_code: str | None = Field(
default=None,
description="ISO 3166-1 alpha-2 country code, or ``null`` if unknown.",
)
country_name: str | None = Field(
default=None,
description="Human-readable country name, or ``null`` if unknown.",
)
asn: str | None = Field(
default=None,
description="Autonomous System Number string (e.g. ``'AS3320'``).",
)
org: str | None = Field(
default=None,
description="Organisation name associated with the IP.",
)
ban_count: int = Field(..., ge=1, description="How many times this IP was banned.")
origin: BanOrigin = Field(
...,
description="Whether this ban came from a blocklist import or fail2ban itself.",
)
class DashboardBanListResponse(BaseModel):
"""Paginated dashboard ban-list response."""
model_config = ConfigDict(strict=True)
items: list[DashboardBanItem] = Field(default_factory=list)
total: int = Field(..., ge=0, description="Total bans in the selected time window.")
page: int = Field(..., ge=1)
page_size: int = Field(..., ge=1)
class BansByCountryResponse(BaseModel):
"""Response for the bans-by-country aggregation endpoint.
Contains a per-country ban count, a human-readable country name map, and
the full (un-paginated) ban list for the selected time window so the
frontend can render both the world map and its companion table from a
single request.
"""
model_config = ConfigDict(strict=True)
countries: dict[str, int] = Field(
default_factory=dict,
description="ISO 3166-1 alpha-2 country code → ban count.",
)
country_names: dict[str, str] = Field(
default_factory=dict,
description="ISO 3166-1 alpha-2 country code → human-readable country name.",
)
bans: list[DashboardBanItem] = Field(
default_factory=list,
description="All bans in the selected time window (up to the server limit).",
)
total: int = Field(..., ge=0, description="Total ban count in the window.")
# ---------------------------------------------------------------------------
# Trend endpoint models
# ---------------------------------------------------------------------------
#: Bucket size in seconds for each time-range preset.
BUCKET_SECONDS: dict[str, int] = {
"24h": 3_600, # 1 hour → 24 buckets
"7d": 6 * 3_600, # 6 hours → 28 buckets
"30d": 86_400, # 1 day → 30 buckets
"365d": 7 * 86_400, # 7 days → ~53 buckets
}
#: Human-readable bucket size label for each time-range preset.
BUCKET_SIZE_LABEL: dict[str, str] = {
"24h": "1h",
"7d": "6h",
"30d": "1d",
"365d": "7d",
}
def bucket_count(range_: TimeRange) -> int:
"""Return the number of buckets needed to cover *range_* completely.
Args:
range_: One of the supported time-range presets.
Returns:
Ceiling division of the range duration by the bucket size so that
the last bucket is included even when the window is not an exact
multiple of the bucket size.
"""
return math.ceil(TIME_RANGE_SECONDS[range_] / BUCKET_SECONDS[range_])
class BanTrendBucket(BaseModel):
"""A single time bucket in the ban trend series."""
model_config = ConfigDict(strict=True)
timestamp: str = Field(..., description="ISO 8601 UTC start of the bucket.")
count: int = Field(..., ge=0, description="Number of bans that started in this bucket.")
class BanTrendResponse(BaseModel):
"""Response for the ``GET /api/dashboard/bans/trend`` endpoint."""
model_config = ConfigDict(strict=True)
buckets: list[BanTrendBucket] = Field(
default_factory=list,
description="Time-ordered list of ban-count buckets covering the full window.",
)
bucket_size: str = Field(
...,
description="Human-readable bucket size label (e.g. '1h', '6h', '1d', '7d').",
)
# ---------------------------------------------------------------------------
# By-jail endpoint models
# ---------------------------------------------------------------------------
class JailBanCount(BaseModel):
"""A single jail entry in the bans-by-jail aggregation."""
model_config = ConfigDict(strict=True)
jail: str = Field(..., description="Jail name.")
count: int = Field(..., ge=0, description="Number of bans recorded in this jail.")
class BansByJailResponse(BaseModel):
"""Response for the ``GET /api/dashboard/bans/by-jail`` endpoint."""
model_config = ConfigDict(strict=True)
jails: list[JailBanCount] = Field(
default_factory=list,
description="Jails ordered by ban count descending.",
)
total: int = Field(..., ge=0, description="Total ban count in the selected window.")
# ---------------------------------------------------------------------------
# Jail-specific paginated bans
# ---------------------------------------------------------------------------
class JailBannedIpsResponse(BaseModel):
"""Paginated response for ``GET /api/jails/{name}/banned``.
Contains only the current page of active ban entries for a single jail,
geo-enriched exclusively for the page slice to avoid rate-limit issues.
"""
model_config = ConfigDict(strict=True)
items: list[ActiveBan] = Field(
default_factory=list,
description="Active ban entries for the current page.",
)
total: int = Field(
...,
ge=0,
description="Total matching entries (after applying the search filter).",
)
page: int = Field(..., ge=1, description="Current page number (1-based).")
page_size: int = Field(..., ge=1, description="Number of items per page.")