feat: Stage 4 — fail2ban connection and server status

This commit is contained in:
2026-02-28 21:48:03 +01:00
parent a41a99dad4
commit 60683da3ca
13 changed files with 1085 additions and 18 deletions

View File

@@ -0,0 +1,194 @@
"""Tests for the dashboard router (GET /api/dashboard/status)."""
from __future__ import annotations
from pathlib import Path
import aiosqlite
import pytest
from httpx import ASGITransport, AsyncClient
from app.config import Settings
from app.db import init_db
from app.main import create_app
from app.models.server import ServerStatus
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
_SETUP_PAYLOAD = {
"master_password": "testpassword1",
"database_path": "bangui.db",
"fail2ban_socket": "/var/run/fail2ban/fail2ban.sock",
"timezone": "UTC",
"session_duration_minutes": 60,
}
@pytest.fixture
async def dashboard_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc]
"""Provide an authenticated ``AsyncClient`` with a pre-seeded server status.
Unlike the shared ``client`` fixture this one also exposes access to
``app.state`` via the app instance so we can seed the status cache.
"""
settings = Settings(
database_path=str(tmp_path / "dashboard_test.db"),
fail2ban_socket="/tmp/fake_fail2ban.sock",
session_secret="test-dashboard-secret",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
app = create_app(settings=settings)
db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path)
db.row_factory = aiosqlite.Row
await init_db(db)
app.state.db = db
# Pre-seed a server status so the endpoint has something to return.
app.state.server_status = ServerStatus(
online=True,
version="1.0.2",
active_jails=2,
total_bans=10,
total_failures=5,
)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
# Complete setup so the middleware doesn't redirect.
resp = await ac.post("/api/setup", json=_SETUP_PAYLOAD)
assert resp.status_code == 201
# Login to get a session cookie.
login_resp = await ac.post(
"/api/auth/login",
json={"password": _SETUP_PAYLOAD["master_password"]},
)
assert login_resp.status_code == 200
yield ac
await db.close()
@pytest.fixture
async def offline_dashboard_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc]
"""Like ``dashboard_client`` but with an offline server status."""
settings = Settings(
database_path=str(tmp_path / "dashboard_offline_test.db"),
fail2ban_socket="/tmp/fake_fail2ban.sock",
session_secret="test-dashboard-offline-secret",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
app = create_app(settings=settings)
db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path)
db.row_factory = aiosqlite.Row
await init_db(db)
app.state.db = db
app.state.server_status = ServerStatus(online=False)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
resp = await ac.post("/api/setup", json=_SETUP_PAYLOAD)
assert resp.status_code == 201
login_resp = await ac.post(
"/api/auth/login",
json={"password": _SETUP_PAYLOAD["master_password"]},
)
assert login_resp.status_code == 200
yield ac
await db.close()
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestDashboardStatus:
"""GET /api/dashboard/status."""
async def test_returns_200_when_authenticated(
self, dashboard_client: AsyncClient
) -> None:
"""Authenticated request returns HTTP 200."""
response = await dashboard_client.get("/api/dashboard/status")
assert response.status_code == 200
async def test_returns_401_when_unauthenticated(
self, client: AsyncClient
) -> None:
"""Unauthenticated request returns HTTP 401."""
# Complete setup so the middleware allows the request through.
await client.post("/api/setup", json=_SETUP_PAYLOAD)
response = await client.get("/api/dashboard/status")
assert response.status_code == 401
async def test_response_shape_when_online(
self, dashboard_client: AsyncClient
) -> None:
"""Response contains the expected ``status`` object shape."""
response = await dashboard_client.get("/api/dashboard/status")
body = response.json()
assert "status" in body
status = body["status"]
assert "online" in status
assert "version" in status
assert "active_jails" in status
assert "total_bans" in status
assert "total_failures" in status
async def test_cached_values_returned_when_online(
self, dashboard_client: AsyncClient
) -> None:
"""Endpoint returns the exact values from ``app.state.server_status``."""
response = await dashboard_client.get("/api/dashboard/status")
status = response.json()["status"]
assert status["online"] is True
assert status["version"] == "1.0.2"
assert status["active_jails"] == 2
assert status["total_bans"] == 10
assert status["total_failures"] == 5
async def test_offline_status_returned_correctly(
self, offline_dashboard_client: AsyncClient
) -> None:
"""Endpoint returns online=False when the cache holds an offline snapshot."""
response = await offline_dashboard_client.get("/api/dashboard/status")
assert response.status_code == 200
status = response.json()["status"]
assert status["online"] is False
assert status["version"] is None
assert status["active_jails"] == 0
assert status["total_bans"] == 0
assert status["total_failures"] == 0
async def test_returns_offline_when_state_not_initialised(
self, client: AsyncClient
) -> None:
"""Endpoint returns online=False as a safe default if the cache is absent."""
# Setup + login so the endpoint is reachable.
await client.post("/api/setup", json=_SETUP_PAYLOAD)
await client.post(
"/api/auth/login",
json={"password": _SETUP_PAYLOAD["master_password"]},
)
# server_status is not set on app.state in the shared `client` fixture.
response = await client.get("/api/dashboard/status")
assert response.status_code == 200
status = response.json()["status"]
assert status["online"] is False

View File

@@ -0,0 +1,263 @@
"""Tests for health_service.probe()."""
from __future__ import annotations
from typing import Any
from unittest.mock import AsyncMock, patch
import pytest
from app.models.server import ServerStatus
from app.services import health_service
from app.utils.fail2ban_client import Fail2BanConnectionError, Fail2BanProtocolError
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_SOCKET = "/fake/fail2ban.sock"
def _make_send(responses: dict[str, Any]) -> AsyncMock:
"""Build an ``AsyncMock`` for ``Fail2BanClient.send`` keyed by command[0].
For the ``["status", jail_name]`` command the key is
``"status:<jail_name>"``.
"""
async def _side_effect(command: list[str]) -> Any:
key = f"status:{command[1]}" if len(command) >= 2 and command[0] == "status" else command[0]
if key not in responses:
raise KeyError(f"Unexpected command key {key!r} in mock")
return responses[key]
mock = AsyncMock(side_effect=_side_effect)
return mock
# ---------------------------------------------------------------------------
# Happy path
# ---------------------------------------------------------------------------
class TestProbeOnline:
"""Verify probe() correctly parses a healthy fail2ban response."""
async def test_online_flag_is_true(self) -> None:
"""status.online is True when ping succeeds."""
send = _make_send(
{
"ping": (0, "pong"),
"version": (0, "1.0.2"),
"status": (0, [("Number of jail", 0), ("Jail list", "")]),
}
)
with patch("app.services.health_service.Fail2BanClient") as mock_client:
mock_client.return_value.send = send
result: ServerStatus = await health_service.probe(_SOCKET)
assert result.online is True
async def test_version_parsed(self) -> None:
"""status.version contains the version string returned by fail2ban."""
send = _make_send(
{
"ping": (0, "pong"),
"version": (0, "1.1.0"),
"status": (0, [("Number of jail", 0), ("Jail list", "")]),
}
)
with patch("app.services.health_service.Fail2BanClient") as mock_client:
mock_client.return_value.send = send
result = await health_service.probe(_SOCKET)
assert result.version == "1.1.0"
async def test_active_jails_count(self) -> None:
"""status.active_jails reflects the jail count from the status command."""
send = _make_send(
{
"ping": (0, "pong"),
"version": (0, "1.0.2"),
"status": (0, [("Number of jail", 2), ("Jail list", "sshd, nginx")]),
"status:sshd": (
0,
[
("Filter", [("Currently failed", 3), ("Total failed", 100)]),
("Actions", [("Currently banned", 1), ("Total banned", 50)]),
],
),
"status:nginx": (
0,
[
("Filter", [("Currently failed", 2), ("Total failed", 50)]),
("Actions", [("Currently banned", 0), ("Total banned", 10)]),
],
),
}
)
with patch("app.services.health_service.Fail2BanClient") as mock_client:
mock_client.return_value.send = send
result = await health_service.probe(_SOCKET)
assert result.active_jails == 2
async def test_total_bans_aggregated(self) -> None:
"""status.total_bans sums 'Currently banned' across all jails."""
send = _make_send(
{
"ping": (0, "pong"),
"version": (0, "1.0.2"),
"status": (0, [("Number of jail", 2), ("Jail list", "sshd, nginx")]),
"status:sshd": (
0,
[
("Filter", [("Currently failed", 3), ("Total failed", 100)]),
("Actions", [("Currently banned", 4), ("Total banned", 50)]),
],
),
"status:nginx": (
0,
[
("Filter", [("Currently failed", 1), ("Total failed", 20)]),
("Actions", [("Currently banned", 2), ("Total banned", 15)]),
],
),
}
)
with patch("app.services.health_service.Fail2BanClient") as mock_client:
mock_client.return_value.send = send
result = await health_service.probe(_SOCKET)
assert result.total_bans == 6 # 4 + 2
async def test_total_failures_aggregated(self) -> None:
"""status.total_failures sums 'Currently failed' across all jails."""
send = _make_send(
{
"ping": (0, "pong"),
"version": (0, "1.0.2"),
"status": (0, [("Number of jail", 2), ("Jail list", "sshd, nginx")]),
"status:sshd": (
0,
[
("Filter", [("Currently failed", 3), ("Total failed", 100)]),
("Actions", [("Currently banned", 1), ("Total banned", 50)]),
],
),
"status:nginx": (
0,
[
("Filter", [("Currently failed", 2), ("Total failed", 20)]),
("Actions", [("Currently banned", 0), ("Total banned", 10)]),
],
),
}
)
with patch("app.services.health_service.Fail2BanClient") as mock_client:
mock_client.return_value.send = send
result = await health_service.probe(_SOCKET)
assert result.total_failures == 5 # 3 + 2
async def test_empty_jail_list(self) -> None:
"""Probe succeeds with zero jails — no per-jail queries are made."""
send = _make_send(
{
"ping": (0, "pong"),
"version": (0, "1.0.2"),
"status": (0, [("Number of jail", 0), ("Jail list", "")]),
}
)
with patch("app.services.health_service.Fail2BanClient") as mock_client:
mock_client.return_value.send = send
result = await health_service.probe(_SOCKET)
assert result.online is True
assert result.active_jails == 0
assert result.total_bans == 0
assert result.total_failures == 0
# ---------------------------------------------------------------------------
# Error handling
# ---------------------------------------------------------------------------
class TestProbeOffline:
"""Verify probe() returns online=False when the daemon is unreachable."""
async def test_connection_error_returns_offline(self) -> None:
"""Fail2BanConnectionError → online=False."""
with patch("app.services.health_service.Fail2BanClient") as mock_client:
mock_client.return_value.send = AsyncMock(
side_effect=Fail2BanConnectionError("socket not found", _SOCKET)
)
result = await health_service.probe(_SOCKET)
assert result.online is False
assert result.version is None
async def test_protocol_error_returns_offline(self) -> None:
"""Fail2BanProtocolError → online=False."""
with patch("app.services.health_service.Fail2BanClient") as mock_client:
mock_client.return_value.send = AsyncMock(
side_effect=Fail2BanProtocolError("bad pickle")
)
result = await health_service.probe(_SOCKET)
assert result.online is False
async def test_bad_ping_response_returns_offline(self) -> None:
"""An unexpected ping response → online=False (defensive guard)."""
send = _make_send({"ping": (0, "NOTPONG")})
with patch("app.services.health_service.Fail2BanClient") as mock_client:
mock_client.return_value.send = send
result = await health_service.probe(_SOCKET)
assert result.online is False
async def test_error_code_in_ping_returns_offline(self) -> None:
"""An error return code in the ping response → online=False."""
send = _make_send({"ping": (1, "ERROR")})
with patch("app.services.health_service.Fail2BanClient") as mock_client:
mock_client.return_value.send = send
result = await health_service.probe(_SOCKET)
assert result.online is False
async def test_per_jail_error_is_tolerated(self) -> None:
"""A parse error on an individual jail's status does not break the probe."""
send = _make_send(
{
"ping": (0, "pong"),
"version": (0, "1.0.2"),
"status": (0, [("Number of jail", 1), ("Jail list", "sshd")]),
# Return garbage to trigger parse tolerance.
"status:sshd": (0, "INVALID"),
}
)
with patch("app.services.health_service.Fail2BanClient") as mock_client:
mock_client.return_value.send = send
result = await health_service.probe(_SOCKET)
# The service should still be online even if per-jail parsing fails.
assert result.online is True
assert result.total_bans == 0
assert result.total_failures == 0
@pytest.mark.parametrize("version_return", [(1, "ERROR"), (0, None)])
async def test_version_failure_is_tolerated(self, version_return: tuple[int, Any]) -> None:
"""A failed or null version response does not prevent a successful probe."""
send = _make_send(
{
"ping": (0, "pong"),
"version": version_return,
"status": (0, [("Number of jail", 0), ("Jail list", "")]),
}
)
with patch("app.services.health_service.Fail2BanClient") as mock_client:
mock_client.return_value.send = send
result = await health_service.probe(_SOCKET)
assert result.online is True