- Task 1: Mark imported blocklist IP addresses
- Add BanOrigin type and _derive_origin() to ban.py model
- Populate origin field in ban_service list_bans() and bans_by_country()
- BanTable and MapPage companion table show origin badge column
- Tests: origin derivation in test_ban_service.py and test_dashboard.py
- Task 2: Add origin filter to dashboard and world map
- ban_service: _origin_sql_filter() helper; origin param on list_bans()
and bans_by_country()
- dashboard router: optional origin query param forwarded to service
- Frontend: BanOriginFilter type + BAN_ORIGIN_FILTER_LABELS in ban.ts
- fetchBans / fetchBansByCountry forward origin to API
- useBans / useMapData accept and pass origin; page resets on change
- BanTable accepts origin prop; DashboardPage adds segmented filter
- MapPage adds origin Select next to time-range picker
- Tests: origin filter assertions in test_ban_service and test_dashboard
214 lines
6.8 KiB
Python
214 lines
6.8 KiB
Python
"""Ban management Pydantic models.
|
|
|
|
Request, response, and domain models used by the ban router and service.
|
|
"""
|
|
|
|
from typing import Literal
|
|
|
|
from pydantic import BaseModel, ConfigDict, Field
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Time-range selector
|
|
# ---------------------------------------------------------------------------
|
|
|
|
#: The four supported time-range presets for the dashboard views.
|
|
TimeRange = Literal["24h", "7d", "30d", "365d"]
|
|
|
|
#: Number of seconds represented by each preset.
|
|
TIME_RANGE_SECONDS: dict[str, int] = {
|
|
"24h": 24 * 3600,
|
|
"7d": 7 * 24 * 3600,
|
|
"30d": 30 * 24 * 3600,
|
|
"365d": 365 * 24 * 3600,
|
|
}
|
|
|
|
|
|
class BanRequest(BaseModel):
|
|
"""Payload for ``POST /api/bans`` (ban an IP)."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
ip: str = Field(..., description="IP address to ban.")
|
|
jail: str = Field(..., description="Jail in which to apply the ban.")
|
|
|
|
|
|
class UnbanRequest(BaseModel):
|
|
"""Payload for ``DELETE /api/bans`` (unban an IP)."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
ip: str = Field(..., description="IP address to unban.")
|
|
jail: str | None = Field(
|
|
default=None,
|
|
description="Jail to remove the ban from. ``null`` means all jails.",
|
|
)
|
|
unban_all: bool = Field(
|
|
default=False,
|
|
description="When ``true`` the IP is unbanned from every jail.",
|
|
)
|
|
|
|
|
|
#: Discriminator literal for the origin of a ban.
|
|
BanOrigin = Literal["blocklist", "selfblock"]
|
|
|
|
#: Jail name used by the blocklist import service.
|
|
BLOCKLIST_JAIL: str = "blocklist-import"
|
|
|
|
|
|
def _derive_origin(jail: str) -> BanOrigin:
|
|
"""Derive the ban origin from the jail name.
|
|
|
|
Args:
|
|
jail: The jail that issued the ban.
|
|
|
|
Returns:
|
|
``"blocklist"`` when the jail is the dedicated blocklist-import
|
|
jail, ``"selfblock"`` otherwise.
|
|
"""
|
|
return "blocklist" if jail == BLOCKLIST_JAIL else "selfblock"
|
|
|
|
|
|
class Ban(BaseModel):
|
|
"""Domain model representing a single active or historical ban record."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
ip: str = Field(..., description="Banned IP address.")
|
|
jail: str = Field(..., description="Jail that issued the ban.")
|
|
banned_at: str = Field(..., description="ISO 8601 UTC timestamp of the ban.")
|
|
expires_at: str | None = Field(
|
|
default=None,
|
|
description="ISO 8601 UTC expiry timestamp, or ``null`` if permanent.",
|
|
)
|
|
ban_count: int = Field(..., ge=1, description="Number of times this IP was banned.")
|
|
country: str | None = Field(
|
|
default=None,
|
|
description="ISO 3166-1 alpha-2 country code resolved from the IP.",
|
|
)
|
|
origin: BanOrigin = Field(
|
|
...,
|
|
description="Whether this ban came from a blocklist import or fail2ban itself.",
|
|
)
|
|
|
|
|
|
class BanResponse(BaseModel):
|
|
"""Response containing a single ban record."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
ban: Ban
|
|
|
|
|
|
class BanListResponse(BaseModel):
|
|
"""Paginated list of ban records."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
bans: list[Ban] = Field(default_factory=list)
|
|
total: int = Field(..., ge=0, description="Total number of matching records.")
|
|
|
|
|
|
class ActiveBan(BaseModel):
|
|
"""A currently active ban entry returned by ``GET /api/bans/active``."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
ip: str = Field(..., description="Banned IP address.")
|
|
jail: str = Field(..., description="Jail holding the ban.")
|
|
banned_at: str | None = Field(default=None, description="ISO 8601 UTC start of the ban.")
|
|
expires_at: str | None = Field(
|
|
default=None,
|
|
description="ISO 8601 UTC expiry, or ``null`` if permanent.",
|
|
)
|
|
ban_count: int = Field(default=1, ge=1, description="Running ban count for this IP.")
|
|
country: str | None = Field(default=None, description="ISO 3166-1 alpha-2 country code.")
|
|
|
|
|
|
class ActiveBanListResponse(BaseModel):
|
|
"""List of all currently active bans across all jails."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
bans: list[ActiveBan] = Field(default_factory=list)
|
|
total: int = Field(..., ge=0)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dashboard ban-list view models
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class DashboardBanItem(BaseModel):
|
|
"""A single row in the dashboard ban-list table.
|
|
|
|
Populated from the fail2ban database and enriched with geo data.
|
|
"""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
ip: str = Field(..., description="Banned IP address.")
|
|
jail: str = Field(..., description="Jail that issued the ban.")
|
|
banned_at: str = Field(..., description="ISO 8601 UTC timestamp of the ban.")
|
|
service: str | None = Field(
|
|
default=None,
|
|
description="First matched log line — used as context for the ban.",
|
|
)
|
|
country_code: str | None = Field(
|
|
default=None,
|
|
description="ISO 3166-1 alpha-2 country code, or ``null`` if unknown.",
|
|
)
|
|
country_name: str | None = Field(
|
|
default=None,
|
|
description="Human-readable country name, or ``null`` if unknown.",
|
|
)
|
|
asn: str | None = Field(
|
|
default=None,
|
|
description="Autonomous System Number string (e.g. ``'AS3320'``).",
|
|
)
|
|
org: str | None = Field(
|
|
default=None,
|
|
description="Organisation name associated with the IP.",
|
|
)
|
|
ban_count: int = Field(..., ge=1, description="How many times this IP was banned.")
|
|
origin: BanOrigin = Field(
|
|
...,
|
|
description="Whether this ban came from a blocklist import or fail2ban itself.",
|
|
)
|
|
|
|
|
|
class DashboardBanListResponse(BaseModel):
|
|
"""Paginated dashboard ban-list response."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
items: list[DashboardBanItem] = Field(default_factory=list)
|
|
total: int = Field(..., ge=0, description="Total bans in the selected time window.")
|
|
page: int = Field(..., ge=1)
|
|
page_size: int = Field(..., ge=1)
|
|
|
|
|
|
class BansByCountryResponse(BaseModel):
|
|
"""Response for the bans-by-country aggregation endpoint.
|
|
|
|
Contains a per-country ban count, a human-readable country name map, and
|
|
the full (un-paginated) ban list for the selected time window so the
|
|
frontend can render both the world map and its companion table from a
|
|
single request.
|
|
"""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
countries: dict[str, int] = Field(
|
|
default_factory=dict,
|
|
description="ISO 3166-1 alpha-2 country code → ban count.",
|
|
)
|
|
country_names: dict[str, str] = Field(
|
|
default_factory=dict,
|
|
description="ISO 3166-1 alpha-2 country code → human-readable country name.",
|
|
)
|
|
bans: list[DashboardBanItem] = Field(
|
|
default_factory=list,
|
|
description="All bans in the selected time window (up to the server limit).",
|
|
)
|
|
total: int = Field(..., ge=0, description="Total ban count in the window.")
|