Files
BanGUI/backend/app/tasks/blocklist_import.py
Lukas 18036d53bf Fix issue #31: Make schedule reschedule deterministic and observable
Replace fire-and-forget reschedule pattern with proper async/await:
- Changed reschedule() from fire-and-forget to awaitable async function
- Errors are now properly propagated instead of silently failing
- Added structured logging for reschedule start and completion
- Schedule updates are now deterministic and observable to callers

Changes:
- app/tasks/blocklist_import.py: Convert reschedule to async, remove asyncio.ensure_future
- tests/test_tasks/test_blocklist_import.py: Add tests for error propagation and logging
- Docs/Features.md: Document scheduling reliability guarantees

All 15 blocklist_import tests pass with 100% coverage.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-29 19:24:55 +02:00

128 lines
4.2 KiB
Python

"""External blocklist import background task.
Registers an APScheduler job that downloads all enabled blocklist sources,
validates their entries, and applies bans via fail2ban on a configurable
schedule. The default schedule is daily at 03:00 UTC; it is stored in the
application :class:`~app.models.blocklist.ScheduleConfig` settings and can
be updated at runtime through the blocklist router.
The scheduler job ID is ``"blocklist_import"`` — using a stable id means
re-registering the job (e.g. after a schedule update) safely replaces the
existing entry without creating duplicates.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import structlog
from app.services import ban_service, blocklist_service
from app.tasks.db import task_db
from app.utils.runtime_state import get_effective_settings
if TYPE_CHECKING:
from aiohttp import ClientSession
from fastapi import FastAPI
from app.config import Settings
log: structlog.stdlib.BoundLogger = structlog.get_logger()
#: Stable APScheduler job id so the job can be replaced without duplicates.
JOB_ID: str = "blocklist_import"
async def _run_import_with_resources(settings: Settings, http_session: ClientSession) -> None:
"""APScheduler callback that imports all enabled blocklist sources.
Args:
settings: The resolved application settings used for database access.
http_session: The shared aiohttp session used for blocklist downloads.
"""
socket_path: str = settings.fail2ban_socket
log.info("blocklist_import_starting")
try:
async with task_db(settings) as db:
result = await blocklist_service.import_all(
db,
http_session,
socket_path,
ban_ip=ban_service.ban_ip,
)
log.info(
"blocklist_import_finished",
total_imported=result.total_imported,
total_skipped=result.total_skipped,
errors=result.errors_count,
)
except Exception:
log.exception("blocklist_import_unexpected_error")
run_import_with_resources = _run_import_with_resources
async def _run_import(app: FastAPI) -> None:
await _run_import_with_resources(get_effective_settings(app), app.state.http_session)
async def register(app: FastAPI) -> None:
"""Add (or replace) the blocklist import job in the application scheduler.
Reads the persisted :class:`~app.models.blocklist.ScheduleConfig` from
the database and translates it into the appropriate APScheduler trigger.
Should be called inside the lifespan handler after the scheduler and
database have been initialised.
Args:
app: The :class:`fastapi.FastAPI` application instance whose
``app.state.scheduler`` will receive the job.
"""
settings = get_effective_settings(app)
async with task_db(settings) as db:
config = await blocklist_service.get_schedule(db)
_apply_schedule(app, config)
async def reschedule(app: FastAPI) -> None:
"""Re-register the blocklist import job with the latest schedule config.
Called by the blocklist router after a schedule update so changes take
effect immediately without a server restart. Failures are logged and
exceptions are propagated to the caller.
Args:
app: The :class:`fastapi.FastAPI` application instance.
Raises:
Exception: If retrieving the schedule or applying it fails.
"""
settings = get_effective_settings(app)
async with task_db(settings) as db:
config = await blocklist_service.get_schedule(db)
log.info("blocklist_reschedule_applying", frequency=config.frequency)
_apply_schedule(app, config)
log.info("blocklist_reschedule_applied")
def _apply_schedule(app: FastAPI, config: Any) -> None:
"""Add or replace the APScheduler cron/interval job for the given config.
Args:
app: FastAPI application instance.
config: :class:`~app.models.blocklist.ScheduleConfig` to apply.
"""
from app.services import blocklist_service
blocklist_service.schedule_blocklist_job(
app.state.scheduler,
get_effective_settings(app),
app.state.http_session,
config,
run_import_with_resources,
)