Refactor geo caching and service layer tests

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-04-25 18:15:31 +02:00
parent 654dbdb000
commit d467190eb1
7 changed files with 218 additions and 150 deletions

View File

@@ -1,7 +1,7 @@
"""Tests for the geo cache flush background task.
Validates that :func:`~app.tasks.geo_cache_flush._run_flush` correctly
delegates to :func:`~app.services.geo_service.flush_dirty` and only logs
Validates that :func:`~app.tasks.geo_cache_flush._run_flush_with_resources` correctly
delegates to :meth:`~app.services.geo_cache.GeoCache.flush_dirty` and only logs
when entries were actually flushed, and that
:func:`~app.tasks.geo_cache_flush.register` configures the APScheduler job
with the correct interval and stable job ID.
@@ -13,6 +13,7 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.services.geo_cache import GeoCache
from app.tasks.geo_cache_flush import GEO_FLUSH_INTERVAL, JOB_ID, _run_flush, register
# ---------------------------------------------------------------------------
@@ -48,37 +49,45 @@ class TestRunFlush:
@pytest.mark.asyncio
async def test_run_flush_calls_flush_dirty_with_db(self) -> None:
"""``_run_flush`` must call ``geo_service.flush_dirty`` with ``app.state.db``."""
app = _make_app()
"""``_run_flush_with_resources`` must call ``geo_cache.flush_dirty`` with a db."""
geo_cache = GeoCache()
settings = MagicMock(database_path="/tmp/fake.db")
with patch(
"app.tasks.db.open_db",
new_callable=AsyncMock,
return_value=app.state.db,
), patch(
"app.tasks.geo_cache_flush.geo_service.flush_dirty",
new_callable=AsyncMock,
return_value=0,
"app.tasks.db.task_db",
MagicMock(
return_value=AsyncMock(
__aenter__=AsyncMock(return_value=MagicMock()),
__aexit__=AsyncMock(return_value=False),
)
),
), patch.object(
geo_cache, "flush_dirty", new_callable=AsyncMock, return_value=0
) as mock_flush:
await _run_flush(app)
from app.tasks.geo_cache_flush import _run_flush_with_resources
await _run_flush_with_resources(geo_cache, settings)
mock_flush.assert_awaited_once_with(app.state.db)
mock_flush.assert_awaited_once()
@pytest.mark.asyncio
async def test_run_flush_logs_when_entries_flushed(self) -> None:
"""``_run_flush`` must emit a debug log when ``flush_dirty`` returns > 0."""
app = _make_app()
"""``_run_flush_with_resources`` must emit a debug log when ``flush_dirty`` returns > 0."""
geo_cache = GeoCache()
settings = MagicMock(database_path="/tmp/fake.db")
with patch(
"app.tasks.db.open_db",
new_callable=AsyncMock,
return_value=app.state.db,
), patch(
"app.tasks.geo_cache_flush.geo_service.flush_dirty",
new_callable=AsyncMock,
return_value=15,
"app.tasks.db.task_db",
MagicMock(
return_value=AsyncMock(
__aenter__=AsyncMock(return_value=MagicMock()),
__aexit__=AsyncMock(return_value=False),
)
),
), patch.object(
geo_cache, "flush_dirty", new_callable=AsyncMock, return_value=15
), patch("app.tasks.geo_cache_flush.log") as mock_log:
await _run_flush(app)
from app.tasks.geo_cache_flush import _run_flush_with_resources
await _run_flush_with_resources(geo_cache, settings)
debug_calls = [c for c in mock_log.debug.call_args_list if c[0][0] == "geo_cache_flush_ran"]
assert len(debug_calls) == 1
@@ -86,19 +95,23 @@ class TestRunFlush:
@pytest.mark.asyncio
async def test_run_flush_does_not_log_when_nothing_to_flush(self) -> None:
"""``_run_flush`` must not emit any log when ``flush_dirty`` returns 0."""
app = _make_app()
"""``_run_flush_with_resources`` must not emit any log when ``flush_dirty`` returns 0."""
geo_cache = GeoCache()
settings = MagicMock(database_path="/tmp/fake.db")
with patch(
"app.tasks.db.open_db",
new_callable=AsyncMock,
return_value=app.state.db,
), patch(
"app.tasks.geo_cache_flush.geo_service.flush_dirty",
new_callable=AsyncMock,
return_value=0,
"app.tasks.db.task_db",
MagicMock(
return_value=AsyncMock(
__aenter__=AsyncMock(return_value=MagicMock()),
__aexit__=AsyncMock(return_value=False),
)
),
), patch.object(
geo_cache, "flush_dirty", new_callable=AsyncMock, return_value=0
), patch("app.tasks.geo_cache_flush.log") as mock_log:
await _run_flush(app)
from app.tasks.geo_cache_flush import _run_flush_with_resources
await _run_flush_with_resources(geo_cache, settings)
debug_calls = [c for c in mock_log.debug.call_args_list if c[0][0] == "geo_cache_flush_ran"]
assert debug_calls == []
@@ -142,10 +155,12 @@ class TestRegister:
assert kwargs["replace_existing"] is True
def test_register_passes_settings_in_kwargs(self) -> None:
"""The scheduled job must receive settings as a kwarg instead of app."""
"""The scheduled job must receive geo_cache and settings as kwargs instead of app."""
app = _make_app()
app.state.geo_cache = GeoCache()
register(app)
_, kwargs = app.state.scheduler.add_job.call_args
assert kwargs["kwargs"] == {"settings": app.state.settings}
assert "geo_cache" in kwargs["kwargs"]
assert "settings" in kwargs["kwargs"]