Refactor blocklist import registration to async startup flow
This commit is contained in:
@@ -69,6 +69,7 @@ Reference: `Docs/Refactoring.md` for full analysis of each issue.
|
|||||||
- Issue: `blocklist_import.register` uses `asyncio.get_event_loop().run_until_complete` then falls back to `asyncio.ensure_future`, which is fragile and can fail if the application is already running inside an event loop.
|
- Issue: `blocklist_import.register` uses `asyncio.get_event_loop().run_until_complete` then falls back to `asyncio.ensure_future`, which is fragile and can fail if the application is already running inside an event loop.
|
||||||
- Propose: Refactor blocklist job registration to use a pure async bootstrap flow invoked from the lifespan handler, or resolve schedule configuration before starting the scheduler instead of forcing synchronous loop execution.
|
- Propose: Refactor blocklist job registration to use a pure async bootstrap flow invoked from the lifespan handler, or resolve schedule configuration before starting the scheduler instead of forcing synchronous loop execution.
|
||||||
- Test: Add tests validating blocklist job registration in both non-running and running event loop contexts, and verify the registration path does not raise loop state errors.
|
- Test: Add tests validating blocklist job registration in both non-running and running event loop contexts, and verify the registration path does not raise loop state errors.
|
||||||
|
- Status: completed
|
||||||
|
|
||||||
10. Standardize async offloading and thread-backed sync work behind a shared executor abstraction
|
10. Standardize async offloading and thread-backed sync work behind a shared executor abstraction
|
||||||
- Goal: Remove scattered `asyncio.get_event_loop().run_in_executor` / `asyncio.ensure_future` patterns and give the backend a single, consistent way to run blocking file, CPU-bound, or compatibility work.
|
- Goal: Remove scattered `asyncio.get_event_loop().run_in_executor` / `asyncio.ensure_future` patterns and give the backend a single, consistent way to run blocking file, CPU-bound, or compatibility work.
|
||||||
|
|||||||
@@ -123,7 +123,7 @@ async def startup_shared_resources(
|
|||||||
scheduler.start()
|
scheduler.start()
|
||||||
|
|
||||||
health_check.register(app)
|
health_check.register(app)
|
||||||
blocklist_import.register(app)
|
await blocklist_import.register(app)
|
||||||
geo_cache_flush.register(app)
|
geo_cache_flush.register(app)
|
||||||
geo_re_resolve.register(app)
|
geo_re_resolve.register(app)
|
||||||
history_sync.register(app)
|
history_sync.register(app)
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ existing entry without creating duplicates.
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import inspect
|
|
||||||
from typing import TYPE_CHECKING, Any
|
from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
import structlog
|
import structlog
|
||||||
@@ -76,7 +75,7 @@ async def _run_import(app: Any) -> None:
|
|||||||
await db.close()
|
await db.close()
|
||||||
|
|
||||||
|
|
||||||
def register(app: FastAPI) -> None:
|
async def register(app: FastAPI) -> None:
|
||||||
"""Add (or replace) the blocklist import job in the application scheduler.
|
"""Add (or replace) the blocklist import job in the application scheduler.
|
||||||
|
|
||||||
Reads the persisted :class:`~app.models.blocklist.ScheduleConfig` from
|
Reads the persisted :class:`~app.models.blocklist.ScheduleConfig` from
|
||||||
@@ -89,30 +88,14 @@ def register(app: FastAPI) -> None:
|
|||||||
app: The :class:`fastapi.FastAPI` application instance whose
|
app: The :class:`fastapi.FastAPI` application instance whose
|
||||||
``app.state.scheduler`` will receive the job.
|
``app.state.scheduler`` will receive the job.
|
||||||
"""
|
"""
|
||||||
import asyncio # noqa: PLC0415
|
db, close_db = await _get_db(app)
|
||||||
|
|
||||||
async def _do_register() -> None:
|
|
||||||
db, close_db = await _get_db(app)
|
|
||||||
try:
|
|
||||||
config = await blocklist_service.get_schedule(db)
|
|
||||||
finally:
|
|
||||||
if close_db:
|
|
||||||
await db.close()
|
|
||||||
_apply_schedule(app, config)
|
|
||||||
|
|
||||||
# APScheduler is synchronous at registration time; use asyncio to read
|
|
||||||
# the stored schedule from the DB before registering.
|
|
||||||
coro = None
|
|
||||||
try:
|
try:
|
||||||
loop = asyncio.get_event_loop()
|
config = await blocklist_service.get_schedule(db)
|
||||||
coro = _do_register()
|
finally:
|
||||||
loop.run_until_complete(coro)
|
if close_db:
|
||||||
except RuntimeError:
|
await db.close()
|
||||||
# If the current thread already has a running loop (uvicorn), schedule
|
|
||||||
# the registration as a coroutine.
|
_apply_schedule(app, config)
|
||||||
if coro is not None and inspect.getcoroutinestate(coro) != inspect.CORO_CLOSED:
|
|
||||||
coro.close()
|
|
||||||
asyncio.ensure_future(_do_register())
|
|
||||||
|
|
||||||
|
|
||||||
def reschedule(app: FastAPI) -> None:
|
def reschedule(app: FastAPI) -> None:
|
||||||
|
|||||||
@@ -300,10 +300,9 @@ class TestApplySchedule:
|
|||||||
class TestRegister:
|
class TestRegister:
|
||||||
"""Tests for :func:`~app.tasks.blocklist_import.register`."""
|
"""Tests for :func:`~app.tasks.blocklist_import.register`."""
|
||||||
|
|
||||||
def test_register_calls_apply_schedule_via_event_loop(self) -> None:
|
@pytest.mark.asyncio
|
||||||
|
async def test_register_calls_apply_schedule(self) -> None:
|
||||||
"""``register`` must call ``_apply_schedule`` after reading the stored config."""
|
"""``register`` must call ``_apply_schedule`` after reading the stored config."""
|
||||||
import asyncio
|
|
||||||
|
|
||||||
from app.tasks.blocklist_import import register
|
from app.tasks.blocklist_import import register
|
||||||
|
|
||||||
app = MagicMock()
|
app = MagicMock()
|
||||||
@@ -324,52 +323,10 @@ class TestRegister:
|
|||||||
new_callable=AsyncMock,
|
new_callable=AsyncMock,
|
||||||
return_value=config,
|
return_value=config,
|
||||||
), patch("app.tasks.blocklist_import._apply_schedule") as mock_apply:
|
), patch("app.tasks.blocklist_import._apply_schedule") as mock_apply:
|
||||||
# Use a fresh event loop to avoid interference from pytest-asyncio.
|
await register(app)
|
||||||
loop = asyncio.new_event_loop()
|
|
||||||
try:
|
|
||||||
with patch("asyncio.get_event_loop", return_value=loop):
|
|
||||||
register(app)
|
|
||||||
finally:
|
|
||||||
loop.close()
|
|
||||||
|
|
||||||
mock_apply.assert_called_once_with(app, config)
|
mock_apply.assert_called_once_with(app, config)
|
||||||
|
|
||||||
def test_register_falls_back_to_ensure_future_on_runtime_error(self) -> None:
|
|
||||||
"""When ``run_until_complete`` raises ``RuntimeError``, ``ensure_future`` is used."""
|
|
||||||
from app.tasks.blocklist_import import register
|
|
||||||
|
|
||||||
app = MagicMock()
|
|
||||||
app.state.db = MagicMock()
|
|
||||||
app.state.db.close = AsyncMock()
|
|
||||||
app.state.settings = MagicMock(database_path="/tmp/fake.db")
|
|
||||||
app.state.scheduler = MagicMock()
|
|
||||||
|
|
||||||
config = ScheduleConfig(frequency=ScheduleFrequency.daily)
|
|
||||||
|
|
||||||
mock_loop = MagicMock()
|
|
||||||
mock_loop.run_until_complete.side_effect = RuntimeError("already running")
|
|
||||||
|
|
||||||
def _close_coro(coro: Any) -> None:
|
|
||||||
coro.close()
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch(
|
|
||||||
"app.tasks.blocklist_import.open_db",
|
|
||||||
new_callable=AsyncMock,
|
|
||||||
return_value=app.state.db,
|
|
||||||
),
|
|
||||||
patch(
|
|
||||||
"app.tasks.blocklist_import.blocklist_service.get_schedule",
|
|
||||||
new_callable=AsyncMock,
|
|
||||||
return_value=config,
|
|
||||||
),
|
|
||||||
patch("asyncio.get_event_loop", return_value=mock_loop),
|
|
||||||
patch("asyncio.ensure_future", side_effect=_close_coro) as mock_ensure_future,
|
|
||||||
):
|
|
||||||
register(app)
|
|
||||||
|
|
||||||
mock_ensure_future.assert_called_once()
|
|
||||||
|
|
||||||
|
|
||||||
class TestReschedule:
|
class TestReschedule:
|
||||||
"""Tests for :func:`~app.tasks.blocklist_import.reschedule`."""
|
"""Tests for :func:`~app.tasks.blocklist_import.reschedule`."""
|
||||||
|
|||||||
Reference in New Issue
Block a user