"""Import run repository for blocklist import idempotency tracking. Persists and queries import run records in the ``import_runs`` table. Enables detection of duplicate import attempts and prevents re-running bans on scheduler retry after a crash. All methods are plain async functions that accept an :class:`aiosqlite.Connection`. """ from __future__ import annotations from typing import TYPE_CHECKING if TYPE_CHECKING: import aiosqlite from app.models.blocklist import ImportRunEntry async def get_by_source_and_hash( db: aiosqlite.Connection, source_id: int, content_hash: str, ) -> ImportRunEntry | None: """Check if a specific import (by source and content hash) already exists. Args: db: Active aiosqlite connection. source_id: FK to ``blocklist_sources.id``. content_hash: SHA256 hash of the downloaded blocklist content. Returns: ImportRunEntry if found, None otherwise. """ async with db.execute( """ SELECT id, source_id, content_hash, status, imported_count, skipped_count, error_message, created_at, updated_at FROM import_runs WHERE source_id = ? AND content_hash = ? """, (source_id, content_hash), ) as cursor: row = await cursor.fetchone() if not row: return None return ImportRunEntry( id=row[0], source_id=row[1], content_hash=row[2], status=row[3], imported_count=row[4], skipped_count=row[5], error_message=row[6], created_at=row[7], updated_at=row[8], ) async def create_pending( db: aiosqlite.Connection, source_id: int, content_hash: str, ) -> int: """Create a pending import run entry. Wraps the insert in an explicit transaction to ensure atomicity and enable proper error handling if a UNIQUE(source_id, content_hash) constraint violation occurs due to concurrent requests. Args: db: Active aiosqlite connection. source_id: FK to ``blocklist_sources.id``. content_hash: SHA256 hash of the downloaded blocklist content. Returns: Primary key of the inserted row. Raises: aiosqlite.IntegrityError: If a row with this (source_id, content_hash) already exists (constraint violation). The caller should catch this and retry the lookup to get the existing run's ID. """ try: await db.execute("BEGIN IMMEDIATE") cursor = await db.execute( """ INSERT INTO import_runs (source_id, content_hash, status) VALUES (?, ?, 'pending') """, (source_id, content_hash), ) await db.commit() return int(cursor.lastrowid) # type: ignore[arg-type] except Exception: await db.rollback() raise async def mark_completed( db: aiosqlite.Connection, run_id: int, imported_count: int, skipped_count: int, ) -> None: """Mark an import run as completed with final counts. Wraps the update in an explicit transaction to ensure atomicity. Args: db: Active aiosqlite connection. run_id: Primary key of the import run. imported_count: Number of IPs successfully banned. skipped_count: Number of entries skipped (invalid or CIDR). """ try: await db.execute("BEGIN IMMEDIATE") await db.execute( """ UPDATE import_runs SET status = 'completed', imported_count = ?, skipped_count = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') WHERE id = ? """, (imported_count, skipped_count, run_id), ) await db.commit() except Exception: await db.rollback() raise async def mark_failed( db: aiosqlite.Connection, run_id: int, error_message: str, ) -> None: """Mark an import run as failed with error details. Wraps the update in an explicit transaction to ensure atomicity. Args: db: Active aiosqlite connection. run_id: Primary key of the import run. error_message: Error description. """ try: await db.execute("BEGIN IMMEDIATE") await db.execute( """ UPDATE import_runs SET status = 'failed', error_message = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') WHERE id = ? """, (error_message, run_id), ) await db.commit() except Exception: await db.rollback() raise