167 lines
6.1 KiB
Python
167 lines
6.1 KiB
Python
"""Tests for the geo cache flush background task.
|
|
|
|
Validates that :func:`~app.tasks.geo_cache_flush._run_flush_with_resources` correctly
|
|
delegates to :meth:`~app.services.geo_cache.GeoCache.flush_dirty` and only logs
|
|
when entries were actually flushed, and that
|
|
:func:`~app.tasks.geo_cache_flush.register` configures the APScheduler job
|
|
with the correct interval and stable job ID.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from app.services.geo_cache import GeoCache
|
|
from app.tasks.geo_cache_flush import GEO_FLUSH_INTERVAL, JOB_ID, _run_flush, register
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_app(flush_count: int = 0) -> MagicMock:
|
|
"""Build a minimal mock ``app`` for geo cache flush task tests.
|
|
|
|
Args:
|
|
flush_count: The value returned by the mocked ``flush_dirty`` call.
|
|
|
|
Returns:
|
|
A :class:`unittest.mock.MagicMock` that mimics ``fastapi.FastAPI``.
|
|
"""
|
|
app = MagicMock()
|
|
app.state.db = MagicMock()
|
|
app.state.db.close = AsyncMock()
|
|
app.state.scheduler = MagicMock()
|
|
app.state.settings = MagicMock(database_path="/tmp/fake.db")
|
|
app.state.runtime_settings = None
|
|
return app
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests for _run_flush
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRunFlush:
|
|
"""Tests for :func:`~app.tasks.geo_cache_flush._run_flush`."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_flush_calls_flush_dirty_with_db(self) -> None:
|
|
"""``_run_flush_with_resources`` must call ``geo_cache.flush_dirty`` with a db."""
|
|
geo_cache = GeoCache()
|
|
settings = MagicMock(database_path="/tmp/fake.db")
|
|
|
|
with patch(
|
|
"app.tasks.db.task_db",
|
|
MagicMock(
|
|
return_value=AsyncMock(
|
|
__aenter__=AsyncMock(return_value=MagicMock()),
|
|
__aexit__=AsyncMock(return_value=False),
|
|
)
|
|
),
|
|
), patch.object(
|
|
geo_cache, "flush_dirty", new_callable=AsyncMock, return_value=0
|
|
) as mock_flush:
|
|
from app.tasks.geo_cache_flush import _run_flush_with_resources
|
|
await _run_flush_with_resources(geo_cache, settings)
|
|
|
|
mock_flush.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_flush_logs_when_entries_flushed(self) -> None:
|
|
"""``_run_flush_with_resources`` must emit a debug log when ``flush_dirty`` returns > 0."""
|
|
geo_cache = GeoCache()
|
|
settings = MagicMock(database_path="/tmp/fake.db")
|
|
|
|
with patch(
|
|
"app.tasks.db.task_db",
|
|
MagicMock(
|
|
return_value=AsyncMock(
|
|
__aenter__=AsyncMock(return_value=MagicMock()),
|
|
__aexit__=AsyncMock(return_value=False),
|
|
)
|
|
),
|
|
), patch.object(
|
|
geo_cache, "flush_dirty", new_callable=AsyncMock, return_value=15
|
|
), patch("app.tasks.geo_cache_flush.log") as mock_log:
|
|
from app.tasks.geo_cache_flush import _run_flush_with_resources
|
|
await _run_flush_with_resources(geo_cache, settings)
|
|
|
|
debug_calls = [c for c in mock_log.debug.call_args_list if c[0][0] == "geo_cache_flush_ran"]
|
|
assert len(debug_calls) == 1
|
|
assert debug_calls[0][1]["flushed"] == 15
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_flush_does_not_log_when_nothing_to_flush(self) -> None:
|
|
"""``_run_flush_with_resources`` must not emit any log when ``flush_dirty`` returns 0."""
|
|
geo_cache = GeoCache()
|
|
settings = MagicMock(database_path="/tmp/fake.db")
|
|
|
|
with patch(
|
|
"app.tasks.db.task_db",
|
|
MagicMock(
|
|
return_value=AsyncMock(
|
|
__aenter__=AsyncMock(return_value=MagicMock()),
|
|
__aexit__=AsyncMock(return_value=False),
|
|
)
|
|
),
|
|
), patch.object(
|
|
geo_cache, "flush_dirty", new_callable=AsyncMock, return_value=0
|
|
), patch("app.tasks.geo_cache_flush.log") as mock_log:
|
|
from app.tasks.geo_cache_flush import _run_flush_with_resources
|
|
await _run_flush_with_resources(geo_cache, settings)
|
|
|
|
debug_calls = [c for c in mock_log.debug.call_args_list if c[0][0] == "geo_cache_flush_ran"]
|
|
assert debug_calls == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests for register
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRegister:
|
|
"""Tests for :func:`~app.tasks.geo_cache_flush.register`."""
|
|
|
|
def test_register_adds_interval_job_to_scheduler(self) -> None:
|
|
"""``register`` must add a job with an ``"interval"`` trigger."""
|
|
app = _make_app()
|
|
|
|
register(app)
|
|
|
|
app.state.scheduler.add_job.assert_called_once()
|
|
_, kwargs = app.state.scheduler.add_job.call_args
|
|
assert kwargs["trigger"] == "interval"
|
|
assert kwargs["seconds"] == GEO_FLUSH_INTERVAL
|
|
|
|
def test_register_uses_stable_job_id(self) -> None:
|
|
"""``register`` must use the module-level ``JOB_ID`` constant."""
|
|
app = _make_app()
|
|
|
|
register(app)
|
|
|
|
_, kwargs = app.state.scheduler.add_job.call_args
|
|
assert kwargs["id"] == JOB_ID
|
|
|
|
def test_register_sets_replace_existing(self) -> None:
|
|
"""``register`` must use ``replace_existing=True`` to avoid duplicate jobs."""
|
|
app = _make_app()
|
|
|
|
register(app)
|
|
|
|
_, kwargs = app.state.scheduler.add_job.call_args
|
|
assert kwargs["replace_existing"] is True
|
|
|
|
def test_register_passes_settings_in_kwargs(self) -> None:
|
|
"""The scheduled job must receive geo_cache and settings as kwargs instead of app."""
|
|
app = _make_app()
|
|
app.state.geo_cache = GeoCache()
|
|
|
|
register(app)
|
|
|
|
_, kwargs = app.state.scheduler.add_job.call_args
|
|
assert "geo_cache" in kwargs["kwargs"]
|
|
assert "settings" in kwargs["kwargs"]
|