- Backend: extend activate_jail() with pre-validation and 4-attempt post-reload
health probe; add validate_jail_config() and rollback_jail() service functions
- Backend: new endpoints POST /api/config/jails/{name}/validate,
GET /api/config/pending-recovery, POST /api/config/jails/{name}/rollback
- Backend: extend JailActivationResponse with fail2ban_running + validation_warnings;
add JailValidationIssue, JailValidationResult, PendingRecovery, RollbackResponse models
- Backend: health_check task tracks last_activation and creates PendingRecovery
record when fail2ban goes offline within 60 s of an activation
- Backend: add fail2ban_start_command setting (configurable start cmd for rollback)
- Frontend: ActivateJailDialog — pre-validation on open, crash-detected callback,
extended spinner text during activation+verify
- Frontend: JailsTab — Validate Config button for inactive jails, validation
result panels (blocking errors + advisory warnings)
- Frontend: RecoveryBanner component — polls pending-recovery, shows full-width
alert with Disable & Restart / View Logs buttons
- Frontend: MainLayout — mount RecoveryBanner at layout level
- Tests: 19 new backend service tests (validate, rollback, filter/action parsing)
+ 6 health_check crash-detection tests + 11 router tests; 5 RecoveryBanner
frontend tests; fix mock setup in existing activate_jail tests
351 lines
12 KiB
Python
351 lines
12 KiB
Python
"""Tests for the health-check background task.
|
|
|
|
Validates that :func:`~app.tasks.health_check._run_probe` correctly stores
|
|
the probe result on ``app.state.server_status``, logs online/offline
|
|
transitions, and that :func:`~app.tasks.health_check.register` configures
|
|
the scheduler and primes the initial status.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from app.models.config import PendingRecovery
|
|
from app.models.server import ServerStatus
|
|
from app.tasks.health_check import HEALTH_CHECK_INTERVAL, _run_probe, register
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_app(prev_online: bool = False) -> MagicMock:
|
|
"""Build a minimal mock ``app`` for health-check task tests.
|
|
|
|
Args:
|
|
prev_online: Whether the previous ``server_status`` was online.
|
|
|
|
Returns:
|
|
A :class:`unittest.mock.MagicMock` that mimics ``fastapi.FastAPI``.
|
|
"""
|
|
app = MagicMock()
|
|
app.state.settings.fail2ban_socket = "/var/run/fail2ban/fail2ban.sock"
|
|
app.state.server_status = ServerStatus(online=prev_online)
|
|
app.state.scheduler = MagicMock()
|
|
app.state.last_activation = None
|
|
app.state.pending_recovery = None
|
|
return app
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests for _run_probe
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRunProbe:
|
|
"""Tests for :func:`~app.tasks.health_check._run_probe`."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_probe_updates_server_status(self) -> None:
|
|
"""``_run_probe`` must store the probe result on ``app.state.server_status``."""
|
|
app = _make_app(prev_online=False)
|
|
new_status = ServerStatus(online=True, version="0.11.2", active_jails=3)
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=new_status,
|
|
):
|
|
await _run_probe(app)
|
|
|
|
assert app.state.server_status is new_status
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_probe_logs_came_online_transition(self) -> None:
|
|
"""When fail2ban comes online, ``"fail2ban_came_online"`` must be logged."""
|
|
app = _make_app(prev_online=False)
|
|
new_status = ServerStatus(online=True, version="0.11.2", active_jails=2)
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=new_status,
|
|
), patch("app.tasks.health_check.log") as mock_log:
|
|
await _run_probe(app)
|
|
|
|
online_calls = [c for c in mock_log.info.call_args_list if c[0][0] == "fail2ban_came_online"]
|
|
assert len(online_calls) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_probe_logs_went_offline_transition(self) -> None:
|
|
"""When fail2ban goes offline, ``"fail2ban_went_offline"`` must be logged."""
|
|
app = _make_app(prev_online=True)
|
|
new_status = ServerStatus(online=False)
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=new_status,
|
|
), patch("app.tasks.health_check.log") as mock_log:
|
|
await _run_probe(app)
|
|
|
|
offline_calls = [c for c in mock_log.warning.call_args_list if c[0][0] == "fail2ban_went_offline"]
|
|
assert len(offline_calls) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_probe_stable_online_no_transition_log(self) -> None:
|
|
"""When status stays online, no transition events must be emitted."""
|
|
app = _make_app(prev_online=True)
|
|
new_status = ServerStatus(online=True, version="0.11.2", active_jails=1)
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=new_status,
|
|
), patch("app.tasks.health_check.log") as mock_log:
|
|
await _run_probe(app)
|
|
|
|
transition_calls = [
|
|
c
|
|
for c in mock_log.info.call_args_list
|
|
if c[0][0] in ("fail2ban_came_online", "fail2ban_went_offline")
|
|
]
|
|
transition_calls += [
|
|
c
|
|
for c in mock_log.warning.call_args_list
|
|
if c[0][0] in ("fail2ban_came_online", "fail2ban_went_offline")
|
|
]
|
|
assert transition_calls == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_probe_stable_offline_no_transition_log(self) -> None:
|
|
"""When status stays offline, no transition events must be emitted."""
|
|
app = _make_app(prev_online=False)
|
|
new_status = ServerStatus(online=False)
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=new_status,
|
|
), patch("app.tasks.health_check.log") as mock_log:
|
|
await _run_probe(app)
|
|
|
|
transition_calls = [
|
|
c
|
|
for c in mock_log.info.call_args_list
|
|
if c[0][0] == "fail2ban_came_online"
|
|
]
|
|
transition_calls += [
|
|
c
|
|
for c in mock_log.warning.call_args_list
|
|
if c[0][0] == "fail2ban_went_offline"
|
|
]
|
|
assert transition_calls == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_probe_uses_socket_path_from_settings(self) -> None:
|
|
"""``_run_probe`` must pass the socket path from ``app.state.settings``."""
|
|
expected_socket = "/custom/fail2ban.sock"
|
|
app = _make_app()
|
|
app.state.settings.fail2ban_socket = expected_socket
|
|
new_status = ServerStatus(online=False)
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=new_status,
|
|
) as mock_probe:
|
|
await _run_probe(app)
|
|
|
|
mock_probe.assert_awaited_once_with(expected_socket)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_probe_uses_default_offline_status_when_state_missing(self) -> None:
|
|
"""``_run_probe`` must handle missing ``server_status`` on first run."""
|
|
app = _make_app()
|
|
# Simulate first run: no previous server_status attribute set yet.
|
|
del app.state.server_status
|
|
new_status = ServerStatus(online=True, version="0.11.2", active_jails=0)
|
|
|
|
with (
|
|
patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=new_status,
|
|
),
|
|
patch("app.tasks.health_check.log"),
|
|
):
|
|
# Must not raise even with no prior status.
|
|
await _run_probe(app)
|
|
|
|
assert app.state.server_status is new_status
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests for register
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRegister:
|
|
"""Tests for :func:`~app.tasks.health_check.register`."""
|
|
|
|
def test_register_adds_interval_job_to_scheduler(self) -> None:
|
|
"""``register`` must add a job with an ``"interval"`` trigger."""
|
|
app = _make_app()
|
|
|
|
register(app)
|
|
|
|
app.state.scheduler.add_job.assert_called_once()
|
|
_, kwargs = app.state.scheduler.add_job.call_args
|
|
assert kwargs["trigger"] == "interval"
|
|
assert kwargs["seconds"] == HEALTH_CHECK_INTERVAL
|
|
|
|
def test_register_primes_offline_server_status(self) -> None:
|
|
"""``register`` must set an initial offline status before the first probe fires."""
|
|
app = _make_app()
|
|
# Reset any value set by _make_app.
|
|
del app.state.server_status
|
|
|
|
register(app)
|
|
|
|
assert isinstance(app.state.server_status, ServerStatus)
|
|
assert app.state.server_status.online is False
|
|
|
|
def test_register_uses_stable_job_id(self) -> None:
|
|
"""``register`` must register the job under the stable id ``"health_check"``."""
|
|
app = _make_app()
|
|
|
|
register(app)
|
|
|
|
_, kwargs = app.state.scheduler.add_job.call_args
|
|
assert kwargs["id"] == "health_check"
|
|
|
|
def test_register_sets_replace_existing(self) -> None:
|
|
"""``register`` must use ``replace_existing=True`` to avoid duplicate jobs."""
|
|
app = _make_app()
|
|
|
|
register(app)
|
|
|
|
_, kwargs = app.state.scheduler.add_job.call_args
|
|
assert kwargs["replace_existing"] is True
|
|
|
|
def test_register_passes_app_in_kwargs(self) -> None:
|
|
"""The scheduled job must receive ``app`` as a kwarg for state access."""
|
|
app = _make_app()
|
|
|
|
register(app)
|
|
|
|
_, kwargs = app.state.scheduler.add_job.call_args
|
|
assert kwargs["kwargs"] == {"app": app}
|
|
|
|
def test_register_initialises_last_activation_none(self) -> None:
|
|
"""``register`` must set ``app.state.last_activation = None``."""
|
|
app = _make_app()
|
|
|
|
register(app)
|
|
|
|
assert app.state.last_activation is None
|
|
|
|
def test_register_initialises_pending_recovery_none(self) -> None:
|
|
"""``register`` must set ``app.state.pending_recovery = None``."""
|
|
app = _make_app()
|
|
|
|
register(app)
|
|
|
|
assert app.state.pending_recovery is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Crash detection (Task 3)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCrashDetection:
|
|
"""Tests for activation-crash detection in _run_probe."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_crash_within_window_creates_pending_recovery(self) -> None:
|
|
"""An online→offline transition within 60 s of activation must set pending_recovery."""
|
|
app = _make_app(prev_online=True)
|
|
now = datetime.datetime.now(tz=datetime.timezone.utc)
|
|
app.state.last_activation = {
|
|
"jail_name": "sshd",
|
|
"at": now - datetime.timedelta(seconds=10),
|
|
}
|
|
app.state.pending_recovery = None
|
|
|
|
offline_status = ServerStatus(online=False)
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=offline_status,
|
|
):
|
|
await _run_probe(app)
|
|
|
|
assert app.state.pending_recovery is not None
|
|
assert isinstance(app.state.pending_recovery, PendingRecovery)
|
|
assert app.state.pending_recovery.jail_name == "sshd"
|
|
assert app.state.pending_recovery.recovered is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_crash_outside_window_does_not_create_pending_recovery(self) -> None:
|
|
"""A crash more than 60 s after activation must NOT set pending_recovery."""
|
|
app = _make_app(prev_online=True)
|
|
app.state.last_activation = {
|
|
"jail_name": "sshd",
|
|
"at": datetime.datetime.now(tz=datetime.timezone.utc)
|
|
- datetime.timedelta(seconds=120),
|
|
}
|
|
app.state.pending_recovery = None
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=ServerStatus(online=False),
|
|
):
|
|
await _run_probe(app)
|
|
|
|
assert app.state.pending_recovery is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_came_online_marks_pending_recovery_resolved(self) -> None:
|
|
"""An offline→online transition must mark an existing pending_recovery as recovered."""
|
|
app = _make_app(prev_online=False)
|
|
activated_at = datetime.datetime.now(tz=datetime.timezone.utc) - datetime.timedelta(seconds=30)
|
|
detected_at = datetime.datetime.now(tz=datetime.timezone.utc)
|
|
app.state.pending_recovery = PendingRecovery(
|
|
jail_name="sshd",
|
|
activated_at=activated_at,
|
|
detected_at=detected_at,
|
|
recovered=False,
|
|
)
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=ServerStatus(online=True),
|
|
):
|
|
await _run_probe(app)
|
|
|
|
assert app.state.pending_recovery.recovered is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_crash_without_recent_activation_does_nothing(self) -> None:
|
|
"""A crash when last_activation is None must not create a pending_recovery."""
|
|
app = _make_app(prev_online=True)
|
|
app.state.last_activation = None
|
|
app.state.pending_recovery = None
|
|
|
|
with patch(
|
|
"app.tasks.health_check.health_service.probe",
|
|
new_callable=AsyncMock,
|
|
return_value=ServerStatus(online=False),
|
|
):
|
|
await _run_probe(app)
|
|
|
|
assert app.state.pending_recovery is None
|