"""Import log repository. Persists and queries blocklist import run records in the ``import_log`` table. All methods are plain async functions that accept a :class:`aiosqlite.Connection`. """ from __future__ import annotations import math from typing import TYPE_CHECKING, Any if TYPE_CHECKING: import aiosqlite async def add_log( db: aiosqlite.Connection, *, source_id: int | None, source_url: str, ips_imported: int, ips_skipped: int, errors: str | None, ) -> int: """Insert a new import log entry and return its id. Args: db: Active aiosqlite connection. source_id: FK to ``blocklist_sources.id``, or ``None`` if the source has been deleted since the import ran. source_url: URL that was downloaded. ips_imported: Number of IPs successfully applied as bans. ips_skipped: Number of lines that were skipped (invalid or CIDR). errors: Error message string, or ``None`` if the import succeeded. Returns: Primary key of the inserted row. """ cursor = await db.execute( """ INSERT INTO import_log (source_id, source_url, ips_imported, ips_skipped, errors) VALUES (?, ?, ?, ?, ?) """, (source_id, source_url, ips_imported, ips_skipped, errors), ) await db.commit() return int(cursor.lastrowid) # type: ignore[arg-type] async def list_logs( db: aiosqlite.Connection, *, source_id: int | None = None, page: int = 1, page_size: int = 50, ) -> tuple[list[dict[str, Any]], int]: """Return a paginated list of import log entries. Args: db: Active aiosqlite connection. source_id: If given, filter to logs for this source only. page: 1-based page index. page_size: Number of items per page. Returns: A 2-tuple ``(items, total)`` where *items* is a list of dicts and *total* is the count of all matching rows (ignoring pagination). """ where = "" params_count: list[Any] = [] params_rows: list[Any] = [] if source_id is not None: where = " WHERE source_id = ?" params_count.append(source_id) params_rows.append(source_id) # Total count async with db.execute( f"SELECT COUNT(*) FROM import_log{where}", # noqa: S608 params_count, ) as cursor: count_row = await cursor.fetchone() total: int = int(count_row[0]) if count_row else 0 offset = (page - 1) * page_size params_rows.extend([page_size, offset]) async with db.execute( f""" SELECT id, source_id, source_url, timestamp, ips_imported, ips_skipped, errors FROM import_log{where} ORDER BY id DESC LIMIT ? OFFSET ? """, # noqa: S608 params_rows, ) as cursor: rows = await cursor.fetchall() items = [_row_to_dict(r) for r in rows] return items, total async def get_last_log(db: aiosqlite.Connection) -> dict[str, Any] | None: """Return the most recent import log entry across all sources. Args: db: Active aiosqlite connection. Returns: The latest log entry as a dict, or ``None`` if no logs exist. """ async with db.execute( """ SELECT id, source_id, source_url, timestamp, ips_imported, ips_skipped, errors FROM import_log ORDER BY id DESC LIMIT 1 """ ) as cursor: row = await cursor.fetchone() return _row_to_dict(row) if row is not None else None def compute_total_pages(total: int, page_size: int) -> int: """Return the total number of pages for a given total and page size. Args: total: Total number of items. page_size: Items per page. Returns: Number of pages (minimum 1). """ if total == 0: return 1 return math.ceil(total / page_size) # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _row_to_dict(row: Any) -> dict[str, Any]: """Convert an aiosqlite row to a plain Python dict. Args: row: An :class:`aiosqlite.Row` or sequence returned by a cursor. Returns: Dict mapping column names to Python values. """ return dict(row)