- tests/test_tasks/test_blocklist_import.py: 14 tests, 96% coverage - tests/test_tasks/test_health_check.py: 12 tests, 100% coverage - tests/test_tasks/test_geo_cache_flush.py: 8 tests, 100% coverage - tests/test_services/test_fail2ban_client.py: 24 new tests, 96% coverage Total: 50 new tests (628 → 678 passing). Overall coverage 85% → 87%. ruff, mypy --strict, tsc, and eslint all clean.
239 lines
8.5 KiB
Python
239 lines
8.5 KiB
Python
"""Tests for the health-check background task.
|
|
|
|
Validates that :func:`~app.tasks.health_check._run_probe` correctly stores
|
|
the probe result on ``app.state.server_status``, logs online/offline
|
|
transitions, and that :func:`~app.tasks.health_check.register` configures
|
|
the scheduler and primes the initial status.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from app.models.server import ServerStatus
|
|
from app.tasks.health_check import HEALTH_CHECK_INTERVAL, _run_probe, register
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_app(prev_online: bool = False) -> MagicMock:
|
|
"""Build a minimal mock ``app`` for health-check task tests.
|
|
|
|
Args:
|
|
prev_online: Whether the previous ``server_status`` was online.
|
|
|
|
Returns:
|
|
A :class:`unittest.mock.MagicMock` that mimics ``fastapi.FastAPI``.
|
|
"""
|
|
app = MagicMock()
|
|
app.state.settings.fail2ban_socket = "/var/run/fail2ban/fail2ban.sock"
|
|
app.state.server_status = ServerStatus(online=prev_online)
|
|
app.state.scheduler = MagicMock()
|
|
return app
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests for _run_probe
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRunProbe:
|
|
"""Tests for :func:`~app.tasks.health_check._run_probe`."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_probe_updates_server_status(self) -> None:
|
|
"""``_run_probe`` must store the probe result on ``app.state.server_status``."""
|
|
app = _make_app(prev_online=False)
|
|
new_status = ServerStatus(online=True, version="0.11.2", active_jails=3)
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=new_status,
|
|
):
|
|
await _run_probe(app)
|
|
|
|
assert app.state.server_status is new_status
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_probe_logs_came_online_transition(self) -> None:
|
|
"""When fail2ban comes online, ``"fail2ban_came_online"`` must be logged."""
|
|
app = _make_app(prev_online=False)
|
|
new_status = ServerStatus(online=True, version="0.11.2", active_jails=2)
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=new_status,
|
|
), patch("app.tasks.health_check.log") as mock_log:
|
|
await _run_probe(app)
|
|
|
|
online_calls = [c for c in mock_log.info.call_args_list if c[0][0] == "fail2ban_came_online"]
|
|
assert len(online_calls) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_probe_logs_went_offline_transition(self) -> None:
|
|
"""When fail2ban goes offline, ``"fail2ban_went_offline"`` must be logged."""
|
|
app = _make_app(prev_online=True)
|
|
new_status = ServerStatus(online=False)
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=new_status,
|
|
), patch("app.tasks.health_check.log") as mock_log:
|
|
await _run_probe(app)
|
|
|
|
offline_calls = [c for c in mock_log.warning.call_args_list if c[0][0] == "fail2ban_went_offline"]
|
|
assert len(offline_calls) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_probe_stable_online_no_transition_log(self) -> None:
|
|
"""When status stays online, no transition events must be emitted."""
|
|
app = _make_app(prev_online=True)
|
|
new_status = ServerStatus(online=True, version="0.11.2", active_jails=1)
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=new_status,
|
|
), patch("app.tasks.health_check.log") as mock_log:
|
|
await _run_probe(app)
|
|
|
|
transition_calls = [
|
|
c
|
|
for c in mock_log.info.call_args_list
|
|
if c[0][0] in ("fail2ban_came_online", "fail2ban_went_offline")
|
|
]
|
|
transition_calls += [
|
|
c
|
|
for c in mock_log.warning.call_args_list
|
|
if c[0][0] in ("fail2ban_came_online", "fail2ban_went_offline")
|
|
]
|
|
assert transition_calls == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_probe_stable_offline_no_transition_log(self) -> None:
|
|
"""When status stays offline, no transition events must be emitted."""
|
|
app = _make_app(prev_online=False)
|
|
new_status = ServerStatus(online=False)
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=new_status,
|
|
), patch("app.tasks.health_check.log") as mock_log:
|
|
await _run_probe(app)
|
|
|
|
transition_calls = [
|
|
c
|
|
for c in mock_log.info.call_args_list
|
|
if c[0][0] == "fail2ban_came_online"
|
|
]
|
|
transition_calls += [
|
|
c
|
|
for c in mock_log.warning.call_args_list
|
|
if c[0][0] == "fail2ban_went_offline"
|
|
]
|
|
assert transition_calls == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_probe_uses_socket_path_from_settings(self) -> None:
|
|
"""``_run_probe`` must pass the socket path from ``app.state.settings``."""
|
|
expected_socket = "/custom/fail2ban.sock"
|
|
app = _make_app()
|
|
app.state.settings.fail2ban_socket = expected_socket
|
|
new_status = ServerStatus(online=False)
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=new_status,
|
|
) as mock_probe:
|
|
await _run_probe(app)
|
|
|
|
mock_probe.assert_awaited_once_with(expected_socket)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_probe_uses_default_offline_status_when_state_missing(self) -> None:
|
|
"""``_run_probe`` must handle missing ``server_status`` on first run."""
|
|
app = _make_app()
|
|
# Simulate first run: no previous server_status attribute set yet.
|
|
del app.state.server_status
|
|
new_status = ServerStatus(online=True, version="0.11.2", active_jails=0)
|
|
|
|
with (
|
|
patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=new_status,
|
|
),
|
|
patch("app.tasks.health_check.log"),
|
|
):
|
|
# Must not raise even with no prior status.
|
|
await _run_probe(app)
|
|
|
|
assert app.state.server_status is new_status
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests for register
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRegister:
|
|
"""Tests for :func:`~app.tasks.health_check.register`."""
|
|
|
|
def test_register_adds_interval_job_to_scheduler(self) -> None:
|
|
"""``register`` must add a job with an ``"interval"`` trigger."""
|
|
app = _make_app()
|
|
|
|
register(app)
|
|
|
|
app.state.scheduler.add_job.assert_called_once()
|
|
_, kwargs = app.state.scheduler.add_job.call_args
|
|
assert kwargs["trigger"] == "interval"
|
|
assert kwargs["seconds"] == HEALTH_CHECK_INTERVAL
|
|
|
|
def test_register_primes_offline_server_status(self) -> None:
|
|
"""``register`` must set an initial offline status before the first probe fires."""
|
|
app = _make_app()
|
|
# Reset any value set by _make_app.
|
|
del app.state.server_status
|
|
|
|
register(app)
|
|
|
|
assert isinstance(app.state.server_status, ServerStatus)
|
|
assert app.state.server_status.online is False
|
|
|
|
def test_register_uses_stable_job_id(self) -> None:
|
|
"""``register`` must register the job under the stable id ``"health_check"``."""
|
|
app = _make_app()
|
|
|
|
register(app)
|
|
|
|
_, kwargs = app.state.scheduler.add_job.call_args
|
|
assert kwargs["id"] == "health_check"
|
|
|
|
def test_register_sets_replace_existing(self) -> None:
|
|
"""``register`` must use ``replace_existing=True`` to avoid duplicate jobs."""
|
|
app = _make_app()
|
|
|
|
register(app)
|
|
|
|
_, kwargs = app.state.scheduler.add_job.call_args
|
|
assert kwargs["replace_existing"] is True
|
|
|
|
def test_register_passes_app_in_kwargs(self) -> None:
|
|
"""The scheduled job must receive ``app`` as a kwarg for state access."""
|
|
app = _make_app()
|
|
|
|
register(app)
|
|
|
|
_, kwargs = app.state.scheduler.add_job.call_args
|
|
assert kwargs["kwargs"] == {"app": app}
|