refactoring-backend #3

Merged
lukas.pupkalipinski merged 403 commits from refactoring-backend into main 2026-05-20 20:23:46 +02:00
10 changed files with 122 additions and 94 deletions
Showing only changes of commit ae81a8f5be - Show all commits

View File

@@ -6,7 +6,8 @@ Reference: `Docs/Refactoring.md` for full analysis of each issue.
---
## Open Issues
## Completed Issues
1. Move runtime application state out of `app.state`
- Goal: Remove process-local mutable business state from FastAPI application state and centralise it in a cluster-safe, testable runtime state abstraction.
@@ -63,6 +64,8 @@ Reference: `Docs/Refactoring.md` for full analysis of each issue.
- Issue: `history_sync`, `geo_re_resolve`, `geo_cache_flush`, and `blocklist_import` read shared resources directly from `app.state` and register jobs against `app.state.scheduler`, which couples task implementation to FastAPI internals.
- Propose: Introduce a scheduler/task bootstrap abstraction that accepts injected settings, database providers, HTTP sessions, and scheduler handles, and move task resource access out of the `app.state` internals.
- Test: Add unit tests for task registration and execution with fake resource providers and a mock scheduler, without needing a real FastAPI instance.
- Status: completed
- Completed: Refactored background task modules to accept explicit scheduler job resources, removed direct `app.state` dependency from task execution callbacks, and updated unit tests to verify registration with injected settings, http session, and runtime state.
9. Remove brittle scheduler bootstrap logic from blocklist job registration
- Goal: Ensure job registration is deterministic and compatible with both synchronous and asynchronous startup contexts.
@@ -118,6 +121,6 @@ Reference: `Docs/Refactoring.md` for full analysis of each issue.
- Issue: `startup_shared_resources` creates shared resources like `aiohttp.ClientSession` and geo cache initialization from the initial environment-loaded settings, then later applies persisted runtime overrides to `app.state.settings`, producing a fragile startup ordering.
- Propose: Split startup into phases that first resolve bootstrap and runtime persisted configuration, then construct shared resources and register scheduled jobs using those effective settings.
- Test: Add startup tests asserting that when persisted runtime settings differ from bootstrap settings, the final initialized resources are built from the resolved effective settings, not the original bootstrap values.
- Status: completed

View File

@@ -24,6 +24,8 @@ from app.utils.runtime_state import get_effective_settings
if TYPE_CHECKING:
import aiosqlite
from aiohttp import ClientSession
from app.config import Settings
if TYPE_CHECKING:
from fastapi import FastAPI
@@ -34,25 +36,19 @@ log: structlog.stdlib.BoundLogger = structlog.get_logger()
JOB_ID: str = "blocklist_import"
async def _get_db(app: Any) -> tuple[aiosqlite.Connection, bool]:
settings = get_effective_settings(app)
async def _get_db(settings: "Settings") -> tuple[aiosqlite.Connection, bool]:
db = await open_db(settings.database_path)
return db, True
async def _run_import(app: Any) -> None:
async def _run_import_with_resources(settings: "Settings", http_session: "ClientSession") -> None:
"""APScheduler callback that imports all enabled blocklist sources.
Reads shared resources from ``app.state`` and delegates to
:func:`~app.services.blocklist_service.import_all`.
Args:
app: The :class:`fastapi.FastAPI` application instance passed via
APScheduler ``kwargs``.
settings: The resolved application settings used for database access.
http_session: The shared aiohttp session used for blocklist downloads.
"""
db, close_db = await _get_db(app)
settings = get_effective_settings(app)
http_session = app.state.http_session
db, close_db = await _get_db(settings)
socket_path: str = settings.fail2ban_socket
log.info("blocklist_import_starting")
@@ -75,6 +71,10 @@ async def _run_import(app: Any) -> None:
await db.close()
async def _run_import(app: FastAPI) -> None:
await _run_import_with_resources(get_effective_settings(app), app.state.http_session)
async def register(app: FastAPI) -> None:
"""Add (or replace) the blocklist import job in the application scheduler.
@@ -88,7 +88,8 @@ async def register(app: FastAPI) -> None:
app: The :class:`fastapi.FastAPI` application instance whose
``app.state.scheduler`` will receive the job.
"""
db, close_db = await _get_db(app)
settings = get_effective_settings(app)
db, close_db = await _get_db(settings)
try:
config = await blocklist_service.get_schedule(db)
finally:
@@ -110,7 +111,8 @@ def reschedule(app: FastAPI) -> None:
import asyncio # noqa: PLC0415
async def _do_reschedule() -> None:
db, close_db = await _get_db(app)
settings = get_effective_settings(app)
db, close_db = await _get_db(settings)
try:
config = await blocklist_service.get_schedule(db)
finally:
@@ -130,7 +132,10 @@ def _apply_schedule(app: FastAPI, config: Any) -> None:
"""
scheduler = app.state.scheduler
kwargs: dict[str, Any] = {"app": app}
kwargs: dict[str, Any] = {
"settings": get_effective_settings(app),
"http_session": app.state.http_session,
}
trigger_type: str
trigger_kwargs: dict[str, Any]
@@ -156,7 +161,7 @@ def _apply_schedule(app: FastAPI, config: Any) -> None:
scheduler.remove_job(JOB_ID)
scheduler.add_job(
_run_import,
_run_import_with_resources,
trigger=trigger_type,
id=JOB_ID,
kwargs=kwargs,

View File

@@ -20,6 +20,7 @@ from app.utils.runtime_state import get_effective_settings
if TYPE_CHECKING:
import aiosqlite
from app.config import Settings
from app.services import geo_service
if TYPE_CHECKING:
@@ -34,23 +35,18 @@ GEO_FLUSH_INTERVAL: int = 60
JOB_ID: str = "geo_cache_flush"
async def _get_db(app: Any) -> tuple[aiosqlite.Connection, bool]:
settings = get_effective_settings(app)
async def _get_db(settings: "Settings") -> tuple[aiosqlite.Connection, bool]:
db = await open_db(settings.database_path)
return db, True
async def _run_flush(app: Any) -> None:
async def _run_flush_with_settings(settings: "Settings") -> None:
"""Flush the geo service dirty set to the application database.
Reads shared resources from ``app.state`` and delegates to
:func:`~app.services.geo_service.flush_dirty`.
Args:
app: The :class:`fastapi.FastAPI` application instance passed via
APScheduler ``kwargs``.
settings: The resolved application settings used for database access.
"""
db, close_db = await _get_db(app)
db, close_db = await _get_db(settings)
try:
count = await geo_service.flush_dirty(db)
finally:
@@ -61,6 +57,10 @@ async def _run_flush(app: Any) -> None:
log.debug("geo_cache_flush_ran", flushed=count)
async def _run_flush(app: FastAPI) -> None:
await _run_flush_with_settings(get_effective_settings(app))
def register(app: FastAPI) -> None:
"""Add (or replace) the geo cache flush job in the application scheduler.
@@ -71,11 +71,12 @@ def register(app: FastAPI) -> None:
app: The :class:`fastapi.FastAPI` application instance whose
``app.state.scheduler`` will receive the job.
"""
settings = get_effective_settings(app)
app.state.scheduler.add_job(
_run_flush,
_run_flush_with_settings,
trigger="interval",
seconds=GEO_FLUSH_INTERVAL,
kwargs={"app": app},
kwargs={"settings": settings},
id=JOB_ID,
replace_existing=True,
)

View File

@@ -26,6 +26,8 @@ from app.utils.runtime_state import get_effective_settings
if TYPE_CHECKING:
import aiosqlite
from aiohttp import ClientSession
from app.config import Settings
from app.services import geo_service
if TYPE_CHECKING:
@@ -40,24 +42,19 @@ GEO_RE_RESOLVE_INTERVAL: int = 600
JOB_ID: str = "geo_re_resolve"
async def _get_db(app: FastAPI) -> tuple[aiosqlite.Connection, bool]:
settings = get_effective_settings(app)
async def _get_db(settings: "Settings") -> tuple[aiosqlite.Connection, bool]:
db = await open_db(settings.database_path)
return db, True
async def _run_re_resolve(app: FastAPI) -> None:
async def _run_re_resolve_with_resources(settings: "Settings", http_session: "ClientSession") -> None:
"""Query NULL-country IPs from the database and re-resolve them.
Reads shared resources from ``app.state`` and delegates to
:func:`~app.services.geo_service.lookup_batch`.
Args:
app: The :class:`fastapi.FastAPI` application instance passed via
APScheduler ``kwargs``.
settings: The resolved application settings used for database access.
http_session: The shared aiohttp session used for external lookups.
"""
db, close_db = await _get_db(app)
http_session = app.state.http_session
db, close_db = await _get_db(settings)
try:
# Fetch all IPs with NULL country_code from the persistent cache.
@@ -89,6 +86,10 @@ async def _run_re_resolve(app: FastAPI) -> None:
await db.close()
async def _run_re_resolve(app: FastAPI) -> None:
await _run_re_resolve_with_resources(get_effective_settings(app), app.state.http_session)
def register(app: FastAPI) -> None:
"""Add (or replace) the geo re-resolve job in the application scheduler.
@@ -102,11 +103,12 @@ def register(app: FastAPI) -> None:
app: The :class:`fastapi.FastAPI` application instance whose
``app.state.scheduler`` will receive the job.
"""
settings = get_effective_settings(app)
app.state.scheduler.add_job(
_run_re_resolve,
_run_re_resolve_with_resources,
trigger="interval",
seconds=GEO_RE_RESOLVE_INTERVAL,
kwargs={"app": app},
kwargs={"settings": settings, "http_session": app.state.http_session},
id=JOB_ID,
replace_existing=True,
)

View File

@@ -25,9 +25,14 @@ import structlog
from app.models.config import PendingRecovery
from app.models.server import ServerStatus
from app.services import health_service
from app.utils.runtime_state import get_effective_settings
from app.utils.runtime_state import (
RuntimeState,
get_effective_settings,
get_runtime_state,
)
if TYPE_CHECKING: # pragma: no cover
from app.config import Settings
from fastapi import FastAPI
log: structlog.stdlib.BoundLogger = structlog.get_logger()
@@ -48,29 +53,21 @@ HEALTH_CHECK_INTERVAL: int = 30
_ACTIVATION_CRASH_WINDOW: int = 60
async def _run_probe(app: FastAPI) -> None:
"""Probe fail2ban and cache the result on *app.state*.
Detects online/offline state transitions. When fail2ban goes offline
within :data:`_ACTIVATION_CRASH_WINDOW` seconds of the last jail
activation, writes a :class:`~app.models.config.PendingRecovery` record to
``app.state.pending_recovery``.
This is the APScheduler job callback. It reads ``fail2ban_socket`` from
the effective runtime settings, runs the health probe, and writes the
result to ``app.state.server_status``.
async def _run_probe_with_resources(settings: "Settings", runtime_state: RuntimeState) -> None:
"""Probe fail2ban and cache the result on the runtime state.
Args:
app: The :class:`fastapi.FastAPI` application instance passed by the
scheduler via the ``kwargs`` mechanism.
settings: The resolved application settings used for the probe.
runtime_state: The mutable runtime state manager.
"""
settings = get_effective_settings(app)
socket_path: str = settings.fail2ban_socket
prev_status: ServerStatus = getattr(
app.state, "server_status", ServerStatus(online=False)
runtime_state,
"server_status",
ServerStatus(online=False),
)
status: ServerStatus = await health_service.probe(socket_path)
app.state.server_status = status
runtime_state.server_status = status
now = datetime.datetime.now(tz=datetime.UTC)
@@ -78,11 +75,9 @@ async def _run_probe(app: FastAPI) -> None:
if status.online and not prev_status.online:
log.info("fail2ban_came_online", version=status.version)
# Clear any pending recovery once fail2ban is back online.
existing: PendingRecovery | None = getattr(
app.state, "pending_recovery", None
)
existing: PendingRecovery | None = getattr(runtime_state, "pending_recovery", None)
if existing is not None and not existing.recovered:
app.state.pending_recovery = PendingRecovery(
runtime_state.pending_recovery = PendingRecovery(
jail_name=existing.jail_name,
activated_at=existing.activated_at,
detected_at=existing.detected_at,
@@ -96,9 +91,7 @@ async def _run_probe(app: FastAPI) -> None:
elif not status.online and prev_status.online:
log.warning("fail2ban_went_offline")
# Check whether this crash happened shortly after a jail activation.
last_activation: ActivationRecord | None = getattr(
app.state, "last_activation", None
)
last_activation: ActivationRecord | None = getattr(runtime_state, "last_activation", None)
if last_activation is not None:
activated_at: datetime.datetime = last_activation["at"]
seconds_since = (now - activated_at).total_seconds()
@@ -106,11 +99,9 @@ async def _run_probe(app: FastAPI) -> None:
jail_name: str = last_activation["jail_name"]
# Only create a new record when there is not already an
# unresolved one for the same jail.
current: PendingRecovery | None = getattr(
app.state, "pending_recovery", None
)
current: PendingRecovery | None = getattr(runtime_state, "pending_recovery", None)
if current is None or current.recovered:
app.state.pending_recovery = PendingRecovery(
runtime_state.pending_recovery = PendingRecovery(
jail_name=jail_name,
activated_at=activated_at,
detected_at=now,
@@ -129,6 +120,13 @@ async def _run_probe(app: FastAPI) -> None:
)
async def _run_probe(app: FastAPI) -> None:
await _run_probe_with_resources(
get_effective_settings(app),
get_runtime_state(app),
)
async def run_probe(app: FastAPI) -> None:
"""Run a single health probe outside the scheduled job context."""
await _run_probe(app)
@@ -147,17 +145,20 @@ def register(app: FastAPI) -> None:
# Initialise the cache with an offline placeholder so the dashboard
# endpoint is always able to return a valid response even before the
# first probe fires.
app.state.server_status = ServerStatus(online=False)
settings = get_effective_settings(app)
runtime_state = get_runtime_state(app)
runtime_state.server_status = ServerStatus(online=False)
# Initialise activation tracking state.
app.state.last_activation = None
app.state.pending_recovery = None
runtime_state.last_activation = None
runtime_state.pending_recovery = None
app.state.scheduler.add_job(
_run_probe,
_run_probe_with_resources,
trigger="interval",
seconds=HEALTH_CHECK_INTERVAL,
kwargs={"app": app},
kwargs={"settings": settings, "runtime_state": runtime_state},
id="health_check",
replace_existing=True,
# Fire immediately on startup too, so the UI isn't dark for 30 s.

View File

@@ -11,6 +11,7 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING:
import aiosqlite
from app.config import Settings
import structlog
@@ -34,8 +35,7 @@ HISTORY_SYNC_INTERVAL: int = 300
BACKFILL_WINDOW: int = 648000
async def _get_db(app: FastAPI) -> tuple[aiosqlite.Connection, bool]:
settings = get_effective_settings(app)
async def _get_db(settings: "Settings") -> tuple[aiosqlite.Connection, bool]:
db = await open_db(settings.database_path)
return db, True
@@ -48,10 +48,9 @@ async def _get_last_archive_ts(db) -> int | None:
return int(row[0])
async def _run_sync(app: FastAPI) -> None:
settings = get_effective_settings(app)
async def _run_sync_with_settings(settings: "Settings") -> None:
socket_path: str = settings.fail2ban_socket
db, close_db = await _get_db(app)
db, close_db = await _get_db(settings)
try:
last_ts = await _get_last_archive_ts(db)
@@ -107,16 +106,21 @@ async def _run_sync(app: FastAPI) -> None:
await db.close()
async def _run_sync(app: FastAPI) -> None:
await _run_sync_with_settings(get_effective_settings(app))
def register(app: FastAPI) -> None:
"""Register the history sync periodic job.
Should be called after scheduler startup, from the lifespan handler.
"""
settings = get_effective_settings(app)
app.state.scheduler.add_job(
_run_sync,
_run_sync_with_settings,
trigger="interval",
seconds=HISTORY_SYNC_INTERVAL,
kwargs={"app": app},
kwargs={"settings": settings},
id=JOB_ID,
replace_existing=True,
next_run_time=datetime.datetime.now(tz=datetime.UTC),

View File

@@ -280,16 +280,21 @@ class TestApplySchedule:
_, kwargs = scheduler.add_job.call_args
assert kwargs["id"] == JOB_ID
def test_apply_schedule_passes_app_in_kwargs(self) -> None:
"""The scheduled job must receive ``app`` as a kwarg for state access."""
def test_apply_schedule_passes_resources_in_kwargs(self) -> None:
"""The scheduled job must receive explicit resources instead of app."""
scheduler = _make_scheduler()
app = self._make_app_with_scheduler(scheduler)
app.state.settings = MagicMock(database_path="/tmp/fake.db")
app.state.http_session = MagicMock()
config = ScheduleConfig(frequency=ScheduleFrequency.daily)
_apply_schedule(app, config)
_, kwargs = scheduler.add_job.call_args
assert kwargs["kwargs"] == {"app": app}
assert kwargs["kwargs"] == {
"settings": app.state.settings,
"http_session": app.state.http_session,
}
# ---------------------------------------------------------------------------

View File

@@ -140,11 +140,11 @@ class TestRegister:
_, kwargs = app.state.scheduler.add_job.call_args
assert kwargs["replace_existing"] is True
def test_register_passes_app_in_kwargs(self) -> None:
"""The scheduled job must receive ``app`` as a kwarg for state access."""
def test_register_passes_settings_in_kwargs(self) -> None:
"""The scheduled job must receive settings as a kwarg instead of app."""
app = _make_app()
register(app)
_, kwargs = app.state.scheduler.add_job.call_args
assert kwargs["kwargs"] == {"app": app}
assert kwargs["kwargs"] == {"settings": app.state.settings}

View File

@@ -16,6 +16,7 @@ import pytest
from app.models.config import PendingRecovery
from app.models.server import ServerStatus
from app.tasks.health_check import HEALTH_CHECK_INTERVAL, _run_probe, register
from app.utils.runtime_state import ApplicationState, RuntimeState
# ---------------------------------------------------------------------------
# Helpers
@@ -31,12 +32,15 @@ def _make_app(prev_online: bool = False) -> MagicMock:
Returns:
A :class:`unittest.mock.MagicMock` that mimics ``fastapi.FastAPI``.
"""
runtime_state = RuntimeState(
server_status=ServerStatus(online=prev_online),
pending_recovery=None,
last_activation=None,
)
app = MagicMock()
app.state.settings.fail2ban_socket = "/var/run/fail2ban/fail2ban.sock"
app.state.server_status = ServerStatus(online=prev_online)
app.state = ApplicationState(runtime_state)
app.state.settings = MagicMock(fail2ban_socket="/var/run/fail2ban/fail2ban.sock")
app.state.scheduler = MagicMock()
app.state.last_activation = None
app.state.pending_recovery = None
return app
@@ -232,14 +236,17 @@ class TestRegister:
_, kwargs = app.state.scheduler.add_job.call_args
assert kwargs["replace_existing"] is True
def test_register_passes_app_in_kwargs(self) -> None:
"""The scheduled job must receive ``app`` as a kwarg for state access."""
def test_register_passes_resources_in_kwargs(self) -> None:
"""The scheduled job must receive explicit resources instead of app."""
app = _make_app()
register(app)
_, kwargs = app.state.scheduler.add_job.call_args
assert kwargs["kwargs"] == {"app": app}
assert kwargs["kwargs"] == {
"settings": app.state.settings,
"runtime_state": app.state.runtime_state,
}
def test_register_initialises_last_activation_none(self) -> None:
"""``register`` must set ``app.state.last_activation = None``."""

View File

@@ -26,7 +26,7 @@ class TestHistorySyncTask:
fake_scheduler.add_job.assert_called_once()
called_args, called_kwargs = fake_scheduler.add_job.call_args
assert called_kwargs["id"] == history_sync.JOB_ID
assert called_kwargs["kwargs"]["app"] == app
assert called_kwargs["kwargs"]["settings"] is app.state.settings
async def test_backfill_window_is_7_5_days(self) -> None:
assert history_sync.BACKFILL_WINDOW == 648000