- Reorganized dashboard router with improved structure - Enhanced ban_service with better separation of concerns - Updated history service with cleaner logic - Improved constants and configuration handling - Updated documentation of completed tasks Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1002 lines
34 KiB
Python
1002 lines
34 KiB
Python
"""Ban service.
|
||
|
||
Queries the fail2ban SQLite database for ban history. The fail2ban database
|
||
path is obtained at runtime by sending ``get dbfile`` to the fail2ban daemon
|
||
via the Unix domain socket.
|
||
|
||
All database I/O is performed through aiosqlite opened in **read-only** mode
|
||
so BanGUI never modifies or locks the fail2ban database.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import contextlib
|
||
import ipaddress
|
||
import time
|
||
from typing import TYPE_CHECKING, cast
|
||
|
||
import structlog
|
||
|
||
from app.exceptions import JailNotFoundError, JailOperationError
|
||
from app.models.ban import (
|
||
BLOCKLIST_JAIL,
|
||
BUCKET_SECONDS,
|
||
BUCKET_SIZE_LABEL,
|
||
TIME_RANGE_SECONDS,
|
||
ActiveBan,
|
||
ActiveBanListResponse,
|
||
BanOrigin,
|
||
BansByCountryResponse,
|
||
BansByJailResponse,
|
||
BanTrendBucket,
|
||
BanTrendResponse,
|
||
DashboardBanItem,
|
||
DashboardBanListResponse,
|
||
TimeRange,
|
||
_derive_origin,
|
||
bucket_count,
|
||
)
|
||
from app.models.ban import (
|
||
JailBanCount as JailBanCountModel,
|
||
)
|
||
from app.repositories import fail2ban_db_repo
|
||
from app.repositories import history_archive_repo as default_history_archive_repo
|
||
from app.services.fail2ban_metadata_service import default_fail2ban_metadata_service
|
||
from app.utils.constants import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
|
||
from app.utils.fail2ban_client import (
|
||
Fail2BanClient,
|
||
)
|
||
from app.utils.fail2ban_db_utils import parse_data_json, ts_to_iso
|
||
from app.utils.fail2ban_response import (
|
||
is_not_found_error,
|
||
ok,
|
||
to_dict,
|
||
)
|
||
|
||
if TYPE_CHECKING:
|
||
import aiohttp
|
||
import aiosqlite
|
||
|
||
from app.models.geo import GeoBatchLookup, GeoCacheLookup, GeoEnricher, GeoInfo
|
||
from app.repositories.protocols import HistoryArchiveRepository
|
||
|
||
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
||
|
||
|
||
async def get_fail2ban_db_path(socket_path: str) -> str:
|
||
"""Return the fail2ban database path using the shared metadata cache."""
|
||
return await default_fail2ban_metadata_service.get_db_path(socket_path)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Constants
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_SOCKET_TIMEOUT: float = 5.0
|
||
|
||
|
||
|
||
|
||
async def ban_ip(socket_path: str, jail: str, ip: str) -> None:
|
||
"""Ban an IP address in the specified jail."""
|
||
try:
|
||
ipaddress.ip_address(ip)
|
||
except ValueError as exc:
|
||
raise ValueError(f"Invalid IP address: {ip!r}") from exc
|
||
|
||
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
||
|
||
try:
|
||
ok(await client.send(["set", jail, "banip", ip]))
|
||
except ValueError as exc:
|
||
if is_not_found_error(exc):
|
||
raise JailNotFoundError(jail) from exc
|
||
raise JailOperationError(str(exc)) from exc
|
||
|
||
|
||
async def unban_ip(socket_path: str, ip: str, jail: str | None = None) -> None:
|
||
"""Unban an IP address from a specific jail or all jails."""
|
||
try:
|
||
ipaddress.ip_address(ip)
|
||
except ValueError as exc:
|
||
raise ValueError(f"Invalid IP address: {ip!r}") from exc
|
||
|
||
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
||
|
||
if jail is None:
|
||
ok(await client.send(["unban", ip]))
|
||
return
|
||
|
||
try:
|
||
ok(await client.send(["set", jail, "unbanip", ip]))
|
||
except ValueError as exc:
|
||
if is_not_found_error(exc):
|
||
raise JailNotFoundError(jail) from exc
|
||
raise JailOperationError(str(exc)) from exc
|
||
|
||
|
||
def _origin_sql_filter(origin: BanOrigin | None) -> tuple[str, tuple[str, ...]]:
|
||
"""Return a SQL fragment and its parameters for the origin filter.
|
||
|
||
Args:
|
||
origin: ``"blocklist"`` to restrict to the blocklist-import jail,
|
||
``"selfblock"`` to exclude it, or ``None`` for no restriction.
|
||
|
||
Returns:
|
||
A ``(sql_fragment, params)`` pair — the fragment starts with ``" AND"``
|
||
so it can be appended directly to an existing WHERE clause.
|
||
"""
|
||
if origin == "blocklist":
|
||
return " AND jail = ?", (BLOCKLIST_JAIL,)
|
||
if origin == "selfblock":
|
||
return " AND jail != ?", (BLOCKLIST_JAIL,)
|
||
return "", ()
|
||
|
||
|
||
def _parse_ban_entry(entry: str, jail: str) -> ActiveBan | None:
|
||
"""Parse a ban entry from ``get <jail> banip --with-time`` output."""
|
||
from datetime import UTC, datetime
|
||
|
||
try:
|
||
parts = entry.split("\t", 1)
|
||
ip = parts[0].strip()
|
||
|
||
ipaddress.ip_address(ip)
|
||
|
||
if len(parts) < 2:
|
||
return ActiveBan(
|
||
ip=ip,
|
||
jail=jail,
|
||
banned_at=None,
|
||
expires_at=None,
|
||
ban_count=1,
|
||
country=None,
|
||
)
|
||
|
||
time_part = parts[1].strip()
|
||
plus_idx = time_part.find(" + ")
|
||
if plus_idx == -1:
|
||
banned_at_str = time_part.strip()
|
||
expires_at_str: str | None = None
|
||
else:
|
||
banned_at_str = time_part[:plus_idx].strip()
|
||
remainder = time_part[plus_idx + 3 :]
|
||
eq_idx = remainder.find(" = ")
|
||
expires_at_str = remainder[eq_idx + 3 :].strip() if eq_idx != -1 else None
|
||
|
||
_date_fmt = "%Y-%m-%d %H:%M:%S"
|
||
|
||
def _to_iso(ts: str) -> str:
|
||
dt = datetime.strptime(ts, _date_fmt).replace(tzinfo=UTC)
|
||
return dt.isoformat()
|
||
|
||
banned_at_iso: str | None = None
|
||
expires_at_iso: str | None = None
|
||
|
||
with contextlib.suppress(ValueError):
|
||
banned_at_iso = _to_iso(banned_at_str)
|
||
|
||
with contextlib.suppress(ValueError):
|
||
if expires_at_str:
|
||
expires_at_iso = _to_iso(expires_at_str)
|
||
|
||
return ActiveBan(
|
||
ip=ip,
|
||
jail=jail,
|
||
banned_at=banned_at_iso,
|
||
expires_at=expires_at_iso,
|
||
ban_count=1,
|
||
country=None,
|
||
)
|
||
except (ValueError, IndexError, AttributeError) as exc:
|
||
log.debug("ban_entry_parse_error", entry=entry, jail=jail, error=str(exc))
|
||
return None
|
||
|
||
|
||
async def _enrich_bans(
|
||
bans: list[ActiveBan],
|
||
geo_enricher: GeoEnricher,
|
||
) -> list[ActiveBan]:
|
||
"""Enrich ban records with geo data asynchronously."""
|
||
geo_results: list[object | Exception] = await asyncio.gather(
|
||
*[cast("Awaitable[object]", geo_enricher(ban.ip)) for ban in bans],
|
||
return_exceptions=True,
|
||
)
|
||
enriched: list[ActiveBan] = []
|
||
for ban, geo in zip(bans, geo_results, strict=False):
|
||
if geo is not None and not isinstance(geo, Exception):
|
||
geo_info = cast("GeoInfo", geo)
|
||
enriched.append(ban.model_copy(update={"country": geo_info.country_code}))
|
||
else:
|
||
enriched.append(ban)
|
||
return enriched
|
||
|
||
|
||
async def get_active_bans(
|
||
socket_path: str,
|
||
geo_batch_lookup: GeoBatchLookup | None = None,
|
||
geo_enricher: GeoEnricher | None = None,
|
||
http_session: aiohttp.ClientSession | None = None,
|
||
app_db: aiosqlite.Connection | None = None,
|
||
) -> ActiveBanListResponse:
|
||
"""Return all currently banned IPs across every jail.
|
||
|
||
For each jail the ``get <jail> banip --with-time`` command is used
|
||
to retrieve ban start and expiry times alongside the IP address.
|
||
|
||
Geo enrichment strategy (highest priority first):
|
||
|
||
1. When *http_session* is provided the entire set of banned IPs is resolved
|
||
in a single :func:`~app.services.geo_service.lookup_batch` call (up to
|
||
100 IPs per HTTP request). This is far more efficient than concurrent
|
||
per-IP lookups and stays within ip-api.com rate limits.
|
||
2. When only *geo_enricher* is provided (legacy / test path) each IP is
|
||
resolved individually via the supplied async callable.
|
||
|
||
Args:
|
||
socket_path: Path to the fail2ban Unix domain socket.
|
||
geo_enricher: Optional async callable ``(ip) -> GeoInfo | None``.
|
||
Used to enrich each ban entry with country and ASN data.
|
||
Ignored when *http_session* is provided.
|
||
http_session: Optional shared :class:`aiohttp.ClientSession`. When
|
||
provided, :func:`~app.services.geo_service.lookup_batch` is used
|
||
for efficient bulk geo resolution.
|
||
app_db: Optional BanGUI application database connection used to
|
||
persist newly resolved geo entries across restarts. Only
|
||
meaningful when *http_session* is provided.
|
||
|
||
Returns:
|
||
:class:`~app.models.ban.ActiveBanListResponse` with all active bans.
|
||
|
||
Raises:
|
||
~app.utils.fail2ban_client.Fail2BanConnectionError: If the socket
|
||
cannot be reached.
|
||
"""
|
||
|
||
client = Fail2BanClient(socket_path=socket_path, timeout=_SOCKET_TIMEOUT)
|
||
|
||
global_status = to_dict(ok(await client.send(["status"])))
|
||
jail_list_raw: str = str(global_status.get("Jail list", "") or "").strip()
|
||
jail_names: list[str] = (
|
||
[j.strip() for j in jail_list_raw.split(",") if j.strip()]
|
||
if jail_list_raw
|
||
else []
|
||
)
|
||
|
||
if not jail_names:
|
||
return ActiveBanListResponse(bans=[], total=0)
|
||
|
||
results: list[object | Exception] = await asyncio.gather(
|
||
*[client.send(["get", jn, "banip", "--with-time"]) for jn in jail_names],
|
||
return_exceptions=True,
|
||
)
|
||
|
||
bans: list[ActiveBan] = []
|
||
for jail_name, raw_result in zip(jail_names, results, strict=False):
|
||
if isinstance(raw_result, Exception):
|
||
log.warning(
|
||
"active_bans_fetch_error",
|
||
jail=jail_name,
|
||
error=str(raw_result),
|
||
)
|
||
continue
|
||
|
||
try:
|
||
ban_list: list[str] = cast("list[str]", ok(raw_result)) or []
|
||
except (TypeError, ValueError) as exc:
|
||
log.warning(
|
||
"active_bans_parse_error",
|
||
jail=jail_name,
|
||
error=str(exc),
|
||
)
|
||
continue
|
||
|
||
for entry in ban_list:
|
||
ban = _parse_ban_entry(str(entry), jail_name)
|
||
if ban is not None:
|
||
bans.append(ban)
|
||
|
||
if http_session is not None and bans and geo_batch_lookup is not None:
|
||
all_ips: list[str] = [ban.ip for ban in bans]
|
||
try:
|
||
geo_map = await geo_batch_lookup(all_ips, http_session, db=app_db)
|
||
except Exception: # noqa: BLE001
|
||
log.warning("active_bans_batch_geo_failed")
|
||
geo_map = {}
|
||
enriched: list[ActiveBan] = []
|
||
for ban in bans:
|
||
geo = geo_map.get(ban.ip)
|
||
if geo is not None:
|
||
enriched.append(ban.model_copy(update={"country": geo.country_code}))
|
||
else:
|
||
enriched.append(ban)
|
||
bans = enriched
|
||
elif geo_enricher is not None:
|
||
bans = await _enrich_bans(bans, geo_enricher)
|
||
|
||
log.info("active_bans_fetched", total=len(bans))
|
||
return ActiveBanListResponse(bans=bans, total=len(bans))
|
||
|
||
|
||
_TIME_RANGE_SLACK_SECONDS: int = 60
|
||
|
||
|
||
def _since_unix(range_: TimeRange) -> int:
|
||
"""Return the Unix timestamp representing the start of the time window.
|
||
|
||
Uses :func:`time.time` (always UTC epoch seconds on all platforms) to be
|
||
consistent with how fail2ban stores ``timeofban`` values in its SQLite
|
||
database. fail2ban records ``time.time()`` values directly, so
|
||
comparing against a timezone-aware ``datetime.now(UTC).timestamp()`` would
|
||
theoretically produce the same number but using :func:`time.time` avoids
|
||
any tz-aware datetime pitfalls on misconfigured systems.
|
||
|
||
Args:
|
||
range_: One of the supported time-range presets.
|
||
|
||
Returns:
|
||
Unix timestamp (seconds since epoch) equal to *now − range_* with a
|
||
small slack window for clock drift and test seeding delays.
|
||
"""
|
||
seconds: int = TIME_RANGE_SECONDS[range_]
|
||
return int(time.time()) - seconds - _TIME_RANGE_SLACK_SECONDS
|
||
|
||
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Public API
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
async def list_bans(
|
||
socket_path: str,
|
||
range_: TimeRange,
|
||
*,
|
||
source: str = "fail2ban",
|
||
page: int = 1,
|
||
page_size: int = DEFAULT_PAGE_SIZE,
|
||
http_session: aiohttp.ClientSession | None = None,
|
||
app_db: aiosqlite.Connection | None = None,
|
||
geo_batch_lookup: GeoBatchLookup | None = None,
|
||
geo_enricher: GeoEnricher | None = None,
|
||
history_archive_repo: HistoryArchiveRepository = default_history_archive_repo,
|
||
origin: BanOrigin | None = None,
|
||
) -> DashboardBanListResponse:
|
||
"""Return a paginated list of bans within the selected time window.
|
||
|
||
Queries the fail2ban database ``bans`` table for records whose
|
||
``timeofban`` falls within the specified *range_*. Results are ordered
|
||
newest-first.
|
||
|
||
Geo enrichment strategy (highest priority first):
|
||
|
||
1. When *http_session* is provided the entire page of IPs is resolved in
|
||
one :func:`~app.services.geo_service.lookup_batch` call (up to 100 IPs
|
||
per HTTP request). This avoids the 45 req/min rate limit of the
|
||
single-IP endpoint and is the preferred production path.
|
||
2. When only *geo_enricher* is provided (legacy / test path) each IP is
|
||
resolved individually via the supplied async callable.
|
||
|
||
Args:
|
||
socket_path: Path to the fail2ban Unix domain socket.
|
||
range_: Time-range preset (``"24h"``, ``"7d"``, ``"30d"``, or
|
||
``"365d"``).
|
||
page: 1-based page number (default: ``1``).
|
||
page_size: Maximum items per page, capped at ``MAX_PAGE_SIZE``
|
||
(default: ``100``).
|
||
http_session: Optional shared :class:`aiohttp.ClientSession`. When
|
||
provided, :func:`~app.services.geo_service.lookup_batch` is used
|
||
for efficient bulk geo resolution.
|
||
app_db: Optional BanGUI application database used to persist newly
|
||
resolved geo entries and to read back cached results.
|
||
geo_enricher: Optional async callable ``(ip: str) -> GeoInfo | None``.
|
||
Used as a fallback when *http_session* is ``None`` (e.g. tests).
|
||
origin: Optional origin filter — ``"blocklist"`` restricts results to
|
||
the ``blocklist-import`` jail, ``"selfblock"`` excludes it.
|
||
|
||
Returns:
|
||
:class:`~app.models.ban.DashboardBanListResponse` containing the
|
||
paginated items and total count.
|
||
"""
|
||
|
||
since: int = _since_unix(range_)
|
||
effective_page_size: int = min(page_size, MAX_PAGE_SIZE)
|
||
offset: int = (page - 1) * effective_page_size
|
||
|
||
if source not in ("fail2ban", "archive"):
|
||
raise ValueError(f"Unsupported source: {source!r}")
|
||
|
||
if source == "archive":
|
||
if app_db is None:
|
||
raise ValueError("app_db must be provided when source is 'archive'")
|
||
|
||
rows, total = await history_archive_repo.get_archived_history(
|
||
db=app_db,
|
||
since=since,
|
||
origin=origin,
|
||
action="ban",
|
||
page=page,
|
||
page_size=effective_page_size,
|
||
)
|
||
else:
|
||
db_path: str = await get_fail2ban_db_path(socket_path)
|
||
log.info(
|
||
"ban_service_list_bans",
|
||
db_path=db_path,
|
||
since=since,
|
||
range=range_,
|
||
origin=origin,
|
||
)
|
||
|
||
rows, total = await fail2ban_db_repo.get_currently_banned(
|
||
db_path=db_path,
|
||
since=since,
|
||
origin=origin,
|
||
limit=effective_page_size,
|
||
offset=offset,
|
||
)
|
||
|
||
# Batch-resolve geo data for all IPs on this page in a single API call.
|
||
# This avoids hitting the 45 req/min single-IP rate limit when the
|
||
# page contains many bans (e.g. after a large blocklist import).
|
||
geo_map: dict[str, GeoInfo] = {}
|
||
if http_session is not None and rows and geo_batch_lookup is not None:
|
||
page_ips: list[str] = [r.ip for r in rows]
|
||
try:
|
||
geo_map = await geo_batch_lookup(page_ips, http_session, db=app_db)
|
||
except Exception: # noqa: BLE001
|
||
log.warning("ban_service_batch_geo_failed_list_bans")
|
||
|
||
items: list[DashboardBanItem] = []
|
||
for row in rows:
|
||
if source == "archive":
|
||
jail = str(row["jail"])
|
||
ip = str(row["ip"])
|
||
banned_at = ts_to_iso(int(row["timeofban"]))
|
||
ban_count = int(row["bancount"])
|
||
matches, _ = parse_data_json(row["data"])
|
||
else:
|
||
jail = row.jail
|
||
ip = row.ip
|
||
banned_at = ts_to_iso(row.timeofban)
|
||
ban_count = row.bancount
|
||
matches, _ = parse_data_json(row.data)
|
||
|
||
service: str | None = matches[0] if matches else None
|
||
|
||
country_code: str | None = None
|
||
country_name: str | None = None
|
||
asn: str | None = None
|
||
org: str | None = None
|
||
|
||
if geo_map:
|
||
geo = geo_map.get(ip)
|
||
if geo is not None:
|
||
country_code = geo.country_code
|
||
country_name = geo.country_name
|
||
asn = geo.asn
|
||
org = geo.org
|
||
elif geo_enricher is not None:
|
||
try:
|
||
geo = await geo_enricher(ip)
|
||
if geo is not None:
|
||
country_code = geo.country_code
|
||
country_name = geo.country_name
|
||
asn = geo.asn
|
||
org = geo.org
|
||
except Exception: # noqa: BLE001
|
||
log.warning("ban_service_geo_lookup_failed", ip=ip)
|
||
|
||
items.append(
|
||
DashboardBanItem(
|
||
ip=ip,
|
||
jail=jail,
|
||
banned_at=banned_at,
|
||
service=service,
|
||
country_code=country_code,
|
||
country_name=country_name,
|
||
asn=asn,
|
||
org=org,
|
||
ban_count=ban_count,
|
||
origin=_derive_origin(jail),
|
||
)
|
||
)
|
||
|
||
return DashboardBanListResponse(
|
||
items=items,
|
||
total=total,
|
||
page=page,
|
||
page_size=effective_page_size,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# bans_by_country
|
||
# ---------------------------------------------------------------------------
|
||
|
||
#: Maximum rows returned in the companion table alongside the map.
|
||
_MAX_COMPANION_BANS: int = 200
|
||
|
||
|
||
async def bans_by_country(
|
||
socket_path: str,
|
||
range_: TimeRange,
|
||
*,
|
||
source: str = "fail2ban",
|
||
http_session: aiohttp.ClientSession | None = None,
|
||
geo_cache_lookup: GeoCacheLookup | None = None,
|
||
geo_batch_lookup: GeoBatchLookup | None = None,
|
||
geo_enricher: GeoEnricher | None = None,
|
||
history_archive_repo: HistoryArchiveRepository = default_history_archive_repo,
|
||
app_db: aiosqlite.Connection | None = None,
|
||
origin: BanOrigin | None = None,
|
||
country_code: str | None = None,
|
||
) -> BansByCountryResponse:
|
||
"""Aggregate ban counts per country for the selected time window.
|
||
|
||
Uses a two-step strategy optimised for large datasets:
|
||
|
||
1. Queries the fail2ban DB with ``GROUP BY ip`` to get the per-IP ban
|
||
counts for all unique IPs in the window — no row-count cap.
|
||
2. Serves geo data from the in-memory cache only (non-blocking).
|
||
Any IPs not yet in the cache are scheduled for background resolution
|
||
via :func:`asyncio.create_task` so the response is returned immediately
|
||
and subsequent requests benefit from the warmed cache.
|
||
3. Returns a ``{country_code: count}`` aggregation and the 200 most
|
||
recent raw rows for the companion table.
|
||
|
||
Note:
|
||
On the very first request a large number of IPs may be uncached and
|
||
the country map will be sparse. The background task will resolve them
|
||
and the next request will return a complete map. This trade-off keeps
|
||
the endpoint fast regardless of dataset size.
|
||
|
||
Args:
|
||
socket_path: Path to the fail2ban Unix domain socket.
|
||
range_: Time-range preset.
|
||
http_session: Optional :class:`aiohttp.ClientSession` for background
|
||
geo lookups. When ``None``, only cached data is used.
|
||
geo_enricher: Legacy async ``(ip) -> GeoInfo | None`` callable;
|
||
used when *http_session* is ``None`` (e.g. tests).
|
||
app_db: Optional BanGUI application database used to persist newly
|
||
resolved geo entries across restarts.
|
||
origin: Optional origin filter — ``"blocklist"`` restricts results to
|
||
the ``blocklist-import`` jail, ``"selfblock"`` excludes it.
|
||
|
||
Returns:
|
||
:class:`~app.models.ban.BansByCountryResponse` with per-country
|
||
aggregation and the companion ban list.
|
||
"""
|
||
|
||
since: int = _since_unix(range_)
|
||
|
||
if source not in ("fail2ban", "archive"):
|
||
raise ValueError(f"Unsupported source: {source!r}")
|
||
|
||
if source == "archive":
|
||
if app_db is None:
|
||
raise ValueError("app_db must be provided when source is 'archive'")
|
||
|
||
all_rows = await history_archive_repo.get_all_archived_history(
|
||
db=app_db,
|
||
since=since,
|
||
origin=origin,
|
||
action="ban",
|
||
)
|
||
|
||
total = len(all_rows)
|
||
|
||
agg_rows = {}
|
||
for row in all_rows:
|
||
ip = str(row["ip"])
|
||
agg_rows[ip] = agg_rows.get(ip, 0) + 1
|
||
|
||
unique_ips = list(agg_rows.keys())
|
||
else:
|
||
origin_clause, origin_params = _origin_sql_filter(origin)
|
||
db_path: str = await get_fail2ban_db_path(socket_path)
|
||
log.info(
|
||
"ban_service_bans_by_country",
|
||
db_path=db_path,
|
||
since=since,
|
||
range=range_,
|
||
origin=origin,
|
||
)
|
||
|
||
# Total count and companion rows reuse the same SQL query logic.
|
||
# Passing limit=0 returns only the total from the count query.
|
||
_, total = await fail2ban_db_repo.get_currently_banned(
|
||
db_path=db_path,
|
||
since=since,
|
||
origin=origin,
|
||
limit=0,
|
||
offset=0,
|
||
)
|
||
|
||
agg_rows = await fail2ban_db_repo.get_ban_event_counts(
|
||
db_path=db_path,
|
||
since=since,
|
||
origin=origin,
|
||
)
|
||
|
||
unique_ips = [r.ip for r in agg_rows]
|
||
geo_map: dict[str, GeoInfo] = {}
|
||
|
||
if http_session is not None and unique_ips and geo_cache_lookup is not None:
|
||
# Serve only what is already in the in-memory cache — no API calls on
|
||
# the hot path. Uncached IPs are resolved asynchronously in the
|
||
# background so subsequent requests benefit from a warmer cache.
|
||
geo_map, uncached = geo_cache_lookup(unique_ips)
|
||
if uncached:
|
||
log.info(
|
||
"ban_service_geo_background_scheduled",
|
||
uncached=len(uncached),
|
||
cached=len(geo_map),
|
||
)
|
||
if geo_batch_lookup is not None:
|
||
# Fire-and-forget: lookup_batch handles rate-limiting / retries.
|
||
# The dirty-set flush task persists results to the DB.
|
||
asyncio.create_task( # noqa: RUF006
|
||
geo_batch_lookup(uncached, http_session, db=app_db),
|
||
name="geo_bans_by_country",
|
||
)
|
||
elif geo_enricher is not None and unique_ips:
|
||
# Fallback: legacy per-IP enricher (used in tests / older callers).
|
||
async def _safe_lookup(ip: str) -> tuple[str, GeoInfo | None]:
|
||
try:
|
||
return ip, await geo_enricher(ip)
|
||
except Exception: # noqa: BLE001
|
||
log.warning("ban_service_geo_lookup_failed", ip=ip)
|
||
return ip, None
|
||
|
||
results = await asyncio.gather(*(_safe_lookup(ip) for ip in unique_ips))
|
||
geo_map = {ip: geo for ip, geo in results if geo is not None}
|
||
|
||
companion_rows: list[dict[str, object] | fail2ban_db_repo.BanRecord]
|
||
if country_code is None:
|
||
if source == "archive":
|
||
companion_rows, _ = await history_archive_repo.get_archived_history(
|
||
db=app_db,
|
||
since=since,
|
||
origin=origin,
|
||
action="ban",
|
||
page=1,
|
||
page_size=_MAX_COMPANION_BANS,
|
||
)
|
||
else:
|
||
companion_rows, _ = await fail2ban_db_repo.get_currently_banned(
|
||
db_path=db_path,
|
||
since=since,
|
||
origin=origin,
|
||
limit=_MAX_COMPANION_BANS,
|
||
offset=0,
|
||
)
|
||
else:
|
||
matched_ips = [
|
||
ip
|
||
for ip, geo in geo_map.items()
|
||
if geo is not None and geo.country_code == country_code
|
||
]
|
||
|
||
if source == "archive":
|
||
if matched_ips:
|
||
companion_rows = await history_archive_repo.get_all_archived_history(
|
||
db=app_db,
|
||
since=since,
|
||
origin=origin,
|
||
action="ban",
|
||
ip_filter=matched_ips,
|
||
)
|
||
else:
|
||
companion_rows = []
|
||
else:
|
||
if matched_ips:
|
||
companion_rows, _ = await fail2ban_db_repo.get_currently_banned(
|
||
db_path=db_path,
|
||
since=since,
|
||
origin=origin,
|
||
ip_filter=matched_ips,
|
||
)
|
||
else:
|
||
companion_rows = []
|
||
|
||
# Build country aggregation from the SQL-grouped rows.
|
||
countries: dict[str, int] = {}
|
||
country_names: dict[str, str] = {}
|
||
|
||
if source == "archive":
|
||
agg_items = [
|
||
{
|
||
"ip": ip,
|
||
"event_count": count,
|
||
}
|
||
for ip, count in agg_rows.items()
|
||
]
|
||
else:
|
||
agg_items = agg_rows
|
||
|
||
for agg_row in agg_items:
|
||
if source == "archive":
|
||
ip = agg_row["ip"]
|
||
event_count = agg_row["event_count"]
|
||
else:
|
||
ip = agg_row.ip
|
||
event_count = agg_row.event_count
|
||
|
||
geo = geo_map.get(ip)
|
||
cc: str | None = geo.country_code if geo else None
|
||
cn: str | None = geo.country_name if geo else None
|
||
|
||
if cc:
|
||
countries[cc] = countries.get(cc, 0) + event_count
|
||
if cn and cc not in country_names:
|
||
country_names[cc] = cn
|
||
|
||
# Build companion table from recent rows (geo already cached from batch step).
|
||
bans: list[ActiveBan] = []
|
||
for companion_row in companion_rows:
|
||
if source == "archive":
|
||
ip = companion_row["ip"]
|
||
jail = companion_row["jail"]
|
||
banned_at = ts_to_iso(int(companion_row["timeofban"]))
|
||
ban_count = int(companion_row["bancount"])
|
||
service = None
|
||
else:
|
||
ip = companion_row.ip
|
||
jail = companion_row.jail
|
||
banned_at = ts_to_iso(companion_row.timeofban)
|
||
ban_count = companion_row.bancount
|
||
matches, _ = parse_data_json(companion_row.data)
|
||
service = matches[0] if matches else None
|
||
|
||
geo = geo_map.get(ip)
|
||
cc = geo.country_code if geo else None
|
||
cn = geo.country_name if geo else None
|
||
asn: str | None = geo.asn if geo else None
|
||
org: str | None = geo.org if geo else None
|
||
|
||
bans.append(
|
||
DashboardBanItem(
|
||
ip=ip,
|
||
jail=jail,
|
||
banned_at=banned_at,
|
||
service=service,
|
||
country_code=cc,
|
||
country_name=cn,
|
||
asn=asn,
|
||
org=org,
|
||
ban_count=ban_count,
|
||
origin=_derive_origin(jail),
|
||
)
|
||
)
|
||
|
||
return BansByCountryResponse(
|
||
countries=countries,
|
||
country_names=country_names,
|
||
bans=bans,
|
||
total=total,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ban_trend
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
async def ban_trend(
|
||
socket_path: str,
|
||
range_: TimeRange,
|
||
*,
|
||
source: str = "fail2ban",
|
||
history_archive_repo: HistoryArchiveRepository = default_history_archive_repo,
|
||
app_db: aiosqlite.Connection | None = None,
|
||
origin: BanOrigin | None = None,
|
||
) -> BanTrendResponse:
|
||
"""Return ban counts aggregated into equal-width time buckets.
|
||
|
||
Queries the fail2ban database ``bans`` table and groups records by a
|
||
computed bucket index so the frontend can render a continuous time-series
|
||
chart. All buckets within the requested window are returned — buckets
|
||
that contain zero bans are included as zero-count entries so the
|
||
frontend always receives a complete, gap-free series.
|
||
|
||
Bucket sizes per time-range preset:
|
||
|
||
* ``24h`` → 1-hour buckets (24 total)
|
||
* ``7d`` → 6-hour buckets (28 total)
|
||
* ``30d`` → 1-day buckets (30 total)
|
||
* ``365d`` → 7-day buckets (~53 total)
|
||
|
||
Args:
|
||
socket_path: Path to the fail2ban Unix domain socket.
|
||
range_: Time-range preset (``"24h"``, ``"7d"``, ``"30d"``, or
|
||
``"365d"``).
|
||
origin: Optional origin filter — ``"blocklist"`` restricts to the
|
||
``blocklist-import`` jail, ``"selfblock"`` excludes it.
|
||
|
||
Returns:
|
||
:class:`~app.models.ban.BanTrendResponse` with a full bucket list
|
||
and the human-readable bucket-size label.
|
||
"""
|
||
since: int = _since_unix(range_)
|
||
bucket_secs: int = BUCKET_SECONDS[range_]
|
||
num_buckets: int = bucket_count(range_)
|
||
|
||
if source not in ("fail2ban", "archive"):
|
||
raise ValueError(f"Unsupported source: {source!r}")
|
||
|
||
if source == "archive":
|
||
if app_db is None:
|
||
raise ValueError("app_db must be provided when source is 'archive'")
|
||
|
||
all_rows = await history_archive_repo.get_all_archived_history(
|
||
db=app_db,
|
||
since=since,
|
||
origin=origin,
|
||
action="ban",
|
||
)
|
||
|
||
counts: list[int] = [0] * num_buckets
|
||
for row in all_rows:
|
||
timeofban = int(row["timeofban"])
|
||
bucket_index = int((timeofban - since) / bucket_secs)
|
||
if 0 <= bucket_index < num_buckets:
|
||
counts[bucket_index] += 1
|
||
|
||
log.info(
|
||
"ban_service_ban_trend",
|
||
source=source,
|
||
since=since,
|
||
range=range_,
|
||
origin=origin,
|
||
bucket_secs=bucket_secs,
|
||
num_buckets=num_buckets,
|
||
)
|
||
else:
|
||
db_path: str = await get_fail2ban_db_path(socket_path)
|
||
log.info(
|
||
"ban_service_ban_trend",
|
||
db_path=db_path,
|
||
since=since,
|
||
range=range_,
|
||
origin=origin,
|
||
bucket_secs=bucket_secs,
|
||
num_buckets=num_buckets,
|
||
)
|
||
|
||
counts = await fail2ban_db_repo.get_ban_counts_by_bucket(
|
||
db_path=db_path,
|
||
since=since,
|
||
bucket_secs=bucket_secs,
|
||
num_buckets=num_buckets,
|
||
origin=origin,
|
||
)
|
||
|
||
buckets: list[BanTrendBucket] = [
|
||
BanTrendBucket(
|
||
timestamp=ts_to_iso(since + i * bucket_secs),
|
||
count=counts[i],
|
||
)
|
||
for i in range(num_buckets)
|
||
]
|
||
|
||
return BanTrendResponse(
|
||
buckets=buckets,
|
||
bucket_size=BUCKET_SIZE_LABEL[range_],
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# bans_by_jail
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
async def bans_by_jail(
|
||
socket_path: str,
|
||
range_: TimeRange,
|
||
*,
|
||
source: str = "fail2ban",
|
||
history_archive_repo: HistoryArchiveRepository = default_history_archive_repo,
|
||
app_db: aiosqlite.Connection | None = None,
|
||
origin: BanOrigin | None = None,
|
||
) -> BansByJailResponse:
|
||
"""Return ban counts aggregated per jail for the selected time window.
|
||
|
||
Queries the fail2ban database ``bans`` table, groups records by jail
|
||
name, and returns them ordered by count descending. The origin filter
|
||
is applied when provided so callers can restrict results to blocklist-
|
||
imported bans or organic fail2ban bans.
|
||
|
||
Args:
|
||
socket_path: Path to the fail2ban Unix domain socket.
|
||
range_: Time-range preset (``"24h"``, ``"7d"``, ``"30d"``, or
|
||
``"365d"``).
|
||
origin: Optional origin filter — ``"blocklist"`` restricts to the
|
||
``blocklist-import`` jail, ``"selfblock"`` excludes it.
|
||
|
||
Returns:
|
||
:class:`~app.models.ban.BansByJailResponse` with per-jail counts
|
||
sorted descending and the total ban count.
|
||
"""
|
||
since: int = _since_unix(range_)
|
||
|
||
if source not in ("fail2ban", "archive"):
|
||
raise ValueError(f"Unsupported source: {source!r}")
|
||
|
||
if source == "archive":
|
||
if app_db is None:
|
||
raise ValueError("app_db must be provided when source is 'archive'")
|
||
|
||
all_rows = await history_archive_repo.get_all_archived_history(
|
||
db=app_db,
|
||
since=since,
|
||
origin=origin,
|
||
action="ban",
|
||
)
|
||
|
||
jail_counter: dict[str, int] = {}
|
||
for row in all_rows:
|
||
jail_name = str(row["jail"])
|
||
jail_counter[jail_name] = jail_counter.get(jail_name, 0) + 1
|
||
|
||
total = sum(jail_counter.values())
|
||
jail_counts = [
|
||
JailBanCountModel(jail=jail_name, count=count)
|
||
for jail_name, count in sorted(jail_counter.items(), key=lambda x: x[1], reverse=True)
|
||
]
|
||
|
||
log.debug(
|
||
"ban_service_bans_by_jail",
|
||
source=source,
|
||
since=since,
|
||
since_iso=ts_to_iso(since),
|
||
range=range_,
|
||
origin=origin,
|
||
)
|
||
else:
|
||
origin_clause, origin_params = _origin_sql_filter(origin)
|
||
|
||
db_path: str = await get_fail2ban_db_path(socket_path)
|
||
log.debug(
|
||
"ban_service_bans_by_jail",
|
||
db_path=db_path,
|
||
since=since,
|
||
since_iso=ts_to_iso(since),
|
||
range=range_,
|
||
origin=origin,
|
||
)
|
||
|
||
total, jail_counts = await fail2ban_db_repo.get_bans_by_jail(
|
||
db_path=db_path,
|
||
since=since,
|
||
origin=origin,
|
||
)
|
||
|
||
# Diagnostic guard: if zero results were returned, check whether the table
|
||
# has *any* rows and log a warning with min/max timeofban so operators can
|
||
# diagnose timezone or filter mismatches from logs.
|
||
if total == 0:
|
||
table_row_count, min_timeofban, max_timeofban = await fail2ban_db_repo.get_bans_table_summary(db_path)
|
||
if table_row_count > 0:
|
||
log.warning(
|
||
"ban_service_bans_by_jail_empty_despite_data",
|
||
table_row_count=table_row_count,
|
||
min_timeofban=min_timeofban,
|
||
max_timeofban=max_timeofban,
|
||
since=since,
|
||
range=range_,
|
||
)
|
||
|
||
log.debug(
|
||
"ban_service_bans_by_jail_result",
|
||
total=total,
|
||
jail_count=len(jail_counts),
|
||
)
|
||
|
||
return BansByJailResponse(
|
||
jails=[JailBanCountModel(jail=j.jail, count=j.count) for j in jail_counts],
|
||
total=total,
|
||
)
|