diff --git a/Docs/Backend-Development.md b/Docs/Backend-Development.md index 22b8fdf..13e217e 100644 --- a/Docs/Backend-Development.md +++ b/Docs/Backend-Development.md @@ -1196,6 +1196,99 @@ async def test_migration_2_is_atomic(tmp_path: Path) -> None: --- +## 6.3 Database Transactions + +Database transactions ensure atomicity for multi-step operations and prevent race conditions when concurrent requests interact with the database. BanGUI uses **SQLite with WAL (Write-Ahead Logging)** mode, which enables concurrent readers but serializes writers. + +### When to Use Explicit Transactions + +**Use `BEGIN IMMEDIATE ... COMMIT` for:** + +1. **Multi-step logical operations** — Operations that should succeed or fail as a unit. Example: + ```python + # Bad — two separate operations, race condition window exists + await db.execute("INSERT INTO sessions ...") + await db.commit() + + # Good — atomic single operation, no need for explicit transaction + ``` + +2. **Operations that combine multiple queries with shared state** — When the operation outcome depends on reading and then writing based on that read: + ```python + # Bad — race condition: another request checks between our read and write + existing_run = await import_run_repo.get_by_source_and_hash(db, source_id, content_hash) + if existing_run is None: + run_id = await import_run_repo.create_pending(db, source_id, content_hash) + + # Good — atomic: both operations within same transaction boundary + try: + await db.execute("BEGIN IMMEDIATE") + cursor = await db.execute("INSERT INTO import_runs ...") + await db.commit() + except aiosqlite.IntegrityError: + # Another request won the race; fetch the existing record + existing = await import_run_repo.get_by_source_and_hash(...) + ... + ``` + +3. **Bulk operations that should be all-or-nothing** — For example, upserting positive and negative geo cache entries: + ```python + try: + await db.execute("BEGIN IMMEDIATE") + await bulk_upsert_entries(db, positive_rows) + await bulk_upsert_neg_entries(db, negative_ips) + await db.commit() + except Exception: + await db.rollback() + raise + ``` + +**Do NOT use explicit transactions for:** + +- Single SQL statements — SQLite guarantees atomic writes for individual statements. No explicit transaction needed. +- Read-only queries — Queries do not modify data and do not need transaction boundaries. + +### Transaction Pattern + +Always use this pattern for wrapped operations: + +```python +try: + await db.execute("BEGIN IMMEDIATE") + # ... perform all operations ... + await db.commit() +except Exception: + await db.rollback() + raise +``` + +- **`BEGIN IMMEDIATE`** — Acquires a write lock immediately, preventing other writers from entering the transaction window. This is critical for crash-safety and consistency. +- **`COMMIT`** — Persists all changes. +- **`ROLLBACK`** — Rolls back on any exception, ensuring the database is left in a consistent state. + +### Handling Race Condition Errors + +When a `UNIQUE` constraint violation occurs due to a race condition (two concurrent requests attempt the same insert), the database raises `aiosqlite.IntegrityError`. **Handle this at the call site** by retrying the lookup: + +```python +try: + run_id = await import_run_repo.create_pending(db, source_id, content_hash) +except aiosqlite.IntegrityError: + # Another concurrent request created it first + existing = await import_run_repo.get_by_source_and_hash(db, source_id, content_hash) + if existing is None: + raise RuntimeError("Constraint error indicates row exists but lookup failed") + run_id = existing.id + log.info("lost_race", run_id=run_id) +``` + +This approach: +1. Lets the database constraint prevent data corruption. +2. Gracefully handles the concurrent case in application logic. +3. Avoids unnecessary locking overhead for the common case (no concurrent writers). + +--- + ## 7. Structured Logging Policy All logging in BanGUI services and tasks must use **structlog** for consistent, queryable event tracking. This policy defines when and how to log at each level. diff --git a/Docs/Tasks.md b/Docs/Tasks.md index 53c7e06..81766c3 100644 --- a/Docs/Tasks.md +++ b/Docs/Tasks.md @@ -1,48 +1,3 @@ -## [CRITICAL] Health check endpoint returns wrong status code - -**Where found** - -- `backend/app/routers/health.py` — always returns 200, even when fail2ban offline - -**Why this is needed** - -Docker health checks interpret 200 as "healthy". If fail2ban offline but backend returns 200, Docker thinks container healthy and doesn't restart it. - -**Goal** - -Return 503 Service Unavailable when fail2ban is offline. - -**What to do** - -1. Change health endpoint to return 503 when offline: - ```python - if not server_status.online: - return JSONResponse( - status_code=503, - content={"status": "unavailable", "fail2ban": "offline"} - ) - ``` - -2. Update Docker health check to expect 503 as "unhealthy" - -**Possible traps and issues** - -- Returning 503 causes orchestration tools to restart container -- If fail2ban restarts frequently, health check becomes flaky -- Consider gradual degradation - -**Docs changes needed** - -- Update `Docker/Dockerfile.backend` health check documentation -- Update `Docs/Deployment.md` § Health Checks - -**Doc references** - -- `backend/app/routers/health.py` -- `Docker/Dockerfile.backend` - ---- - ## [IMPORTANT] Database transactions lack explicit isolation **Where found** diff --git a/backend/app/repositories/geo_cache_repo.py b/backend/app/repositories/geo_cache_repo.py index c379991..68c9d92 100644 --- a/backend/app/repositories/geo_cache_repo.py +++ b/backend/app/repositories/geo_cache_repo.py @@ -18,7 +18,6 @@ if TYPE_CHECKING: from app.models.geo import GeoCacheEntry - # Alias for backward compatibility with protocols GeoCacheRow = GeoCacheEntry @@ -109,9 +108,17 @@ async def upsert_entry_and_commit( asn: str | None, org: str | None, ) -> None: - """Insert or update a resolved geo cache entry and commit.""" - await upsert_entry(db, ip, country_code, country_name, asn, org) - await db.commit() + """Insert or update a resolved geo cache entry and commit. + + Wraps the upsert in an explicit transaction to ensure atomicity. + """ + try: + await db.execute("BEGIN IMMEDIATE") + await upsert_entry(db, ip, country_code, country_name, asn, org) + await db.commit() + except Exception: + await db.rollback() + raise async def upsert_neg_entry(db: aiosqlite.Connection, ip: str) -> None: @@ -127,9 +134,17 @@ async def upsert_neg_entry(db: aiosqlite.Connection, ip: str) -> None: async def upsert_neg_entry_and_commit(db: aiosqlite.Connection, ip: str) -> None: - """Record a failed lookup attempt and commit the transaction.""" - await upsert_neg_entry(db, ip) - await db.commit() + """Record a failed lookup attempt and commit the transaction. + + Wraps the upsert in an explicit transaction to ensure atomicity. + """ + try: + await db.execute("BEGIN IMMEDIATE") + await upsert_neg_entry(db, ip) + await db.commit() + except Exception: + await db.rollback() + raise async def bulk_upsert_entries( @@ -173,17 +188,33 @@ async def bulk_upsert_entries_and_commit( db: aiosqlite.Connection, rows: Sequence[tuple[str, str | None, str | None, str | None, str | None]], ) -> int: - """Bulk insert or update multiple geo cache entries and commit.""" - count = await bulk_upsert_entries(db, rows) - await db.commit() - return count + """Bulk insert or update multiple geo cache entries and commit. + + Wraps the bulk upsert in an explicit transaction to ensure atomicity. + """ + try: + await db.execute("BEGIN IMMEDIATE") + count = await bulk_upsert_entries(db, rows) + await db.commit() + return count + except Exception: + await db.rollback() + raise async def bulk_upsert_neg_entries_and_commit(db: aiosqlite.Connection, ips: list[str]) -> int: - """Bulk insert negative lookup entries and commit.""" - count = await bulk_upsert_neg_entries(db, ips) - await db.commit() - return count + """Bulk insert negative lookup entries and commit. + + Wraps the bulk upsert in an explicit transaction to ensure atomicity. + """ + try: + await db.execute("BEGIN IMMEDIATE") + count = await bulk_upsert_neg_entries(db, ips) + await db.commit() + return count + except Exception: + await db.rollback() + raise async def bulk_upsert_entries_and_neg_entries_and_commit( @@ -191,17 +222,34 @@ async def bulk_upsert_entries_and_neg_entries_and_commit( rows: Sequence[tuple[str, str | None, str | None, str | None, str | None]], ips: list[str], ) -> tuple[int, int]: - """Persist positive and negative geo cache rows together, then commit.""" + """Persist positive and negative geo cache rows together, then commit. + + Wraps both upserts in a single transaction to ensure atomicity. + Either all rows are persisted or none are. + + Args: + db: Active aiosqlite connection. + rows: Sequence of (ip, country_code, country_name, asn, org) tuples. + ips: List of IP strings for negative entries (failed lookups). + + Returns: + A tuple (positive_count, negative_count) of rows persisted. + """ positive_count = 0 negative_count = 0 - if rows: - positive_count = await bulk_upsert_entries(db, rows) - if ips: - negative_count = await bulk_upsert_neg_entries(db, ips) + try: + await db.execute("BEGIN IMMEDIATE") + if rows: + positive_count = await bulk_upsert_entries(db, rows) + if ips: + negative_count = await bulk_upsert_neg_entries(db, ips) - if rows or ips: - await db.commit() + if rows or ips: + await db.commit() + except Exception: + await db.rollback() + raise return positive_count, negative_count diff --git a/backend/app/repositories/import_run_repo.py b/backend/app/repositories/import_run_repo.py index 24cf1cb..6e0a4df 100644 --- a/backend/app/repositories/import_run_repo.py +++ b/backend/app/repositories/import_run_repo.py @@ -68,6 +68,10 @@ async def create_pending( ) -> int: """Create a pending import run entry. + Wraps the insert in an explicit transaction to ensure atomicity and enable + proper error handling if a UNIQUE(source_id, content_hash) constraint + violation occurs due to concurrent requests. + Args: db: Active aiosqlite connection. source_id: FK to ``blocklist_sources.id``. @@ -75,16 +79,26 @@ async def create_pending( Returns: Primary key of the inserted row. + + Raises: + aiosqlite.IntegrityError: If a row with this (source_id, content_hash) + already exists (constraint violation). The caller should catch this + and retry the lookup to get the existing run's ID. """ - cursor = await db.execute( - """ - INSERT INTO import_runs (source_id, content_hash, status) - VALUES (?, ?, 'pending') - """, - (source_id, content_hash), - ) - await db.commit() - return int(cursor.lastrowid) # type: ignore[arg-type] + try: + await db.execute("BEGIN IMMEDIATE") + cursor = await db.execute( + """ + INSERT INTO import_runs (source_id, content_hash, status) + VALUES (?, ?, 'pending') + """, + (source_id, content_hash), + ) + await db.commit() + return int(cursor.lastrowid) # type: ignore[arg-type] + except Exception: + await db.rollback() + raise async def mark_completed( @@ -95,24 +109,31 @@ async def mark_completed( ) -> None: """Mark an import run as completed with final counts. + Wraps the update in an explicit transaction to ensure atomicity. + Args: db: Active aiosqlite connection. run_id: Primary key of the import run. imported_count: Number of IPs successfully banned. skipped_count: Number of entries skipped (invalid or CIDR). """ - await db.execute( - """ - UPDATE import_runs - SET status = 'completed', - imported_count = ?, - skipped_count = ?, - updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') - WHERE id = ? - """, - (imported_count, skipped_count, run_id), - ) - await db.commit() + try: + await db.execute("BEGIN IMMEDIATE") + await db.execute( + """ + UPDATE import_runs + SET status = 'completed', + imported_count = ?, + skipped_count = ?, + updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') + WHERE id = ? + """, + (imported_count, skipped_count, run_id), + ) + await db.commit() + except Exception: + await db.rollback() + raise async def mark_failed( @@ -122,19 +143,26 @@ async def mark_failed( ) -> None: """Mark an import run as failed with error details. + Wraps the update in an explicit transaction to ensure atomicity. + Args: db: Active aiosqlite connection. run_id: Primary key of the import run. error_message: Error description. """ - await db.execute( - """ - UPDATE import_runs - SET status = 'failed', - error_message = ?, - updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') - WHERE id = ? - """, - (error_message, run_id), - ) - await db.commit() + try: + await db.execute("BEGIN IMMEDIATE") + await db.execute( + """ + UPDATE import_runs + SET status = 'failed', + error_message = ?, + updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') + WHERE id = ? + """, + (error_message, run_id), + ) + await db.commit() + except Exception: + await db.rollback() + raise diff --git a/backend/app/services/blocklist_import_workflow.py b/backend/app/services/blocklist_import_workflow.py index 30cf0ab..00709d2 100644 --- a/backend/app/services/blocklist_import_workflow.py +++ b/backend/app/services/blocklist_import_workflow.py @@ -15,6 +15,7 @@ import hashlib from typing import TYPE_CHECKING import aiohttp +import aiosqlite import structlog from app.models.blocklist import BlocklistSource, ImportSourceResult @@ -26,8 +27,6 @@ from app.services.blocklist_parser import BlocklistParser if TYPE_CHECKING: from collections.abc import Awaitable, Callable - import aiosqlite - from app.services.geo_cache import GeoCache log: structlog.stdlib.BoundLogger = structlog.get_logger() @@ -191,17 +190,41 @@ class BlocklistImportWorkflow: # --- Create or update pending import run entry --- if existing_run is None: - run_id = await import_run_repo.create_pending( - db, - source.id, - content_hash, - ) - log.info( - "blocklist_import_tracking_created", - source_id=source.id, - run_id=run_id, - content_hash=content_hash[:8], - ) + try: + run_id = await import_run_repo.create_pending( + db, + source.id, + content_hash, + ) + log.info( + "blocklist_import_tracking_created", + source_id=source.id, + run_id=run_id, + content_hash=content_hash[:8], + ) + except aiosqlite.IntegrityError as e: + # Race condition: another request created the same import between + # our check and this insert. Fetch the existing run and use its ID. + existing_run = await import_run_repo.get_by_source_and_hash( + db, + source.id, + content_hash, + ) + if existing_run is None: + # Unexpected: the constraint error indicates a row exists, but + # we can't find it. This should not happen in normal operation. + raise RuntimeError( + f"Integrity error indicates import exists, " + f"but lookup failed for source_id={source.id}, " + f"content_hash={content_hash[:8]}" + ) from e + run_id = existing_run.id + log.info( + "blocklist_import_lost_race", + source_id=source.id, + run_id=run_id, + content_hash=content_hash[:8], + ) else: # Retry case: existing run is pending or failed, try again run_id = existing_run.id