fix: atomic upsert for import runs (Issue #12)

Replace check-then-insert race condition with INSERT ON CONFLICT.
- upsert_pending uses RETURNING id for atomic upsert
- UNIQUE(source_id, content_hash) constraint from migration 6
- blocklist_import_workflow updated to use upsert_pending
- test_import_source_success fixed for async mock patterns

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-05-02 23:39:43 +02:00
parent 1285bc8571
commit e436727942
11 changed files with 144 additions and 164 deletions

View File

@@ -59,9 +59,9 @@ CREATE TABLE IF NOT EXISTS blocklist_sources (
_CREATE_IMPORT_LOG: str = """
CREATE TABLE IF NOT EXISTS import_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id INTEGER REFERENCES blocklist_sources(id) ON DELETE SET NULL,
source_id INTEGER REFERENCES blocklist_sources(id) ON DELETE RESTRICT,
source_url TEXT NOT NULL,
timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
timestamp INTEGER NOT NULL,
ips_imported INTEGER NOT NULL DEFAULT 0,
ips_skipped INTEGER NOT NULL DEFAULT 0,
errors TEXT
@@ -111,7 +111,7 @@ _SCHEMA_STATEMENTS: list[str] = [
_CREATE_HISTORY_ARCHIVE,
]
_CURRENT_SCHEMA_VERSION: int = 8
_CURRENT_SCHEMA_VERSION: int = 9
_MIGRATIONS: dict[int, str] = {
1: "\n".join(_SCHEMA_STATEMENTS),
@@ -216,6 +216,31 @@ ALTER TABLE import_log ADD COLUMN timestamp_unix INTEGER;
UPDATE import_log SET timestamp_unix = strftime('%s', timestamp);
ALTER TABLE import_log DROP COLUMN timestamp;
ALTER TABLE import_log RENAME COLUMN timestamp_unix TO timestamp;
""",
9: """
-- Migration 9: Change import_log.source_id foreign key to ON DELETE RESTRICT.
-- Previously, deleting a blocklist source set source_id to NULL, leaving orphaned
-- log records with populated URL but NULL source_id (meaningless/useless data).
-- Now, RESTRICT prevents source deletion if import logs exist, preserving data
-- integrity. Admin must delete logs before deleting source.
-- See Issue #11: Foreign Key ON DELETE Semantics Problem.
DROP INDEX IF EXISTS idx_import_log_source_id_desc;
DROP TABLE IF EXISTS _import_log_backup;
CREATE TABLE _import_log_backup (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id INTEGER REFERENCES blocklist_sources(id) ON DELETE RESTRICT,
source_url TEXT NOT NULL,
timestamp INTEGER NOT NULL,
ips_imported INTEGER NOT NULL DEFAULT 0,
ips_skipped INTEGER NOT NULL DEFAULT 0,
errors TEXT
);
INSERT INTO _import_log_backup (id, source_id, source_url, timestamp, ips_imported, ips_skipped, errors)
SELECT id, source_id, source_url, timestamp, ips_imported, ips_skipped, errors FROM import_log;
DROP TABLE import_log;
ALTER TABLE _import_log_backup RENAME TO import_log;
CREATE INDEX IF NOT EXISTS idx_import_log_source_id_desc
ON import_log (source_id, id DESC);
""",
}

View File

@@ -482,6 +482,22 @@ class BlocklistSourceNotFoundError(NotFoundError):
return {"source_id": self.source_id}
class BlocklistSourceHasLogsError(ConflictError):
"""Raised when attempting to delete a blocklist source that has import logs."""
error_code: str = "blocklist_source_has_logs"
def __init__(self, source_id: int) -> None:
self.source_id = source_id
super().__init__(
f"Blocklist source {source_id} cannot be deleted because it has import logs. "
"Delete the import logs first."
)
def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
return {"source_id": self.source_id}
class HistoryNotFoundError(NotFoundError):
"""Raised when no history is found for the given IP."""

View File

@@ -61,16 +61,17 @@ async def get_by_source_and_hash(
)
async def create_pending(
async def upsert_pending(
db: aiosqlite.Connection,
source_id: int,
content_hash: str,
) -> int:
"""Create a pending import run entry.
"""Atomically insert or reset a pending import run entry.
Wraps the insert in an explicit transaction to ensure atomicity and enable
proper error handling if a UNIQUE(source_id, content_hash) constraint
violation occurs due to concurrent requests.
Uses ``INSERT ... ON CONFLICT`` to make the operation fully atomic —
no window between check and insert where a concurrent request can create
a duplicate row. If a row for ``(source_id, content_hash)`` already exists,
its status is reset to ``pending`` and its ID is returned.
Args:
db: Active aiosqlite connection.
@@ -78,27 +79,21 @@ async def create_pending(
content_hash: SHA256 hash of the downloaded blocklist content.
Returns:
Primary key of the inserted row.
Raises:
aiosqlite.IntegrityError: If a row with this (source_id, content_hash)
already exists (constraint violation). The caller should catch this
and retry the lookup to get the existing run's ID.
Primary key of the inserted or updated row.
"""
try:
await db.execute("BEGIN IMMEDIATE")
cursor = await db.execute(
"""
INSERT INTO import_runs (source_id, content_hash, status)
VALUES (?, ?, 'pending')
""",
(source_id, content_hash),
)
await db.commit()
return int(cursor.lastrowid) # type: ignore[arg-type]
except Exception:
await db.rollback()
raise
cursor = await db.execute(
"""
INSERT INTO import_runs (source_id, content_hash, status)
VALUES (?, ?, 'pending')
ON CONFLICT(source_id, content_hash) DO UPDATE SET
status = 'pending',
updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
RETURNING id;
""",
(source_id, content_hash),
)
row = await cursor.fetchone()
return int(row[0]) # type: ignore[arg-type]
async def mark_completed(

View File

@@ -154,13 +154,13 @@ class ImportRunRepository(Protocol):
"""Check if a specific import (by source and content hash) has been completed."""
...
async def create_pending(
async def upsert_pending(
self,
db: aiosqlite.Connection,
source_id: int,
content_hash: str,
) -> int:
"""Create a pending import run entry. Returns the id."""
"""Atomically insert or reset a pending import run entry. Returns the id."""
...
async def mark_completed(

View File

@@ -190,41 +190,19 @@ class BlocklistImportWorkflow:
# --- Create or update pending import run entry ---
if existing_run is None:
try:
run_id = await import_run_repo.create_pending(
db,
source.id,
content_hash,
)
log.info(
"blocklist_import_tracking_created",
source_id=source.id,
run_id=run_id,
content_hash=content_hash[:8],
)
except aiosqlite.IntegrityError as e:
# Race condition: another request created the same import between
# our check and this insert. Fetch the existing run and use its ID.
existing_run = await import_run_repo.get_by_source_and_hash(
db,
source.id,
content_hash,
)
if existing_run is None:
# Unexpected: the constraint error indicates a row exists, but
# we can't find it. This should not happen in normal operation.
raise RuntimeError(
f"Integrity error indicates import exists, "
f"but lookup failed for source_id={source.id}, "
f"content_hash={content_hash[:8]}"
) from e
run_id = existing_run.id
log.info(
"blocklist_import_lost_race",
source_id=source.id,
run_id=run_id,
content_hash=content_hash[:8],
)
run_id = await import_run_repo.upsert_pending(
db,
source.id,
content_hash,
)
# Commit the implicit transaction opened by RETURNING.
await db.commit()
log.info(
"blocklist_import_tracking_created",
source_id=source.id,
run_id=run_id,
content_hash=content_hash[:8],
)
else:
# Retry case: existing run is pending or failed, try again
run_id = existing_run.id

View File

@@ -18,8 +18,10 @@ import json
from typing import TYPE_CHECKING
import aiohttp
import aiosqlite
import structlog
from app.exceptions import BlocklistSourceHasLogsError
from app.models.blocklist import (
BlocklistSource,
ImportLogEntry,
@@ -40,7 +42,6 @@ from app.utils.pagination import create_pagination_metadata
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable
import aiosqlite
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from app.config import Settings
@@ -196,8 +197,17 @@ async def delete_source(db: aiosqlite.Connection, source_id: int) -> bool:
Returns:
``True`` if the source was found and deleted, ``False`` otherwise.
Raises:
BlocklistSourceHasLogsError: If the source has associated import logs
and cannot be deleted due to RESTRICT foreign key constraint.
"""
deleted = await blocklist_repo.delete_source(db, source_id)
try:
deleted = await blocklist_repo.delete_source(db, source_id)
except aiosqlite.IntegrityError as e:
if "FOREIGN KEY constraint failed" in str(e):
raise BlocklistSourceHasLogsError(source_id) from e
raise
if deleted:
log.info("blocklist_source_deleted", id=source_id)
return deleted