"""External blocklist import background task. Registers an APScheduler job that downloads all enabled blocklist sources, validates their entries, and applies bans via fail2ban on a configurable schedule. The default schedule is daily at 03:00 UTC; it is stored in the application :class:`~app.models.blocklist.ScheduleConfig` settings and can be updated at runtime through the blocklist router. The scheduler job ID is ``"blocklist_import"`` — using a stable id means re-registering the job (e.g. after a schedule update) safely replaces the existing entry without creating duplicates. """ from __future__ import annotations from typing import TYPE_CHECKING, Any import structlog from app.db import open_db from app.services import blocklist_service, jail_service from app.utils.runtime_state import get_effective_settings if TYPE_CHECKING: import aiosqlite from aiohttp import ClientSession from fastapi import FastAPI from app.config import Settings log: structlog.stdlib.BoundLogger = structlog.get_logger() #: Stable APScheduler job id so the job can be replaced without duplicates. JOB_ID: str = "blocklist_import" async def _get_db(settings: Settings) -> tuple[aiosqlite.Connection, bool]: db = await open_db(settings.database_path) return db, True async def _run_import_with_resources(settings: Settings, http_session: ClientSession) -> None: """APScheduler callback that imports all enabled blocklist sources. Args: settings: The resolved application settings used for database access. http_session: The shared aiohttp session used for blocklist downloads. """ db, close_db = await _get_db(settings) socket_path: str = settings.fail2ban_socket log.info("blocklist_import_starting") try: result = await blocklist_service.import_all( db, http_session, socket_path, ban_ip=jail_service.ban_ip, ) log.info( "blocklist_import_finished", total_imported=result.total_imported, total_skipped=result.total_skipped, errors=result.errors_count, ) except Exception: log.exception("blocklist_import_unexpected_error") finally: if close_db: await db.close() async def _run_import(app: FastAPI) -> None: await _run_import_with_resources(get_effective_settings(app), app.state.http_session) async def register(app: FastAPI) -> None: """Add (or replace) the blocklist import job in the application scheduler. Reads the persisted :class:`~app.models.blocklist.ScheduleConfig` from the database and translates it into the appropriate APScheduler trigger. Should be called inside the lifespan handler after the scheduler and database have been initialised. Args: app: The :class:`fastapi.FastAPI` application instance whose ``app.state.scheduler`` will receive the job. """ settings = get_effective_settings(app) db, close_db = await _get_db(settings) try: config = await blocklist_service.get_schedule(db) finally: if close_db: await db.close() _apply_schedule(app, config) def reschedule(app: FastAPI) -> None: """Re-register the blocklist import job with the latest schedule config. Called by the blocklist router after a schedule update so changes take effect immediately without a server restart. Args: app: The :class:`fastapi.FastAPI` application instance. """ import asyncio # noqa: PLC0415 async def _do_reschedule() -> None: settings = get_effective_settings(app) db, close_db = await _get_db(settings) try: config = await blocklist_service.get_schedule(db) finally: if close_db: await db.close() _apply_schedule(app, config) asyncio.ensure_future(_do_reschedule()) def _apply_schedule(app: FastAPI, config: Any) -> None: """Add or replace the APScheduler cron/interval job for the given config. Args: app: FastAPI application instance. config: :class:`~app.models.blocklist.ScheduleConfig` to apply. """ from app.services import blocklist_service blocklist_service.schedule_blocklist_job( app.state.scheduler, get_effective_settings(app), app.state.http_session, config, )