diff --git a/Docs/Tasks.md b/Docs/Tasks.md index 0b70d56..3017889 100644 --- a/Docs/Tasks.md +++ b/Docs/Tasks.md @@ -6,7 +6,8 @@ Reference: `Docs/Refactoring.md` for full analysis of each issue. --- -## Open Issues +## Completed Issues + 1. Move runtime application state out of `app.state` - Goal: Remove process-local mutable business state from FastAPI application state and centralise it in a cluster-safe, testable runtime state abstraction. @@ -63,6 +64,8 @@ Reference: `Docs/Refactoring.md` for full analysis of each issue. - Issue: `history_sync`, `geo_re_resolve`, `geo_cache_flush`, and `blocklist_import` read shared resources directly from `app.state` and register jobs against `app.state.scheduler`, which couples task implementation to FastAPI internals. - Propose: Introduce a scheduler/task bootstrap abstraction that accepts injected settings, database providers, HTTP sessions, and scheduler handles, and move task resource access out of the `app.state` internals. - Test: Add unit tests for task registration and execution with fake resource providers and a mock scheduler, without needing a real FastAPI instance. + - Status: completed + - Completed: Refactored background task modules to accept explicit scheduler job resources, removed direct `app.state` dependency from task execution callbacks, and updated unit tests to verify registration with injected settings, http session, and runtime state. 9. Remove brittle scheduler bootstrap logic from blocklist job registration - Goal: Ensure job registration is deterministic and compatible with both synchronous and asynchronous startup contexts. @@ -118,6 +121,6 @@ Reference: `Docs/Refactoring.md` for full analysis of each issue. - Issue: `startup_shared_resources` creates shared resources like `aiohttp.ClientSession` and geo cache initialization from the initial environment-loaded settings, then later applies persisted runtime overrides to `app.state.settings`, producing a fragile startup ordering. - Propose: Split startup into phases that first resolve bootstrap and runtime persisted configuration, then construct shared resources and register scheduled jobs using those effective settings. - Test: Add startup tests asserting that when persisted runtime settings differ from bootstrap settings, the final initialized resources are built from the resolved effective settings, not the original bootstrap values. - - Status: completed + diff --git a/backend/app/tasks/blocklist_import.py b/backend/app/tasks/blocklist_import.py index 03ab51a..f8b5c5e 100644 --- a/backend/app/tasks/blocklist_import.py +++ b/backend/app/tasks/blocklist_import.py @@ -24,6 +24,8 @@ from app.utils.runtime_state import get_effective_settings if TYPE_CHECKING: import aiosqlite + from aiohttp import ClientSession + from app.config import Settings if TYPE_CHECKING: from fastapi import FastAPI @@ -34,25 +36,19 @@ log: structlog.stdlib.BoundLogger = structlog.get_logger() JOB_ID: str = "blocklist_import" -async def _get_db(app: Any) -> tuple[aiosqlite.Connection, bool]: - settings = get_effective_settings(app) +async def _get_db(settings: "Settings") -> tuple[aiosqlite.Connection, bool]: db = await open_db(settings.database_path) return db, True -async def _run_import(app: Any) -> None: +async def _run_import_with_resources(settings: "Settings", http_session: "ClientSession") -> None: """APScheduler callback that imports all enabled blocklist sources. - Reads shared resources from ``app.state`` and delegates to - :func:`~app.services.blocklist_service.import_all`. - Args: - app: The :class:`fastapi.FastAPI` application instance passed via - APScheduler ``kwargs``. + settings: The resolved application settings used for database access. + http_session: The shared aiohttp session used for blocklist downloads. """ - db, close_db = await _get_db(app) - settings = get_effective_settings(app) - http_session = app.state.http_session + db, close_db = await _get_db(settings) socket_path: str = settings.fail2ban_socket log.info("blocklist_import_starting") @@ -75,6 +71,10 @@ async def _run_import(app: Any) -> None: await db.close() +async def _run_import(app: FastAPI) -> None: + await _run_import_with_resources(get_effective_settings(app), app.state.http_session) + + async def register(app: FastAPI) -> None: """Add (or replace) the blocklist import job in the application scheduler. @@ -88,7 +88,8 @@ async def register(app: FastAPI) -> None: app: The :class:`fastapi.FastAPI` application instance whose ``app.state.scheduler`` will receive the job. """ - db, close_db = await _get_db(app) + settings = get_effective_settings(app) + db, close_db = await _get_db(settings) try: config = await blocklist_service.get_schedule(db) finally: @@ -110,7 +111,8 @@ def reschedule(app: FastAPI) -> None: import asyncio # noqa: PLC0415 async def _do_reschedule() -> None: - db, close_db = await _get_db(app) + settings = get_effective_settings(app) + db, close_db = await _get_db(settings) try: config = await blocklist_service.get_schedule(db) finally: @@ -130,7 +132,10 @@ def _apply_schedule(app: FastAPI, config: Any) -> None: """ scheduler = app.state.scheduler - kwargs: dict[str, Any] = {"app": app} + kwargs: dict[str, Any] = { + "settings": get_effective_settings(app), + "http_session": app.state.http_session, + } trigger_type: str trigger_kwargs: dict[str, Any] @@ -156,7 +161,7 @@ def _apply_schedule(app: FastAPI, config: Any) -> None: scheduler.remove_job(JOB_ID) scheduler.add_job( - _run_import, + _run_import_with_resources, trigger=trigger_type, id=JOB_ID, kwargs=kwargs, diff --git a/backend/app/tasks/geo_cache_flush.py b/backend/app/tasks/geo_cache_flush.py index 5614d9d..a8449a9 100644 --- a/backend/app/tasks/geo_cache_flush.py +++ b/backend/app/tasks/geo_cache_flush.py @@ -20,6 +20,7 @@ from app.utils.runtime_state import get_effective_settings if TYPE_CHECKING: import aiosqlite + from app.config import Settings from app.services import geo_service if TYPE_CHECKING: @@ -34,23 +35,18 @@ GEO_FLUSH_INTERVAL: int = 60 JOB_ID: str = "geo_cache_flush" -async def _get_db(app: Any) -> tuple[aiosqlite.Connection, bool]: - settings = get_effective_settings(app) +async def _get_db(settings: "Settings") -> tuple[aiosqlite.Connection, bool]: db = await open_db(settings.database_path) return db, True -async def _run_flush(app: Any) -> None: +async def _run_flush_with_settings(settings: "Settings") -> None: """Flush the geo service dirty set to the application database. - Reads shared resources from ``app.state`` and delegates to - :func:`~app.services.geo_service.flush_dirty`. - Args: - app: The :class:`fastapi.FastAPI` application instance passed via - APScheduler ``kwargs``. + settings: The resolved application settings used for database access. """ - db, close_db = await _get_db(app) + db, close_db = await _get_db(settings) try: count = await geo_service.flush_dirty(db) finally: @@ -61,6 +57,10 @@ async def _run_flush(app: Any) -> None: log.debug("geo_cache_flush_ran", flushed=count) +async def _run_flush(app: FastAPI) -> None: + await _run_flush_with_settings(get_effective_settings(app)) + + def register(app: FastAPI) -> None: """Add (or replace) the geo cache flush job in the application scheduler. @@ -71,11 +71,12 @@ def register(app: FastAPI) -> None: app: The :class:`fastapi.FastAPI` application instance whose ``app.state.scheduler`` will receive the job. """ + settings = get_effective_settings(app) app.state.scheduler.add_job( - _run_flush, + _run_flush_with_settings, trigger="interval", seconds=GEO_FLUSH_INTERVAL, - kwargs={"app": app}, + kwargs={"settings": settings}, id=JOB_ID, replace_existing=True, ) diff --git a/backend/app/tasks/geo_re_resolve.py b/backend/app/tasks/geo_re_resolve.py index 2894057..770fe0d 100644 --- a/backend/app/tasks/geo_re_resolve.py +++ b/backend/app/tasks/geo_re_resolve.py @@ -26,6 +26,8 @@ from app.utils.runtime_state import get_effective_settings if TYPE_CHECKING: import aiosqlite + from aiohttp import ClientSession + from app.config import Settings from app.services import geo_service if TYPE_CHECKING: @@ -40,24 +42,19 @@ GEO_RE_RESOLVE_INTERVAL: int = 600 JOB_ID: str = "geo_re_resolve" -async def _get_db(app: FastAPI) -> tuple[aiosqlite.Connection, bool]: - settings = get_effective_settings(app) +async def _get_db(settings: "Settings") -> tuple[aiosqlite.Connection, bool]: db = await open_db(settings.database_path) return db, True -async def _run_re_resolve(app: FastAPI) -> None: +async def _run_re_resolve_with_resources(settings: "Settings", http_session: "ClientSession") -> None: """Query NULL-country IPs from the database and re-resolve them. - Reads shared resources from ``app.state`` and delegates to - :func:`~app.services.geo_service.lookup_batch`. - Args: - app: The :class:`fastapi.FastAPI` application instance passed via - APScheduler ``kwargs``. + settings: The resolved application settings used for database access. + http_session: The shared aiohttp session used for external lookups. """ - db, close_db = await _get_db(app) - http_session = app.state.http_session + db, close_db = await _get_db(settings) try: # Fetch all IPs with NULL country_code from the persistent cache. @@ -89,6 +86,10 @@ async def _run_re_resolve(app: FastAPI) -> None: await db.close() +async def _run_re_resolve(app: FastAPI) -> None: + await _run_re_resolve_with_resources(get_effective_settings(app), app.state.http_session) + + def register(app: FastAPI) -> None: """Add (or replace) the geo re-resolve job in the application scheduler. @@ -102,11 +103,12 @@ def register(app: FastAPI) -> None: app: The :class:`fastapi.FastAPI` application instance whose ``app.state.scheduler`` will receive the job. """ + settings = get_effective_settings(app) app.state.scheduler.add_job( - _run_re_resolve, + _run_re_resolve_with_resources, trigger="interval", seconds=GEO_RE_RESOLVE_INTERVAL, - kwargs={"app": app}, + kwargs={"settings": settings, "http_session": app.state.http_session}, id=JOB_ID, replace_existing=True, ) diff --git a/backend/app/tasks/health_check.py b/backend/app/tasks/health_check.py index 80afd1f..fa71a5e 100644 --- a/backend/app/tasks/health_check.py +++ b/backend/app/tasks/health_check.py @@ -25,9 +25,14 @@ import structlog from app.models.config import PendingRecovery from app.models.server import ServerStatus from app.services import health_service -from app.utils.runtime_state import get_effective_settings +from app.utils.runtime_state import ( + RuntimeState, + get_effective_settings, + get_runtime_state, +) if TYPE_CHECKING: # pragma: no cover + from app.config import Settings from fastapi import FastAPI log: structlog.stdlib.BoundLogger = structlog.get_logger() @@ -48,29 +53,21 @@ HEALTH_CHECK_INTERVAL: int = 30 _ACTIVATION_CRASH_WINDOW: int = 60 -async def _run_probe(app: FastAPI) -> None: - """Probe fail2ban and cache the result on *app.state*. - - Detects online/offline state transitions. When fail2ban goes offline - within :data:`_ACTIVATION_CRASH_WINDOW` seconds of the last jail - activation, writes a :class:`~app.models.config.PendingRecovery` record to - ``app.state.pending_recovery``. - - This is the APScheduler job callback. It reads ``fail2ban_socket`` from - the effective runtime settings, runs the health probe, and writes the - result to ``app.state.server_status``. +async def _run_probe_with_resources(settings: "Settings", runtime_state: RuntimeState) -> None: + """Probe fail2ban and cache the result on the runtime state. Args: - app: The :class:`fastapi.FastAPI` application instance passed by the - scheduler via the ``kwargs`` mechanism. + settings: The resolved application settings used for the probe. + runtime_state: The mutable runtime state manager. """ - settings = get_effective_settings(app) socket_path: str = settings.fail2ban_socket prev_status: ServerStatus = getattr( - app.state, "server_status", ServerStatus(online=False) + runtime_state, + "server_status", + ServerStatus(online=False), ) status: ServerStatus = await health_service.probe(socket_path) - app.state.server_status = status + runtime_state.server_status = status now = datetime.datetime.now(tz=datetime.UTC) @@ -78,11 +75,9 @@ async def _run_probe(app: FastAPI) -> None: if status.online and not prev_status.online: log.info("fail2ban_came_online", version=status.version) # Clear any pending recovery once fail2ban is back online. - existing: PendingRecovery | None = getattr( - app.state, "pending_recovery", None - ) + existing: PendingRecovery | None = getattr(runtime_state, "pending_recovery", None) if existing is not None and not existing.recovered: - app.state.pending_recovery = PendingRecovery( + runtime_state.pending_recovery = PendingRecovery( jail_name=existing.jail_name, activated_at=existing.activated_at, detected_at=existing.detected_at, @@ -96,9 +91,7 @@ async def _run_probe(app: FastAPI) -> None: elif not status.online and prev_status.online: log.warning("fail2ban_went_offline") # Check whether this crash happened shortly after a jail activation. - last_activation: ActivationRecord | None = getattr( - app.state, "last_activation", None - ) + last_activation: ActivationRecord | None = getattr(runtime_state, "last_activation", None) if last_activation is not None: activated_at: datetime.datetime = last_activation["at"] seconds_since = (now - activated_at).total_seconds() @@ -106,11 +99,9 @@ async def _run_probe(app: FastAPI) -> None: jail_name: str = last_activation["jail_name"] # Only create a new record when there is not already an # unresolved one for the same jail. - current: PendingRecovery | None = getattr( - app.state, "pending_recovery", None - ) + current: PendingRecovery | None = getattr(runtime_state, "pending_recovery", None) if current is None or current.recovered: - app.state.pending_recovery = PendingRecovery( + runtime_state.pending_recovery = PendingRecovery( jail_name=jail_name, activated_at=activated_at, detected_at=now, @@ -129,6 +120,13 @@ async def _run_probe(app: FastAPI) -> None: ) +async def _run_probe(app: FastAPI) -> None: + await _run_probe_with_resources( + get_effective_settings(app), + get_runtime_state(app), + ) + + async def run_probe(app: FastAPI) -> None: """Run a single health probe outside the scheduled job context.""" await _run_probe(app) @@ -147,17 +145,20 @@ def register(app: FastAPI) -> None: # Initialise the cache with an offline placeholder so the dashboard # endpoint is always able to return a valid response even before the # first probe fires. - app.state.server_status = ServerStatus(online=False) + settings = get_effective_settings(app) + runtime_state = get_runtime_state(app) + + runtime_state.server_status = ServerStatus(online=False) # Initialise activation tracking state. - app.state.last_activation = None - app.state.pending_recovery = None + runtime_state.last_activation = None + runtime_state.pending_recovery = None app.state.scheduler.add_job( - _run_probe, + _run_probe_with_resources, trigger="interval", seconds=HEALTH_CHECK_INTERVAL, - kwargs={"app": app}, + kwargs={"settings": settings, "runtime_state": runtime_state}, id="health_check", replace_existing=True, # Fire immediately on startup too, so the UI isn't dark for 30 s. diff --git a/backend/app/tasks/history_sync.py b/backend/app/tasks/history_sync.py index 49ec4e7..595d1e9 100644 --- a/backend/app/tasks/history_sync.py +++ b/backend/app/tasks/history_sync.py @@ -11,6 +11,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: import aiosqlite + from app.config import Settings import structlog @@ -34,8 +35,7 @@ HISTORY_SYNC_INTERVAL: int = 300 BACKFILL_WINDOW: int = 648000 -async def _get_db(app: FastAPI) -> tuple[aiosqlite.Connection, bool]: - settings = get_effective_settings(app) +async def _get_db(settings: "Settings") -> tuple[aiosqlite.Connection, bool]: db = await open_db(settings.database_path) return db, True @@ -48,10 +48,9 @@ async def _get_last_archive_ts(db) -> int | None: return int(row[0]) -async def _run_sync(app: FastAPI) -> None: - settings = get_effective_settings(app) +async def _run_sync_with_settings(settings: "Settings") -> None: socket_path: str = settings.fail2ban_socket - db, close_db = await _get_db(app) + db, close_db = await _get_db(settings) try: last_ts = await _get_last_archive_ts(db) @@ -107,16 +106,21 @@ async def _run_sync(app: FastAPI) -> None: await db.close() +async def _run_sync(app: FastAPI) -> None: + await _run_sync_with_settings(get_effective_settings(app)) + + def register(app: FastAPI) -> None: """Register the history sync periodic job. Should be called after scheduler startup, from the lifespan handler. """ + settings = get_effective_settings(app) app.state.scheduler.add_job( - _run_sync, + _run_sync_with_settings, trigger="interval", seconds=HISTORY_SYNC_INTERVAL, - kwargs={"app": app}, + kwargs={"settings": settings}, id=JOB_ID, replace_existing=True, next_run_time=datetime.datetime.now(tz=datetime.UTC), diff --git a/backend/tests/test_tasks/test_blocklist_import.py b/backend/tests/test_tasks/test_blocklist_import.py index 246aec4..1d4621a 100644 --- a/backend/tests/test_tasks/test_blocklist_import.py +++ b/backend/tests/test_tasks/test_blocklist_import.py @@ -280,16 +280,21 @@ class TestApplySchedule: _, kwargs = scheduler.add_job.call_args assert kwargs["id"] == JOB_ID - def test_apply_schedule_passes_app_in_kwargs(self) -> None: - """The scheduled job must receive ``app`` as a kwarg for state access.""" + def test_apply_schedule_passes_resources_in_kwargs(self) -> None: + """The scheduled job must receive explicit resources instead of app.""" scheduler = _make_scheduler() app = self._make_app_with_scheduler(scheduler) + app.state.settings = MagicMock(database_path="/tmp/fake.db") + app.state.http_session = MagicMock() config = ScheduleConfig(frequency=ScheduleFrequency.daily) _apply_schedule(app, config) _, kwargs = scheduler.add_job.call_args - assert kwargs["kwargs"] == {"app": app} + assert kwargs["kwargs"] == { + "settings": app.state.settings, + "http_session": app.state.http_session, + } # --------------------------------------------------------------------------- diff --git a/backend/tests/test_tasks/test_geo_cache_flush.py b/backend/tests/test_tasks/test_geo_cache_flush.py index be7ed5d..078f341 100644 --- a/backend/tests/test_tasks/test_geo_cache_flush.py +++ b/backend/tests/test_tasks/test_geo_cache_flush.py @@ -140,11 +140,11 @@ class TestRegister: _, kwargs = app.state.scheduler.add_job.call_args assert kwargs["replace_existing"] is True - def test_register_passes_app_in_kwargs(self) -> None: - """The scheduled job must receive ``app`` as a kwarg for state access.""" + def test_register_passes_settings_in_kwargs(self) -> None: + """The scheduled job must receive settings as a kwarg instead of app.""" app = _make_app() register(app) _, kwargs = app.state.scheduler.add_job.call_args - assert kwargs["kwargs"] == {"app": app} + assert kwargs["kwargs"] == {"settings": app.state.settings} diff --git a/backend/tests/test_tasks/test_health_check.py b/backend/tests/test_tasks/test_health_check.py index 0af33f1..556132c 100644 --- a/backend/tests/test_tasks/test_health_check.py +++ b/backend/tests/test_tasks/test_health_check.py @@ -16,6 +16,7 @@ import pytest from app.models.config import PendingRecovery from app.models.server import ServerStatus from app.tasks.health_check import HEALTH_CHECK_INTERVAL, _run_probe, register +from app.utils.runtime_state import ApplicationState, RuntimeState # --------------------------------------------------------------------------- # Helpers @@ -31,12 +32,15 @@ def _make_app(prev_online: bool = False) -> MagicMock: Returns: A :class:`unittest.mock.MagicMock` that mimics ``fastapi.FastAPI``. """ + runtime_state = RuntimeState( + server_status=ServerStatus(online=prev_online), + pending_recovery=None, + last_activation=None, + ) app = MagicMock() - app.state.settings.fail2ban_socket = "/var/run/fail2ban/fail2ban.sock" - app.state.server_status = ServerStatus(online=prev_online) + app.state = ApplicationState(runtime_state) + app.state.settings = MagicMock(fail2ban_socket="/var/run/fail2ban/fail2ban.sock") app.state.scheduler = MagicMock() - app.state.last_activation = None - app.state.pending_recovery = None return app @@ -232,14 +236,17 @@ class TestRegister: _, kwargs = app.state.scheduler.add_job.call_args assert kwargs["replace_existing"] is True - def test_register_passes_app_in_kwargs(self) -> None: - """The scheduled job must receive ``app`` as a kwarg for state access.""" + def test_register_passes_resources_in_kwargs(self) -> None: + """The scheduled job must receive explicit resources instead of app.""" app = _make_app() register(app) _, kwargs = app.state.scheduler.add_job.call_args - assert kwargs["kwargs"] == {"app": app} + assert kwargs["kwargs"] == { + "settings": app.state.settings, + "runtime_state": app.state.runtime_state, + } def test_register_initialises_last_activation_none(self) -> None: """``register`` must set ``app.state.last_activation = None``.""" diff --git a/backend/tests/test_tasks/test_history_sync.py b/backend/tests/test_tasks/test_history_sync.py index d5a29a8..2179774 100644 --- a/backend/tests/test_tasks/test_history_sync.py +++ b/backend/tests/test_tasks/test_history_sync.py @@ -26,7 +26,7 @@ class TestHistorySyncTask: fake_scheduler.add_job.assert_called_once() called_args, called_kwargs = fake_scheduler.add_job.call_args assert called_kwargs["id"] == history_sync.JOB_ID - assert called_kwargs["kwargs"]["app"] == app + assert called_kwargs["kwargs"]["settings"] is app.state.settings async def test_backfill_window_is_7_5_days(self) -> None: assert history_sync.BACKFILL_WINDOW == 648000