From 9cba5a9fcb268298047c56acfec8f593ebc0a1c7 Mon Sep 17 00:00:00 2001 From: Lukas Date: Sat, 11 Apr 2026 20:07:00 +0200 Subject: [PATCH] Refactor blocklist import registration to async startup flow --- Docs/Tasks.md | 1 + backend/app/startup.py | 2 +- backend/app/tasks/blocklist_import.py | 33 +++---------- .../tests/test_tasks/test_blocklist_import.py | 49 ++----------------- 4 files changed, 13 insertions(+), 72 deletions(-) diff --git a/Docs/Tasks.md b/Docs/Tasks.md index 899291d..0b70d56 100644 --- a/Docs/Tasks.md +++ b/Docs/Tasks.md @@ -69,6 +69,7 @@ Reference: `Docs/Refactoring.md` for full analysis of each issue. - Issue: `blocklist_import.register` uses `asyncio.get_event_loop().run_until_complete` then falls back to `asyncio.ensure_future`, which is fragile and can fail if the application is already running inside an event loop. - Propose: Refactor blocklist job registration to use a pure async bootstrap flow invoked from the lifespan handler, or resolve schedule configuration before starting the scheduler instead of forcing synchronous loop execution. - Test: Add tests validating blocklist job registration in both non-running and running event loop contexts, and verify the registration path does not raise loop state errors. + - Status: completed 10. Standardize async offloading and thread-backed sync work behind a shared executor abstraction - Goal: Remove scattered `asyncio.get_event_loop().run_in_executor` / `asyncio.ensure_future` patterns and give the backend a single, consistent way to run blocking file, CPU-bound, or compatibility work. diff --git a/backend/app/startup.py b/backend/app/startup.py index d6c7b9a..28d9f9f 100644 --- a/backend/app/startup.py +++ b/backend/app/startup.py @@ -123,7 +123,7 @@ async def startup_shared_resources( scheduler.start() health_check.register(app) - blocklist_import.register(app) + await blocklist_import.register(app) geo_cache_flush.register(app) geo_re_resolve.register(app) history_sync.register(app) diff --git a/backend/app/tasks/blocklist_import.py b/backend/app/tasks/blocklist_import.py index e905f73..03ab51a 100644 --- a/backend/app/tasks/blocklist_import.py +++ b/backend/app/tasks/blocklist_import.py @@ -13,7 +13,6 @@ existing entry without creating duplicates. from __future__ import annotations -import inspect from typing import TYPE_CHECKING, Any import structlog @@ -76,7 +75,7 @@ async def _run_import(app: Any) -> None: await db.close() -def register(app: FastAPI) -> None: +async def register(app: FastAPI) -> None: """Add (or replace) the blocklist import job in the application scheduler. Reads the persisted :class:`~app.models.blocklist.ScheduleConfig` from @@ -89,30 +88,14 @@ def register(app: FastAPI) -> None: app: The :class:`fastapi.FastAPI` application instance whose ``app.state.scheduler`` will receive the job. """ - import asyncio # noqa: PLC0415 - - async def _do_register() -> None: - db, close_db = await _get_db(app) - try: - config = await blocklist_service.get_schedule(db) - finally: - if close_db: - await db.close() - _apply_schedule(app, config) - - # APScheduler is synchronous at registration time; use asyncio to read - # the stored schedule from the DB before registering. - coro = None + db, close_db = await _get_db(app) try: - loop = asyncio.get_event_loop() - coro = _do_register() - loop.run_until_complete(coro) - except RuntimeError: - # If the current thread already has a running loop (uvicorn), schedule - # the registration as a coroutine. - if coro is not None and inspect.getcoroutinestate(coro) != inspect.CORO_CLOSED: - coro.close() - asyncio.ensure_future(_do_register()) + config = await blocklist_service.get_schedule(db) + finally: + if close_db: + await db.close() + + _apply_schedule(app, config) def reschedule(app: FastAPI) -> None: diff --git a/backend/tests/test_tasks/test_blocklist_import.py b/backend/tests/test_tasks/test_blocklist_import.py index a55fc4a..246aec4 100644 --- a/backend/tests/test_tasks/test_blocklist_import.py +++ b/backend/tests/test_tasks/test_blocklist_import.py @@ -300,10 +300,9 @@ class TestApplySchedule: class TestRegister: """Tests for :func:`~app.tasks.blocklist_import.register`.""" - def test_register_calls_apply_schedule_via_event_loop(self) -> None: + @pytest.mark.asyncio + async def test_register_calls_apply_schedule(self) -> None: """``register`` must call ``_apply_schedule`` after reading the stored config.""" - import asyncio - from app.tasks.blocklist_import import register app = MagicMock() @@ -324,52 +323,10 @@ class TestRegister: new_callable=AsyncMock, return_value=config, ), patch("app.tasks.blocklist_import._apply_schedule") as mock_apply: - # Use a fresh event loop to avoid interference from pytest-asyncio. - loop = asyncio.new_event_loop() - try: - with patch("asyncio.get_event_loop", return_value=loop): - register(app) - finally: - loop.close() + await register(app) mock_apply.assert_called_once_with(app, config) - def test_register_falls_back_to_ensure_future_on_runtime_error(self) -> None: - """When ``run_until_complete`` raises ``RuntimeError``, ``ensure_future`` is used.""" - from app.tasks.blocklist_import import register - - app = MagicMock() - app.state.db = MagicMock() - app.state.db.close = AsyncMock() - app.state.settings = MagicMock(database_path="/tmp/fake.db") - app.state.scheduler = MagicMock() - - config = ScheduleConfig(frequency=ScheduleFrequency.daily) - - mock_loop = MagicMock() - mock_loop.run_until_complete.side_effect = RuntimeError("already running") - - def _close_coro(coro: Any) -> None: - coro.close() - - with ( - patch( - "app.tasks.blocklist_import.open_db", - new_callable=AsyncMock, - return_value=app.state.db, - ), - patch( - "app.tasks.blocklist_import.blocklist_service.get_schedule", - new_callable=AsyncMock, - return_value=config, - ), - patch("asyncio.get_event_loop", return_value=mock_loop), - patch("asyncio.ensure_future", side_effect=_close_coro) as mock_ensure_future, - ): - register(app) - - mock_ensure_future.assert_called_once() - class TestReschedule: """Tests for :func:`~app.tasks.blocklist_import.reschedule`."""