Add tests for background tasks and fail2ban client utility

- tests/test_tasks/test_blocklist_import.py: 14 tests, 96% coverage
- tests/test_tasks/test_health_check.py: 12 tests, 100% coverage
- tests/test_tasks/test_geo_cache_flush.py: 8 tests, 100% coverage
- tests/test_services/test_fail2ban_client.py: 24 new tests, 96% coverage

Total: 50 new tests (628 → 678 passing). Overall coverage 85% → 87%.
ruff, mypy --strict, tsc, and eslint all clean.
This commit is contained in:
2026-03-13 10:29:22 +01:00
parent d0b8b78d12
commit d6da81131f
5 changed files with 1010 additions and 0 deletions

View File

@@ -0,0 +1,136 @@
"""Tests for the geo cache flush background task.
Validates that :func:`~app.tasks.geo_cache_flush._run_flush` correctly
delegates to :func:`~app.services.geo_service.flush_dirty` and only logs
when entries were actually flushed, and that
:func:`~app.tasks.geo_cache_flush.register` configures the APScheduler job
with the correct interval and stable job ID.
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.tasks.geo_cache_flush import GEO_FLUSH_INTERVAL, JOB_ID, _run_flush, register
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_app(flush_count: int = 0) -> MagicMock:
"""Build a minimal mock ``app`` for geo cache flush task tests.
Args:
flush_count: The value returned by the mocked ``flush_dirty`` call.
Returns:
A :class:`unittest.mock.MagicMock` that mimics ``fastapi.FastAPI``.
"""
app = MagicMock()
app.state.db = MagicMock()
app.state.scheduler = MagicMock()
return app
# ---------------------------------------------------------------------------
# Tests for _run_flush
# ---------------------------------------------------------------------------
class TestRunFlush:
"""Tests for :func:`~app.tasks.geo_cache_flush._run_flush`."""
@pytest.mark.asyncio
async def test_run_flush_calls_flush_dirty_with_db(self) -> None:
"""``_run_flush`` must call ``geo_service.flush_dirty`` with ``app.state.db``."""
app = _make_app()
with patch(
"app.tasks.geo_cache_flush.geo_service.flush_dirty",
new_callable=AsyncMock,
return_value=0,
) as mock_flush:
await _run_flush(app)
mock_flush.assert_awaited_once_with(app.state.db)
@pytest.mark.asyncio
async def test_run_flush_logs_when_entries_flushed(self) -> None:
"""``_run_flush`` must emit a debug log when ``flush_dirty`` returns > 0."""
app = _make_app()
with patch(
"app.tasks.geo_cache_flush.geo_service.flush_dirty",
new_callable=AsyncMock,
return_value=15,
), patch("app.tasks.geo_cache_flush.log") as mock_log:
await _run_flush(app)
debug_calls = [c for c in mock_log.debug.call_args_list if c[0][0] == "geo_cache_flush_ran"]
assert len(debug_calls) == 1
assert debug_calls[0][1]["flushed"] == 15
@pytest.mark.asyncio
async def test_run_flush_does_not_log_when_nothing_to_flush(self) -> None:
"""``_run_flush`` must not emit any log when ``flush_dirty`` returns 0."""
app = _make_app()
with patch(
"app.tasks.geo_cache_flush.geo_service.flush_dirty",
new_callable=AsyncMock,
return_value=0,
), patch("app.tasks.geo_cache_flush.log") as mock_log:
await _run_flush(app)
debug_calls = [c for c in mock_log.debug.call_args_list if c[0][0] == "geo_cache_flush_ran"]
assert debug_calls == []
# ---------------------------------------------------------------------------
# Tests for register
# ---------------------------------------------------------------------------
class TestRegister:
"""Tests for :func:`~app.tasks.geo_cache_flush.register`."""
def test_register_adds_interval_job_to_scheduler(self) -> None:
"""``register`` must add a job with an ``"interval"`` trigger."""
app = _make_app()
register(app)
app.state.scheduler.add_job.assert_called_once()
_, kwargs = app.state.scheduler.add_job.call_args
assert kwargs["trigger"] == "interval"
assert kwargs["seconds"] == GEO_FLUSH_INTERVAL
def test_register_uses_stable_job_id(self) -> None:
"""``register`` must use the module-level ``JOB_ID`` constant."""
app = _make_app()
register(app)
_, kwargs = app.state.scheduler.add_job.call_args
assert kwargs["id"] == JOB_ID
def test_register_sets_replace_existing(self) -> None:
"""``register`` must use ``replace_existing=True`` to avoid duplicate jobs."""
app = _make_app()
register(app)
_, kwargs = app.state.scheduler.add_job.call_args
assert kwargs["replace_existing"] is True
def test_register_passes_app_in_kwargs(self) -> None:
"""The scheduled job must receive ``app`` as a kwarg for state access."""
app = _make_app()
register(app)
_, kwargs = app.state.scheduler.add_job.call_args
assert kwargs["kwargs"] == {"app": app}