Stage 9: ban history — backend service, router, frontend history page

- history.py models: HistoryBanItem, HistoryListResponse, IpTimelineEvent, IpDetailResponse
- history_service.py: list_history() with dynamic WHERE clauses (range/jail/ip
  prefix/all-time), get_ip_detail() with timeline aggregation
- history.py router: GET /api/history + GET /api/history/{ip} (404 for unknown)
- Fixed latent bug in ban_service._parse_data_json: json.loads('null') -> None
  -> AttributeError; now checks isinstance(parsed, dict) before assigning obj
- 317 tests pass (27 new), ruff + mypy clean (46 files)
- types/history.ts, api/history.ts, hooks/useHistory.ts created
- HistoryPage.tsx: filter bar (time range/jail/IP), DataGrid table,
  high-ban-count row highlighting, per-IP IpDetailView with timeline,
  pagination
- Frontend tsc + ESLint clean (0 errors/warnings)
- Tasks.md Stage 9 marked done
This commit is contained in:
2026-03-01 15:09:22 +01:00
parent 54313fd3e0
commit b8f3a1c562
12 changed files with 2050 additions and 50 deletions

View File

@@ -0,0 +1,333 @@
"""Tests for the history router (GET /api/history, GET /api/history/{ip})."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import aiosqlite
import pytest
from httpx import ASGITransport, AsyncClient
from app.config import Settings
from app.db import init_db
from app.main import create_app
from app.models.history import (
HistoryBanItem,
HistoryListResponse,
IpDetailResponse,
IpTimelineEvent,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_SETUP_PAYLOAD = {
"master_password": "testpassword1",
"database_path": "bangui.db",
"fail2ban_socket": "/var/run/fail2ban/fail2ban.sock",
"timezone": "UTC",
"session_duration_minutes": 60,
}
def _make_history_item(ip: str = "1.2.3.4", jail: str = "sshd") -> HistoryBanItem:
"""Build a single ``HistoryBanItem`` for use in test responses."""
return HistoryBanItem(
ip=ip,
jail=jail,
banned_at="2026-03-01T10:00:00+00:00",
ban_count=3,
failures=5,
matches=["Mar 1 10:00:00 host sshd[123]: Failed password for root"],
country_code="DE",
country_name="Germany",
asn="AS3320",
org="Telekom",
)
def _make_history_list(n: int = 2) -> HistoryListResponse:
"""Build a mock ``HistoryListResponse`` with *n* items."""
items = [_make_history_item(ip=f"1.2.3.{i}") for i in range(n)]
return HistoryListResponse(items=items, total=n, page=1, page_size=100)
def _make_ip_detail(ip: str = "1.2.3.4") -> IpDetailResponse:
"""Build a mock ``IpDetailResponse`` for *ip*."""
events = [
IpTimelineEvent(
jail="sshd",
banned_at="2026-03-01T10:00:00+00:00",
ban_count=3,
failures=5,
matches=["Mar 1 10:00:00 host sshd[123]: Failed password for root"],
),
IpTimelineEvent(
jail="sshd",
banned_at="2026-02-28T08:00:00+00:00",
ban_count=2,
failures=5,
matches=[],
),
]
return IpDetailResponse(
ip=ip,
total_bans=2,
total_failures=10,
last_ban_at="2026-03-01T10:00:00+00:00",
country_code="DE",
country_name="Germany",
asn="AS3320",
org="Telekom",
timeline=events,
)
# ---------------------------------------------------------------------------
# Fixture
# ---------------------------------------------------------------------------
@pytest.fixture
async def history_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc]
"""Provide an authenticated ``AsyncClient`` for history endpoint tests."""
settings = Settings(
database_path=str(tmp_path / "history_test.db"),
fail2ban_socket="/tmp/fake_fail2ban.sock",
session_secret="test-history-secret",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
app = create_app(settings=settings)
db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path)
db.row_factory = aiosqlite.Row
await init_db(db)
app.state.db = db
app.state.http_session = MagicMock()
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
resp = await ac.post("/api/setup", json=_SETUP_PAYLOAD)
assert resp.status_code == 201
login_resp = await ac.post(
"/api/auth/login",
json={"password": _SETUP_PAYLOAD["master_password"]},
)
assert login_resp.status_code == 200
yield ac
await db.close()
# ---------------------------------------------------------------------------
# GET /api/history
# ---------------------------------------------------------------------------
class TestHistoryList:
"""GET /api/history — paginated history list."""
async def test_returns_200_when_authenticated(
self, history_client: AsyncClient
) -> None:
"""Authenticated request returns HTTP 200."""
with patch(
"app.routers.history.history_service.list_history",
new=AsyncMock(return_value=_make_history_list()),
):
response = await history_client.get("/api/history")
assert response.status_code == 200
async def test_returns_401_when_unauthenticated(
self, client: AsyncClient
) -> None:
"""Unauthenticated request returns HTTP 401."""
await client.post("/api/setup", json=_SETUP_PAYLOAD)
response = await client.get("/api/history")
assert response.status_code == 401
async def test_response_shape(self, history_client: AsyncClient) -> None:
"""Response body contains the expected keys."""
mock_response = _make_history_list(n=1)
with patch(
"app.routers.history.history_service.list_history",
new=AsyncMock(return_value=mock_response),
):
response = await history_client.get("/api/history")
body = response.json()
assert "items" in body
assert "total" in body
assert "page" in body
assert "page_size" in body
assert body["total"] == 1
item = body["items"][0]
assert "ip" in item
assert "jail" in item
assert "banned_at" in item
assert "ban_count" in item
assert "failures" in item
assert "matches" in item
assert item["country_code"] == "DE"
async def test_forwards_jail_filter(self, history_client: AsyncClient) -> None:
"""The ``jail`` query parameter is forwarded to the service."""
mock_fn = AsyncMock(return_value=_make_history_list(n=0))
with patch(
"app.routers.history.history_service.list_history",
new=mock_fn,
):
await history_client.get("/api/history?jail=nginx")
_args, kwargs = mock_fn.call_args
assert kwargs.get("jail") == "nginx"
async def test_forwards_ip_filter(self, history_client: AsyncClient) -> None:
"""The ``ip`` query parameter is forwarded as ``ip_filter`` to the service."""
mock_fn = AsyncMock(return_value=_make_history_list(n=0))
with patch(
"app.routers.history.history_service.list_history",
new=mock_fn,
):
await history_client.get("/api/history?ip=192.168")
_args, kwargs = mock_fn.call_args
assert kwargs.get("ip_filter") == "192.168"
async def test_forwards_time_range(self, history_client: AsyncClient) -> None:
"""The ``range`` query parameter is forwarded as ``range_`` to the service."""
mock_fn = AsyncMock(return_value=_make_history_list(n=0))
with patch(
"app.routers.history.history_service.list_history",
new=mock_fn,
):
await history_client.get("/api/history?range=7d")
_args, kwargs = mock_fn.call_args
assert kwargs.get("range_") == "7d"
async def test_empty_result(self, history_client: AsyncClient) -> None:
"""An empty history returns items=[] and total=0."""
with patch(
"app.routers.history.history_service.list_history",
new=AsyncMock(
return_value=HistoryListResponse(items=[], total=0, page=1, page_size=100)
),
):
response = await history_client.get("/api/history")
body = response.json()
assert body["items"] == []
assert body["total"] == 0
# ---------------------------------------------------------------------------
# GET /api/history/{ip}
# ---------------------------------------------------------------------------
class TestIpHistory:
"""GET /api/history/{ip} — per-IP detail."""
async def test_returns_200_when_authenticated(
self, history_client: AsyncClient
) -> None:
"""Authenticated request returns HTTP 200 for a known IP."""
with patch(
"app.routers.history.history_service.get_ip_detail",
new=AsyncMock(return_value=_make_ip_detail("1.2.3.4")),
):
response = await history_client.get("/api/history/1.2.3.4")
assert response.status_code == 200
async def test_returns_401_when_unauthenticated(
self, client: AsyncClient
) -> None:
"""Unauthenticated request returns HTTP 401."""
await client.post("/api/setup", json=_SETUP_PAYLOAD)
response = await client.get("/api/history/1.2.3.4")
assert response.status_code == 401
async def test_returns_404_for_unknown_ip(
self, history_client: AsyncClient
) -> None:
"""Returns 404 when the IP has no records in the database."""
with patch(
"app.routers.history.history_service.get_ip_detail",
new=AsyncMock(return_value=None),
):
response = await history_client.get("/api/history/9.9.9.9")
assert response.status_code == 404
async def test_response_shape(self, history_client: AsyncClient) -> None:
"""Response body contains the expected keys and nested timeline."""
mock_detail = _make_ip_detail("1.2.3.4")
with patch(
"app.routers.history.history_service.get_ip_detail",
new=AsyncMock(return_value=mock_detail),
):
response = await history_client.get("/api/history/1.2.3.4")
body = response.json()
assert body["ip"] == "1.2.3.4"
assert body["total_bans"] == 2
assert body["total_failures"] == 10
assert body["country_code"] == "DE"
assert "timeline" in body
assert len(body["timeline"]) == 2
event = body["timeline"][0]
assert "jail" in event
assert "banned_at" in event
assert "ban_count" in event
assert "failures" in event
assert "matches" in event
async def test_aggregation_sums_failures(
self, history_client: AsyncClient
) -> None:
"""total_failures reflects the sum across all timeline events."""
mock_detail = _make_ip_detail("10.0.0.1")
mock_detail = IpDetailResponse(
ip="10.0.0.1",
total_bans=3,
total_failures=15,
last_ban_at="2026-03-01T10:00:00+00:00",
country_code=None,
country_name=None,
asn=None,
org=None,
timeline=[
IpTimelineEvent(
jail="sshd",
banned_at="2026-03-01T10:00:00+00:00",
ban_count=3,
failures=7,
matches=[],
),
IpTimelineEvent(
jail="sshd",
banned_at="2026-02-28T08:00:00+00:00",
ban_count=2,
failures=8,
matches=[],
),
],
)
with patch(
"app.routers.history.history_service.get_ip_detail",
new=AsyncMock(return_value=mock_detail),
):
response = await history_client.get("/api/history/10.0.0.1")
assert response.status_code == 200
body = response.json()
assert body["total_failures"] == 15
assert body["total_bans"] == 3

View File

@@ -0,0 +1,341 @@
"""Tests for history_service.list_history() and history_service.get_ip_detail()."""
from __future__ import annotations
import json
import time
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, patch
import aiosqlite
import pytest
from app.services import history_service
# ---------------------------------------------------------------------------
# Time helpers
# ---------------------------------------------------------------------------
_NOW: int = int(time.time())
_ONE_HOUR_AGO: int = _NOW - 3600
_TWO_DAYS_AGO: int = _NOW - 2 * 24 * 3600
_THIRTY_DAYS_AGO: int = _NOW - 30 * 24 * 3600
# ---------------------------------------------------------------------------
# DB fixture helpers
# ---------------------------------------------------------------------------
async def _create_f2b_db(path: str, rows: list[dict[str, Any]]) -> None:
"""Create a minimal fail2ban SQLite database with the given ban rows."""
async with aiosqlite.connect(path) as db:
await db.execute(
"CREATE TABLE jails ("
"name TEXT NOT NULL UNIQUE, "
"enabled INTEGER NOT NULL DEFAULT 1"
")"
)
await db.execute(
"CREATE TABLE bans ("
"jail TEXT NOT NULL, "
"ip TEXT, "
"timeofban INTEGER NOT NULL, "
"bantime INTEGER NOT NULL, "
"bancount INTEGER NOT NULL DEFAULT 1, "
"data JSON"
")"
)
for row in rows:
await db.execute(
"INSERT INTO bans (jail, ip, timeofban, bantime, bancount, data) "
"VALUES (?, ?, ?, ?, ?, ?)",
(
row["jail"],
row["ip"],
row["timeofban"],
row.get("bantime", 3600),
row.get("bancount", 1),
json.dumps(row["data"]) if "data" in row else None,
),
)
await db.commit()
@pytest.fixture
async def f2b_db_path(tmp_path: Path) -> str: # type: ignore[misc]
"""Return the path to a test fail2ban SQLite database."""
path = str(tmp_path / "fail2ban_test.sqlite3")
await _create_f2b_db(
path,
[
{
"jail": "sshd",
"ip": "1.2.3.4",
"timeofban": _ONE_HOUR_AGO,
"bantime": 3600,
"bancount": 3,
"data": {
"matches": ["Mar 1 sshd[1]: Failed password for root"],
"failures": 5,
},
},
{
"jail": "nginx",
"ip": "5.6.7.8",
"timeofban": _ONE_HOUR_AGO,
"bantime": 7200,
"bancount": 1,
"data": {"matches": ["GET /admin HTTP/1.1"], "failures": 3},
},
{
"jail": "sshd",
"ip": "1.2.3.4",
"timeofban": _TWO_DAYS_AGO,
"bantime": 3600,
"bancount": 2,
"data": {"failures": 5},
},
{
"jail": "sshd",
"ip": "9.0.0.1",
"timeofban": _THIRTY_DAYS_AGO,
"bantime": 3600,
"bancount": 1,
"data": None,
},
],
)
return path
# ---------------------------------------------------------------------------
# list_history tests
# ---------------------------------------------------------------------------
class TestListHistory:
"""history_service.list_history()."""
async def test_returns_all_bans_with_no_filter(
self, f2b_db_path: str
) -> None:
"""No filter returns every record in the database."""
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
result = await history_service.list_history("fake_socket")
assert result.total == 4
assert len(result.items) == 4
async def test_time_range_filter_excludes_old_bans(
self, f2b_db_path: str
) -> None:
"""The ``range_`` filter excludes bans older than the window."""
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
# "24h" window should include only the two recent bans
result = await history_service.list_history(
"fake_socket", range_="24h"
)
assert result.total == 2
async def test_jail_filter(self, f2b_db_path: str) -> None:
"""Jail filter restricts results to bans from that jail."""
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
result = await history_service.list_history("fake_socket", jail="nginx")
assert result.total == 1
assert result.items[0].jail == "nginx"
async def test_ip_prefix_filter(self, f2b_db_path: str) -> None:
"""IP prefix filter restricts results to matching IPs."""
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
result = await history_service.list_history(
"fake_socket", ip_filter="1.2.3"
)
assert result.total == 2
for item in result.items:
assert item.ip.startswith("1.2.3")
async def test_combined_filters(self, f2b_db_path: str) -> None:
"""Jail + IP prefix filters applied together narrow the result set."""
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
result = await history_service.list_history(
"fake_socket", jail="sshd", ip_filter="1.2.3.4"
)
# 2 sshd bans for 1.2.3.4
assert result.total == 2
async def test_unknown_ip_returns_empty(self, f2b_db_path: str) -> None:
"""Filtering by a non-existent IP returns an empty result set."""
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
result = await history_service.list_history(
"fake_socket", ip_filter="99.99.99.99"
)
assert result.total == 0
assert result.items == []
async def test_failures_extracted_from_data(
self, f2b_db_path: str
) -> None:
"""``failures`` field is parsed from the JSON ``data`` column."""
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
result = await history_service.list_history(
"fake_socket", ip_filter="5.6.7.8"
)
assert result.total == 1
assert result.items[0].failures == 3
async def test_matches_extracted_from_data(
self, f2b_db_path: str
) -> None:
"""``matches`` list is parsed from the JSON ``data`` column."""
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
result = await history_service.list_history(
"fake_socket", ip_filter="1.2.3.4", range_="24h"
)
# Most recent record for 1.2.3.4 has a matches list
recent = result.items[0]
assert len(recent.matches) == 1
assert "Failed password" in recent.matches[0]
async def test_null_data_column_handled_gracefully(
self, f2b_db_path: str
) -> None:
"""Records with ``data=NULL`` produce failures=0 and matches=[]."""
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
result = await history_service.list_history(
"fake_socket", ip_filter="9.0.0.1"
)
assert result.total == 1
item = result.items[0]
assert item.failures == 0
assert item.matches == []
async def test_pagination(self, f2b_db_path: str) -> None:
"""Pagination returns the correct slice."""
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
result = await history_service.list_history(
"fake_socket", page=1, page_size=2
)
assert result.total == 4
assert len(result.items) == 2
assert result.page == 1
assert result.page_size == 2
# ---------------------------------------------------------------------------
# get_ip_detail tests
# ---------------------------------------------------------------------------
class TestGetIpDetail:
"""history_service.get_ip_detail()."""
async def test_returns_none_for_unknown_ip(
self, f2b_db_path: str
) -> None:
"""Returns ``None`` when the IP has no records in the database."""
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
result = await history_service.get_ip_detail("fake_socket", "99.99.99.99")
assert result is None
async def test_returns_ip_detail_for_known_ip(
self, f2b_db_path: str
) -> None:
"""Returns an IpDetailResponse with correct totals for a known IP."""
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
result = await history_service.get_ip_detail("fake_socket", "1.2.3.4")
assert result is not None
assert result.ip == "1.2.3.4"
assert result.total_bans == 2
assert result.total_failures == 10 # 5 + 5
async def test_timeline_ordered_newest_first(
self, f2b_db_path: str
) -> None:
"""Timeline events are ordered newest-first."""
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
result = await history_service.get_ip_detail("fake_socket", "1.2.3.4")
assert result is not None
assert len(result.timeline) == 2
# First event should be the most recent
assert result.timeline[0].banned_at > result.timeline[1].banned_at
async def test_last_ban_at_is_most_recent(self, f2b_db_path: str) -> None:
"""``last_ban_at`` matches the banned_at of the first timeline event."""
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
result = await history_service.get_ip_detail("fake_socket", "1.2.3.4")
assert result is not None
assert result.last_ban_at == result.timeline[0].banned_at
async def test_geo_enrichment_applied_when_provided(
self, f2b_db_path: str
) -> None:
"""Geolocation is applied when a geo_enricher is provided."""
from app.services.geo_service import GeoInfo
mock_geo = GeoInfo(
country_code="US",
country_name="United States",
asn="AS15169",
org="Google",
)
fake_enricher = AsyncMock(return_value=mock_geo)
with patch(
"app.services.history_service._get_fail2ban_db_path",
new=AsyncMock(return_value=f2b_db_path),
):
result = await history_service.get_ip_detail(
"fake_socket", "1.2.3.4", geo_enricher=fake_enricher
)
assert result is not None
assert result.country_code == "US"
assert result.country_name == "United States"
assert result.asn == "AS15169"
assert result.org == "Google"