- Add TYPE_CHECKING guards for runtime-expensive imports (aiohttp, aiosqlite) - Reorganize imports to follow PEP 8 conventions - Convert TypeAlias to modern PEP 695 type syntax (where appropriate) - Use Sequence/Mapping from collections.abc for type hints (covariant) - Replace string literals with cast() for improved type inference - Fix casting of Fail2BanResponse and TypedDict patterns - Add IpLookupResult TypedDict for precise return type annotation - Reformat overlong lines for readability (120 char limit) - Add asyncio_mode and filterwarnings to pytest config - Update test fixtures with improved type hints This improves mypy type checking and makes type relationships explicit.
171 lines
4.6 KiB
Python
171 lines
4.6 KiB
Python
"""Import log repository.
|
|
|
|
Persists and queries blocklist import run records in the ``import_log``
|
|
table. All methods are plain async functions that accept a
|
|
:class:`aiosqlite.Connection`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
from typing import TYPE_CHECKING, TypedDict, cast
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Mapping
|
|
|
|
import aiosqlite
|
|
|
|
|
|
class ImportLogRow(TypedDict):
|
|
"""Row shape returned by queries on the import_log table."""
|
|
|
|
id: int
|
|
source_id: int | None
|
|
source_url: str
|
|
timestamp: str
|
|
ips_imported: int
|
|
ips_skipped: int
|
|
errors: str | None
|
|
|
|
|
|
async def add_log(
|
|
db: aiosqlite.Connection,
|
|
*,
|
|
source_id: int | None,
|
|
source_url: str,
|
|
ips_imported: int,
|
|
ips_skipped: int,
|
|
errors: str | None,
|
|
) -> int:
|
|
"""Insert a new import log entry and return its id.
|
|
|
|
Args:
|
|
db: Active aiosqlite connection.
|
|
source_id: FK to ``blocklist_sources.id``, or ``None`` if the source
|
|
has been deleted since the import ran.
|
|
source_url: URL that was downloaded.
|
|
ips_imported: Number of IPs successfully applied as bans.
|
|
ips_skipped: Number of lines that were skipped (invalid or CIDR).
|
|
errors: Error message string, or ``None`` if the import succeeded.
|
|
|
|
Returns:
|
|
Primary key of the inserted row.
|
|
"""
|
|
cursor = await db.execute(
|
|
"""
|
|
INSERT INTO import_log (source_id, source_url, ips_imported, ips_skipped, errors)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(source_id, source_url, ips_imported, ips_skipped, errors),
|
|
)
|
|
await db.commit()
|
|
return int(cursor.lastrowid) # type: ignore[arg-type]
|
|
|
|
|
|
async def list_logs(
|
|
db: aiosqlite.Connection,
|
|
*,
|
|
source_id: int | None = None,
|
|
page: int = 1,
|
|
page_size: int = 50,
|
|
) -> tuple[list[ImportLogRow], int]:
|
|
"""Return a paginated list of import log entries.
|
|
|
|
Args:
|
|
db: Active aiosqlite connection.
|
|
source_id: If given, filter to logs for this source only.
|
|
page: 1-based page index.
|
|
page_size: Number of items per page.
|
|
|
|
Returns:
|
|
A 2-tuple ``(items, total)`` where *items* is a list of dicts and
|
|
*total* is the count of all matching rows (ignoring pagination).
|
|
"""
|
|
where = ""
|
|
params_count: list[object] = []
|
|
params_rows: list[object] = []
|
|
|
|
if source_id is not None:
|
|
where = " WHERE source_id = ?"
|
|
params_count.append(source_id)
|
|
params_rows.append(source_id)
|
|
|
|
# Total count
|
|
async with db.execute(
|
|
f"SELECT COUNT(*) FROM import_log{where}", # noqa: S608
|
|
params_count,
|
|
) as cursor:
|
|
count_row = await cursor.fetchone()
|
|
total: int = int(count_row[0]) if count_row else 0
|
|
|
|
offset = (page - 1) * page_size
|
|
params_rows.extend([page_size, offset])
|
|
|
|
async with db.execute(
|
|
f"""
|
|
SELECT id, source_id, source_url, timestamp, ips_imported, ips_skipped, errors
|
|
FROM import_log{where}
|
|
ORDER BY id DESC
|
|
LIMIT ? OFFSET ?
|
|
""", # noqa: S608
|
|
params_rows,
|
|
) as cursor:
|
|
rows = await cursor.fetchall()
|
|
items = [_row_to_dict(r) for r in rows]
|
|
|
|
return items, total
|
|
|
|
|
|
async def get_last_log(db: aiosqlite.Connection) -> ImportLogRow | None:
|
|
"""Return the most recent import log entry across all sources.
|
|
|
|
Args:
|
|
db: Active aiosqlite connection.
|
|
|
|
Returns:
|
|
The latest log entry as a dict, or ``None`` if no logs exist.
|
|
"""
|
|
async with db.execute(
|
|
"""
|
|
SELECT id, source_id, source_url, timestamp, ips_imported, ips_skipped, errors
|
|
FROM import_log
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
"""
|
|
) as cursor:
|
|
row = await cursor.fetchone()
|
|
return _row_to_dict(row) if row is not None else None
|
|
|
|
|
|
def compute_total_pages(total: int, page_size: int) -> int:
|
|
"""Return the total number of pages for a given total and page size.
|
|
|
|
Args:
|
|
total: Total number of items.
|
|
page_size: Items per page.
|
|
|
|
Returns:
|
|
Number of pages (minimum 1).
|
|
"""
|
|
if total == 0:
|
|
return 1
|
|
return math.ceil(total / page_size)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _row_to_dict(row: object) -> ImportLogRow:
|
|
"""Convert an aiosqlite row to a plain Python dict.
|
|
|
|
Args:
|
|
row: An :class:`aiosqlite.Row` or similar mapping returned by a cursor.
|
|
|
|
Returns:
|
|
Dict mapping column names to Python values.
|
|
"""
|
|
mapping = cast("Mapping[str, object]", row)
|
|
return cast("ImportLogRow", dict(mapping))
|