"""Tests for the history router (GET /api/history, GET /api/history/{ip}).""" from __future__ import annotations from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import aiosqlite import pytest from httpx import ASGITransport, AsyncClient from app.config import Settings from app.db import init_db from app.main import create_app from app.models.history import ( HistoryBanItem, HistoryListResponse, IpDetailResponse, IpTimelineEvent, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _SETUP_PAYLOAD = { "master_password": "testpassword1", "database_path": "bangui.db", "fail2ban_socket": "/var/run/fail2ban/fail2ban.sock", "timezone": "UTC", "session_duration_minutes": 60, } def _make_history_item(ip: str = "1.2.3.4", jail: str = "sshd") -> HistoryBanItem: """Build a single ``HistoryBanItem`` for use in test responses.""" return HistoryBanItem( ip=ip, jail=jail, banned_at="2026-03-01T10:00:00+00:00", ban_count=3, failures=5, matches=["Mar 1 10:00:00 host sshd[123]: Failed password for root"], country_code="DE", country_name="Germany", asn="AS3320", org="Telekom", ) def _make_history_list(n: int = 2) -> HistoryListResponse: """Build a mock ``HistoryListResponse`` with *n* items.""" items = [_make_history_item(ip=f"1.2.3.{i}") for i in range(n)] return HistoryListResponse(items=items, total=n, page=1, page_size=100) def _make_ip_detail(ip: str = "1.2.3.4") -> IpDetailResponse: """Build a mock ``IpDetailResponse`` for *ip*.""" events = [ IpTimelineEvent( jail="sshd", banned_at="2026-03-01T10:00:00+00:00", ban_count=3, failures=5, matches=["Mar 1 10:00:00 host sshd[123]: Failed password for root"], ), IpTimelineEvent( jail="sshd", banned_at="2026-02-28T08:00:00+00:00", ban_count=2, failures=5, matches=[], ), ] return IpDetailResponse( ip=ip, total_bans=2, total_failures=10, last_ban_at="2026-03-01T10:00:00+00:00", country_code="DE", country_name="Germany", asn="AS3320", org="Telekom", timeline=events, ) # --------------------------------------------------------------------------- # Fixture # --------------------------------------------------------------------------- @pytest.fixture async def history_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc] """Provide an authenticated ``AsyncClient`` for history endpoint tests.""" settings = Settings( database_path=str(tmp_path / "history_test.db"), fail2ban_socket="/tmp/fake_fail2ban.sock", session_secret="test-history-secret", session_duration_minutes=60, timezone="UTC", log_level="debug", ) app = create_app(settings=settings) db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path) db.row_factory = aiosqlite.Row await init_db(db) app.state.db = db app.state.http_session = MagicMock() transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as ac: resp = await ac.post("/api/setup", json=_SETUP_PAYLOAD) assert resp.status_code == 201 login_resp = await ac.post( "/api/auth/login", json={"password": _SETUP_PAYLOAD["master_password"]}, ) assert login_resp.status_code == 200 yield ac await db.close() # --------------------------------------------------------------------------- # GET /api/history # --------------------------------------------------------------------------- class TestHistoryList: """GET /api/history — paginated history list.""" async def test_returns_200_when_authenticated( self, history_client: AsyncClient ) -> None: """Authenticated request returns HTTP 200.""" with patch( "app.routers.history.history_service.list_history", new=AsyncMock(return_value=_make_history_list()), ): response = await history_client.get("/api/history") assert response.status_code == 200 async def test_returns_401_when_unauthenticated( self, client: AsyncClient ) -> None: """Unauthenticated request returns HTTP 401.""" await client.post("/api/setup", json=_SETUP_PAYLOAD) response = await client.get("/api/history") assert response.status_code == 401 async def test_response_shape(self, history_client: AsyncClient) -> None: """Response body contains the expected keys.""" mock_response = _make_history_list(n=1) with patch( "app.routers.history.history_service.list_history", new=AsyncMock(return_value=mock_response), ): response = await history_client.get("/api/history") body = response.json() assert "items" in body assert "total" in body assert "page" in body assert "page_size" in body assert body["total"] == 1 item = body["items"][0] assert "ip" in item assert "jail" in item assert "banned_at" in item assert "ban_count" in item assert "failures" in item assert "matches" in item assert item["country_code"] == "DE" async def test_forwards_jail_filter(self, history_client: AsyncClient) -> None: """The ``jail`` query parameter is forwarded to the service.""" mock_fn = AsyncMock(return_value=_make_history_list(n=0)) with patch( "app.routers.history.history_service.list_history", new=mock_fn, ): await history_client.get("/api/history?jail=nginx") _args, kwargs = mock_fn.call_args assert kwargs.get("jail") == "nginx" async def test_forwards_ip_filter(self, history_client: AsyncClient) -> None: """The ``ip`` query parameter is forwarded as ``ip_filter`` to the service.""" mock_fn = AsyncMock(return_value=_make_history_list(n=0)) with patch( "app.routers.history.history_service.list_history", new=mock_fn, ): await history_client.get("/api/history?ip=192.168") _args, kwargs = mock_fn.call_args assert kwargs.get("ip_filter") == "192.168" async def test_forwards_time_range(self, history_client: AsyncClient) -> None: """The ``range`` query parameter is forwarded as ``range_`` to the service.""" mock_fn = AsyncMock(return_value=_make_history_list(n=0)) with patch( "app.routers.history.history_service.list_history", new=mock_fn, ): await history_client.get("/api/history?range=7d") _args, kwargs = mock_fn.call_args assert kwargs.get("range_") == "7d" async def test_forwards_origin_filter(self, history_client: AsyncClient) -> None: """The ``origin`` query parameter is forwarded to the service.""" mock_fn = AsyncMock(return_value=_make_history_list(n=0)) with patch( "app.routers.history.history_service.list_history", new=mock_fn, ): await history_client.get("/api/history?origin=blocklist") _args, kwargs = mock_fn.call_args assert kwargs.get("origin") == "blocklist" async def test_empty_result(self, history_client: AsyncClient) -> None: """An empty history returns items=[] and total=0.""" with patch( "app.routers.history.history_service.list_history", new=AsyncMock( return_value=HistoryListResponse(items=[], total=0, page=1, page_size=100) ), ): response = await history_client.get("/api/history") body = response.json() assert body["items"] == [] assert body["total"] == 0 # --------------------------------------------------------------------------- # GET /api/history/{ip} # --------------------------------------------------------------------------- class TestIpHistory: """GET /api/history/{ip} — per-IP detail.""" async def test_returns_200_when_authenticated( self, history_client: AsyncClient ) -> None: """Authenticated request returns HTTP 200 for a known IP.""" with patch( "app.routers.history.history_service.get_ip_detail", new=AsyncMock(return_value=_make_ip_detail("1.2.3.4")), ): response = await history_client.get("/api/history/1.2.3.4") assert response.status_code == 200 async def test_returns_401_when_unauthenticated( self, client: AsyncClient ) -> None: """Unauthenticated request returns HTTP 401.""" await client.post("/api/setup", json=_SETUP_PAYLOAD) response = await client.get("/api/history/1.2.3.4") assert response.status_code == 401 async def test_returns_404_for_unknown_ip( self, history_client: AsyncClient ) -> None: """Returns 404 when the IP has no records in the database.""" with patch( "app.routers.history.history_service.get_ip_detail", new=AsyncMock(return_value=None), ): response = await history_client.get("/api/history/9.9.9.9") assert response.status_code == 404 async def test_response_shape(self, history_client: AsyncClient) -> None: """Response body contains the expected keys and nested timeline.""" mock_detail = _make_ip_detail("1.2.3.4") with patch( "app.routers.history.history_service.get_ip_detail", new=AsyncMock(return_value=mock_detail), ): response = await history_client.get("/api/history/1.2.3.4") body = response.json() assert body["ip"] == "1.2.3.4" assert body["total_bans"] == 2 assert body["total_failures"] == 10 assert body["country_code"] == "DE" assert "timeline" in body assert len(body["timeline"]) == 2 event = body["timeline"][0] assert "jail" in event assert "banned_at" in event assert "ban_count" in event assert "failures" in event assert "matches" in event async def test_aggregation_sums_failures( self, history_client: AsyncClient ) -> None: """total_failures reflects the sum across all timeline events.""" mock_detail = _make_ip_detail("10.0.0.1") mock_detail = IpDetailResponse( ip="10.0.0.1", total_bans=3, total_failures=15, last_ban_at="2026-03-01T10:00:00+00:00", country_code=None, country_name=None, asn=None, org=None, timeline=[ IpTimelineEvent( jail="sshd", banned_at="2026-03-01T10:00:00+00:00", ban_count=3, failures=7, matches=[], ), IpTimelineEvent( jail="sshd", banned_at="2026-02-28T08:00:00+00:00", ban_count=2, failures=8, matches=[], ), ], ) with patch( "app.routers.history.history_service.get_ip_detail", new=AsyncMock(return_value=mock_detail), ): response = await history_client.get("/api/history/10.0.0.1") assert response.status_code == 200 body = response.json() assert body["total_failures"] == 15 assert body["total_bans"] == 3