Files
BanGUI/backend/app/tasks/scheduler_lock_heartbeat.py
Lukas 52f237d5d4 Make background tasks idempotent - prevent duplicate bans on retry
CRITICAL FIX: Background tasks (especially blocklist_import) crashed mid-execution,
leaving partial state. On retry, the same bans were applied again, causing duplicates.

Solution: Content-hash based operation tracking for blocklist imports:
- Added import_runs table (migration 6) to track operations by source + content hash
- Before banning, check if this exact content has already been imported
- If completed: skip banning (already done), optionally re-warm cache
- If new or failed: proceed with ban and mark as completed or failed

Changes:
- Database: Migration 6 adds import_runs table with operation state tracking
- Model: Added ImportRunEntry for import run records
- Repository: New import_run_repo module with CRUD operations
- Workflow: Updated blocklist_import_workflow to check operation history before banning
- Dependencies: Registered import_run_repo for dependency injection
- Tests: Added test_import_source_idempotent_on_retry and test_import_source_different_content_not_reused
- Documentation: Added Task Idempotency section to Backend-Development.md

Verification:
- All 7 import tests pass (5 existing + 2 new idempotency tests)
- Type checking: mypy --strict 
- Linting: ruff 
- No API changes, backwards compatible via automatic migration

Fixes: Background tasks not idempotent #CRITICAL

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-30 21:54:14 +02:00

93 lines
3.0 KiB
Python

"""Scheduler lock heartbeat background task.
Registers an APScheduler job that periodically updates the scheduler lock's
heartbeat timestamp. This prevents the lock from being considered stale
if the running instance experiences temporary delays or high load.
Without this heartbeat, stale lock detection (based on TTL) could incorrectly
determine that the scheduler instance has crashed when it's merely busy, and
a new instance could take over.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import structlog
from app.tasks.db import task_db
from app.tasks.timeout_utils import run_with_timeout
from app.utils.runtime_state import get_effective_settings
from app.utils.scheduler_lock import update_scheduler_lock_heartbeat
if TYPE_CHECKING:
from fastapi import FastAPI
from app.config import Settings
log: structlog.stdlib.BoundLogger = structlog.get_logger()
#: How often the heartbeat job fires (seconds). Must be less than the lock TTL.
SCHEDULER_LOCK_HEARTBEAT_INTERVAL: int = 10
#: Stable APScheduler job ID — ensures re-registration replaces, not duplicates.
JOB_ID: str = "scheduler_lock_heartbeat"
#: Maximum seconds to allow for scheduler lock heartbeat to complete.
TASK_TIMEOUT_SECONDS: int = 5
async def _update_heartbeat_with_resources(settings: Settings) -> None:
"""Update the scheduler lock heartbeat timestamp.
If the heartbeat update fails (e.g., we no longer hold the lock), log
a warning but don't crash the scheduler. This allows the running
application to continue even if something went wrong.
Args:
settings: The resolved application settings used for database access.
"""
async def _do_update() -> None:
async with task_db(settings) as db:
success = await update_scheduler_lock_heartbeat(db)
if success:
log.debug("scheduler_lock_heartbeat_updated")
else:
log.warning(
"scheduler_lock_heartbeat_failed",
message="Failed to update heartbeat; we may have lost the lock.",
)
await run_with_timeout("scheduler_lock_heartbeat", _do_update(), TASK_TIMEOUT_SECONDS)
async def _update_heartbeat(app: FastAPI) -> None:
await _update_heartbeat_with_resources(get_effective_settings(app))
def register(app: FastAPI) -> None:
"""Add (or replace) the scheduler lock heartbeat job.
Must be called after the scheduler has been started (i.e., inside the
lifespan handler, after ``scheduler.start()``).
Args:
app: The :class:`fastapi.FastAPI` application instance whose
``app.state.scheduler`` will receive the job.
"""
settings = get_effective_settings(app)
app.state.scheduler.add_job(
_update_heartbeat_with_resources,
trigger="interval",
seconds=SCHEDULER_LOCK_HEARTBEAT_INTERVAL,
kwargs={"settings": settings},
id=JOB_ID,
replace_existing=True,
)
log.info(
"scheduler_lock_heartbeat_scheduled",
interval_seconds=SCHEDULER_LOCK_HEARTBEAT_INTERVAL,
)