"""Tests for the geo cache flush background task. Validates that :func:`~app.tasks.geo_cache_flush._run_flush_with_resources` correctly delegates to :meth:`~app.services.geo_cache.GeoCache.flush_dirty` and only logs when entries were actually flushed, and that :func:`~app.tasks.geo_cache_flush.register` configures the APScheduler job with the correct interval and stable job ID. """ from __future__ import annotations from unittest.mock import AsyncMock, MagicMock, patch import pytest from app.services.geo_cache import GeoCache from app.tasks.geo_cache_flush import GEO_FLUSH_INTERVAL, JOB_ID, _run_flush, register # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_app(flush_count: int = 0) -> MagicMock: """Build a minimal mock ``app`` for geo cache flush task tests. Args: flush_count: The value returned by the mocked ``flush_dirty`` call. Returns: A :class:`unittest.mock.MagicMock` that mimics ``fastapi.FastAPI``. """ app = MagicMock() app.state.db = MagicMock() app.state.db.close = AsyncMock() app.state.scheduler = MagicMock() app.state.settings = MagicMock(database_path="/tmp/fake.db") app.state.runtime_settings = None return app # --------------------------------------------------------------------------- # Tests for _run_flush # --------------------------------------------------------------------------- class TestRunFlush: """Tests for :func:`~app.tasks.geo_cache_flush._run_flush`.""" @pytest.mark.asyncio async def test_run_flush_calls_flush_dirty_with_db(self) -> None: """``_run_flush_with_resources`` must call ``geo_cache.flush_dirty`` with a db.""" geo_cache = GeoCache() settings = MagicMock(database_path="/tmp/fake.db") with patch( "app.tasks.db.task_db", MagicMock( return_value=AsyncMock( __aenter__=AsyncMock(return_value=MagicMock()), __aexit__=AsyncMock(return_value=False), ) ), ), patch.object( geo_cache, "flush_dirty", new_callable=AsyncMock, return_value=0 ) as mock_flush: from app.tasks.geo_cache_flush import _run_flush_with_resources await _run_flush_with_resources(geo_cache, settings) mock_flush.assert_awaited_once() @pytest.mark.asyncio async def test_run_flush_logs_when_entries_flushed(self) -> None: """``_run_flush_with_resources`` must emit a debug log when ``flush_dirty`` returns > 0.""" geo_cache = GeoCache() settings = MagicMock(database_path="/tmp/fake.db") with patch( "app.tasks.db.task_db", MagicMock( return_value=AsyncMock( __aenter__=AsyncMock(return_value=MagicMock()), __aexit__=AsyncMock(return_value=False), ) ), ), patch.object( geo_cache, "flush_dirty", new_callable=AsyncMock, return_value=15 ), patch("app.tasks.geo_cache_flush.log") as mock_log: from app.tasks.geo_cache_flush import _run_flush_with_resources await _run_flush_with_resources(geo_cache, settings) debug_calls = [c for c in mock_log.debug.call_args_list if c[0][0] == "geo_cache_flush_ran"] assert len(debug_calls) == 1 assert debug_calls[0][1]["flushed"] == 15 @pytest.mark.asyncio async def test_run_flush_does_not_log_when_nothing_to_flush(self) -> None: """``_run_flush_with_resources`` must not emit any log when ``flush_dirty`` returns 0.""" geo_cache = GeoCache() settings = MagicMock(database_path="/tmp/fake.db") with patch( "app.tasks.db.task_db", MagicMock( return_value=AsyncMock( __aenter__=AsyncMock(return_value=MagicMock()), __aexit__=AsyncMock(return_value=False), ) ), ), patch.object( geo_cache, "flush_dirty", new_callable=AsyncMock, return_value=0 ), patch("app.tasks.geo_cache_flush.log") as mock_log: from app.tasks.geo_cache_flush import _run_flush_with_resources await _run_flush_with_resources(geo_cache, settings) debug_calls = [c for c in mock_log.debug.call_args_list if c[0][0] == "geo_cache_flush_ran"] assert debug_calls == [] # --------------------------------------------------------------------------- # Tests for register # --------------------------------------------------------------------------- class TestRegister: """Tests for :func:`~app.tasks.geo_cache_flush.register`.""" def test_register_adds_interval_job_to_scheduler(self) -> None: """``register`` must add a job with an ``"interval"`` trigger.""" app = _make_app() register(app) app.state.scheduler.add_job.assert_called_once() _, kwargs = app.state.scheduler.add_job.call_args assert kwargs["trigger"] == "interval" assert kwargs["seconds"] == GEO_FLUSH_INTERVAL def test_register_uses_stable_job_id(self) -> None: """``register`` must use the module-level ``JOB_ID`` constant.""" app = _make_app() register(app) _, kwargs = app.state.scheduler.add_job.call_args assert kwargs["id"] == JOB_ID def test_register_sets_replace_existing(self) -> None: """``register`` must use ``replace_existing=True`` to avoid duplicate jobs.""" app = _make_app() register(app) _, kwargs = app.state.scheduler.add_job.call_args assert kwargs["replace_existing"] is True def test_register_passes_settings_in_kwargs(self) -> None: """The scheduled job must receive geo_cache and settings as kwargs instead of app.""" app = _make_app() app.state.geo_cache = GeoCache() register(app) _, kwargs = app.state.scheduler.add_job.call_args assert "geo_cache" in kwargs["kwargs"] assert "settings" in kwargs["kwargs"]