Files
BanGUI/backend/app/tasks/blocklist_import.py

144 lines
4.4 KiB
Python

"""External blocklist import background task.
Registers an APScheduler job that downloads all enabled blocklist sources,
validates their entries, and applies bans via fail2ban on a configurable
schedule. The default schedule is daily at 03:00 UTC; it is stored in the
application :class:`~app.models.blocklist.ScheduleConfig` settings and can
be updated at runtime through the blocklist router.
The scheduler job ID is ``"blocklist_import"`` — using a stable id means
re-registering the job (e.g. after a schedule update) safely replaces the
existing entry without creating duplicates.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import structlog
from app.db import open_db
from app.services import blocklist_service, jail_service
from app.utils.runtime_state import get_effective_settings
if TYPE_CHECKING:
import aiosqlite
from aiohttp import ClientSession
from fastapi import FastAPI
from app.config import Settings
log: structlog.stdlib.BoundLogger = structlog.get_logger()
#: Stable APScheduler job id so the job can be replaced without duplicates.
JOB_ID: str = "blocklist_import"
async def _get_db(settings: Settings) -> tuple[aiosqlite.Connection, bool]:
db = await open_db(settings.database_path)
return db, True
async def _run_import_with_resources(settings: Settings, http_session: ClientSession) -> None:
"""APScheduler callback that imports all enabled blocklist sources.
Args:
settings: The resolved application settings used for database access.
http_session: The shared aiohttp session used for blocklist downloads.
"""
db, close_db = await _get_db(settings)
socket_path: str = settings.fail2ban_socket
log.info("blocklist_import_starting")
try:
result = await blocklist_service.import_all(
db,
http_session,
socket_path,
ban_ip=jail_service.ban_ip,
)
log.info(
"blocklist_import_finished",
total_imported=result.total_imported,
total_skipped=result.total_skipped,
errors=result.errors_count,
)
except Exception:
log.exception("blocklist_import_unexpected_error")
finally:
if close_db:
await db.close()
run_import_with_resources = _run_import_with_resources
async def _run_import(app: FastAPI) -> None:
await _run_import_with_resources(get_effective_settings(app), app.state.http_session)
async def register(app: FastAPI) -> None:
"""Add (or replace) the blocklist import job in the application scheduler.
Reads the persisted :class:`~app.models.blocklist.ScheduleConfig` from
the database and translates it into the appropriate APScheduler trigger.
Should be called inside the lifespan handler after the scheduler and
database have been initialised.
Args:
app: The :class:`fastapi.FastAPI` application instance whose
``app.state.scheduler`` will receive the job.
"""
settings = get_effective_settings(app)
db, close_db = await _get_db(settings)
try:
config = await blocklist_service.get_schedule(db)
finally:
if close_db:
await db.close()
_apply_schedule(app, config)
def reschedule(app: FastAPI) -> None:
"""Re-register the blocklist import job with the latest schedule config.
Called by the blocklist router after a schedule update so changes take
effect immediately without a server restart.
Args:
app: The :class:`fastapi.FastAPI` application instance.
"""
import asyncio # noqa: PLC0415
async def _do_reschedule() -> None:
settings = get_effective_settings(app)
db, close_db = await _get_db(settings)
try:
config = await blocklist_service.get_schedule(db)
finally:
if close_db:
await db.close()
_apply_schedule(app, config)
asyncio.ensure_future(_do_reschedule())
def _apply_schedule(app: FastAPI, config: Any) -> None:
"""Add or replace the APScheduler cron/interval job for the given config.
Args:
app: FastAPI application instance.
config: :class:`~app.models.blocklist.ScheduleConfig` to apply.
"""
from app.services import blocklist_service
blocklist_service.schedule_blocklist_job(
app.state.scheduler,
get_effective_settings(app),
app.state.http_session,
config,
run_import_with_resources,
)