Refactor blocklist schedule management into service
This commit is contained in:
@@ -28,12 +28,12 @@ import aiosqlite
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
||||
|
||||
from app.dependencies import (
|
||||
AppDep,
|
||||
AuthDep,
|
||||
Fail2BanSocketDep,
|
||||
GeoBatchLookupDep,
|
||||
HttpSessionDep,
|
||||
SchedulerDep,
|
||||
SettingsDep,
|
||||
get_db,
|
||||
)
|
||||
from app.models.blocklist import (
|
||||
@@ -48,7 +48,6 @@ from app.models.blocklist import (
|
||||
ScheduleInfo,
|
||||
)
|
||||
from app.services import blocklist_service, geo_service, jail_service
|
||||
from app.tasks import blocklist_import as blocklist_import_task
|
||||
|
||||
router: APIRouter = APIRouter(prefix="/api/blocklists", tags=["Blocklists"])
|
||||
|
||||
@@ -162,7 +161,6 @@ async def get_schedule(
|
||||
The ``next_run_at`` field is read from APScheduler if the job is active.
|
||||
|
||||
Args:
|
||||
request: Incoming request (used to query the scheduler).
|
||||
db: Application database connection (injected).
|
||||
_auth: Validated session — enforces authentication.
|
||||
|
||||
@@ -170,12 +168,7 @@ async def get_schedule(
|
||||
:class:`~app.models.blocklist.ScheduleInfo` with config and run
|
||||
times.
|
||||
"""
|
||||
job = scheduler.get_job(blocklist_import_task.JOB_ID)
|
||||
next_run_at: str | None = None
|
||||
if job is not None and job.next_run_time is not None:
|
||||
next_run_at = job.next_run_time.isoformat()
|
||||
|
||||
return await blocklist_service.get_schedule_info(db, next_run_at)
|
||||
return await blocklist_service.get_schedule_info_with_runtime(db, scheduler)
|
||||
|
||||
|
||||
@router.put(
|
||||
@@ -188,29 +181,29 @@ async def update_schedule(
|
||||
db: DbDep,
|
||||
_auth: AuthDep,
|
||||
scheduler: SchedulerDep,
|
||||
app: AppDep,
|
||||
http_session: HttpSessionDep,
|
||||
settings: SettingsDep,
|
||||
) -> ScheduleInfo:
|
||||
"""Persist a new schedule configuration and reschedule the import job.
|
||||
|
||||
Args:
|
||||
payload: New :class:`~app.models.blocklist.ScheduleConfig`.
|
||||
app: FastAPI application instance used to reschedule the import job.
|
||||
db: Application database connection (injected).
|
||||
_auth: Validated session — enforces authentication.
|
||||
scheduler: Shared APScheduler instance (injected).
|
||||
http_session: Shared HTTP session used by the scheduler job.
|
||||
settings: Current application settings used by the scheduler job.
|
||||
|
||||
Returns:
|
||||
Updated :class:`~app.models.blocklist.ScheduleInfo`.
|
||||
"""
|
||||
await blocklist_service.set_schedule(db, payload)
|
||||
# Reschedule the background job immediately.
|
||||
blocklist_import_task.reschedule(app)
|
||||
|
||||
job = scheduler.get_job(blocklist_import_task.JOB_ID)
|
||||
next_run_at: str | None = None
|
||||
if job is not None and job.next_run_time is not None:
|
||||
next_run_at = job.next_run_time.isoformat()
|
||||
|
||||
return await blocklist_service.get_schedule_info(db, next_run_at)
|
||||
return await blocklist_service.update_schedule(
|
||||
db,
|
||||
scheduler,
|
||||
http_session,
|
||||
settings,
|
||||
payload,
|
||||
)
|
||||
|
||||
|
||||
@router.get(
|
||||
|
||||
@@ -30,6 +30,7 @@ from app.models.blocklist import (
|
||||
ImportSourceResult,
|
||||
PreviewResponse,
|
||||
ScheduleConfig,
|
||||
ScheduleFrequency,
|
||||
ScheduleInfo,
|
||||
)
|
||||
from app.repositories import blocklist_repo, import_log_repo, settings_repo
|
||||
@@ -40,7 +41,9 @@ if TYPE_CHECKING:
|
||||
|
||||
import aiohttp
|
||||
import aiosqlite
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
|
||||
from app.config import Settings
|
||||
from app.models.geo import GeoBatchLookup
|
||||
|
||||
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
||||
@@ -505,6 +508,66 @@ async def import_all(
|
||||
|
||||
_DEFAULT_SCHEDULE = ScheduleConfig()
|
||||
|
||||
#: Stable APScheduler job id for the blocklist import job.
|
||||
JOB_ID: str = "blocklist_import"
|
||||
|
||||
|
||||
def _get_job_next_run_at(scheduler: AsyncIOScheduler) -> str | None:
|
||||
"""Return the next scheduled run time as an ISO 8601 string."""
|
||||
job = scheduler.get_job(JOB_ID)
|
||||
if job is None or job.next_run_time is None:
|
||||
return None
|
||||
return job.next_run_time.isoformat()
|
||||
|
||||
|
||||
def schedule_blocklist_job(
|
||||
scheduler: AsyncIOScheduler,
|
||||
settings: Settings,
|
||||
http_session: aiohttp.ClientSession,
|
||||
config: ScheduleConfig,
|
||||
) -> None:
|
||||
"""Register or replace the scheduled blocklist import job."""
|
||||
from app.tasks import blocklist_import as blocklist_import_task
|
||||
|
||||
if scheduler.get_job(JOB_ID):
|
||||
scheduler.remove_job(JOB_ID)
|
||||
|
||||
kwargs: dict[str, object] = {
|
||||
"settings": settings,
|
||||
"http_session": http_session,
|
||||
}
|
||||
|
||||
if config.frequency == ScheduleFrequency.hourly:
|
||||
trigger_type = "interval"
|
||||
trigger_kwargs = {"hours": config.interval_hours}
|
||||
elif config.frequency == ScheduleFrequency.weekly:
|
||||
trigger_type = "cron"
|
||||
trigger_kwargs = {
|
||||
"day_of_week": config.day_of_week,
|
||||
"hour": config.hour,
|
||||
"minute": config.minute,
|
||||
}
|
||||
else:
|
||||
trigger_type = "cron"
|
||||
trigger_kwargs = {
|
||||
"hour": config.hour,
|
||||
"minute": config.minute,
|
||||
}
|
||||
|
||||
scheduler.add_job(
|
||||
blocklist_import_task._run_import_with_resources,
|
||||
trigger=trigger_type,
|
||||
id=JOB_ID,
|
||||
kwargs=kwargs,
|
||||
**trigger_kwargs,
|
||||
)
|
||||
log.info(
|
||||
"blocklist_import_scheduled",
|
||||
frequency=config.frequency,
|
||||
trigger=trigger_type,
|
||||
trigger_kwargs=trigger_kwargs,
|
||||
)
|
||||
|
||||
|
||||
async def get_schedule(db: aiosqlite.Connection) -> ScheduleConfig:
|
||||
"""Read the import schedule config from the settings table.
|
||||
@@ -576,6 +639,28 @@ async def get_schedule_info(
|
||||
)
|
||||
|
||||
|
||||
async def get_schedule_info_with_runtime(
|
||||
db: aiosqlite.Connection,
|
||||
scheduler: AsyncIOScheduler,
|
||||
) -> ScheduleInfo:
|
||||
"""Return schedule info enriched with runtime scheduler metadata."""
|
||||
next_run_at = _get_job_next_run_at(scheduler)
|
||||
return await get_schedule_info(db, next_run_at)
|
||||
|
||||
|
||||
async def update_schedule(
|
||||
db: aiosqlite.Connection,
|
||||
scheduler: AsyncIOScheduler,
|
||||
http_session: aiohttp.ClientSession,
|
||||
settings: Settings,
|
||||
config: ScheduleConfig,
|
||||
) -> ScheduleInfo:
|
||||
"""Persist a new schedule config and re-register the scheduled job."""
|
||||
await set_schedule(db, config)
|
||||
schedule_blocklist_job(scheduler, settings, http_session, config)
|
||||
return await get_schedule_info(db, _get_job_next_run_at(scheduler))
|
||||
|
||||
|
||||
async def list_import_logs(
|
||||
db: aiosqlite.Connection,
|
||||
*,
|
||||
|
||||
@@ -18,7 +18,6 @@ from typing import TYPE_CHECKING, Any
|
||||
import structlog
|
||||
|
||||
from app.db import open_db
|
||||
from app.models.blocklist import ScheduleFrequency
|
||||
from app.services import blocklist_service, jail_service
|
||||
from app.utils.runtime_state import get_effective_settings
|
||||
|
||||
@@ -130,46 +129,11 @@ def _apply_schedule(app: FastAPI, config: Any) -> None:
|
||||
app: FastAPI application instance.
|
||||
config: :class:`~app.models.blocklist.ScheduleConfig` to apply.
|
||||
"""
|
||||
scheduler = app.state.scheduler
|
||||
from app.services import blocklist_service
|
||||
|
||||
kwargs: dict[str, Any] = {
|
||||
"settings": get_effective_settings(app),
|
||||
"http_session": app.state.http_session,
|
||||
}
|
||||
trigger_type: str
|
||||
trigger_kwargs: dict[str, Any]
|
||||
|
||||
if config.frequency == ScheduleFrequency.hourly:
|
||||
trigger_type = "interval"
|
||||
trigger_kwargs = {"hours": config.interval_hours}
|
||||
elif config.frequency == ScheduleFrequency.weekly:
|
||||
trigger_type = "cron"
|
||||
trigger_kwargs = {
|
||||
"day_of_week": config.day_of_week,
|
||||
"hour": config.hour,
|
||||
"minute": config.minute,
|
||||
}
|
||||
else: # daily (default)
|
||||
trigger_type = "cron"
|
||||
trigger_kwargs = {
|
||||
"hour": config.hour,
|
||||
"minute": config.minute,
|
||||
}
|
||||
|
||||
# Remove existing job if it exists, then add new one.
|
||||
if scheduler.get_job(JOB_ID):
|
||||
scheduler.remove_job(JOB_ID)
|
||||
|
||||
scheduler.add_job(
|
||||
_run_import_with_resources,
|
||||
trigger=trigger_type,
|
||||
id=JOB_ID,
|
||||
kwargs=kwargs,
|
||||
**trigger_kwargs,
|
||||
)
|
||||
log.info(
|
||||
"blocklist_import_scheduled",
|
||||
frequency=config.frequency,
|
||||
trigger=trigger_type,
|
||||
trigger_kwargs=trigger_kwargs,
|
||||
blocklist_service.schedule_blocklist_job(
|
||||
app.state.scheduler,
|
||||
get_effective_settings(app),
|
||||
app.state.http_session,
|
||||
config,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user