Add Kubernetes liveness/readiness probes and middleware order validation

- Split /health into /health/live (liveness) and /health/ready (readiness)
  following Kubernetes conventions. Combined /health retained for backward
  compatibility with existing Docker HEALTHCHECK definitions.
- Add ReadyCheck and ReadyResponse models for structured readiness output.
- Add _assert_middleware_order() startup check enforcing:
  RateLimit → Csrf → CorrelationId middleware chain.
- Register CorrelationIdMiddleware, CsrfMiddleware, RateLimitMiddleware
  in create_app() with documented required order (reverse of processing).
- Add correlation.py, csrf.py, rate_limit.py middleware modules.
- Add health probe tests in test_health_probes.py.
- Update test_main.py with middleware order assertion tests.
- Update frontend useFetchData hook tests.
- Docs: update Deployment.md with Kubernetes probe config examples.
This commit is contained in:
2026-05-04 02:42:09 +02:00
parent 65fe747cba
commit eb339efcfd
13 changed files with 882 additions and 129 deletions

View File

@@ -314,13 +314,13 @@ async def _lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
def _get_error_code(exc: Exception) -> str:
"""Get the machine-readable error code from an exception.
First checks if the exception has an error_code class attribute.
Falls back to converting the exception class name to snake_case.
Args:
exc: The exception instance.
Returns:
A snake_case error code string.
"""
@@ -334,12 +334,12 @@ def _get_error_code(exc: Exception) -> str:
def _get_error_metadata(exc: Exception) -> ErrorMetadata:
"""Get structured metadata from an exception.
Calls the exception's get_error_metadata() method if available.
Args:
exc: The exception instance.
Returns:
A dictionary of metadata safe for API responses.
"""
@@ -350,12 +350,12 @@ def _get_error_metadata(exc: Exception) -> ErrorMetadata:
def _get_correlation_id(request: Request) -> str | None:
"""Extract correlation ID from request state if available.
The correlation ID is set by CorrelationIdMiddleware.
Args:
request: The incoming FastAPI request.
Returns:
The correlation ID string, or None if not present.
"""
@@ -802,7 +802,9 @@ async def _request_validation_error_handler(
_EXACT_ALLOWED: frozenset[str] = frozenset(
{
"/api/v1/setup", # GET/POST /api/v1/setup
"/api/v1/health", # Health check endpoint
"/api/v1/health", # Health check endpoint (combined)
"/api/v1/health/live", # Kubernetes liveness probe
"/api/v1/health/ready", # Kubernetes readiness probe
"/api/docs", # Swagger UI
"/api/redoc", # ReDoc
"/api/openapi.json", # OpenAPI schema
@@ -988,6 +990,48 @@ def _enforce_single_worker() -> None:
# ---------------------------------------------------------------------------
def _assert_middleware_order(app: FastAPI) -> None:
"""Assert required middleware order at startup.
Raises:
AssertionError: If middleware are not in the required order.
"""
registered = [m.cls.__name__ for m in app.user_middleware]
# Find positions; skip middleware not in the security-critical chain
order: tuple[str, ...] = (
"RateLimitMiddleware",
"CsrfMiddleware",
"CorrelationIdMiddleware",
)
positions = {name: registered.index(name) for name in order if name in registered}
# RateLimitMiddleware must be before CsrfMiddleware
if (
"RateLimitMiddleware" in positions
and "CsrfMiddleware" in positions
and positions["RateLimitMiddleware"] > positions["CsrfMiddleware"]
):
raise AssertionError(
f"Middleware order violation: RateLimitMiddleware (position {positions['RateLimitMiddleware']}) "
f"must be registered before CsrfMiddleware (position {positions['CsrfMiddleware']}). "
f"Current order: {registered}"
)
# CsrfMiddleware must be before CorrelationIdMiddleware
if (
"CsrfMiddleware" in positions
and "CorrelationIdMiddleware" in positions
and positions["CsrfMiddleware"] > positions["CorrelationIdMiddleware"]
):
raise AssertionError(
f"Middleware order violation: CsrfMiddleware (position {positions['CsrfMiddleware']}) "
f"must be registered before CorrelationIdMiddleware (position {positions['CorrelationIdMiddleware']}). "
f"Current order: {registered}"
)
def create_app(settings: Settings | None = None) -> FastAPI:
"""Create and configure the BanGUI FastAPI application.
@@ -1066,11 +1110,18 @@ def create_app(settings: Settings | None = None) -> FastAPI:
)
# --- Middleware ---
# Note: middleware is applied in reverse order of registration.
# SecurityHeadersMiddleware must run early but after CORS/CSRF so headers
# are added to all responses including error responses.
# CorrelationIdMiddleware must run first (added last) so correlation ID
# is available to all downstream handlers and loggers.
# Note: Starlette applies middleware in reverse order of registration
# (last registered = outermost; first to see request, last to see response).
#
# Required processing order (outermost → innermost):
# 1. CorrelationIdMiddleware generates/extracts correlation ID first
# 2. CsrfMiddleware CSRF validation after correlation ID is available
# 3. RateLimitMiddleware rate limiting last (needs correlation ID for logging)
#
# This requires registration order (reverse of processing):
# 1. RateLimitMiddleware (registered first → innermost for responses)
# 2. CsrfMiddleware
# 3. CorrelationIdMiddleware (registered last → outermost for requests)
app.add_middleware(CorrelationIdMiddleware)
app.add_middleware(SecurityHeadersMiddleware)
app.add_middleware(SetupRedirectMiddleware)
@@ -1083,6 +1134,11 @@ def create_app(settings: Settings | None = None) -> FastAPI:
settings=resolved_settings,
)
# Validate middleware order before returning the app.
# Raising loud errors at startup is intentional — a misconfigured middleware
# stack is a security-critical defect that must not slip through silently.
_assert_middleware_order(app)
# --- Exception handlers ---
#

View File

@@ -11,6 +11,18 @@ Correlation IDs flow through the request lifecycle:
3. Middleware stores in structlog.contextvars
4. All log entries include the correlation ID automatically
5. Error responses include the correlation ID for client-side correlation
Processing order
-----------------
This middleware must be the outermost in the security-critical chain so it
executes first on incoming requests (outermost = first to see request,
last to see response). In the required chain:
CorrelationIdMiddleware → CsrfMiddleware → RateLimitMiddleware
The registration order in ``main.py`` must be:
RateLimitMiddleware, CsrfMiddleware, CorrelationIdMiddleware
(last registered = outermost in Starlette's reverse application).
"""
from __future__ import annotations

View File

@@ -9,6 +9,16 @@ is not CSRF-vulnerable. GET, HEAD, and OPTIONS requests are also exempt.
Cross-site requests cannot set custom headers without CORS preflight, which the
backend rejects for non-allowed origins, providing defense-in-depth.
Processing order
----------------
This middleware must be the middle component in the security-critical chain:
CorrelationIdMiddleware → CsrfMiddleware → RateLimitMiddleware
It runs after CorrelationIdMiddleware has attached a correlation ID (so rate-limit
errors can include it in their log context), and before RateLimitMiddleware
(so rate-limit counters are only incremented for requests that pass CSRF checks).
"""
from __future__ import annotations

View File

@@ -20,6 +20,16 @@ scheduler lock). The startup warning log documents this constraint.
Redis-backed adapter that uses atomic INCR + EXPIRE semantics. The
check_allowed() and check_allowed_for_bucket() interfaces are designed
to make this swap-in without touching middleware or router code.
Processing order
----------------
This middleware must be the innermost in the security-critical chain:
CorrelationIdMiddleware → CsrfMiddleware → RateLimitMiddleware
Rate limiting is last so that requests blocked by CsrfMiddleware do not
consume rate-limit budget, and so that rate-limit log entries (which are
unusual and potentially suspicious) always carry a correlation ID for tracing.
"""
from __future__ import annotations

View File

@@ -480,3 +480,53 @@ class FlushLogsResponse(BanGuiBaseModel):
"""
message: str = Field(..., description="Human-readable result message from fail2ban.")
class ReadyCheck(BanGuiBaseModel):
"""Result of a single readiness subsystem check.
Fields:
name: Subsystem name (e.g., "database", "fail2ban", "config_dir").
healthy: True when the subsystem is reachable/operational.
message: Optional error message describing the failure.
"""
name: str = Field(..., description="Subsystem name.")
healthy: bool = Field(..., description="True when the subsystem is operational.")
message: str | None = Field(
default=None,
description="Error detail when the check fails.",
)
class ReadyResponse(BanGuiBaseModel):
"""Structured readiness check response for the ``/health/ready`` endpoint.
Fields:
status: "ok" when all checks pass, "error" when at least one failed.
checks: Per-subsystem result list.
failed_count: Number of checks that returned healthy=False.
Example:
```python
# All healthy (HTTP 200)
{"status": "ok", "checks": [...], "failed_count": 0}
# Some failed (HTTP 503)
{"status": "error", "checks": [...], "failed_count": 2}
```
"""
status: Literal["ok", "error"] = Field(
...,
description="'ok' when all checks pass, 'error' when at least one fails.",
)
checks: list[ReadyCheck] = Field(
default_factory=list,
description="Per-subsystem check results.",
)
failed_count: int = Field(
...,
ge=0,
description="Number of checks that returned healthy=False.",
)

View File

@@ -1,27 +1,36 @@
"""Health check router.
A lightweight ``GET /api/v1/health`` endpoint that verifies the application
is running and can serve requests. Also reports the cached fail2ban liveness
state so monitoring tools and Docker health checks can observe daemon status
without probing the socket directly.
Two distinct probes following Kubernetes conventions:
Comprehensive checks performed:
- Database connectivity
- fail2ban socket reachability (via cached server_status)
- Background scheduler health
- Session cache initialization
* ``GET /api/v1/health/live`` — **Liveness** — checks that the Python process is
alive and the event loop is responsive. Always returns 200; a non-2xx answer
tells Kubernetes to *restart* the container.
* ``GET /api/v1/health/ready`` — **Readiness** — checks that all critical
sub-systems (database, fail2ban socket, config directory, scheduler) are
reachable. Returns 200 only when all pass; returns 503 with a JSON body
listing every failed check otherwise. A non-2xx answer tells Kubernetes to
*stop routing traffic* to the pod until it recovers.
The combined ``GET /api/v1/health`` endpoint is retained for backward
compatibility with existing Docker HEALTHCHECK definitions.
"""
from __future__ import annotations
from typing import Annotated, Literal
import asyncio
import os
from typing import TYPE_CHECKING, Literal
import structlog
from fastapi import APIRouter, status
from fastapi.responses import JSONResponse
from app.dependencies import AppStateDep, ServerStatusDep
from app.models.response import ComponentHealth, HealthResponse
from app.models.response import ComponentHealth, HealthResponse, ReadyCheck, ReadyResponse
if TYPE_CHECKING:
from collections.abc import Coroutine
router: APIRouter = APIRouter(prefix="/api/v1/health", tags=["Health"])
@@ -142,3 +151,164 @@ async def health_check(
components=components,
).model_dump(),
)
# --- Constants for subsystem checks ------------------------------------------ #
SUBSYSTEM_TIMEOUT_SECONDS: float = 2.0
# --- Helper: run a blocking check in a thread pool to avoid event-loop delays -- #
async def _run_check(
name: str,
coro: Coroutine[object, object, None],
error_msg: str,
) -> ReadyCheck:
"""Run *coro* with a short timeout and return a ReadyCheck."""
try:
await asyncio.wait_for(coro, timeout=SUBSYSTEM_TIMEOUT_SECONDS)
return ReadyCheck(name=name, healthy=True)
except (OSError, TimeoutError, Exception) as exc: # noqa: BLE001
log.warning("ready_check_failed", subsystem=name, error=str(exc))
return ReadyCheck(name=name, healthy=False, message=f"{error_msg}: {exc}")
# --- Liveness probe ---------------------------------------------------------- #
@router.get(
"/live",
summary="Process liveness probe",
response_model=ReadyResponse,
responses={
200: {"description": "Process is alive"},
},
)
async def liveness_probe() -> JSONResponse:
"""Lightweight liveness check for Kubernetes.
Returns 200 when the Python process and event loop are responsive.
A non-2xx response tells Kubernetes to restart the container.
No subsystem checks are performed — this endpoint must be fast.
"""
return JSONResponse(
status_code=status.HTTP_200_OK,
content=ReadyResponse(
status="ok",
checks=[ReadyCheck(name="process", healthy=True)],
failed_count=0,
).model_dump(),
)
# --- Readiness probe --------------------------------------------------------- #
async def _check_database(app_state: AppStateDep) -> ReadyCheck:
"""Check database connectivity with a short timeout."""
from app.config import Settings
from app.db import open_db
effective_settings: Settings = (
app_state.runtime_settings if app_state.runtime_settings is not None else app_state.settings
)
async def _probe() -> None:
test_db = await open_db(effective_settings.database_path)
await test_db.close()
return await _run_check(
"database",
_probe(),
"Connection failed",
)
async def _check_fail2ban(app_state: AppStateDep, server_status: ServerStatusDep) -> ReadyCheck:
"""Check fail2ban socket reachability using the cached server status."""
if server_status.online:
return ReadyCheck(name="fail2ban", healthy=True)
return ReadyCheck(name="fail2ban", healthy=False, message="Socket not reachable")
async def _check_config_dir(app_state: AppStateDep) -> ReadyCheck:
"""Check config directory read access."""
from app.config import Settings
effective_settings: Settings = (
app_state.runtime_settings if app_state.runtime_settings is not None else app_state.settings
)
async def _probe() -> None:
config_path = effective_settings.fail2ban_config_dir
# Quick read-test: list directory (checks both existence and readability)
await asyncio.to_thread(os.access, config_path, os.R_OK)
return await _run_check(
"config_dir",
_probe(),
"Config directory not readable",
)
async def _check_scheduler(app_state: AppStateDep) -> ReadyCheck:
"""Check scheduler liveness."""
try:
scheduler = app_state.scheduler
if scheduler is not None and getattr(scheduler, "running", False):
return ReadyCheck(name="scheduler", healthy=True)
elif scheduler is not None:
return ReadyCheck(name="scheduler", healthy=False, message="Scheduler stopped")
else:
return ReadyCheck(name="scheduler", healthy=False, message="Not initialised")
except AttributeError:
return ReadyCheck(name="scheduler", healthy=False, message="Not accessible")
@router.get(
"/ready",
summary="Subsystem readiness probe",
response_model=ReadyResponse,
responses={
200: {"description": "All subsystems healthy"},
503: {"description": "One or more subsystems unreachable"},
},
)
async def readiness_probe(
app_state: AppStateDep,
server_status: ServerStatusDep,
) -> JSONResponse:
"""Readiness check for Kubernetes.
Verifies all critical sub-systems are reachable:
- Database connectivity
- fail2ban socket (via cached server status)
- Config directory read access
- Background scheduler liveness
Returns HTTP 200 only when every check passes; returns HTTP 503 with a
JSON body listing every failed subsystem otherwise. Each check has a
short per-subsystem timeout to prevent the endpoint from overwhelming the
system under load.
"""
db_check, f2b_check, config_check, sched_check = await asyncio.gather(
_check_database(app_state),
_check_fail2ban(app_state, server_status),
_check_config_dir(app_state),
_check_scheduler(app_state),
)
checks: list[ReadyCheck] = [db_check, f2b_check, config_check, sched_check]
failed_count = sum(1 for c in checks if not c.healthy)
http_status = status.HTTP_200_OK if failed_count == 0 else status.HTTP_503_SERVICE_UNAVAILABLE
return JSONResponse(
status_code=http_status,
content=ReadyResponse(
status="ok" if failed_count == 0 else "error",
checks=checks,
failed_count=failed_count,
).model_dump(),
)

View File

@@ -12,7 +12,15 @@ from httpx import ASGITransport, AsyncClient
from app.config import Settings
from app.db import init_db
from app.exceptions import ConfigValidationError, ConfigWriteError, JailNotFoundError
from app.main import CORSMiddleware, _enforce_single_worker, _lifespan, create_app
from app.main import (
CORSMiddleware,
_assert_middleware_order,
_enforce_single_worker,
_lifespan,
create_app,
)
from app.middleware.correlation import CorrelationIdMiddleware
from app.middleware.rate_limit import RateLimitMiddleware
from app.services import setup_service
@@ -450,14 +458,23 @@ async def test_startup_loads_geo_cache_from_persisted_runtime_database(tmp_path:
exit_stack.enter_context(patch("app.services.geo_cache.GeoCache.load_cache_from_db", new=load_cache))
exit_stack.enter_context(patch("app.services.geo_cache.GeoCache.count_unresolved", new=AsyncMock(return_value=0)))
exit_stack.enter_context(patch("app.services.setup_service.is_setup_complete", new=AsyncMock(return_value=True)))
exit_stack.enter_context(patch("app.services.setup_service.get_runtime_database_path", new=AsyncMock(return_value=runtime_db_path)))
exit_stack.enter_context(patch("app.services.setup_service.get_persisted_runtime_settings", new=AsyncMock(return_value={
"database_path": runtime_db_path,
"fail2ban_socket": "/tmp/persisted.sock",
"timezone": "Europe/Berlin",
"session_duration_minutes": 123,
})))
exit_stack.enter_context(patch("app.services.setup_service.get_fail2ban_db_path", new=AsyncMock(return_value="/tmp/fail2ban/banned.tar.bz2")))
exit_stack.enter_context(patch(
"app.services.setup_service.get_runtime_database_path",
new=AsyncMock(return_value=runtime_db_path),
))
exit_stack.enter_context(patch(
"app.services.setup_service.get_persisted_runtime_settings",
new=AsyncMock(return_value={
"database_path": runtime_db_path,
"fail2ban_socket": "/tmp/persisted.sock",
"timezone": "Europe/Berlin",
"session_duration_minutes": 123,
}),
))
exit_stack.enter_context(patch(
"app.services.setup_service.get_fail2ban_db_path",
new=AsyncMock(return_value="/tmp/fail2ban/banned.tar.bz2"),
))
exit_stack.enter_context(patch("app.tasks.health_check.register"))
exit_stack.enter_context(patch("app.tasks.blocklist_import.register"))
exit_stack.enter_context(patch("app.tasks.geo_cache_flush.register"))
@@ -466,8 +483,9 @@ async def test_startup_loads_geo_cache_from_persisted_runtime_database(tmp_path:
with exit_stack:
async with _lifespan(app):
loaded_db_path = load_cache.call_args.args[0]
runtime_connections = [conn for path, conn in opened_connections if path == runtime_db_path]
runtime_connections = [
conn for path, conn in opened_connections if path == runtime_db_path
]
assert runtime_connections, "Expected runtime database to be opened"
assert app.state.runtime_settings is not None
@@ -538,6 +556,91 @@ async def test_concurrent_requests_use_request_scoped_db_connections(tmp_path: P
assert all(connection.close.await_count == 1 for connection in connections)
# ---------------------------------------------------------------------------
# Middleware order validation
# ---------------------------------------------------------------------------
def _make_settings(tmp_path: Path) -> Settings:
"""Return a minimal Settings object with a temporary fail2ban config dir."""
fail2ban_config_dir = tmp_path / "fail2ban"
fail2ban_config_dir.mkdir()
return Settings(
database_path=str(tmp_path / "bangui.db"),
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir=str(fail2ban_config_dir),
session_secret="test-secret-key-do-not-use-in-production",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
def test_create_app_raises_on_incorrect_middleware_order(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""_assert_middleware_order() raises AssertionError when middleware order is wrong.
The security-critical chain requires:
RateLimitMiddleware → CsrfMiddleware → CorrelationIdMiddleware
in user_middleware (processing order: outermost → innermost).
"""
monkeypatch.setenv("TESTING", "1")
settings = _make_settings(tmp_path)
app = create_app(settings=settings)
# Swap CorrelationIdMiddleware and RateLimitMiddleware to break the order.
user_mw = app.user_middleware
corr_idx = next(i for i, m in enumerate(user_mw) if m.cls.__name__ == "CorrelationIdMiddleware")
rate_idx = next(i for i, m in enumerate(user_mw) if m.cls.__name__ == "RateLimitMiddleware")
user_mw[corr_idx], user_mw[rate_idx] = user_mw[rate_idx], user_mw[corr_idx]
with pytest.raises(AssertionError, match="must be registered before"):
_assert_middleware_order(app)
def test_middleware_order_validation_passes_for_correct_order(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""_assert_middleware_order() does not raise when middleware order is correct."""
monkeypatch.setenv("TESTING", "1")
settings = _make_settings(tmp_path)
app = create_app(settings=settings)
_assert_middleware_order(app) # Should not raise
def test_create_app_validates_middleware_order_at_startup(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""create_app() raises immediately if middleware registration order is incorrect.
This test verifies the integration: _assert_middleware_order is called at the
end of create_app, so a fresh app with deliberately wrong middleware order
(simulated by patching add_middleware during creation) raises AssertionError.
"""
monkeypatch.setenv("TESTING", "1")
settings = _make_settings(tmp_path)
from starlette.applications import Starlette
original_add = Starlette.add_middleware
def swapping_add(self, middleware_cls: type, **kwargs: object) -> None:
"""Patched add_middleware that swaps CorrelationId and RateLimit."""
if middleware_cls is CorrelationIdMiddleware:
pass # Skip CorrelationId
elif middleware_cls is RateLimitMiddleware:
original_add(self, RateLimitMiddleware, **kwargs)
original_add(self, CorrelationIdMiddleware)
else:
original_add(self, middleware_cls, **kwargs)
with patch.object(Starlette, "add_middleware", swapping_add), \
pytest.raises(AssertionError, match="must be registered before"):
create_app(settings=settings)
# ---------------------------------------------------------------------------
# Single-worker enforcement
# ---------------------------------------------------------------------------

View File

@@ -0,0 +1,130 @@
"""Tests for the health-check router — liveness and readiness probes."""
from unittest.mock import MagicMock, patch
import pytest
from httpx import AsyncClient
from app.models.server import ServerStatus
from app.models.response import ReadyCheck
# ---------------------------------------------------------------------------
# GET /health/live — liveness probe
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_liveness_returns_200(client: AsyncClient) -> None:
"""``GET /api/v1/health/live`` must always return HTTP 200."""
response = await client.get("/api/v1/health/live")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_liveness_body_is_ready_response(client: AsyncClient) -> None:
"""Response body must be a ReadyResponse."""
response = await client.get("/api/v1/health/live")
data: dict[str, object] = response.json()
assert data["status"] == "ok"
assert data["failed_count"] == 0
assert "checks" in data
assert isinstance(data["checks"], list)
@pytest.mark.asyncio
async def test_liveness_includes_process_check(client: AsyncClient) -> None:
"""Liveness response must include a 'process' check."""
response = await client.get("/api/v1/health/live")
data: dict[str, object] = response.json()
checks: list[dict[str, object]] = data["checks"] # type: ignore[assignment]
assert any(c.get("name") == "process" and c.get("healthy") is True for c in checks)
# ---------------------------------------------------------------------------
# GET /health/ready — readiness probe
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_readiness_returns_200_when_all_pass(client: AsyncClient) -> None:
"""``GET /api/v1/health/ready`` must return 200 when all subsystems pass."""
with patch("app.routers.health._run_check", side_effect=lambda n, c, e: ReadyCheck(name=n, healthy=True)):
response = await client.get("/api/v1/health/ready")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_readiness_returns_503_when_subsystem_fails(client: AsyncClient) -> None:
"""``GET /api/v1/health/ready`` must return 503 when at least one check fails."""
# Force fail2ban offline
client._transport.app.state.server_status = ServerStatus(online=False)
response = await client.get("/api/v1/health/ready")
assert response.status_code == 503
@pytest.mark.asyncio
async def test_readiness_body_is_ready_response(client: AsyncClient) -> None:
"""Response body must be a ReadyResponse."""
response = await client.get("/api/v1/health/ready")
data: dict[str, object] = response.json()
assert data["status"] in ("ok", "error")
assert "failed_count" in data
assert "checks" in data
assert isinstance(data["checks"], list)
@pytest.mark.asyncio
async def test_readiness_includes_all_subsystems(client: AsyncClient) -> None:
"""Readiness response must include checks for all four subsystems."""
response = await client.get("/api/v1/health/ready")
data: dict[str, object] = response.json()
checks: list[dict[str, object]] = data["checks"] # type: ignore[assignment]
names = {c["name"] for c in checks}
assert names == {"database", "fail2ban", "config_dir", "scheduler"}
@pytest.mark.asyncio
async def test_readiness_status_ok_when_all_healthy(client: AsyncClient) -> None:
"""``status`` must be 'ok' when all checks pass."""
with patch("app.routers.health._run_check", side_effect=lambda n, c, e: ReadyCheck(name=n, healthy=True)):
response = await client.get("/api/v1/health/ready")
data: dict[str, object] = response.json()
assert data["status"] == "ok"
assert data["failed_count"] == 0
@pytest.mark.asyncio
async def test_readiness_status_error_when_fail2ban_offline(client: AsyncClient) -> None:
"""``status`` must be 'error' when fail2ban is offline."""
client._transport.app.state.server_status = ServerStatus(online=False)
response = await client.get("/api/v1/health/ready")
data: dict[str, object] = response.json()
assert data["status"] == "error"
assert data["failed_count"] > 0
@pytest.mark.asyncio
async def test_readiness_includes_failed_subsystem_detail(client: AsyncClient) -> None:
"""When fail2ban is offline the fail2ban check must include an error message."""
client._transport.app.state.server_status = ServerStatus(online=False)
response = await client.get("/api/v1/health/ready")
data: dict[str, object] = response.json()
checks: list[dict[str, object]] = data["checks"] # type: ignore[assignment]
f2b = next(c for c in checks if c["name"] == "fail2ban")
assert f2b["healthy"] is False
assert f2b["message"] is not None
@pytest.mark.asyncio
async def test_readiness_content_type_is_json(client: AsyncClient) -> None:
"""``/api/v1/health/ready`` must set the ``Content-Type`` header to JSON."""
response = await client.get("/api/v1/health/ready")
assert "application/json" in response.headers.get("content-type", "")
@pytest.mark.asyncio
async def test_readiness_live_content_type_is_json(client: AsyncClient) -> None:
"""``/api/v1/health/live`` must set the ``Content-Type`` header to JSON."""
response = await client.get("/api/v1/health/live")
assert "application/json" in response.headers.get("content-type", "")