# Backend Development — Rules & Guidelines Rules and conventions every backend developer must follow. Read this before writing your first line of code. --- ## 1. Language & Typing - **Python 3.12+** is the minimum version. - **Every** function, method, and variable must have explicit type annotations — no exceptions. - Use `str`, `int`, `float`, `bool`, `None` for primitives. - Use `list[T]`, `dict[K, V]`, `set[T]`, `tuple[T, ...]` (lowercase, built-in generics) — never `typing.List`, `typing.Dict`, etc. - Use `T | None` instead of `Optional[T]`. - Use `TypeAlias`, `TypeVar`, `Protocol`, and `NewType` when they improve clarity. - Return types are **mandatory** — including `-> None`. - Never use `Any` unless there is no other option and a comment explains why. - Run `mypy --strict` (or `pyright` in strict mode) — the codebase must pass with zero errors. ```python # Good def get_jail_by_name(name: str) -> Jail | None: ... # Bad — missing types def get_jail_by_name(name): ... ``` --- ## 2. Core Libraries | Purpose | Library | Notes | |---|---|---| | Web framework | **FastAPI** | Async endpoints only. | | Data validation & settings | **Pydantic v2** | All request/response bodies and config models. | | Async HTTP client | **aiohttp** (`ClientSession`) | For external calls (blocklists, IP lookups). | | Scheduling | **APScheduler 4.x** (async) | Blocklist imports, periodic health checks. | | Structured logging | **structlog** | Every log call must use structlog — never `print()` or `logging` directly. | | Database | **aiosqlite** | Async SQLite access for the application database. | | Testing | **pytest** + **pytest-asyncio** + **httpx** (`AsyncClient`) | Every feature needs tests. | | Mocking | **unittest.mock** / **pytest-mock** | Isolate external dependencies. | | Date & time | **datetime** (stdlib) — always timezone-aware | Use `datetime.datetime.now(datetime.UTC)`. Never naive datetimes. | | IP / Network | **ipaddress** (stdlib) | Validate and normalise IPs and CIDR ranges. | | Environment / config | **pydantic-settings** | Load `.env` and environment variables into typed models. | | fail2ban integration | **fail2ban client** (bundled) | Use the local copy at [`./fail2ban-master`](../fail2ban-master). Import from [`./fail2ban-master/fail2ban/client`](../fail2ban-master/fail2ban/client) to communicate with the fail2ban socket. Do **not** install fail2ban as a pip package. | ### fail2ban Client Usage The repository ships with a vendored copy of fail2ban located at `./fail2ban-master`. All communication with the fail2ban daemon must go through the client classes found in `./fail2ban-master/fail2ban/client`. Add the project root to `sys.path` (or configure it in `pyproject.toml` as a path dependency) so that `from fail2ban.client ...` resolves to the bundled copy. ```python import sys from pathlib import Path # Ensure the bundled fail2ban is importable sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "fail2ban-master")) from fail2ban.client.csocket import CSSocket # noqa: E402 ``` ### Libraries you must NOT use - `requests` — use `aiohttp` (async). - `flask` — we use FastAPI. - `celery` — we use APScheduler. - `print()` for logging — use `structlog`. - `json.loads` / `json.dumps` on Pydantic models — use `.model_dump()` / `.model_validate()`. ### Timestamp Handling Timestamp consistency is critical for accurate ban history queries across the dashboard and history endpoints. Follow these rules: **Rule 1: Use consistent UTC timestamps** - All timestamps in the database are stored as Unix epochs (seconds since 1970-01-01 UTC). - fail2ban stores timestamps using `time.time()`, which is always UTC epoch seconds. - When querying fail2ban's SQLite database by timestamp, use `app.utils.time_utils.since_unix()` (not manual datetime calculations). **Rule 2: Time-range windows include a 60-second slack** - The `since_unix()` function includes a 60-second slack window (`TIME_RANGE_SLACK_SECONDS` in `app.utils.constants`). - This slack accommodates: - Clock drift between the local system and fail2ban. - Test seeding delays when timestamps are manually set to exact boundaries. - The slack ensures that dashboard and history queries return consistent row counts for the same time range. **Rule 3: Never duplicate timestamp calculation logic** - All services that query by time range must import and use `since_unix()`. - Do not recalculate timestamps locally using `datetime` or `time` modules in service code. - If you need a timestamp for a time range, use `since_unix()`. **Example:** ```python from app.utils.time_utils import since_unix # Get all bans from the last 24 hours (with 60-second slack) since_ts: int = since_unix("24h") rows = await db.execute( "SELECT * FROM bans WHERE timeofban >= ?", (since_ts,) ) ``` ### Database Performance & Indexing Large archive datasets can degrade query performance without proper indexing. The `history_archive` table supports multiple filter patterns: **Query Patterns (Indexed for Performance):** 1. **MAX(timeofban)** — `history_sync_task` queries for the latest timestamp to know where to resume syncing from fail2ban. This is a covering index lookup. 2. **Jail filter with time ordering** — Dashboard and API endpoints filter by `jail` and sort by `timeofban DESC` for pagination. This is accelerated by `idx_history_archive_jail_timeofban`. 3. **Time-range filter** — Queries filter by `timeofban >= since` to fetch recent records. This uses the composite index `idx_history_archive_timeofban_jail_action` which includes `timeofban` as the leading column for efficient range scans. 4. **IP filter** — Users can search by exact IP or IP prefix (using `LIKE ip%`). The `idx_history_archive_ip` index accelerates these searches. 5. **Action filter** — Queries may filter by action ('ban' or 'unban'). The `idx_history_archive_action` index supports this. 6. **Purge old entries** — Background tasks delete entries older than a threshold (`timeofban < cutoff`). This uses `idx_history_archive_timeofban_jail_action`. **Current Indexes (defined in `backend/app/db.py` Migration 5):** - `idx_history_archive_jail_timeofban(jail, timeofban DESC)` — Composite index for jail-filtered queries. - `idx_history_archive_timeofban_jail_action(timeofban DESC, jail, action)` — Covering index for time-range queries and MAX lookups. - `idx_history_archive_ip(ip)` — Single-column index for IP searches. - `idx_history_archive_action(action)` — Single-column index for action filtering. **Benchmark Results:** Query benchmarks (see `backend/tests/test_repositories/test_history_archive_indexing.py`) verify that common operations complete within expected thresholds on datasets with 10,000+ records: | Operation | Time Budget | Actual (with indexes) | |-----------|-------------|----------------------| | MAX(timeofban) | <0.01s | ✓ Uses covering index | | Count with jail filter | <0.10s | ✓ Covering index scan | | List with jail + order | <0.05s | ✓ Index fully utilized | | Time-range filter | <0.05s | ✓ Range scan on timeofban | | Combined filters | <0.05s | ✓ Composite indexes used | **Adding New Indexes:** If you add new query patterns to `history_archive_repo.py`: 1. **Analyze the WHERE and ORDER BY clauses** — Identify which columns are filtered and sorted. 2. **Check EXPLAIN QUERY PLAN** in a local test: ```python async with db.execute("EXPLAIN QUERY PLAN SELECT ...") as cur: rows = await cur.fetchall() for row in rows: print(row[3]) # Print the plan text ``` 3. **If the plan shows a full table scan, add an index** that matches the filter columns in order. 4. **Create a migration** in `backend/app/db.py` following the pattern from Migration 5. 5. **Add a benchmark test** to verify the new index improves query performance. **Index Tradeoffs:** - **Pros**: Faster SELECT queries, reduced CPU during queries. - **Cons**: Slower INSERT/UPDATE/DELETE (indexes must be maintained), larger database file size. --- ## 7.5 Cursor-Based Pagination for Large Result Sets **Problem:** Offset-based pagination (`LIMIT ? OFFSET ?`) scans and discards N rows before fetching the next N. On a 10M-row table, fetching the last page takes 15+ seconds because SQLite must evaluate all previous rows. **Solution:** Use keyset pagination (cursor-based) with `WHERE id > last_id` instead of OFFSET. This leverages indexes to jump directly to the next page in O(log N) time. ### Offset vs. Cursor Pagination | Aspect | Offset (`LIMIT ? OFFSET ?`) | Cursor (`WHERE id > ?`) | |--------|-----|-----| | Performance | O(N) — scans N rows to fetch | O(log N) — index jump | | Last page on 10M rows | 15+ seconds ⚠️ | <50ms ✓ | | API Contract | `page`, `page_size` | `cursor`, `page_size` | | Backward nav | Stateless (any page any time) | Stateless (cursor is opaque) | | Count query | Required (slow on large tables) | Not required | ### When to Use Cursor Pagination - ✓ **Use cursor pagination** for large tables (>100K rows) with frequent pagination queries - ✓ **Use cursor pagination** for real-time feeds where rows are constantly added/modified - ✓ **Use cursor pagination** if your API already exposes cursor tokens to clients - ✗ **Use offset pagination** for small datasets or administrative interfaces where performance is not critical ### Implementation Pattern **1. Add indexes on sort columns:** Cursor queries use `WHERE id > :cursor ORDER BY id ASC LIMIT :page_size`. Ensure the sort column is indexed or part of a composite index. **2. Use cursor pagination utilities:** ```python from app.utils.pagination import encode_cursor, decode_cursor # Fetch next page using cursor last_row_id = decode_cursor(cursor) if cursor else None items, has_more = await repo.get_items_keyset( page_size=50, last_row_id=last_row_id, ) # Encode cursor for next page next_cursor = encode_cursor(items[-1]["id"]) if items and has_more else None ``` **3. Return cursor in pagination metadata:** The response includes `cursor` (for cursor pagination) in addition to `page`, `page_size`, and `has_next_page`: ```json { "items": [...], "pagination": { "page": 1, "page_size": 50, "total": -1, "total_pages": -1, "has_next_page": true, "has_prev_page": false, "cursor": "eyJpZCI6IDQyN30=" } } ``` **4. Repositories supporting cursor pagination:** - `import_log_repo.list_logs_keyset()` — Import log with cursor pagination - `history_archive_repo.get_archived_history_keyset()` — Archived bans with cursor pagination Both functions return `(items, has_more)` instead of `(items, total)` to avoid expensive COUNT queries. ### Cursor Format & Security Cursors are **opaque base64-encoded JSON** objects. Clients must not decode or modify them: ```python # Cursor structure (internal only — never expose raw JSON to client) {"id": 12345} # Base64-encoded cursor sent to client: # eyJpZCI6IDEyMzQ1fQ== # Decode with decode_cursor() which validates the format last_id = decode_cursor(cursor) ``` Benefits: - ✓ **Opaque to client** — Format can evolve without breaking API compatibility - ✓ **Deterministic** — Same row ID always produces the same cursor - ✓ **Tamper-evident** — Invalid/malformed cursors are rejected with clear errors For `history_archive`, the read-heavy workload justifies these indexes because: - Inserts are batched during sync (one batch per minute), not per-request. - Deletes happen once per day during purge. - SELECT queries run on every API request to the history endpoint. --- ## 7.6 Never Load Unbounded Result Sets **Problem:** Loading large result sets entirely into Python memory causes: - Memory spikes that crash containers - Slow dashboard performance - Unbounded database file growth **Rule:** Never load unbounded result sets. Always use SQL aggregation or pagination. **Anti-patterns:** ```python # BAD — loads all rows into memory all_rows = await history_archive_repo.get_all_archived_history(db=db, ...) # GOOD — SQL aggregation returns lightweight counts ip_counts = await history_archive_repo.get_ip_ban_counts(db=db, ...) ``` **SQL aggregation patterns for common operations:** | Operation | SQL Pattern | Repository Function | |-----------|-------------|---------------------| | Count by IP | `SELECT ip, COUNT(*) FROM bans GROUP BY ip` | `get_ip_ban_counts()` | | Count by jail | `SELECT jail, COUNT(*) FROM bans GROUP BY jail` | `get_jail_ban_counts()` | | Count by time bucket | `SELECT CAST((timeofban - ?) / ? AS INTEGER), COUNT(*) ... GROUP BY bucket_idx` | `get_ban_counts_by_bucket()` | | Paginated rows | `WHERE id < ? ORDER BY id DESC LIMIT ?` | `get_archived_history_keyset()` | **When to use SQL aggregation:** - Computing totals, counts, or aggregations for display - Building country/jail/geo maps from large datasets - Any endpoint that needs only a summary, not full row data **When to use pagination:** - Endpoints that return individual records for display (ban lists, history) - Any endpoint where clients need access to specific rows **Memory budgets for reference:** - 1M ban records ≈ 200-400 MB if fully materialized as Python dicts - SQL aggregation returns lightweight results: {ip, count} pairs = a few KB for same 1M records - Keyset pagination returns only the page size (typically 50-200 rows) --- ## 3. Project Structure ``` backend/ ├── app/ │ ├── __init__.py │ ├── main.py # FastAPI app factory, lifespan │ ├── config.py # Pydantic settings │ ├── dependencies.py # FastAPI dependency providers │ ├── models/ # Pydantic schemas (request, response, domain) │ ├── routers/ # FastAPI routers grouped by feature │ ├── services/ # Business logic — one service per domain │ ├── repositories/ # Database access layer │ ├── tasks/ # APScheduler jobs │ └── utils/ # Helpers, constants, shared types ├── tests/ │ ├── conftest.py │ ├── test_routers/ │ ├── test_services/ │ └── test_repositories/ ├── pyproject.toml └── .env.example ``` - **Routers** receive requests, validate input via Pydantic, and delegate to **services**. - **Services** contain business logic and call **repositories** or external clients. - **Repositories** handle raw database queries — nothing else. - Never put business logic inside routers or repositories. ### Service Dependencies and Injection Services should **never** directly import other services to avoid hidden coupling and make testing harder. Instead: 1. **Define clear service interfaces** using Protocol classes in `app/services/protocols.py`. 2. **Make dependencies explicit** by passing them as function parameters with optional defaults. 3. **Use lazy imports** for fallback singletons (not at module level). 4. **Inject services via FastAPI dependencies** when called from routers. **Example:** The `history_service` depends on `Fail2BanMetadataService` to resolve the fail2ban database path: ```python # Good — dependency passed as parameter async def list_history( socket_path: str, fail2ban_metadata_service: Fail2BanMetadataService | None = None, ) -> HistoryListResponse: if fail2ban_metadata_service is None: # Lazy import fallback for backward compatibility from app.services.fail2ban_metadata_service import default_fail2ban_metadata_service fail2ban_metadata_service = default_fail2ban_metadata_service ... ``` Routers inject the service dependency explicitly: ```python from app.dependencies import Fail2BanMetadataServiceDep @router.get("/api/history") async def get_history( fail2ban_metadata_service: Fail2BanMetadataServiceDep, ) -> HistoryListResponse: return await history_service.list_history( socket_path, fail2ban_metadata_service=fail2ban_metadata_service, ) ``` This pattern prevents circular imports, makes services testable, and allows easy mocking in tests. ### Mutable Runtime State All mutable runtime state (state that changes during the application's lifetime) **must** be stored in `RuntimeState` defined in `app/utils/runtime_state.py`. This centralizes state management, prevents accidental global mutable variables, and makes state management testable and synchronization-safe. **Allowed locations for mutable state:** 1. **RuntimeState fields** — Core application state (e.g., `server_status`, `last_activation`, `pending_recovery`, `runtime_settings`). Managed through dedicated functions (e.g., `record_activation()`, `clear_pending_recovery()`). 2. **Nested service state** — Service-specific mutable state (e.g., `JailServiceState` for jail capability detection cache) is nested within `RuntimeState` as a field. Services receive their state via dependency injection. 3. **Controlled via dependencies** — State is injected into services and routers using FastAPI `Depends()`. This ensures single-source-of-truth and testability. **Example — jail_service state management:** ```python # Define service-specific state (in app/utils/runtime_state.py) @dataclass class JailServiceState: backend_cmd_supported: bool | None = None backend_cmd_lock: asyncio.Lock | None = None # Nested in RuntimeState @dataclass class RuntimeState: jail_service_state: JailServiceState = field(default_factory=JailServiceState) ... # Injected into services via dependency async def list_jails(socket_path: str, state: JailServiceState) -> JailListResponse: backend_cmd_is_supported = await _check_backend_cmd_supported(client, name, state) ... # Routers inject state through FastAPI dependencies @router.get("/api/jails") async def get_jails(state: JailServiceStateDep) -> JailListResponse: return await jail_service.list_jails(socket_path, state) ``` **Why:** Centralizing mutable state prevents race conditions, makes concurrency boundaries explicit, simplifies testing (each test gets a fresh state object), and prepares for multi-worker deployments (shared state would need to be extracted to Redis, database, or shared memory). --- ## 4. FastAPI Conventions - Use **async def** for every endpoint — no sync endpoints. - Every endpoint must declare explicit **response models** (`response_model=...`). - Use **Pydantic models** for request bodies and query parameters — never raw dicts. - Use **Depends()** for dependency injection (database sessions, services, auth). - Group endpoints into routers by feature domain (`routers/jails.py`, `routers/bans.py`, …). - Use appropriate HTTP status codes: `201` for creation, `204` for deletion with no body, `404` for not found, etc. - Protected endpoints should return `401 Unauthorized` or `403 Forbidden` when the session is invalid or expired; the frontend treats these responses as a session-expiry event and redirects the user to `/login`. - Use **HTTPException** or custom exception handlers — never return error dicts manually. - All successful responses must use a standardized Pydantic response model. List and collection endpoints should wrap data in `items`, `total`, and optional pagination metadata. Detail endpoints must expose a single domain object under a named field (for example `jail`, `status`, or `settings`). Command endpoints must use a `CommandResponse`-style wrapper with `message` and `success`. - **GET endpoints are read-only — never call `db.commit()` or execute INSERT/UPDATE/DELETE inside a GET handler.** If a GET path produces side-effects (e.g., caching resolved data), that write belongs in a background task, a scheduled flush, or a separate POST endpoint. Users and HTTP caches assume GET is idempotent and non-mutating. ```python # Good — pass db=None on GET so geo_service never commits result = await geo_service.lookup_batch(ips, http_session, db=None) # Bad — triggers INSERT + COMMIT per GET inside a GET handler result = await geo_service.lookup_batch(ips, http_session, db=app_db) ``` ### OpenAPI Schema Synchronization **Critical:** The OpenAPI schema is the single source of truth for frontend types. When you add, modify, or remove endpoints or response models: 1. **FastAPI automatically updates the schema** based on your Pydantic models and endpoint definitions. 2. **The frontend regenerates types** from the schema on every build: `npm run generate:types`. 3. **Ensure your Pydantic models are accurate** — they are directly serialized into the schema and used to generate frontend types. 4. **Test type generation locally** before committing: ```bash cd frontend npm run generate:types # Generates src/types/generated.ts npm run build # Build should succeed if types match ``` 5. **The backend must be running** for type generation to work (the tool fetches `/api/openapi.json`). 6. **Commit generated types** alongside backend changes — they must always be in sync. **Never:** - Manually edit `src/types/generated.ts` on the frontend — regenerate from the schema instead. - Commit backend changes without ensuring the frontend can regenerate types. - Assume the OpenAPI schema is correct — validate your Pydantic model's `field` descriptions and types are as intended. ```python from fastapi import APIRouter, Depends, HTTPException, status from app.models.jail import JailResponse, JailListResponse from app.services.jail_service import JailService router: APIRouter = APIRouter(prefix="/api/jails", tags=["Jails"]) @router.get("/", response_model=JailListResponse) async def list_jails(service: JailService = Depends()) -> JailListResponse: jails: list[JailResponse] = await service.get_all_jails() return JailListResponse(jails=jails) ``` ### Dependency Layering: Enforcing the Repository Boundary The **repository boundary** separates database-aware code from application logic. This is enforced through dependency injection. For a complete overview of BanGUI's DI pattern, including the composition root, service wiring, and lifecycle management, see [Architekture.md § 2.3 Dependency Wiring and Service Composition](Architekture.md#23-dependency-wiring-and-service-composition). | Layer | Responsibilities | Dependencies | |---|---|---| | **Routers** | Receive requests, validate input, return responses. | Service context dependencies (SessionServiceContextDep, BlocklistServiceContextDep), settings, auth. Never raw database connections. | | **Services** | Contain business logic, orchestrate operations. | Other services, repositories. May receive `aiosqlite.Connection` for repository operations. | | **Repositories** | Execute all SQL queries. All database knowledge lives here. | `aiosqlite.Connection` (from callers). | **Rule: Routers must NOT depend on `DbDep` (raw database connections).** Instead, routers should: 1. Depend on **service context dependencies** like `SessionServiceContextDep`, `BlocklistServiceContextDep`, etc. 2. These context dependencies combine the database connection and related repositories. 3. Pass the context to services, which internally orchestrate database operations. **Service Context Dependencies Available:** - `SessionServiceContextDep` — Contains `db` and `session_repo` for session operations. - `BlocklistServiceContextDep` — Contains `db`, `blocklist_repo`, `import_log_repo`, `settings_repo`. - `SettingsServiceContextDep` — Contains `db` and `settings_repo`. - `BanServiceContextDep` — Contains `db` and `fail2ban_db_repo`. - `HistoryServiceContextDep` — Contains `db`, `fail2ban_db_repo`, `history_archive_repo`. **Why:** - **Enforcement**: Not exporting `DbDep` from the dependencies module makes it impossible for routers to accidentally bypass repositories. - **Clarity**: Service context dependencies explicitly declare which database operations a router needs. - **Testability**: Services and routers are easier to test when they depend on repositories (which can be mocked) rather than raw connections. **Example:** ```python # ✅ GOOD — router depends on service context @router.post("/login") async def login( body: LoginRequest, response: Response, session_ctx: SessionServiceContextDep, # Contains db + session_repo _auth: AuthDep, ) -> LoginResponse: return await auth_service.login( session_ctx.db, password=body.password, session_repo=session_ctx.session_repo, ... ) # ❌ BAD — router depends on raw db (DbDep is not exported for this reason) @router.post("/login") async def login( body: LoginRequest, db: DbDep, # ← Cannot import DbDep in routers _auth: AuthDep, ) -> LoginResponse: return await auth_service.login(db, password=body.password, ...) ``` **DEPRECATED: DbDep** - The `DbDep` type alias is provided for backward compatibility only. - DO NOT use in new code. Use service context dependencies instead. - See `backend/app/dependencies.py` for available service contexts. --- ## 4.1 API Response Envelope Policy All API responses must follow a **consistent wrapper pattern**. This standardization reduces frontend branching logic, prevents integration bugs, and makes the API easier to document and maintain. ### Response Patterns #### Pattern 1: Paginated Lists Use `PaginatedListResponse[T]` for endpoints that return paginated collections: ```python from app.models.response import PaginatedListResponse class JailListResponse(PaginatedListResponse[JailSummary]): """Response for ``GET /api/jails``.""" pass # Returns: { "items": [...], # T[] "pagination": { "page": 2, # int: current page (1-based) "page_size": 20, # int: items per page "total": 100, # int: total items across all pages "total_pages": 5, # int: computed total number of pages "has_next_page": true, # bool: whether more pages exist "has_prev_page": true # bool: whether previous pages exist } } ``` **When to use:** Endpoints that support pagination parameters (`page`, `page_size`, `limit`, `offset`). #### Pattern 2: Non-Paginated Collections Use `CollectionResponse[T]` for endpoints that return a complete collection without pagination: ```python from app.models.response import CollectionResponse class JailConfigListResponse(CollectionResponse[JailConfig]): """Response for ``GET /api/config/jails``.""" pass # Returns: { "items": [...], # T[] "total": 42 # int: total items } ``` **When to use:** Endpoints that return a complete collection (not paginated). The frontend can render all items without worrying about paging. #### Pattern 3: Single-Item Detail Responses Use **domain-specific field names** (not generic wrappers) for detail endpoints: ```python class JailDetailResponse(BaseModel): """Response for ``GET /api/jails/{name}``.""" jail: Jail ignore_list: list[str] ignore_self: bool # Returns: { "jail": { ... }, # Jail object "ignore_list": [...], # Additional context "ignore_self": true } ``` **When to use:** Endpoints that fetch a single entity. Use the entity name as the field (jail, status, settings, etc.). **Field naming:** - Primary entity uses its own name: `jail`, `status`, `settings`, etc. - Related or supplementary data uses descriptive names: `ignore_list`, `warnings`, `metadata`, etc. #### Pattern 4: Command/Action Responses Use `CommandResponse` for endpoints that execute commands: ```python from app.models.response import CommandResponse class JailCommandResponse(CommandResponse): """Generic response for jail control commands.""" jail: str # Target identifier (optional) # Returns: { "message": "Jail 'sshd' started.", "success": true, "jail": "sshd" # Optional: target identifier } ``` **When to use:** POST/PUT/DELETE endpoints that perform operations (start jail, ban IP, update config, etc.). **Fields:** - `message: str` — Human-readable result or error description. - `success: bool` — Operation succeeded (default: true). Use false for non-exception error handlers. - Optional domain-specific fields (jail, ip, etc.) to identify the affected resource. #### Pattern 5: Aggregation Responses Use domain-specific field names for aggregated data: ```python class BansByJailResponse(BaseModel): """Response for ``GET /api/dashboard/bans/by-jail``.""" jails: list[JailBanCount] # Aggregated per-jail data total: int # Total count across all jails class BansByCountryResponse(BaseModel): """Response for ``GET /api/dashboard/bans/by-country``.""" countries: dict[str, int] # Country code → count country_names: dict[str, str] # Country code → name bans: list[DashboardBanItem] # Full list for rendering companion table total: int # Total ban count # Returns: { "jails": [ { "jail": "sshd", "count": 42 }, ... ], "total": 500 } ``` **When to use:** Endpoints that return computed/aggregated data. Use field names that reflect the data (jails, countries, buckets, etc.). ### Summary Table | Pattern | Used for | Field Names | Example | |---------|----------|---|---| | **PaginatedListResponse** | Paginated collections | `items`, `pagination` (page, page_size, total, total_pages, has_next_page, has_prev_page) | `GET /api/dashboard/bans` | | **CollectionResponse** | Complete collections | `items`, `total` | `GET /api/config/jails` | | **Detail Response** | Single entity + metadata | Entity name + descriptors | `GET /api/jails/{name}` | | **CommandResponse** | Action results | `message`, `success` + optional identifiers | `POST /api/jails/{name}/start` | | **Aggregation Response** | Computed data | Domain-specific names | `GET /api/dashboard/bans/by-jail` | ### Rules 1. **Always wrap lists in `items` field** — Consistency aids frontend parsing. - ✅ `{ "items": [...], "total": 100 }` - ❌ `{ "jails": [...], "total": 100 }` (for list endpoints; OK for aggregations) 2. **Aggregation responses are exceptions** — They use domain-specific field names because the data represents computed results, not a simple list. - ✅ `{ "countries": {...}, "jails": [...], "total": 100 }` 3. **Every response with >1 item must include `total`** — Enables frontend to understand scale. 4. **Paginated responses must include `page` and `page_size`** — Enables the frontend to render pagination controls. 5. **No ad-hoc wrapper objects** — Don't invent new response shapes. Use the patterns above. ### Standardized Pagination Query Parameters All paginated endpoints follow a consistent query parameter contract: | Parameter | Type | Constraints | Default | Notes | |---|---|---|---|---| | `page` | int | ≥ 1 | `1` | 1-based page number (not 0-based offset). | | `page_size` | int | 1–500 | `100` | Items per page. Clients may request smaller pages for UI reasons. | **Implementation:** ```python from fastapi import Query from app.utils.constants import DEFAULT_PAGE_SIZE from app.utils.pagination import create_pagination_metadata @router.get("/items") async def get_items( page: int = Query(default=1, ge=1, description="1-based page number."), page_size: int = Query( default=DEFAULT_PAGE_SIZE, ge=1, le=500, description="Items per page (max 500).", ), ): # Compute offset for database query offset = (page - 1) * page_size items = await db.fetch("SELECT * FROM items LIMIT ? OFFSET ?", page_size, offset) total = await db.fetchval("SELECT COUNT(*) FROM items") # Create pagination metadata with computed fields pagination = create_pagination_metadata(total, page, page_size) return PaginatedListResponse( items=items, pagination=pagination, ) ``` **Helper functions** are available in `app.utils.pagination`: ```python from app.utils.pagination import get_offset, compute_total_pages, create_pagination_metadata # Calculate database offset from page and page_size offset = get_offset(page, page_size) # Equivalent to (page - 1) * page_size # Calculate total pages for rendering pagination UI (optional) total_pages = compute_total_pages(total, page_size) # Create complete pagination metadata with all computed fields pagination = create_pagination_metadata(total, page, page_size) # Returns PaginationMetadata with: page, page_size, total, total_pages, has_next_page, has_prev_page ``` **Rules:** 1. **Use 1-based pages** — Not 0-based offsets. Page 1 is always the first page. 2. **Always provide defaults** — Use `DEFAULT_PAGE_SIZE` (100) and initial page 1. 3. **Cap maximum page_size at 500** — Prevents accidental DoS from enormous requests. 4. **Use `create_pagination_metadata()`** — Factory function computes derived fields (total_pages, has_next_page, has_prev_page) consistently. 5. **Respond with `PaginatedListResponse[T]`** — Must include `items` and `pagination` metadata object. --- ## 4.2 Error Response Schema All error responses use a consistent machine-readable format that enables frontend code to branch reliably on error conditions without string-parsing error detail text. ### Error Response Format Every non-2xx HTTP response body is a JSON object with this structure: ```json { "code": "jail_not_found", "detail": "Jail 'example' not found", "metadata": { "jail_name": "example" }, "correlation_id": "550e8400-e29b-41d4-a716-446655440000" } ``` **Fields:** - **`code`** (string, required): Machine-readable error code for client-side branching. Examples: `jail_not_found`, `rate_limit_exceeded`, `authentication_required`, `invalid_input`. - **`detail`** (string, required): Human-readable error message. Safe for displaying to users. - **`metadata`** (object, optional): Structured context data relevant to the error. Only includes data safe for client consumption (no sensitive internal state). Examples: offending parameter names, resource identifiers, field error counts, time windows. - **`correlation_id`** (string | null, optional): Unique request ID for tracing this error across logs and systems. Set by the `CorrelationIdMiddleware`. Use this to correlate client-side errors with server logs for debugging. ### Exception Hierarchy & Error Codes All domain exceptions inherit from `DomainError` (defined in `backend/app/exceptions.py`) and are organized by HTTP status category: | HTTP Status | Category Class | Error Codes | Use Case | |---|---|---|---| | **404** | `NotFoundError` | `not_found`, `jail_not_found`, `filter_not_found`, `action_not_found`, `config_file_not_found`, `blocklist_source_not_found`, `history_not_found` | Requested resource does not exist | | **400** | `BadRequestError` | `invalid_input`, `config_validation_failed`, `config_operation_failed`, `jail_name_invalid`, `filter_name_invalid`, `action_name_invalid`, `config_file_name_invalid`, `filter_invalid_regex` | Invalid input, validation failure, malformed request | | **409** | `ConflictError` | `conflict`, `jail_operation_failed`, `jail_already_active`, `jail_already_inactive`, `jail_not_in_config`, `action_already_exists`, `filter_already_exists`, `config_file_exists` | State conflict, resource already exists, invalid state transition | | **500** | `OperationError` | `operation_failed`, `config_write_failed`, `config_file_write_failed`, `server_operation_failed`, `fail2ban_protocol_error` | Operation failure, write errors, unexpected failures | | **503** | `ServiceUnavailableError` | `service_unavailable`, `config_dir_unavailable`, `fail2ban_unreachable` | Infrastructure/external service issues, temporary unavailability | | **401** | `AuthenticationError` | `authentication_required` | Authentication or authorization failure, invalid/expired credentials | | **429** | `RateLimitError` | `rate_limit_exceeded` | Rate limit exceeded, too many requests | **Note on request validation errors:** Pydantic validation errors (from request body type mismatches, missing required fields, etc.) are automatically caught by the `_request_validation_error_handler` and converted to `ErrorResponse` with `code="invalid_input"`. The `metadata` field includes `field_errors` (count of validation failures) and `first_field` (location of the first error field) to help clients debug malformed requests. ### Implementing Error Handlers Every exception category has a corresponding exception handler registered in `backend/app/main.py`. When a domain exception is raised: 1. FastAPI's exception handling middleware catches it. 2. The registered handler converts it to an `ErrorResponse` with HTTP status code. 3. The response is serialized as JSON with `code`, `detail`, and `metadata` fields. **Pattern for service code:** ```python from app.exceptions import JailNotFoundError, ConfigValidationError async def get_jail(name: str) -> Jail: """Raises JailNotFoundError if jail not found.""" jail = await db.fetchone("SELECT * FROM jails WHERE name = ?", (name,)) if jail is None: raise JailNotFoundError(name) # HTTP 404, code='jail_not_found' return jail async def apply_config(config: JailConfig) -> None: """Raises ConfigValidationError if invalid.""" if not config.filter_name: raise ConfigValidationError("filter_name is required") # HTTP 400, code='config_validation_failed' ``` ### Adding New Exception Types 1. **Choose the appropriate category** based on the HTTP status (NotFoundError for 404, BadRequestError for 400, etc.). 2. **Create a subclass** in `backend/app/exceptions.py`: ```python class MySpecificError(BadRequestError): """Raised when X happens.""" error_code: str = "my_specific_error" def __init__(self, detail_msg: str, **context) -> None: self.context = context super().__init__(detail_msg) def get_error_metadata(self) -> dict[str, str | int | float | bool | None]: """Return only safe, relevant metadata.""" return {k: v for k, v in self.context.items() if k in ("offending_value", "constraint")} ``` 3. **Use explicit error codes** — Don't derive them from the class name. This makes them self-documenting and prevents breakage on class renames. 4. **Implement `get_error_metadata()`** — Return only data safe for client consumption. Never leak internal state, file paths, or system details. 5. **Raise from service code** — Never from repositories or utils. Exceptions represent business logic violations, not infrastructure errors. ### Exception Handler Hierarchy All domain exceptions are automatically converted to `ErrorResponse` via handlers registered in `backend/app/main.py`. The handler registration order is critical: ```python # Handlers are registered from most specific to least specific: 1. Network errors (Fail2BanConnectionError, etc.) → HTTP 502 2. Auth/rate errors (AuthenticationError, RateLimitError) → HTTP 401/429 3. Category handlers (NotFoundError, BadRequestError, ConflictError, etc.) → HTTP 404/400/409/500/503 4. DomainError catch-all → HTTP 500 # ← Catches unregistered DomainError subclasses 5. HTTPException (FastAPI built-ins) → HTTP varies 6. ValueError (Pydantic validation) → HTTP 400 7. Exception catch-all → HTTP 500 # ← Absolute last resort ``` **Important:** The `DomainError` catch-all handler (step 4) is the safety net. If you add a new `DomainError` subclass without placing it in a category (e.g., `class MyError(DomainError)` instead of `class MyError(BadRequestError)`), it will still get the correct `error_code` and `metadata` via this handler instead of silently falling through to the generic exception handler. **Critical caveat:** Every new `DomainError` subclass **must**: - Define an `error_code` class attribute (e.g., `error_code: str = "my_error"`) - Override `get_error_metadata()` if it needs to return context data - Inherit from the appropriate category (NotFoundError, BadRequestError, ConflictError, OperationError, or ServiceUnavailableError) If you forget to implement `error_code` or `get_error_metadata()`, the fallback to parent class implementations will produce misleading error codes and empty metadata — check your tests! **What NOT to do:** - ❌ Don't raise `HTTPException` from service code (bypass the ErrorResponse format). - ❌ Don't put sensitive information in `metadata` (database paths, SQL, internal IDs). - ❌ Don't derive error codes from class names using regex (fragile and non-self-documenting). - ❌ Don't create a `DomainError` subclass without a category (always inherit from one of the seven categories) ### Frontend Error Parsing The frontend `ApiError` class parses error responses automatically: ```typescript import { api } from "src/api/client"; try { const jail = await api.get("/jails/example"); } catch (error) { if (error instanceof ApiError) { const code = error.errorResponse?.code; if (code === "jail_not_found") { // Handle not found console.log("Jail does not exist:", error.errorResponse?.metadata?.jail_name); } else if (code === "rate_limit_exceeded") { // Handle rate limit showRateLimitModal(); } else if (code === "authentication_required") { // Handle auth — the frontend framework auto-redirects to /login redirectToLogin(); } } } ``` The `errorResponse` field contains the parsed error object with `code`, `detail`, and `metadata` fields, enabling reliable machine-readable branching. --- ## 5. Pydantic Models ### Base Class Every model in `app/models/` **must** inherit from `BanGuiBaseModel` (defined in `app/models/response.py`), not from `pydantic.BaseModel` directly. ```python from app.models.response import BanGuiBaseModel class BanResponse(BanGuiBaseModel): ip: str = Field(..., description="Banned IP address") jail: str = Field(..., description="Jail that issued the ban") banned_at: datetime = Field(..., description="UTC timestamp of the ban") expires_at: datetime | None = Field(None, description="UTC expiry, None if permanent") ban_count: int = Field(..., ge=1, description="Number of times this IP was banned") ``` `BanGuiBaseModel` sets `strict=True` and documents the naming policy. Do **not** override `model_config` on individual models unless you have a specific, documented reason. ### API Field Naming Policy — snake_case everywhere All API field names use **`snake_case`** in Python, in the JSON wire format, and in the corresponding TypeScript interfaces. There is no `alias_generator` that converts to camelCase. - ✅ Python field: `active_jails` → JSON key: `"active_jails"` → TypeScript property: `active_jails` - ❌ Do **not** add a camelCase `alias_generator` to individual models. - ❌ Do **not** mix field name conventions within a single API response. This policy eliminates a whole class of frontend–backend contract bugs. If the naming policy ever needs to change (e.g. to emit camelCase), change `BanGuiBaseModel` once — all models update automatically. ### Other Model Rules - Validate at the boundary — once data enters a Pydantic model it is trusted. - Use `Field(...)` with descriptions for every field to keep auto-generated docs useful. - Separate **request models**, **response models**, and **domain (internal) models** — do not reuse one model for all three. - **Models are leaf nodes**: Models in `app/models/` must not import from application-layer modules (`app.services`, `app.config`, `app.utils`). Models may only import from: - Standard library and third-party packages (Pydantic, typing) - Other models in `app/models/` (sibling models) - `app.models.response` (response envelopes) - Validation that requires app-level state (e.g., `settings`, allowed directories) must happen at the router or service layer, never in model validators. ### Using `Literal` Types for Constrained Strings When a field should only accept a small set of predefined values, use `Literal` to enforce this at the type level: ```python from typing import Literal from pydantic import BaseModel, Field LogLevel = Literal["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"] class GlobalConfigUpdate(BaseModel): log_level: LogLevel | None = Field( default=None, description="Log level: CRITICAL, ERROR, WARNING, NOTICE, INFO, or DEBUG.", ) ``` This provides: - **Type safety** — IDEs and type checkers enforce valid values. - **API documentation** — OpenAPI docs automatically list all allowed values. - **Validation** — Pydantic rejects invalid values and provides a clear error message. ### Field Validators and Validation Placement **Critical Constraint — No Import-Time Execution:** Pydantic validators, field defaults, and computed fields execute when a model is **defined** (at import time), not just when instances are created. For this reason: - Validators must be **pure functions** with no side effects - **NEVER** import or call runtime-dependent functions: `get_settings()`, file I/O, database queries, network calls, etc. - **NEVER** import from `app.config`, `app.utils`, `app.services`, or `app.routers` in model files Violating this constraint creates **hidden circular dependencies** that prevent the application from starting. **Example of What NOT to Do:** ```python # ❌ WRONG — This gets executed at import time: from pydantic import Field from app.config import get_settings class ConfigModel(BaseModel): max_age: int = Field(default_factory=lambda: get_settings().max_log_max_age_days) # ↑ get_settings() is called when Python imports this module! ``` Field validators in models should only contain logic that is **stateless and does not depend on application configuration or state**. Validators must not import from `app.config`, `app.utils`, or `app.services`. For validation that depends on app-level state (e.g., file paths that must be within allowed directories), perform validation in the router or service layer: ```python # ✅ Good: Validation in router (has access to settings) from fastapi import APIRouter from app.config import get_settings from app.utils.path_utils import validate_log_path @router.post("/jails/{name}/logpath") async def add_log_path(name: str, body: AddLogPathRequest) -> None: # Validate before using validate_log_path(body.log_path) await config_service.add_log_path(socket_path, name, body) # ❌ Avoid: Importing from app layer in model validators # Do NOT do this in app/models/config.py: # from app.config import get_settings # @field_validator("log_path") # def validate_log_path_field(cls, value: str) -> str: # settings = get_settings() # ← Models must not import from app layer # ... ``` **Common Helper:** For shared path validation logic, use the `validate_log_path()` helper from `app.utils.path_utils` in your router or service, not in model validators. ```python from fastapi import HTTPException, status from app.utils.path_utils import validate_log_path @router.delete("/{name}/logpath") async def delete_log_path( name: str, log_path: str = Query(...), ) -> None: try: validate_log_path(log_path) except ValueError as e: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(e), ) from e # ... rest of handler ``` **Key points:** - Use `mode="after"` in model validators to validate after Pydantic's basic type coercion. - Raise `ValueError` if validation fails; Pydantic converts it to an HTTP 400 response. - For query parameters that cannot use Pydantic validators, use the `validate_log_path()` helper and raise HTTP 422. - **Never use string prefix matching** for path validation (e.g., `path.startswith("/var/log")`). The helper uses `Path.relative_to()` to prevent bypasses like `/var/log_evil/file.log`. - Symlinks are resolved before validating to prevent symlink-based escapes. ### Model Type Usage by Layer **Pydantic models** are mandatory for all **external-facing** data structures — anything that crosses layer boundaries or is serialized to HTTP responses. **TypedDict** may be used **only** for internal, layer-private data structures where they provide precise typing without runtime overhead. **Rules:** 1. **Routers (HTTP boundary):** All request and response types **must be Pydantic models**. FastAPI uses these for validation, serialization, and OpenAPI documentation. - Use Pydantic request models for request bodies and query parameters. - Use Pydantic response models in the `response_model` parameter. ```python # Good — Pydantic models for router layer class JailStatsRequest(BaseModel): jail_name: str class JailStatsResponse(BaseModel): jail_name: str active_bans: int @router.post("/stats", response_model=JailStatsResponse) async def get_stats(req: JailStatsRequest) -> JailStatsResponse: ... ``` 2. **Services (business logic):** Return types should be **Pydantic models** if the result is: - Returned to a router (likely — they become API responses). - Used across multiple services (shared interfaces). - Exposed to external consumers (even indirectly). If a service returns a purely internal intermediate result used by a single caller, TypedDict is acceptable but should be rare. ```python # Good — service returns Pydantic (may be used by multiple routers) async def get_jail_details(name: str) -> JailDetailResponse: ... # Acceptable — purely internal utility result def _parse_fail2ban_response(raw: str) -> ParsedResponse: """Internal helper—used only by this service.""" ... ``` 3. **Repositories (data access):** Return types may use **TypedDict** because they represent **raw database rows** that: - Are layer-private (only called by their own service). - Do not cross HTTP boundaries directly. - Benefit from lightweight typing without runtime validation. ```python # Good — TypedDict for raw repository rows class GeoRow(TypedDict): ip: str country_code: str | None async def load_all(db: aiosqlite.Connection) -> list[GeoRow]: ... ``` If a repository result becomes part of a service's public interface (returned to routers or other services), convert it to a Pydantic model. 4. **Utilities and helpers:** Internal helper results may use TypedDict if they are not part of a public module interface. **Migration path:** Existing internal TypedDicts (e.g., `GeoCacheRow`, `ImportLogRow`) may remain as TypedDicts so long as they stay within their layer. If a type needs to cross layer boundaries (repo → service → router), convert it to a Pydantic model incrementally as you refactor that data flow. --- ## 6. Async Rules - **Never** call blocking / synchronous I/O in an async function — no `time.sleep()`, no synchronous file reads, no `requests.get()`. - Use `aiohttp.ClientSession` for HTTP calls, `aiosqlite` for database access. - Use `asyncio.TaskGroup` (Python 3.11+) when you need to run independent coroutines concurrently. - Long-running startup/shutdown logic goes into the **FastAPI lifespan** context manager. - **Never call `db.commit()` inside a loop.** With aiosqlite, every commit serialises through a background thread and forces an `fsync`. N rows × 1 commit = N fsyncs. Accumulate all writes in the loop, then issue a single `db.commit()` once after the loop ends. The difference between 5,000 commits and 1 commit can be seconds vs milliseconds. ```python # Good — one commit for the whole batch for ip, info in results.items(): await db.execute(INSERT_SQL, (ip, info.country_code, ...)) await db.commit() # ← single fsync # Bad — one fsync per row for ip, info in results.items(): await db.execute(INSERT_SQL, (ip, info.country_code, ...)) await db.commit() # ← fsync on every iteration ``` - **Prefer `executemany()` over calling `execute()` in a loop** when inserting or updating multiple rows with the same SQL template. aiosqlite passes the entire batch to SQLite in one call, reducing Python↔thread overhead on top of the single-commit saving. ```python # Good await db.executemany(INSERT_SQL, [(ip, cc, cn, asn, org) for ip, info in results.items()]) await db.commit() ``` - Shared resources (DB connections, HTTP sessions) are created once during startup and closed during shutdown — never inside request handlers. ```python from contextlib import asynccontextmanager from collections.abc import AsyncGenerator from fastapi import FastAPI import aiohttp import aiosqlite @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None]: # Startup app.state.http_session = aiohttp.ClientSession() app.state.db = await aiosqlite.connect("bangui.db") yield # Shutdown await app.state.http_session.close() await app.state.db.close() ``` ### Fire-and-Forget Background Tasks When you need to spawn a background task that runs independently without waiting for the result, use `asyncio.create_task()` with the `logged_task()` helper from `app.utils.async_utils`. This ensures exceptions in background tasks are always logged and never silently discarded. **Why this matters:** In Python 3.11+, unhandled exceptions in fire-and-forget tasks become silent `RuntimeWarning`s. Without logging, background errors (network failures, database writes, API timeouts) become invisible in structured logs and are extremely hard to debug. **Pattern:** ```python from app.utils.async_utils import logged_task # Bad — exceptions are silently discarded asyncio.create_task(some_background_work()) # Good — exceptions are logged asyncio.create_task( logged_task(some_background_work(), "task_name"), name="task_name", ) ``` The `logged_task()` wrapper: - Wraps your coroutine to catch any exception - Logs the exception with `log.exception()` (structlog automatically captures the traceback) - Adds `task_name` to the structured log context - Never re-raises — it's safe to use with `asyncio.create_task()` Example: ```python import asyncio from app.utils.async_utils import logged_task import structlog log = structlog.get_logger() async def geo_lookup_batch(ips: list[str]) -> None: """Look up geolocation data for IPs asynchronously.""" try: for ip in ips: # May timeout, fail network call, or fail DB write location = await lookup_ip_location(ip) await db.execute(INSERT_GEO_SQL, (ip, location)) await db.commit() except Exception: # All exceptions are automatically logged by logged_task() wrapper raise # In your request handler or service: asyncio.create_task( logged_task(geo_lookup_batch(uncached_ips), "geo_cache_batch"), name="geo_cache_batch", ) ``` --- ## 6.1 Database Query Conventions ### LIKE Queries and Wildcard Escaping SQLite's `LIKE` operator treats `%` (any sequence of characters) and `_` (any single character) as wildcards. When querying with user-supplied filters that may contain these characters, you must escape them to prevent unintended matches. **The Problem:** ```python # Bad — ip_filter="10.0.0_" matches "10.0.0.1", "10.0.0.2", etc. ip_filter = "10.0.0_" await db.execute( "SELECT * FROM bans WHERE ip LIKE ?", (f"{ip_filter}%",) # ← wildcard characters not escaped ) ``` **The Solution:** Use the `escape_like()` helper from `app.utils.fail2ban_db_utils`: ```python from app.utils.fail2ban_db_utils import escape_like # Good — wildcard characters are escaped ip_filter = "10.0.0_" await db.execute( "SELECT * FROM bans WHERE ip LIKE ? ESCAPE '\\'", (f"{escape_like(ip_filter)}%",) # ← underscores escaped to literal ) ``` **How `escape_like()` works:** The function escapes backslashes first, then `%` and `_` signs: ```python def escape_like(s: str) -> str: return s.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") ``` **Key rules:** 1. **Backslash escapes first** — to prevent double-escaping when the input contains backslashes. 2. **Add `ESCAPE '\\'` to the SQL** — tells SQLite which character to use for escaping. 3. **Dots are not wildcards** — they do not need escaping; normal IP addresses pass through unchanged. **Test example:** ```python assert escape_like("10.0.0_") == "10.0.0\\_" assert escape_like("10.0.0%test") == "10.0.0\\%test" assert escape_like("10.0.0.1") == "10.0.0.1" # Unchanged ``` --- ## 6.2 Database Migrations The application database schema is versioned and migrated automatically on startup via `app.db.init_db()`. ### Migration Design Principles **Migrations must be atomic.** All schema changes for a single version (DDL statements) and the `schema_migrations` record insert must be wrapped in a single `BEGIN IMMEDIATE ... COMMIT` transaction. This prevents partial migrations if a process crashes mid-migration. If a crash occurs between migration steps, the next startup will: 1. Detect the missing `schema_migrations` record. 2. Re-apply the entire migration in a single transaction (all-or-nothing). 3. Avoid data corruption or schema inconsistency. ### Writing a New Migration 1. **Add the DDL statements** to `_MIGRATIONS` dict in `app/db.py`: ```python _MIGRATIONS: dict[int, str] = { 1: _CREATE_INITIAL_SCHEMA, 2: """ -- Migration 2: Add new_column to users table. ALTER TABLE users ADD COLUMN new_column TEXT DEFAULT 'default_value'; CREATE INDEX idx_users_new_column ON users(new_column); """, } ``` 2. **Update `_CURRENT_SCHEMA_VERSION`** to the new version number: ```python _CURRENT_SCHEMA_VERSION: int = 2 # was 1 ``` 3. **Ensure idempotency where possible:** - Use `CREATE TABLE IF NOT EXISTS` and `CREATE INDEX IF NOT EXISTS`. - For `ALTER TABLE ADD COLUMN`, check if the column exists first using `PRAGMA table_info()` if re-applying the migration is a concern. 4. **Verify atomicity in tests:** ```python async def test_migration_2_is_atomic(tmp_path: Path) -> None: """Verify migration 2 rolls back on failure.""" db = await open_db(str(tmp_path / "test.db")) try: await db.execute("CREATE TABLE schema_migrations (version INTEGER PRIMARY KEY);") await db.commit() # Add a test migration that fails mid-way original = db_module._MIGRATIONS.copy() db_module._MIGRATIONS[99] = """ CREATE TABLE test_table (id INTEGER PRIMARY KEY); INSERT INTO nonexistent_table VALUES (1); """ try: with pytest.raises(Exception): await _apply_migration(db, 99) # Verify rollback: migration NOT recorded async with db.execute( "SELECT version FROM schema_migrations WHERE version = 99;" ) as cursor: assert await cursor.fetchone() is None # Verify rollback: table NOT created async with db.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='test_table';" ) as cursor: assert await cursor.fetchone() is None finally: db_module._MIGRATIONS = original finally: await db.close() ``` ### Common Pitfalls - **Non-idempotent statements** — `ALTER TABLE ADD COLUMN` without `IF NOT EXISTS` will fail on re-run. Use explicit checks if needed. - **Comments containing semicolons** — the migration parser strips comments correctly, but avoid unusual comment syntax. - **String literals with semicolons** — the parser handles these; no special escaping needed. - **Multiple operations in one migration** — keep migrations focused. Combine related DDL but split unrelated changes. --- ## 6.3 Database Transactions Database transactions ensure atomicity for multi-step operations and prevent race conditions when concurrent requests interact with the database. BanGUI uses **SQLite with WAL (Write-Ahead Logging)** mode, which enables concurrent readers but serializes writers. ### When to Use Explicit Transactions **Use `BEGIN IMMEDIATE ... COMMIT` for:** 1. **Multi-step logical operations** — Operations that should succeed or fail as a unit. Example: ```python # Bad — two separate operations, race condition window exists await db.execute("INSERT INTO sessions ...") await db.commit() # Good — atomic single operation, no need for explicit transaction ``` 2. **Operations that combine multiple queries with shared state** — When the operation outcome depends on reading and then writing based on that read: ```python # Bad — race condition: another request checks between our read and write existing_run = await import_run_repo.get_by_source_and_hash(db, source_id, content_hash) if existing_run is None: run_id = await import_run_repo.create_pending(db, source_id, content_hash) # Good — atomic: both operations within same transaction boundary try: await db.execute("BEGIN IMMEDIATE") cursor = await db.execute("INSERT INTO import_runs ...") await db.commit() except aiosqlite.IntegrityError: # Another request won the race; fetch the existing record existing = await import_run_repo.get_by_source_and_hash(...) ... ``` 3. **Bulk operations that should be all-or-nothing** — For example, upserting positive and negative geo cache entries: ```python try: await db.execute("BEGIN IMMEDIATE") await bulk_upsert_entries(db, positive_rows) await bulk_upsert_neg_entries(db, negative_ips) await db.commit() except Exception: await db.rollback() raise ``` **Do NOT use explicit transactions for:** - Single SQL statements — SQLite guarantees atomic writes for individual statements. No explicit transaction needed. - Read-only queries — Queries do not modify data and do not need transaction boundaries. ### Transaction Pattern Always use this pattern for wrapped operations: ```python try: await db.execute("BEGIN IMMEDIATE") # ... perform all operations ... await db.commit() except Exception: await db.rollback() raise ``` - **`BEGIN IMMEDIATE`** — Acquires a write lock immediately, preventing other writers from entering the transaction window. This is critical for crash-safety and consistency. - **`COMMIT`** — Persists all changes. - **`ROLLBACK`** — Rolls back on any exception, ensuring the database is left in a consistent state. ### Handling Race Condition Errors When a `UNIQUE` constraint violation occurs due to a race condition (two concurrent requests attempt the same insert), the database raises `aiosqlite.IntegrityError`. **Handle this at the call site** by retrying the lookup: ```python try: run_id = await import_run_repo.create_pending(db, source_id, content_hash) except aiosqlite.IntegrityError: # Another concurrent request created it first existing = await import_run_repo.get_by_source_and_hash(db, source_id, content_hash) if existing is None: raise RuntimeError("Constraint error indicates row exists but lookup failed") run_id = existing.id log.info("lost_race", run_id=run_id) ``` This approach: 1. Lets the database constraint prevent data corruption. 2. Gracefully handles the concurrent case in application logic. 3. Avoids unnecessary locking overhead for the common case (no concurrent writers). --- ## 7. Structured Logging Policy All logging in BanGUI services and tasks must use **structlog** for consistent, queryable event tracking. This policy defines when and how to log at each level. ### 7.1 Logging Levels and When to Use Them **INFO** — User-facing operations and state changes - Use for significant operational events that the operator should know about. - Examples: service startup/completion, resource creation/deletion, state transitions, successful operations with business impact. - Never excessive — keep volume reasonable to maintain log clarity. - Include relevant context: resource IDs, counts, configuration changes. ```python log.info("jail_activated", jail=name) log.info("blocklist_source_created", id=new_id, name=name, url=url) log.info("session_cleanup_ran", deleted_count=count, cutoff_time=now_iso) ``` **WARNING** — Recoverable failures, degraded functionality, unexpected but handled conditions - Use for issues that are not fatal but indicate something is wrong or suboptimal. - Examples: missing optional config, fallback behavior triggered, non-critical API call failed, parsing errors, missing resources that have workarounds. - Include error details and context to enable investigation. - Do NOT use for expected error paths (e.g., wrong password in login attempt). ```python log.warning("jail_status_parse_error", jail=name, error=str(exc)) log.warning("geo_lookup_failed", ip=ip, error=type(exc).__name__) log.warning("geoip_mmdb_not_found", path=mmdb_path) ``` **ERROR** — Fatal/unrecoverable failures within a request or task - Use for errors that prevent an operation from completing. - Examples: database write failed, critical resource is missing, state is corrupted. - Include the full error context to enable debugging. - Pair with exception handling — if you log an error, you've decided the caller should handle it or return a failure. ```python log.error("jail_activation_rollback_restore_failed", jail=name, error=str(exc)) log.error("fail2ban_probe_parse_error", error=str(exc)) ``` **EXCEPTION** — Unhandled exceptions in background tasks and scheduled jobs - Use `log.exception()` (not `log.error()`) in catch-all exception handlers to automatically capture the full traceback. - This level is ONLY for surprises that should never happen in production. - Use in task callback exception handlers and top-level task runners. ```python try: result = await blocklist_service.import_all(...) except Exception: log.exception("blocklist_import_unexpected_error") ``` **DEBUG** — Low-level details for development and troubleshooting - Use for details too verbose for normal operation (e.g., successful lookups, parsed files, state transitions, loop iterations). - Include data samples and validation results. - Safe to leave in the code — debug logs are not emitted by default in production. ```python log.debug("geo_lookup_success_mmdb", ip=ip, country=result.country_code) log.debug("action_file_parsed", name=raw.name) log.debug("backend_cmd_supported_detected") ``` ### 7.2 Event Naming Convention All log event names must follow a consistent pattern for queryability: **Pattern:** `{domain}_{entity_or_action}[_{result_or_detail}]` - **Domain** — The service or feature area (e.g., `jail`, `blocklist`, `geo`, `auth`, `ban`). - **Entity or Action** — The noun or verb describing what happened (e.g., `activated`, `created`, `failed`, `synced`). - **Result or Detail** (optional) — Additional specificity for complex scenarios (e.g., `_restore_failed`, `_non_200`, `_no_fallback`). **Examples (organized by domain):** | Domain | INFO | WARNING | ERROR | DEBUG | |--------|------|---------|-------|-------| | **jail** | `jail_activated`, `jail_deactivated`, `jail_reloaded` | `jail_status_parse_error`, `jail_rollback_failed` | `jail_activation_rollback_restore_failed` | `jail_file_parsed`, `backend_cmd_supported_detected` | | **geo** | `geo_cache_loaded_from_db`, `geo_flush_dirty_complete` | `geo_lookup_failed`, `geo_persist_failed` | - | `geo_lookup_success_mmdb`, `geo_cache_cleanup_ran` | | **blocklist** | `blocklist_import_starting`, `blocklist_source_created` | `blocklist_schedule_invalid`, `blocklist_preview_failed` | - | `blocklist_ban_failed` | | **ban** | `active_bans_fetched`, `all_ips_unbanned` | `ban_service_geo_lookup_failed` | `ban_service_geo_lookup_unexpected_error` | `ban_entry_parse_error` | | **auth** | `bangui_login_success`, `bangui_logout` | `bangui_login_wrong_password`, `bangui_login_no_hash` | - | - | | **config** | `filter_created`, `action_updated` | `filter_read_error`, `action_d_not_found` | - | `filter_file_parsed` | | **setup** | `bangui_setup_started`, `bangui_setup_completed` | - | `bangui_setup_failed` | - | **Key rules:** - Use **snake_case** (never camelCase or PascalCase). - Keep event names short but descriptive — aim for 2–4 words. - Use consistent terminology across the codebase (e.g., always `_created`, never `_added` or `_new`). - Prefix task/background job events with the job name (e.g., `blocklist_import_starting`, `session_cleanup_ran`). ### 7.3 Structured Context and Key-Value Pairs Always log with **structured context** — key-value pairs that make logs queryable and analyzable. **Essential patterns:** ```python # Operation with count log.info("active_bans_fetched", total=len(bans)) # Resource manipulation with ID log.info("blocklist_source_deleted", id=source_id) # State transition with reason or error log.warning("geo_persist_failed", ip=ip, error=type(exc).__name__) # Time-bounded operation log.info("session_cleanup_ran", deleted_count=count, cutoff_time=now_iso) # Config or feature change log.info("blocklist_schedule_updated", frequency=config.frequency, hour=config.hour) # Batch operation with metrics log.info("blocklist_import_finished", total_imported=result.total_imported, total_skipped=result.total_skipped, errors=result.errors_count) ``` **What to include:** - Resource IDs (jail names, IP addresses, source IDs). - Counts and metrics (rows processed, items synced, errors). - Configuration or decision points (enabled/disabled flags, thresholds). - Timestamps and durations for long-running operations (optional but useful). - Error types and short error messages (use `type(exc).__name__` or `str(exc)` depending on context). **What to NEVER log:** - Sensitive data: passwords, tokens, session IDs, API keys, hashes. - For session correlation without leaking token material, use a one-way hash fragment: `hashlib.sha256(token.encode()).hexdigest()[:12]`. - Use numeric database IDs for entity correlation instead of raw identifiers: `session_id=session.id` instead of `token=session.token`. - Full exception tracebacks in INFO/WARNING (use `log.exception()` only in catch-all handlers). - Redundant system time (structlog adds `timestamp` automatically). - User PII in most cases (name, email, phone, etc.) — unless directly relevant to debugging and anonymized. ### 7.4 Background Tasks and Scheduled Jobs Every background task (APScheduler job) must follow this pattern: 1. **On startup:** Log `{job_name}_scheduled` with interval or cron expression. 2. **On execution:** Log `{job_name}_starting` (INFO) and `{job_name}_finished` or `{job_name}_ran` (INFO) with metrics. 3. **On exception:** Use `log.exception("{job_name}_unexpected_error")` in the top-level try/except. ```python # app/tasks/blocklist_import.py async def _run_import_with_resources(settings: Settings, http_session: ClientSession) -> None: """APScheduler callback that imports all enabled blocklist sources.""" log.info("blocklist_import_starting") try: result = await blocklist_service.import_all(...) log.info("blocklist_import_finished", total_imported=result.total_imported, total_skipped=result.total_skipped, errors=result.errors_count) except Exception: log.exception("blocklist_import_unexpected_error") # Register and log in the lifespan or register function: log.info("blocklist_import_scheduled", interval_seconds=INTERVAL_SECONDS) ``` ### 7.5 Service Functions and Methods Service functions should log at entry/exit for significant operations, or when errors occur: **Entry logging** (optional, use for complex or long-running operations): ```python async def import_all(...) -> ImportResult: log.debug("blocklist_import_starting", count=len(sources)) try: ... except SomeError: log.warning("blocklist_import_partial_failure", imported=count, error=str(exc)) ``` **Exit/success logging** (log results with metrics): ```python async def get_all_jails(socket_path: str) -> list[JailResponse]: jails = await _fetch_jails(socket_path) log.info("jail_list_fetched", count=len(jails)) return jails ``` **Error handling** (log with context, let caller decide how to respond): ```python try: result = await fetch_external_data(url) except TimeoutError: log.warning("external_fetch_timeout", url=url, timeout_seconds=TIMEOUT) raise except Exception as exc: log.error("external_fetch_failed", url=url, error=type(exc).__name__) raise ``` ### 7.6 Domain Model Pattern for Services Services **return domain models** (frozen dataclasses), not Pydantic response models. Conversion to response models happens at the **router boundary**. **Example (correct):** ```python # app/services/jail_service.py — returns domain model async def get_jail(socket_path: str, name: str) -> DomainJailDetail: ... return DomainJailDetail(name=name, ...) # app/routers/jails.py — converts at boundary @router.get("/{name}") async def get_jail(...) -> JailDetailResponse: domain = await jail_service.get_jail(socket_path, name) return jail_mappers.map_domain_jail_detail_to_response(domain) ``` **When adding a new service:** 1. Define domain model in `app/models/{domain}_domain.py` (frozen dataclass) 2. Add mapper in `app/mappers/{domain}_mappers.py`: `map_domain_X_to_response(domain: DomainX) -> XResponse` 3. Service returns domain model type 4. Router calls mapper before returning **Reference:** `ban_service.py` + `ban_mappers.py` is canonical example. See `Docs/DOMAIN_MODELS.md`. --- ## 8. Error Handling - Define **custom exception classes** for domain errors (e.g., `JailNotFoundError`, `BanFailedError`). - Catch specific exceptions — never bare `except:` or `except Exception:` without re-raising. - Map domain exceptions to HTTP status codes via FastAPI **exception handlers** registered on the app. - Always log errors with context before raising. ```python class JailNotFoundError(Exception): def __init__(self, name: str) -> None: self.name: str = name super().__init__(f"Jail '{name}' not found") # In main.py @app.exception_handler(JailNotFoundError) async def jail_not_found_handler(request: Request, exc: JailNotFoundError) -> JSONResponse: return JSONResponse(status_code=404, content={"detail": f"Jail '{exc.name}' not found"}) ``` ### Routers and Exception Propagation - **Routers must NOT construct `HTTPException` for domain errors** — let domain exceptions propagate. - Routers should never have helper functions like `_bad_gateway()`, `_not_found()`, `_conflict()` etc. that convert domain exceptions to `HTTPException`. - All domain exception types must have corresponding handlers registered in `main.py` via `app.add_exception_handler()`. - Exception handlers are registered in order from most specific to least specific — FastAPI evaluates them in registration order. ```python # ❌ BAD — routers constructing HTTPException for domain exceptions @router.get("/{name}") async def get_jail(name: str, socket_path: Fail2BanSocketDep) -> JailDetailResponse: try: return await jail_service.get_jail(socket_path, name) except JailNotFoundError: raise HTTPException(status_code=404, detail=f"Jail not found: {name!r}") from None # ✅ GOOD — domain exception propagates to global handler @router.get("/{name}") async def get_jail(name: str, socket_path: Fail2BanSocketDep) -> JailDetailResponse: return await jail_service.get_jail(socket_path, name) ``` All domain exceptions raised by services propagate to handlers in `main.py`, ensuring: 1. Consistent error response format across the entire API. 2. No duplicated exception-to-HTTP-status mapping logic. 3. Easy to audit all error codes — they are all in one place. ### Error Message Hygiene HTTP responses must never leak sensitive internal details that aid attackers or expose infrastructure: - **Never include system paths** in HTTP error messages (e.g., `/var/run/fail2ban/fail2ban.sock`, `/etc/fail2ban/`). - **Never include raw exception messages** that expose internal parsing or implementation logic. - **Log full details server-side only** — exception handlers must log `error=str(exc)` with full exception context, but return generic user-friendly messages in the HTTP response. ```python # ❌ BAD — leaks socket path and internal details to the client async def _fail2ban_connection_handler(request: Request, exc: Fail2BanConnectionError) -> JSONResponse: return JSONResponse( status_code=502, content={"detail": f"Cannot reach fail2ban: {exc}"}, # exc includes socket path! ) # ✅ GOOD — generic message in response, full details in server logs async def _fail2ban_connection_handler(request: Request, exc: Fail2BanConnectionError) -> JSONResponse: log.warning( "fail2ban_connection_error", path=request.url.path, method=request.method, error=str(exc), # Full details logged server-side ) return JSONResponse( status_code=502, content={"detail": "Cannot reach the fail2ban service. Check the server status page."}, ) ``` ### Exception Taxonomy and HTTP Mapping BanGUI uses a **standardized exception taxonomy** that maps domain errors to HTTP status codes consistently across all services. This allows routers to handle exceptions by category rather than by individual type, reducing code duplication and ensuring consistent client-facing error responses. #### Exception Categories All domain exceptions inherit from one of six base categories defined in `app.exceptions`: | Base Exception | HTTP Status | Meaning | Example | |---|---|---|---| | `NotFoundError` | 404 | Requested domain entity not found | `JailNotFoundError`, `FilterNotFoundError` | | `BadRequestError` | 400 | Invalid input, validation failure, or invalid identifier | `ConfigValidationError`, `JailNameError` | | `ConflictError` | 409 | State conflict or resource constraint violation | `JailAlreadyActiveError`, `FilterAlreadyExistsError` | | `OperationError` | 500 | Domain operation failure (write, update, delete) | `ConfigWriteError`, `ConfigFileWriteError` | | `ServiceUnavailableError` | 503 | Infrastructure or external service unreachable | `Fail2BanConnectionError`, `ConfigDirError` | #### Service Exception Mapping Every service-specific exception inherits from exactly one category. This allows `main.py` to register just **5 exception handlers** instead of 25+: ```python # In app/exceptions.py — define each exception once with its category class JailNotFoundError(NotFoundError): def __init__(self, name: str) -> None: self.name = name super().__init__(f"Jail not found: {name!r}") class JailAlreadyActiveError(ConflictError): def __init__(self, name: str) -> None: self.name = name super().__init__(f"Jail is already active: {name!r}") # In app/main.py — register category handlers app.add_exception_handler(NotFoundError, _not_found_handler) app.add_exception_handler(BadRequestError, _bad_request_handler) app.add_exception_handler(ConflictError, _conflict_handler) app.add_exception_handler(OperationError, _domain_error_handler) app.add_exception_handler(ServiceUnavailableError, _service_unavailable_handler) ``` #### Service Exception Reference When writing a new service, determine which category each exception belongs to: - **Not found**: Always `NotFoundError` (e.g., jail, filter, action, config file not found) - **Invalid input**: Always `BadRequestError` (e.g., validation errors, invalid names, regex compile failure) - **State conflicts**: Always `ConflictError` (e.g., already exists, already active, readonly resource) - **Operation failures**: Always `OperationError` (e.g., write failed, update failed, command failed) - **Infrastructure**: Always `ServiceUnavailableError` (e.g., config dir missing, socket unreachable, fail2ban protocol error) #### Client Expectations Clients should expect the following HTTP status codes and response format for all domain errors: ```json HTTP 400 Bad Request { "detail": "Jail name contains invalid characters" } HTTP 404 Not Found { "detail": "Jail not found: 'sshd'" } HTTP 409 Conflict { "detail": "Jail is already active: 'sshd'" } HTTP 500 Internal Server Error { "detail": "Failed to write configuration: permission denied" } HTTP 503 Service Unavailable { "detail": "Cannot reach the fail2ban service. Check the server status page." } ``` The `detail` field always contains the exception's message (from `str(exc)`). Sensitive details (socket paths, file paths, internal error messages) are never included — they are logged server-side only. - **Network I/O**: `TimeoutError`, `aiohttp.ClientError`, `asyncio.TimeoutError` - **File I/O**: `OSError` (includes `IOError`, `FileNotFoundError`, `PermissionError`) - **JSON parsing**: `json.JSONDecodeError`, `ValueError` - **Database errors**: `aiosqlite.Error` and derivatives (caught as `OSError`) - **Third-party libraries**: Specific exception classes (e.g., `geoip2.errors.GeoIP2Error`) **When catching service-critical exceptions**: 1. Catch the specific exception types for the operation. 2. Log with the exception type and relevant context. 3. Return a safe fallback (empty dict, None, etc.) or re-raise if the service cannot function. **When truly unavoidable broad catches are needed** (e.g., retrying transient network failures): 1. Place specific catches first. 2. Add one final `except Exception` **after** specific cases, with `error_type="unexpected"` logged to flag surprises. 3. Document why broad catching is necessary (e.g., "tests use mock objects that may raise arbitrary exceptions"). **Example:** ```python async def lookup_batch(ips: list[str], http_session: aiohttp.ClientSession) -> dict[str, GeoInfo]: """Resolve multiple IPs, returning empty map on failure.""" try: result = await http_session.post(url, json=payload, timeout=timeout) except (TimeoutError, aiohttp.ClientError) as exc: # Expected network failures — log and return empty result log.warning("geo_batch_http_failed", error=type(exc).__name__) return {} except Exception as exc: # Unexpected — log as error for investigation log.error("geo_batch_unexpected_error", error=type(exc).__name__) return {} ``` --- ## 9. Testing - **Every** new feature or bug fix must include tests. - Tests live in `tests/` mirroring the `app/` structure. - Use `pytest` with `pytest-asyncio` for async tests. - Use `httpx.AsyncClient` to test FastAPI endpoints (not `TestClient` which is sync). - Mock external dependencies (fail2ban socket, aiohttp calls) — tests must never touch real infrastructure. - Aim for **>80 % line coverage** — critical paths (auth, banning, scheduling) must be 100 %. - Test names follow `test___` pattern. ```python import pytest from httpx import AsyncClient, ASGITransport from app.main import create_app @pytest.fixture async def client() -> AsyncClient: app = create_app() transport: ASGITransport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as ac: yield ac @pytest.mark.asyncio async def test_list_jails_returns_200(client: AsyncClient) -> None: response = await client.get("/api/v1/jails/") assert response.status_code == 200 data: dict = response.json() assert "jails" in data ``` See [API_VERSIONING.md](API_VERSIONING.md) for the full versioning strategy, deprecation policy, and instructions for adding versioned endpoints. --- ## 9.1 Background Tasks and Scheduler Architecture BanGUI uses **APScheduler 4.x** (async mode) to manage background jobs that execute on a schedule without user interaction. This section documents how to write and register background tasks. ### Task Location and Structure All background tasks live in `backend/app/tasks/` as separate modules. Each task: - Exports a `register(app: FastAPI) -> None` or `async def register(app: FastAPI) -> None` function. - Opens its own database connection using `app.db.open_db()` or the `task_db()` helper. - Closes connections when work completes (use the async context manager pattern). - Runs independently of the FastAPI request/response cycle. ### Example Task ```python # backend/app/tasks/my_task.py import structlog from fastapi import FastAPI from apscheduler.schedulers.asyncio import AsyncIOScheduler log = structlog.get_logger() async def my_background_job(app: FastAPI) -> None: """Do important work on a schedule.""" log.info("my_background_job_started") try: db = await app.db.open_db(app.state.settings.database_path) try: # Do work... pass finally: await db.close() except Exception: log.error("my_background_job_failed", exc_info=True) def register(app: FastAPI) -> None: """Register the job with the scheduler.""" scheduler: AsyncIOScheduler = app.state.scheduler scheduler.add_job( my_background_job, args=(app,), trigger="interval", seconds=60, id="my_task", name="My Background Job", ) ``` ### Accessing Shared Resources in Tasks Since tasks do not have access to `Depends(get_db)` (no request scope), they must: 1. **Open their own DB connection** via `app.state.db_factory.open_db(path)`. 2. **Access app-level state** — `app.state.http_session`, `app.state.geo_cache`, `app.state.settings`, etc. 3. **Use structlog** for all logging (never `print()`). ### Single-Worker Requirement **The scheduler is bound to a single asyncio event loop and cannot be shared across multiple worker processes.** BanGUI enforces single-worker mode to prevent duplicate task execution. - **Deployment constraint:** Set `BANGUI_WORKERS=1` (default). - **Startup validation:** `startup_shared_resources()` raises `RuntimeError` if `BANGUI_WORKERS > 1`. - See [Architekture.md § 9.2](Architekture.md) for full details. ### Timeout Protection for Background Tasks **All background tasks must wrap their async work with timeout protection.** If a task hangs (API unreachable, network partition, database lock), it runs forever — never completes → lock never released → duplicate work starts → resource exhaustion. Timeouts prevent this. **Rule:** Every task function must use `run_with_timeout()` from `app.tasks.timeout_utils` to enforce a timeout on its async work. ```python from app.tasks.timeout_utils import run_with_timeout async def _run_import_with_resources(settings: Settings, http_session: ClientSession) -> None: """Imports blocklists with timeout protection.""" async def _do_import() -> None: # ... your async work ... result = await blocklist_service.import_all(...) log.info("import_finished", total=result.total_imported) # Wrap with timeout: abort after 300 seconds await run_with_timeout("blocklist_import", _do_import(), timeout_seconds=300) ``` **Why this pattern:** 1. `run_with_timeout()` enforces strict time limits using `asyncio.wait_for()`. 2. If timeout is exceeded, `TimeoutError` is raised and logged with elapsed time. 3. If task approaches timeout (>80% of time budget), a warning is logged for observability. 4. Failures are logged at `warning` level (not `error`) — timeouts are expected sometimes, but worth investigating. **Timeout Values by Task:** | Task | Timeout | Rationale | |------|---------|-----------| | `blocklist_import` | 300s (5 min) | Downloads, validates, applies external lists. Network delays expected. | | `health_check` | 10s | Socket probe to fail2ban. Should complete quickly or fail2ban is unresponsive. | | `geo_cache_flush` | 60s | Writes dirty cache entries to DB. Handles contention gracefully. | | `session_cleanup` | 30s | Deletes expired sessions. DB contention unlikely but possible. | | `rate_limiter_cleanup` | 5s | In-memory cleanup, no I/O. Should always be instant. | | `geo_cache_cleanup` | 60s | Deletes stale geo entries from DB. May scan large table. | | `geo_re_resolve` | 120s | Retries failed IP lookups with backoff. API rate-limit delays expected. | | `history_sync` | 60s | Syncs records from fail2ban DB to archive. May read/write many rows. | | `scheduler_lock_heartbeat` | 5s | Updates lock timestamp. Must be quick or lock is lost. | **Timeout Events Are Logged:** On timeout: ``` task_timeout task_name=blocklist_import timeout_seconds=300 elapsed_seconds=300.45 ``` On approaching timeout (>80% of budget used): ``` task_approaching_timeout task_name=blocklist_import timeout_seconds=300 elapsed_seconds=298.5 usage_percent=99.5 ``` The logs include `elapsed_seconds` for observability — if you see tasks consistently near timeout, the value may need adjustment. **Testing Timeout Behavior:** Tests for timeout scenarios are in `backend/tests/test_tasks/test_timeout_utils.py`: - Verify timeout is raised and logged. - Verify approaching-timeout warning is logged. - Verify task exceptions (not timeout) propagate correctly. Add timeout tests to your task test file: ```python @pytest.mark.asyncio async def test_task_timeout_is_logged(self) -> None: """Task must be logged and raise TimeoutError on timeout.""" with patch("app.tasks.my_task.log") as mock_log: with pytest.raises(TimeoutError): await my_task._run_with_resources(settings) # exceeds timeout timeout_calls = [ c for c in mock_log.warning.call_args_list if c[0][0] == "task_timeout" ] assert len(timeout_calls) == 1 ``` --- ### Task Idempotency **Background tasks must be idempotent** — retrying after a crash must produce the same result as running once. If a task crashes or times out mid-execution, the scheduler may retry. Without idempotency, retries cause duplicate work: - **blocklist_import**: banned IPs appear twice → database corruption - **geo_cache_flush**: entries written twice → cache inconsistency - Any multi-step operation: partial state remains **Pattern: Content-Hash Idempotency for Blocklist Imports** Track imports by source + content hash to detect retries: ```python from app.repositories import import_run_repo async def import_source(source, db, ...): # Download content status, content = await downloader.download(url) # Compute hash for idempotency detection content_hash = hashlib.sha256(content.encode()).hexdigest() # Check if this exact import already completed existing_run = await import_run_repo.get_by_source_and_hash( db, source.id, content_hash ) if existing_run and existing_run.status == "completed": # Already done — skip banning, optionally re-warm cache log.info("blocklist_import_already_completed", ...) return ImportSourceResult(ips_imported=existing_run.imported_count, ...) # First run: create pending record if not existing_run: run_id = await import_run_repo.create_pending( db, source.id, content_hash ) else: run_id = existing_run.id # Retry case # Do work (ban IPs, etc.) imported, errors = await ban_executor.ban_ips(...) # Mark as completed or failed (atomically) if errors: await import_run_repo.mark_failed(db, run_id, str(errors)) else: await import_run_repo.mark_completed(db, run_id, imported, skipped) ``` **Key points:** 1. **Operation ID must be deterministic** — Use content hash, not timestamp - Same content = same operation ID → retry safe - Different content = different operation ID → new import run 2. **Check before doing work** — Query `import_runs` table before banning - If completed: skip banning (already done) - If pending: retry was interrupted, try again - If failed: retry to recover 3. **Atomic state updates** — Mark as completed AFTER all work succeeds - All-or-nothing: either import succeeded + logged, or failed + retryable 4. **Test idempotency** — Verify retrying same content doesn't duplicate bans ```python # First import: ban 2 IPs result1 = await import_source(source, content, db) assert result1.ips_imported == 2 # Second import (same content): skip bans result2 = await import_source(source, content, db) assert result2.ips_imported == 2 assert ban_ip.call_count == 2 # Only called once, not twice ``` --- | Tool | Purpose | |---|---| | **Ruff** | Linter and formatter (replaces black, isort, flake8). | | **mypy** or **pyright** | Static type checking in strict mode. | | **pre-commit** | Run ruff + type checker before every commit. | - Line length: **120 characters** max. - Strings: use **double quotes** (`"`). - Imports: sorted by ruff — stdlib → third-party → local, one import per line. - No unused imports, no unused variables, no `# type: ignore` without explanation. - Docstrings in **Google style** on every public function, class, and module. --- ## 11. fail2ban Response Utilities All services that interact with the fail2ban daemon must use the canonical response parsing utilities from `app.utils.fail2ban_response`. This ensures consistent error handling, type safety, and makes it easy to fix bugs in response handling across the entire codebase. ### Available Functions **`ok(response: object) -> object`** Extracts the payload from a fail2ban ``(return_code, data)`` response tuple. - Raises `ValueError` if return code ≠ 0 or response shape is invalid. - Use this on every response from `Fail2BanClient.send()`. **`to_dict(pairs: object) -> dict[str, object]`** Converts a list of ``(key, value)`` pairs (fail2ban's native response format) to a Python dict. - Silently ignores malformed entries and non-list/tuple inputs. - Always returns a dict (empty if input is invalid). **`ensure_list(value: object | None) -> list[str]`** Coerces fail2ban response values (which may be `None`, a single string, or a list) to a normalized list of strings. - Handles all three cases consistently. - Returns empty list for `None` or empty strings. **`is_not_found_error(exc: Exception) -> bool`** Checks if an exception indicates a jail does not exist. - Checks for multiple error message patterns (case-insensitive). - Use this to distinguish "jail not found" errors from other failures. ### Example Usage ```python from app.utils.fail2ban_response import ok, to_dict, ensure_list, is_not_found_error from app.utils.fail2ban_client import Fail2BanClient client = Fail2BanClient(socket_path="/var/run/fail2ban/fail2ban.sock") try: # Get jail status response = await client.send(["status", "sshd", "short"]) status_dict = to_dict(ok(response)) # Extract payload and convert to dict # Get list of banned IPs ban_response = await client.send(["get", "sshd", "banip"]) banned_ips = ensure_list(ok(ban_response)) # Normalize to list of strings except ValueError as exc: if is_not_found_error(exc): raise JailNotFoundError("sshd") from exc raise ``` ### Why This Matters Before this utility module, every service implemented its own copy of these functions, leading to: - Code duplication across 7+ service files. - Subtle inconsistencies in error handling. - Difficult maintenance — every bug fix required touching multiple files. Now, all services import from a single authoritative source, making response handling consistent, maintainable, and type-safe. --- ## 12. Configuration & Secrets - All configuration lives in **environment variables** loaded through **pydantic-settings**. - Secrets (master password hash, session key) are **never** committed to the repository. - Provide a `.env.example` with all keys and placeholder values. - Validate config at startup — fail fast with a clear error if a required value is missing. ```python from pydantic_settings import BaseSettings from pydantic import Field class Settings(BaseSettings): database_path: str = Field("bangui.db", description="Path to SQLite database") fail2ban_socket: str = Field("/var/run/fail2ban/fail2ban.sock", description="fail2ban socket path") session_secret: str = Field(..., description="Secret key for session signing") log_level: str = Field("info", description="Logging level") model_config = {"env_prefix": "BANGUI_", "env_file": ".env"} ``` ### Session Secret Configuration The `session_secret` is the HMAC key used to sign all session tokens. It must be at least 32 characters (256 bits) to provide sufficient cryptographic strength for HMAC-SHA256. **Minimum Length:** 32 characters **Why 32 characters?** Session tokens are signed using HMAC-SHA256. A secret shorter than 32 bytes (256 bits) significantly weakens the signature, potentially allowing attackers to forge valid tokens. The constraint is enforced at startup — the application will fail to start if `session_secret` is shorter than 32 characters. **Generation:** Generate a secure secret using Python: ```bash python -c "import secrets; print(secrets.token_hex(32))" ``` This produces a 64-character hexadecimal string (256 bits) suitable for production use. **Environment Variable:** ```bash BANGUI_SESSION_SECRET="your-32-character-minimum-secret-here" ``` **Never** commit the actual secret to the repository. Provide a `.env.example` with a placeholder: ```bash # .env.example BANGUI_SESSION_SECRET="set-this-to-a-32-character-minimum-secret" ``` ### Session Secret Rotation **Problem:** If a session secret leaks, all active sessions become compromised and an attacker can forge new tokens. Rotating the secret invalidates forged tokens but may require all users to log out if rotation is done all at once. **Solution:** BanGUI supports gradual secret rotation without forcing logout. During rotation: 1. All new tokens are signed with the current secret 2. Old tokens signed with the previous secret are still accepted 3. Tokens using the previous secret are transparently validated and logged 4. Once all old tokens expire naturally, disable the rotation by unsetting the previous secret **Rotation Strategy (Step-by-Step):** #### 1. Generate a New Secret Before rotation, generate a fresh secret: ```bash python -c "import secrets; print(secrets.token_hex(32))" ``` #### 2. Start Rotation (Without Stopping the Service) Update your configuration **simultaneously** on all deployment replicas: ```bash # .env (or ConfigMap in Kubernetes) BANGUI_SESSION_SECRET="" # Current (new) secret BANGUI_SESSION_SECRET_PREVIOUS="" # Previous (old) secret ``` Or in Kubernetes: ```yaml env: - name: BANGUI_SESSION_SECRET valueFrom: secretKeyRef: name: bangui-secrets key: current-secret - name: BANGUI_SESSION_SECRET_PREVIOUS valueFrom: secretKeyRef: name: bangui-secrets key: previous-secret ``` **Key Point:** All replicas must know both secrets to accept old tokens. #### 3. Monitor Token Rotation Tokens signed with the previous secret are automatically validated and logged: ``` event=session_token_rotated_in_place session_id=42 old_secret_fragment=abc123 new_secret_fragment=def456 ``` These logs let you track how many sessions are still using old tokens. #### 4. Wait for Old Tokens to Expire Monitor the application logs and wait until: - No new `session_token_rotated_in_place` events appear (all old tokens have been used or expired) - Session duration (default: 480 minutes) + grace period has elapsed since the previous secret was enabled - Example: If sessions last 480 minutes, wait at least 8 hours from enabling the previous secret #### 5. Complete Rotation Once all old tokens have expired, remove the previous secret: ```bash # .env BANGUI_SESSION_SECRET="" # BANGUI_SESSION_SECRET_PREVIOUS is now unset or empty ``` **Important:** Keep the previous secret configured for at least `session_duration_minutes` (default 480 minutes / 8 hours) to avoid rejecting tokens that are still valid. **Metrics & Logging:** The auth service logs rotation events for observability: - `session_token_rotated_in_place` — Logged when a token signed with the previous secret is validated during the rotation window - `session_token_re_signed_after_rotation` — Logged in `unwrap_session_token_with_rotation()` when the previous secret validates a token - `old_secret_fragment` / `new_secret_fragment` — First 6 characters of the SHA256 hash of each secret (for non-sensitive correlation without logging actual secrets) - `session_id` — Database ID of the rotated session **Example Rotation Sequence:** ``` Time Config Event Logged ──────────────────────────────────────────────────────────────── T=0 current=old (normal operation) previous= T=5m current=new session_token_rotated_in_place previous=old (user session S1 validated with old secret) T=30m current=new (no more old tokens, all new tokens use current) previous=old → Still keep old secret set T=500m Enough time passed (old session S1 has expired) (480 min session + grace) T=510m current=new (rotation complete) previous= ``` **Avoiding Common Mistakes:** ❌ **Don't:** Rotate the secret and immediately unset the previous one → Old tokens will be rejected, forcing logout ✓ **Do:** Keep the previous secret for at least `session_duration_minutes` ❌ **Don't:** Rotate without updating all replicas → Some replicas reject old tokens, others accept them → Inconsistent behavior ✓ **Do:** Deploy config to all replicas simultaneously (via ConfigMap, Helm, or orchestrator) ❌ **Don't:** Use the same secret for development and production → Leaked dev secret can compromise prod ✓ **Do:** Generate unique secrets per environment ### Session Cookie Security The `session_cookie_secure` configuration controls the `Secure` flag on the session cookie. This flag prevents browsers from sending the session cookie over unencrypted HTTP. **Default:** `true` — Production deployments are secure by default. Cookies are only sent over HTTPS. **Local Development:** Set `BANGUI_SESSION_COOKIE_SECURE=false` in your compose file or `.env` to allow cookies over HTTP (required for `localhost:8000`). ```yaml # Docker/compose.debug.yml environment: BANGUI_SESSION_COOKIE_SECURE: "false" # Allow HTTP during local development ``` **Important:** If `Secure=true` is set, browsers will reject the session cookie when the backend is served over HTTP. Ensure your nginx/reverse proxy terminates TLS and passes `X-Forwarded-Proto: https` so FastAPI knows the connection is secure. ### CSRF Protection Middleware State-mutating endpoints (POST, PUT, DELETE, PATCH) authenticated via session cookies are protected by the `CsrfMiddleware`, which enforces a custom header check. **How It Works:** 1. For every request using a mutating HTTP method, the middleware checks: - Is this request authenticated via session cookie (not Bearer token)? - If yes, require the custom header `X-BanGUI-Request: 1`. - If missing or incorrect, return `403 Forbidden`. 2. **Bearer token requests** (via `Authorization: Bearer` header) bypass the check because tokens are not CSRF-vulnerable — they are never automatically sent on cross-origin requests. 3. **Safe HTTP methods** (GET, HEAD, OPTIONS) bypass the check. 4. **Cross-site protection:** Cross-site JavaScript (`fetch()` calls from other origins) cannot set custom headers without CORS preflight, which the backend rejects for non-allowed origins. This provides defense-in-depth against subdomain attacks and XSS injection. **Implementation Location:** - Middleware: `backend/app/middleware/csrf.py` - Registered in: `backend/app/main.py` via `app.add_middleware(CsrfMiddleware)` **Example:** ```python # ✓ Cookie-authenticated POST with CSRF header — allowed POST /api/bans Cookie: bangui_session=... X-BanGUI-Request: 1 # ✗ Cookie-authenticated POST without CSRF header — rejected with 403 POST /api/bans Cookie: bangui_session=... (no X-BanGUI-Request header) # ✓ Bearer token authentication without CSRF header — allowed POST /api/bans Authorization: Bearer (no X-BanGUI-Request header needed) # ✓ Safe GET method without CSRF header — allowed GET /api/jails Cookie: bangui_session=... (no X-BanGUI-Request header needed) ``` ### Setup Guard Route Policy BanGUI requires a one-time setup wizard to be completed before the application is usable. The `SetupRedirectMiddleware` enforces this by redirecting unauthenticated API requests to `/api/setup` until setup is complete. **How It Works:** 1. **Explicit Allowlist:** The middleware maintains two allowlists: - `_EXACT_ALLOWED`: Exact paths that bypass the guard (e.g., `/api/setup`, `/api/health`, `/api/docs`) - `_PREFIX_ALLOWED`: Route prefixes that bypass the guard (e.g., `/api/setup/` for nested routes like `/api/setup/timezone`) 2. **Path Matching Strategy:** The middleware uses **exact matching for exact paths** and **prefix matching with trailing slashes for nested routes**. This prevents fragile prefix-based allowlists (e.g., using `startswith("/api/setup")` would accidentally allow `/api/setup-debug`). 3. **When Setup is Complete:** Once setup completes, the middleware becomes a no-op and all routes are accessible normally. **Allowlisted Paths:** - `/api/setup` — Setup status check and initialization endpoint - `/api/setup/timezone` — Timezone configuration (reaches via `/api/setup/` prefix) - `/api/health` — Health check endpoint (used by monitoring and load balancers) - `/api/docs` — Swagger UI documentation - `/api/redoc` — ReDoc documentation - `/api/openapi.json` — OpenAPI schema (required by docs frontends) **Adding New Setup Routes:** When adding new routes to the setup flow: 1. If the route is an exact path (e.g., `/api/setup/validate`), add it to `_EXACT_ALLOWED` 2. If the route is nested under `/api/setup/` (e.g., `/api/setup/validate/config`), ensure `/api/setup/` is in `_PREFIX_ALLOWED` (it already is) 3. Never use prefix matching without a trailing slash — it leads to security issues with future route additions **Implementation Location:** - Middleware: `backend/app/main.py` — `SetupRedirectMiddleware` class - Configuration: Lines 584–601 in `backend/app/main.py` — `_EXACT_ALLOWED` and `_PREFIX_ALLOWED` constants - Guard logic: Lines 638–648 in `backend/app/main.py` — `dispatch()` method **Example:** ```python # If setup is incomplete: GET /api/jails → 307 Temporary Redirect to /api/setup # Allowlisted paths are always accessible: GET /api/setup → 200 OK (setup status) POST /api/setup → 201 Created (run setup) GET /api/setup/timezone → 200 OK (get timezone) GET /api/health → 200 OK (health check) GET /api/docs → 200 OK (documentation) # If setup is complete, all routes are accessible: GET /api/jails → 200 OK (jail list) ``` ### fail2ban_start_command Configuration The `fail2ban_start_command` setting specifies the shell command used to start the fail2ban daemon during recovery operations (e.g., after a rollback). **Format & Parsing:** - The command is split into arguments using `shlex.split()`, which respects shell quoting rules. - Paths with spaces must be quoted. Example: `"/opt/my tools/fail2ban-client" start`. - The command is **not** executed through a shell — no shell variables or globbing are interpreted. **Validation:** - The command is validated at startup using `shlex.split()`. Mismatched quotes will raise a `ValueError` with the problematic command in the error message. **Environment Variables:** ```bash BANGUI_FAIL2BAN_START_COMMAND="fail2ban-client start" # Default BANGUI_FAIL2BAN_START_COMMAND="systemctl start fail2ban" # systemd BANGUI_FAIL2BAN_START_COMMAND='"/opt/my tools/fail2ban" start' # Quoted path ``` **Common Pitfall:** Using `.split()` instead of `shlex.split()` would break commands with spaces in paths. Always use quoted strings for paths that contain whitespace. ### Trusted Proxy Configuration (Reverse Proxy Deployments) When BanGUI is deployed behind a reverse proxy (nginx, HAProxy, etc.), the proxy forwards the original client IP via HTTP headers (`X-Forwarded-For`, `X-Real-IP`). To extract the correct client IP for rate limiting and logging, you must configure which proxies are trusted. **Why This Is Needed:** Rate limiting (`POST /api/auth/login`) relies on accurate client IP detection to prevent brute-force attacks. Without proper proxy configuration: - Rate limits are applied per **proxy IP** (always the same) instead of per **client IP** — attackers can bypass limits by making many requests from the same proxy. - Logging shows proxy IPs instead of actual attacker IPs. **Trusted Proxies Configuration:** ```bash BANGUI_TRUSTED_PROXIES="10.0.0.0/8,172.16.0.0/12,192.168.0.0/16" ``` Accepted formats: - **Single IP:** `BANGUI_TRUSTED_PROXIES="192.168.1.1"` - **CIDR range:** `BANGUI_TRUSTED_PROXIES="10.0.0.0/8"` (matches any IP in 10.0.0.0 to 10.255.255.255) - **Multiple entries (comma-separated):** `BANGUI_TRUSTED_PROXIES="192.168.1.1,10.0.0.0/8"` - **Whitespace is stripped:** `BANGUI_TRUSTED_PROXIES="192.168.1.1 , 10.0.0.0/8"` is valid - **IPv6 supported:** `BANGUI_TRUSTED_PROXIES="2001:db8::/32"` **Default:** Empty list (no proxies trusted). Proxy headers are ignored, and only the direct connection IP is used. **Validation:** The application validates all entries at startup: - Each entry must be a valid IP address or CIDR range. - Invalid entries (e.g., `"not-an-ip"`, `"10.0.0.0/33"`) will cause a `ValidationError` at startup. - The application will not start if any entry is invalid. **How It Works:** 1. When a request arrives, the middleware checks the immediate connection source (e.g., `client.host`). 2. If the immediate connection is **not** in the `trusted_proxies` list, it is used directly as the client IP (proxy headers are ignored). 3. If the immediate connection **is** trusted, the middleware extracts the original client IP from headers in this order: - `X-Forwarded-For` (leftmost IP in the chain, if present) - `X-Real-IP` (fallback) - Immediate connection IP (if no forwarded headers found) **Example Docker Compose Configuration:** ```yaml version: '3.8' services: nginx: image: nginx:latest ports: - "80:80" volumes: - ./nginx.conf:/etc/nginx/nginx.conf:ro networks: - bangui-net backend: image: bangui:latest environment: BANGUI_TRUSTED_PROXIES: "10.0.0.0/8" # Trust Docker internal network BANGUI_SESSION_COOKIE_SECURE: "false" # nginx terminates TLS networks: - bangui-net networks: bangui-net: driver: bridge ``` **Example nginx Configuration:** ```nginx upstream bangui_backend { server backend:8000; } server { listen 80; server_name bangui.example.com; location /api/ { proxy_pass http://bangui_backend; # Forward the original client IP proxy_set_header X-Forwarded-For $remote_addr; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-Proto $scheme; # Required for FastAPI to recognize the original protocol proxy_set_header Host $host; } } ``` **Important Security Notes:** - **Only trust IPs you control.** Never include untrusted networks or `0.0.0.0/0`. An attacker with network access to a trusted IP can forge `X-Forwarded-For` headers and bypass rate limits. - **Validate proxy IPs carefully.** Use CIDR ranges that match your infrastructure (e.g., `10.0.0.0/8` for Docker, `172.31.0.0/16` for specific Docker networks). - **Use HTTPS in production.** Ensure your nginx terminates TLS (uses HTTPS) and passes `X-Forwarded-Proto: https` so FastAPI's `Secure` cookie flag works correctly. - **Beware of Header Spoofing.** `X-Forwarded-For` can contain multiple IPs (client, proxy1, proxy2). The leftmost IP is used as the original client. If an untrusted proxy is between the client and your BanGUI instance, attackers can still spoof headers. Always filter at the network level — only allow traffic from trusted proxies. ### IP Geolocation Resolution BanGUI resolves IP addresses to country codes and network organization information for ban analytics and geomapping. The geolocation system implements a **primary + fallback** resolution strategy to balance security and availability: 1. **Primary Resolver (MaxMind GeoLite2-Country):** All IP lookups first attempt resolution using a local MaxMind GeoLite2-Country MMDB database file (if available). The MMDB is downloaded offline and mounted into the container — no IP data is sent over the network. 2. **Fallback Resolver (ip-api.com HTTP):** If the MMDB is unavailable or returns no result, the system can fall back to the ip-api.com HTTP API. **This fallback must be explicitly enabled** and only sends unresolved IPs over HTTP. HTTP is disabled by default for security (to avoid sending IP addresses in cleartext). **Download & Configure MaxMind GeoLite2:** The MaxMind GeoLite2-Country MMDB requires a free account and license key. To set up the database: 1. **Create a free MaxMind account** at https://www.maxmind.com/en/geolite2/signup and download your license key. 2. **Download the GeoLite2-Country MMDB** using the provided script or manually from the MaxMind downloads page. 3. **Mount the MMDB into the BanGUI container** at a known path (e.g., `/data/GeoLite2-Country.mmdb`). 4. **Set `BANGUI_GEOIP_DB_PATH`** to the mounted path in your environment. Example Docker Compose configuration: ```yaml services: bangui: volumes: - ./GeoLite2-Country.mmdb:/data/GeoLite2-Country.mmdb:ro environment: BANGUI_GEOIP_DB_PATH: /data/GeoLite2-Country.mmdb ``` **Fallback to HTTP (Not Recommended):** If the MMDB cannot be mounted (e.g., in restricted environments), you can enable the HTTP fallback: ```yaml services: bangui: environment: BANGUI_GEOIP_ALLOW_HTTP_FALLBACK: "true" ``` **⚠️ Security Warning:** Enabling HTTP fallback causes unresolved IP addresses to be sent **unencrypted** to ip-api.com. This is a privacy and GDPR/CCPA concern. Only enable this if the MMDB absolutely cannot be provisioned, and understand the implications. **Data Structure:** The `GeoInfo` returned by the resolution system includes: - `country_code` (str | None): ISO 3166-1 alpha-2 country code (e.g., `"US"`, `"DE"`). - `country_name` (str | None): Human-readable country name (e.g., `"United States"`). - `asn` (str | None): Autonomous System Number (e.g., `"AS3320"`). Only populated when using the HTTP API; local MMDB lookups return `None`. - `org` (str | None): Organization name associated with the ASN. Only populated when using the HTTP API; local MMDB lookups return `None`. **Environment Variables:** ```bash BANGUI_GEOIP_DB_PATH=/data/GeoLite2-Country.mmdb # Path to MaxMind MMDB (primary) BANGUI_GEOIP_ALLOW_HTTP_FALLBACK="false" # Default: false (MMDB-only) BANGUI_GEOIP_ALLOW_HTTP_FALLBACK="true" # Enable HTTP fallback (not recommended) ``` **Caching & Performance:** - Resolved IPs are cached in-memory and persisted to SQLite for fast subsequent lookups. - Failed lookups are cached for 5 minutes to avoid hammering external APIs. - The background `geo_cache_flush` task (runs every 60 seconds) persists newly resolved entries to the database. - The background `geo_re_resolve` task (configurable schedule) periodically re-resolves stale entries to keep data fresh. - The background `geo_cache_cleanup` task (runs nightly) removes entries not referenced in the configured retention period (default: 90 days) to prevent unbounded database growth and maintain query performance. **Retention & Cleanup:** The `geo_cache` table tracks the last time each IP was referenced via a `last_seen` timestamp. Over time, as unique IPs accumulate, the table can grow very large, degrading query performance on every geo lookup. To manage this: - The `geo_cache_cleanup` background task runs once per day (default: midnight UTC). - It removes all entries where `last_seen` is older than the configured retention period (default: 90 days). - If a purged IP is encountered again after cleanup, it will be re-resolved from the MaxMind database or ip-api.com (if configured). - The retention period is controlled by the constant `GEO_CACHE_RETENTION_DAYS` in `backend/app/tasks/geo_cache_cleanup.py`. ### API Documentation Configuration The `enable_docs` setting controls whether FastAPI serves interactive API documentation at `/api/docs` (Swagger UI) and `/api/redoc` (ReDoc). **Default:** `false` — API documentation is disabled by default to prevent information disclosure in production. **When to Enable:** - Set `BANGUI_ENABLE_DOCS=true` in development and debugging environments only. - Never enable in production. Exposed API documentation reveals all endpoints, request/response schemas, and allows direct API invocation from the browser. **Environment Variables:** ```bash BANGUI_ENABLE_DOCS="true" # Enable docs in development BANGUI_ENABLE_DOCS="false" # Disable docs (default) # Unset # Defaults to false (production) ``` **Debug Compose File:** The `Docker/compose.debug.yml` sets `BANGUI_ENABLE_DOCS: "true"` for local development. Production compose files (`Docker/compose.prod.yml`) leave this unset, defaulting to `false`. **Middleware Allowlist:** The `SetupRedirectMiddleware` in `main.py` includes `/api/docs`, `/api/redoc`, and `/api/openapi.json` in its `_ALWAYS_ALLOWED` paths so documentation can be accessed before setup completes (if enabled). ### Log Path Validation & Allowlisting Authenticated users can instruct fail2ban to monitor additional log files through the API endpoint `POST /api/config/jails/{name}/logpath`. To prevent path-traversal attacks and unauthorized reads of sensitive system files, all requested log paths must resolve to locations within a configurable allowlist of safe directories. **Allowed Directories:** - Configured via the `BANGUI_ALLOWED_LOG_DIRS` environment variable (comma-separated list). - Defaults to: `["/var/log", "/config/log"]`. **Path Validation Rules:** 1. The requested path is resolved to its canonical form using `Path(log_path).resolve()`, which: - Expands relative paths to absolute paths. - Resolves symbolic links to their real targets. - Normalizes `.` and `..` components. 2. The resolved path is checked using `Path.is_relative_to()` against each allowed directory prefix. 3. If the resolved path is not relative to any allowed directory, a `ValueError` is raised with a descriptive error message. **Implementation:** - Validation occurs in the Pydantic model `AddLogPathRequest` using a `@field_validator`. - The validator runs at request time, before the service layer is invoked. - Symlinks that escape allowed directories are rejected (see [symlink bypass tests](../../backend/tests/test_models.py)). **Important:** Use `is_relative_to()`, not `startswith()` or string prefix matching. The latter is bypassable with paths like `/var/log_evil/file.log`. **Environment Variables:** ```bash BANGUI_ALLOWED_LOG_DIRS="/var/log,/config/log" # Default BANGUI_ALLOWED_LOG_DIRS="/var/log,/config/log,/home/app/logs" # Custom directory ``` ### Log Target Validation (fail2ban) The `log_target` field on the global config endpoint (`PUT /api/config/global`) is critical for security because fail2ban runs as root. Users can only set log targets to: 1. **Special values:** `STDOUT`, `STDERR`, `SYSLOG` (case-insensitive) 2. **File paths:** Must resolve to one of the configured allowed directories (same allowlist as log paths) **Why This Matters:** - fail2ban creates/opens files with root privileges. Without validation, an attacker could write to arbitrary system paths (e.g., `/etc/cron.d/malicious_script`). - Validation occurs at **both** the Pydantic model layer (`GlobalConfigUpdate.validate_log_target()`) **and** the service layer (`update_global_config()`) for defense in depth. - This prevents both HTTP and non-HTTP attack vectors. **Implementation:** ```python # Model layer: Automatic validation via @field_validator update = GlobalConfigUpdate(log_target="/etc/passwd") # Raises ValidationError → HTTP 422 # Service layer: Defense in depth await config_service.update_global_config(socket_path, update) # Validates again before sending to fail2ban ``` ### Login Rate Limiting The login endpoint (`POST /api/auth/login`) is protected against brute-force attacks using an in-memory exponential backoff rate limiter. **Design:** - Uses a `dict[str, deque[float]]` keyed by client IP, storing failed login timestamps within a time window. - Old failures outside the time window are automatically pruned during validation checks. - Expired IP entries are cleaned up to prevent unbounded memory growth. **Rate Limit Rules:** - **Exponential backoff:** Each failed login attempt incurs a progressively longer delay before the next attempt is allowed: - 1st failure: 1 × 2¹ = 2 seconds - 2nd failure: 1 × 2² = 4 seconds - 3rd failure: 1 × 2³ = 8 seconds - 4th+ failures: capped at 10 seconds (max) - Failed attempts that arrive during the backoff period return **HTTP 429 Too Many Requests** with a `Retry-After` header indicating the remaining wait time. - Each failed login is also accompanied by bcrypt password hashing (~100ms), providing additional computational resistance. - The backoff counter resets after the rate-limit window (60 seconds by default) expires with no new failures. **IP Extraction (Proxy Safety):** - When behind nginx, the rate limiter reads the real client IP from `X-Forwarded-For` or `X-Real-IP` headers. - Only trusts these headers when the immediate connection source is in a configured trusted proxy list. - Prevents attackers from spoofing these headers to bypass rate limits. - Falls back to the direct connection IP when proxy headers cannot be trusted. **Process-Local Limitation:** - The rate limiter is process-local (in-memory). In multi-worker deployments (e.g., Gunicorn with 4 workers), each worker maintains its own rate limit counter. - This is acceptable because the single-worker constraint is enforced elsewhere. See [TASK-002/003 notes](Instructions.md) for details. **Implementation:** - Rate limiter: `app.utils.rate_limiter.RateLimiter` - IP extraction: `app.utils.client_ip.get_client_ip()` - Dependency: `LoginRateLimiterDep` in `app.dependencies` ### Global Rate Limiting In addition to login-specific rate limiting, all API endpoints are protected by global per-IP rate limiting to prevent resource exhaustion, CPU spikes, and network bandwidth attacks from malicious or misconfigured clients. **Design:** - Uses a `dict[str, deque[float]]` keyed by client IP, storing request timestamps within a time window. - Implements a sliding-window algorithm: when an IP exceeds the limit, subsequent requests are blocked until the oldest request timestamp in the window expires. - Applied globally via middleware that runs on every request. - Respects the same IP extraction logic (trusted proxies) as login rate limiting. **Rate Limit Rules:** - **Default limit:** 200 requests per 60 seconds per IP. - Blocked requests return **HTTP 429 Too Many Requests** with a `Retry-After` header indicating the estimated seconds until the IP can retry. - The `Retry-After` value is dynamically calculated based on when the oldest request in the window will expire. - Different endpoints can be configured with different limits by adjusting the global rate limiter settings or using per-endpoint decorators (future enhancement). **IP Extraction (Proxy Safety):** - Same as login rate limiting: reads real client IP from `X-Forwarded-For` or `X-Real-IP` headers when the immediate connection is from a trusted proxy. - Falls back to direct connection IP when headers cannot be trusted. **Process-Local Limitation:** - The global rate limiter is process-local (in-memory), like the login rate limiter. - In single-worker deployments (enforced elsewhere), this is not a constraint. - Each worker in a multi-worker setup maintains independent counters, which is acceptable under the single-worker enforcement model. **Memory Management:** - Old request timestamps outside the rate-limit window are automatically pruned during validation checks. - A scheduled background task (`rate_limiter_cleanup` in `app.tasks.rate_limiter_cleanup`) runs every 30 minutes to remove dormant IPs from memory, preventing unbounded growth. **Implementation:** - Rate limiter: `app.utils.rate_limiter.GlobalRateLimiter` - Middleware: `app.middleware.rate_limit.RateLimitMiddleware` - IP extraction: `app.utils.client_ip.get_client_ip()` - Cleanup task: `app.tasks.rate_limiter_cleanup` (registered in `app.startup`) - Initialized in: `app.main.create_app()` and the lifespan handler --- ## 12. Authentication Endpoints #### Browser SPA (Cookie-Based) The **primary** authentication flow for the frontend is **cookie-based** and protects the session token from JavaScript access: 1. **Login (`POST /api/auth/login`)** - Accepts `LoginRequest` (password field) - Returns `LoginResponse` containing **only** `expires_at` (ISO 8601 UTC timestamp) - **Crucially:** The session token is **not** included in the JSON response body - Instead, the token is set as an **HttpOnly** `SameSite=Lax` cookie named `bangui_session` - Frontend automatically includes this cookie in all requests via `credentials: "include"` 2. **Why not return token in response body?** - Third-party JavaScript (analytics, ads, XSS injections) can intercept `fetch()` response bodies - If the token were in the response, malicious code could extract and store it in `localStorage` - An attacker could then use it via the `Authorization: Bearer ` header, bypassing the HttpOnly cookie protection - By returning **only** the expiry timestamp, we ensure the token stays exclusively in the HttpOnly cookie 3. **Session Validation (`GET /api/auth/session`)** - Frontend calls this on app mount to verify the session is still valid on the server - Works with both cookie and Bearer token authentication - Returns `{"valid": true}` if the session exists and is not expired - Returns **401 Unauthorized** if the session is invalid or expired 4. **Logout (`POST /api/auth/logout`)** - Revokes the session in the database - Clears the `bangui_session` cookie via `Set-Cookie` header - Works with both cookie and Bearer token authentication - Idempotent — calling without a session returns 200 without error #### Programmatic API Clients (Bearer Token) For non-browser clients (CLI tools, batch scripts, automation) that cannot use cookies, use the **Bearer token authentication path** by sending: ```http Authorization: Bearer ``` The token can be obtained by parsing the cookie from a login response or, in a future implementation, via a dedicated `POST /api/auth/token` endpoint (currently, these clients extract the token from cookies or use Bearer directly from the signed token value). **Note:** Bearer token authentication is not recommended for browser-based clients because: - Tokens must be stored somewhere (localStorage, sessionStorage, or request body) - All storage mechanisms are accessible to JavaScript and thus vulnerable to XSS - HttpOnly cookies provide better protection --- ## 13. Password Hashing The master password is hashed using **bcrypt** with an auto-generated salt. All password validation uses the models in `app.models.auth` and `app.models.setup`. ### The 72-Byte Bcrypt Limitation **Important:** bcrypt silently truncates all input at **72 bytes** before hashing. This means: - A user who sets a 100-character password is actually authenticated by only the first 72 bytes - Extra characters beyond 72 bytes provide **zero additional security** - An attacker who has reduced their search space to 72 bytes can brute-force the password more efficiently than intended **Solution:** Both password fields enforce a **maximum length of 72 bytes**: - `LoginRequest.password` — max 72 characters (enforced via Pydantic `Field(max_length=72)`) - `SetupRequest.master_password` — max 72 characters (enforced via Pydantic `Field(max_length=72)`) **Validation flow:** 1. Frontend → hashes password with SHA256 using `SubtleCrypto` before transmission 2. Backend receives SHA256 hash, validates length (≤ 72 bytes) 3. Backend → hashes with bcrypt using `run_blocking(bcrypt.hashpw)` to avoid event loop stall 4. Hash stored in SQLite `settings` table **If a password exceeds 72 bytes:** - Pydantic raises `ValidationError` with error code `string_too_long` - The router returns **HTTP 422 Unprocessable Entity** - The frontend should inform the user to choose a shorter password **Implementation:** - Models: `app.models.auth.LoginRequest`, `app.models.setup.SetupRequest` - Service layer: `app.services.auth_service._check_password()`, `app.services.setup_service.run_setup()` --- ## 15. File I/O Conventions All file write operations to critical configuration files must be **atomic** to prevent corruption if the process is killed mid-write. ### Atomic File Writes Configuration files (e.g., fail2ban jail configs in `jail.d/`) are essential for system operation. A truncated or corrupt config file can break fail2ban's ability to reload and may disable active protection. **Rule: Always use write-to-temp + atomic rename** Never use `Path.write_text()` or `file.write()` directly for critical files. Instead: 1. Create a temporary file in the **same directory** as the target (crucial for atomic `os.replace()`). 2. Write content to the temp file. 3. Atomically rename the temp file to replace the target. 4. Clean up the temp file if an error occurs. **Implementation Pattern:** ```python import os import tempfile from pathlib import Path target = Path("/path/to/config/file.conf") tmp_name: str | None = None try: # Create temp file in target's directory (same filesystem = atomic) with tempfile.NamedTemporaryFile( mode="w", encoding="utf-8", dir=target.parent, delete=False, suffix=".tmp", ) as tmp: tmp.write(content) tmp_name = tmp.name # Atomic rename (single syscall on POSIX systems) os.replace(tmp_name, target) except OSError as exc: # Clean up temp file on error with contextlib.suppress(OSError): if tmp_name is not None: os.unlink(tmp_name) raise ConfigWriteError(f"Cannot write config: {exc}") from exc ``` **Why this matters:** - `Path.write_text()` overwrites in place. If the process dies mid-write, the file is left truncated or partially written. - `os.replace()` is atomic on POSIX systems (single rename syscall) **only if source and target are on the same filesystem**. - Creating the temp file in `target.parent` ensures atomicity. - On Linux containers, this prevents config corruption and service degradation. **Atomic write helper:** A shared `atomic_write(path: Path, content: str)` helper is available in `app/services/config_file_helpers.py`. This is the preferred way to perform atomic writes — it handles all the temp file and cleanup logic: ```python from app.services.config_file_helpers import atomic_write atomic_write(path, updated_content) # Atomic write, auto-cleanup on error ``` **Files requiring atomic writes:** - All config files under `jail.d/` (created/modified by `_write_conf_file`, `_create_conf_file`, `set_jail_config_enabled`, and `write_jail_config_file`) - Any critical state files that fail2ban relies on **Examples in the codebase:** - `app/services/config_file_helpers.py`: `_write_conf_file`, `_create_conf_file`, `atomic_write` - `app/services/raw_config_io_service.py`: `set_jail_config_enabled`, `write_jail_config_file` - `app/services/jail_config_service.py`: `_write_local_file_sync`, `_restore_local_file_sync` --- ## 16. Git & Workflow - **Branch naming:** `feature/`, `fix/`, `chore/`. - **Commit messages:** imperative tense, max 72 chars first line (`Add jail reload endpoint`, `Fix ban history query`). - Every merge request must pass: ruff, type checker, all tests. - Do not merge with failing CI. - Keep pull requests small and focused — one feature or fix per PR. --- ## 17. Coding Principles These principles are **non-negotiable**. Every backend contributor must internalise and apply them daily. ### 17.1 Clean Code - Write code that **reads like well-written prose** — a new developer should understand intent without asking. - **Meaningful names** — variables, functions, and classes must reveal their purpose. Avoid abbreviations (`cnt`, `mgr`, `tmp`) unless universally understood. - **Small functions** — each function does exactly one thing. If you need a comment to explain a block inside a function, extract it into its own function. - **No magic numbers or strings** — use named constants. - **Boy Scout Rule** — leave every file cleaner than you found it. - **Avoid deep nesting** — prefer early returns (guard clauses) to keep the happy path at the top indentation level. ```python # Good — guard clause, clear name, one job async def get_active_ban(ip: str, jail: str) -> Ban: ban: Ban | None = await repo.find_ban(ip=ip, jail=jail) if ban is None: raise BanNotFoundError(ip=ip, jail=jail) if ban.is_expired(): raise BanExpiredError(ip=ip, jail=jail) return ban # Bad — nested, vague name async def check(ip, j): b = await repo.find_ban(ip=ip, jail=j) if b: if not b.is_expired(): return b else: raise Exception("expired") else: raise Exception("not found") ``` ### 17.2 Separation of Concerns (SoC) - Each module, class, and function must have a **single, well-defined responsibility**. - **Routers** → HTTP layer only (parse requests, return responses). - **Services** → business logic and orchestration. - **Repositories** → data access and persistence. - **Models** → data shapes and validation. - **Tasks** → scheduled background jobs. - Never mix layers — a router must not execute SQL, and a repository must not raise `HTTPException`. ### 17.3 Single Responsibility Principle (SRP) - A class or module should have **one and only one reason to change**. - If a service handles both ban management *and* email notifications, split it into `BanService` and `NotificationService`. ### 17.4 Don't Repeat Yourself (DRY) - Extract shared logic into utility functions, base classes, or dependency providers. - If the same block of code appears in more than one place, **refactor it** into a single source of truth. - But don't over-abstract — premature DRY that couples unrelated features is worse than a little duplication (see **Rule of Three**: refactor when something appears a third time). ### 17.5 KISS — Keep It Simple, Stupid - Choose the simplest solution that works correctly. - Avoid clever tricks, premature optimisation, and over-engineering. - If a standard library function does the job, prefer it over a custom implementation. ### 17.6 YAGNI — You Aren't Gonna Need It - Do **not** build features, abstractions, or config options "just in case". - Implement what is required **now**. Extend later when a real need emerges. ### 17.7 Dependency Inversion Principle (DIP) - High-level modules (services) must not depend on low-level modules (repositories) directly. Both should depend on **abstractions** (protocols / interfaces). - Use FastAPI's `Depends()` to inject implementations — this makes swapping and testing trivial. ```python from typing import Protocol class BanRepository(Protocol): async def find_ban(self, ip: str, jail: str) -> Ban | None: ... async def save_ban(self, ban: Ban) -> None: ... class SqliteBanRepository: """Concrete implementation — depends on aiosqlite.""" async def find_ban(self, ip: str, jail: str) -> Ban | None: ... async def save_ban(self, ban: Ban) -> None: ... ``` #### 13.7.1 Repository Module Pattern — Module-as-Protocol Structural Compatibility BanGUI uses **module-level functions** for repository implementations, not classes. Each repository module (e.g., `session_repo.py`, `blocklist_repo.py`) exports async functions that match the signatures defined in the Protocol interface in `protocols.py`. This is a **structural typing pattern** — mypy accepts the module as a valid Protocol implementation because the function signatures match, *even though* the module is not explicitly annotated as implementing the Protocol. This approach works correctly with FastAPI's dependency injection via `cast()`: ```python # In app/repositories/session_repo.py async def create_session(db: aiosqlite.Connection, token: str, created_at: str, expires_at: str) -> Session: """Insert a new session row.""" ... # In app/repositories/protocols.py class SessionRepository(Protocol): async def create_session( self, db: aiosqlite.Connection, token: str, created_at: str, expires_at: str, ) -> Session: ... # In app/dependencies.py async def get_session_repo() -> SessionRepository: """Provide the concrete session repository implementation.""" from app.repositories import session_repo return session_repo # ← mypy accepts this because the module has matching functions ``` **Why this pattern is used:** - **Simplicity** — no boilerplate class/instance wrapping. - **Compatibility** — Python's **structural typing** (PEP 544) means the module automatically satisfies the Protocol interface if function signatures match. - **Testability** — the same DIP principle applies; services depend on the Protocol, not the module directly, so tests can mock the Protocol. **Risks and mitigations:** - **Silent breakage if function signatures change** — If a parameter is added or removed from a module function, the module no longer satisfies the Protocol, but mypy does not flag this as an error because the module is loosely coupled. To prevent this, **Protocol signatures in `protocols.py` are the source of truth**. Always check that module functions match the Protocol definitions before merging changes. The CI/CD pipeline validates this compatibility at build time. **How the validation works (CI check):** - Before each deployment, run `mypy --strict` to ensure all dependency providers return values compatible with their Protocol types. - The `cast()` calls in `dependencies.py` are a documented signal that structural compatibility is being verified externally, not via explicit class inheritance. - Automated tests in `backend/tests/test_repositories/test_protocol_compliance.py` verify that each repository module implements all protocol methods, preventing silent protocol drift. #### 13.7.1.1 Repository Protocol Coverage Checklist All public repository functions must be defined in a corresponding Protocol. To add a new repository: 1. **Create the repository module** — `backend/app/repositories/my_repo.py` with async functions. 2. **Define the Protocol** — Add a `MyRepository(Protocol)` class in `backend/app/repositories/protocols.py` with methods matching every public function signature. 3. **Add imports** — If the Protocol uses custom return types, import them in `protocols.py`. 4. **Run compliance tests** — Execute `pytest backend/tests/test_repositories/test_protocol_compliance.py` to verify coverage. 5. **Verify type safety** — Run `mypy --strict backend/app/repositories/protocols.py` to ensure all types are correct. **Current repository protocol coverage** (all 7 repositories fully covered): - `SessionRepository` — 4 methods - `SettingsRepository` — 4 methods - `BlocklistRepository` — 6 methods - `ImportLogRepository` — 4 methods - `GeoCacheRepository` — 13 methods - `HistoryArchiveRepository` — 5 methods - `Fail2BanDbRepository` — 8 methods #### 13.7.2 Session Token Hashing — One-Way Protection Against Database Exposure Session tokens must be protected against database exposure. **Session tokens are stored as one-way SHA256 hashes in the database** to ensure that if the database file is compromised (volume mount misconfiguration, backup leak, etc.), the session tokens themselves cannot be directly used to hijack sessions. **Implementation pattern:** ```python import hashlib from typing import TYPE_CHECKING if TYPE_CHECKING: import aiosqlite from app.models.auth import Session def _hash_token(token: str) -> str: """Return the SHA256 hash of a session token.""" return hashlib.sha256(token.encode()).hexdigest() async def create_session( db: "aiosqlite.Connection", token: str, created_at: str, expires_at: str, ) -> Session: """Insert a new session row with the token hash.""" token_hash = _hash_token(token) cursor = await db.execute( "INSERT INTO sessions (token_hash, created_at, expires_at) VALUES (?, ?, ?)", (token_hash, created_at, expires_at), ) await db.commit() # Return the Session with the ORIGINAL token (not the hash) # so the service layer can sign and return it to the client. return Session( id=int(cursor.lastrowid) if cursor.lastrowid else 0, token=token, # ← raw token, not the hash created_at=created_at, expires_at=expires_at, ) async def get_session( db: "aiosqlite.Connection", token: str ) -> Session | None: """Look up a session by token hash.""" token_hash = _hash_token(token) async with db.execute( "SELECT id, token_hash, created_at, expires_at FROM sessions WHERE token_hash = ?", (token_hash,), ) as cursor: row = await cursor.fetchone() if row is None: return None # Return the Session with the INCOMING token (the one the client sent). return Session( id=int(row[0]), token=token, # ← the raw token passed in created_at=str(row[2]), expires_at=str(row[3]), ) ``` **Key points:** 1. **Hash on write** — When inserting a session, hash the token before storage. 2. **Hash on read** — When validating a session, hash the incoming token before the database lookup. 3. **Never store raw tokens** — The `token_hash` column contains only hashes; raw tokens are never persisted. 4. **Return raw tokens to the service layer** — The `Session` model's `token` field contains the raw token (for signing and response), not the hash. 5. **Database schema** — Use `token_hash TEXT NOT NULL UNIQUE` instead of `token TEXT NOT NULL UNIQUE`, and create an index on `token_hash`. 6. **Migration strategy** — When upgrading from plaintext to hashed tokens, drop the old table and recreate it. This invalidates all existing sessions, which is acceptable because the database was exposed in plaintext. **Why one-way hashing is safe:** - If an attacker obtains a token hash from the database, they cannot reverse the SHA256 hash to recover the original token. - The attacker cannot use the hash directly in a client request — they would need the original token to pass the hash check. - This forces the attacker to either compromise the client (where they'd also get the raw token) or perform a brute-force attack against the hash space (infeasible for random 128-bit tokens). **Never use symmetric encryption** — symmetric encryption stores a key in the database or environment, which merely shifts the exposure risk. A one-way hash is the correct choice for protecting tokens. #### 13.7.2a Session Token Signing Format — HMAC-SHA256 Integrity Protection **All session tokens sent to clients are signed using HMAC-SHA256.** The signed token format is: ``` . ``` where: - `` is a 16-byte (128-bit) random hex string generated by `secrets.token_hex(16)`. - `.` is the separator (defined in `app.utils.constants.SESSION_TOKEN_SIGNATURE_SEPARATOR`). - `` is the HMAC-SHA256 hex digest of `` using the configured `session_secret`. **Example:** `a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6.f7e6d5c4b3a2918f7e6d5c4b3a29180` **Signing and verification pattern:** ```python import hashlib import hmac def _session_token_signature(token: str, secret: str) -> str: """Return the HMAC-SHA256 signature for a session token.""" return hmac.new(secret.encode(), token.encode(), hashlib.sha256).hexdigest() def sign_session_token(token: str, secret: str) -> str: """Return a signed session token string for the client.""" return f"{token}.{_session_token_signature(token, secret)}" def unwrap_session_token(token: str, secret: str) -> str: """Verify and return the raw token from a signed session token. Raises ValueError if the token lacks a signature or signature is invalid. """ if "." not in token: raise ValueError("Invalid session token.") raw_token, signature = token.rsplit(".", 1) expected_signature = _session_token_signature(raw_token, secret) if not hmac.compare_digest(expected_signature, signature): raise ValueError("Invalid session token.") return raw_token ``` **Key points:** 1. **All tokens must be signed** — Tokens without a signature (no separator) are rejected immediately. 2. **Signature is mandatory** — The `unwrap_session_token()` function raises `ValueError` if the separator is absent. 3. **Use HMAC-SHA256** — Always use `hmac.compare_digest()` for signature verification to prevent timing attacks. 4. **Sign on login** — `login()` creates a raw token, stores it (hashed) in the database, then returns the signed token to the client. 5. **Verify on every request** — The `validate_session()` service verifies the signature by calling `unwrap_session_token()` with the `session_secret`, then looks up the raw token in the database. 6. **Session invalidation** — When upgrading from plaintext to signed tokens (TASK-022), all existing sessions must be invalidated because raw tokens will no longer be stored unencrypted. **Why HMAC signing is necessary:** - **Prevents token forgery** — An attacker cannot create a valid token without knowing the `session_secret`. - **Works alongside hashed storage** — Even if the database is compromised (plaintext before hashing), the attacker gets only the raw token, not a signed token. A raw token without a valid signature is rejected by `unwrap_session_token()`. - **Timing attack resistance** — `hmac.compare_digest()` compares signatures in constant time, preventing attackers from using timing differences to guess valid signatures. #### 13.7.3 Session Cache Pluggability — Process-Local vs. Shared Backends Session validation is expensive (SQLite lookup + password verification). To improve performance, **validated session tokens are cached** using the `SessionCache` interface (`app.utils.session_cache`). The default implementation, `InMemorySessionCache`, stores cached sessions in process-local memory. **Current implementation (single-worker):** ```python from app.utils.session_cache import SessionCache, InMemorySessionCache, NoOpSessionCache class SessionCache(Protocol): """Interface for session token validation cache backends.""" def get(self, token: str) -> Session | None: ... def set(self, token: str, session: Session, ttl_seconds: float) -> None: ... def invalidate(self, token: str) -> None: ... def clear(self) -> None: ... # Default in-memory implementation — PROCESS-LOCAL class InMemorySessionCache: def __init__(self) -> None: self._entries: dict[str, tuple[Session, float]] = {} ``` **Single-worker constraint:** `InMemorySessionCache` is **process-local** — each worker process has its own dict. In single-worker mode (enforced by TASK-002), this is safe and improves performance. In multi-worker deployments: - A logout by worker A clears the session from A's cache, but worker B still has it → logout doesn't work. - Enabling/disabling the cache requires restarting all workers to take effect. **Multi-worker solution:** To support multiple workers (future enhancement), implement a shared backend behind the same `SessionCache` Protocol: ```python # Example Redis implementation (not yet in codebase) class RedisSessionCache: """Session cache backed by Redis.""" def __init__(self, redis_url: str) -> None: self.client = aioredis.from_url(redis_url) async def get(self, token: str) -> Session | None: data = await self.client.get(f"session:{token}") return Session.model_validate_json(data) if data else None async def set(self, token: str, session: Session, ttl_seconds: float) -> None: await self.client.setex( f"session:{token}", int(ttl_seconds), session.model_dump_json() ) async def invalidate(self, token: str) -> None: await self.client.delete(f"session:{token}") async def clear(self) -> None: await self.client.flushdb() ``` To adopt a Redis backend: 1. Create `RedisSessionCache` in `app.utils.session_cache`. 2. Update `app.utils.runtime_state.set_runtime_settings()` to instantiate `RedisSessionCache` when `REDIS_URL` env var is set. 3. Update `app.config.Settings` to accept optional `REDIS_URL`. 4. Tests continue to use `InMemorySessionCache` (no Redis dependency in dev). **Implementation rules:** - All cache methods must be `async` (even if the backend is sync). - Never log session tokens or session data. - TTL must be respected — expired entries must be removed on access. - See `app/utils/session_cache.py` for the full Protocol definition and current implementations. ### 17.8 Composition over Inheritance - Favour **composing** small, focused objects over deep inheritance hierarchies. - Use mixins or protocols only when a clear "is-a" relationship exists; otherwise, pass collaborators as constructor arguments. ### 17.9 Fail Fast - Validate inputs as early as possible — at the API boundary with Pydantic, at service entry with assertions or domain checks. - Raise specific exceptions immediately rather than letting bad data propagate silently. ### 17.10 Law of Demeter (Principle of Least Knowledge) - A function should only call methods on: 1. Its own object (`self`). 2. Objects passed as parameters. 3. Objects it creates. - Avoid long accessor chains like `request.state.db.cursor().execute(...)` — wrap them in a meaningful method. ### 17.11 Defensive Programming - Never trust external input — validate and sanitise everything that crosses a boundary (HTTP request, file, socket, environment variable). - Handle edge cases explicitly: empty lists, `None` values, negative numbers, empty strings. - Use type narrowing and exhaustive pattern matching (`match` / `case`) to eliminate impossible states. ### 17.12 SSRF Prevention (Server-Side Request Forgery) When user-supplied URLs are fetched by the backend, validate them before making any HTTP requests: 1. **Use Pydantic's `AnyHttpUrl` type** to restrict schemes to `http://` and `https://` only. - Rejects `file://`, `ftp://`, `gopher://`, and other non-http schemes at the model boundary. 2. **Validate resolved IP addresses** before fetching: - Parse the hostname and resolve it via DNS (using `socket.getaddrinfo()`). - Use `ipaddress.ip_address().is_private` to reject private/reserved ranges: - RFC 1918: `10.0.0.0/8`, `172.16.0.0/12`, `192.168.0.0/16` - Loopback: `127.0.0.0/8`, `::1/128` - Link-local: `169.254.0.0/16`, `fe80::/10` - IPv6 site-local, multicast, and reserved ranges. - Raise `ValueError` if validation fails; let the router convert it to HTTP 400. 3. **Guard against DNS rebinding**: - Validate DNS at URL creation/validation time (performed during request deserialization). - For additional safety, re-validate the connection IP at HTTP client time (e.g., custom `aiohttp.TCPConnector` can inspect the resolved address during connect). 4. **Example implementation** (see `backend/app/utils/ip_utils.py`): - `is_private_ip(ip_str: str) → bool`: Checks if IP is private/reserved/loopback/link-local. - `async validate_blocklist_url(url: AnyHttpUrl) → None`: Async DNS resolution + private IP check. - Service layer calls `await validate_blocklist_url(url)` before persisting; router catches `ValueError` and returns 400. ## 18. Quick Reference — Do / Don't | Do | Don't | |---|---| | Type every function, variable, return | Leave types implicit | | Use `async def` for I/O | Use sync functions for I/O | | Validate with Pydantic at the boundary | Pass raw dicts through the codebase | | Log with structlog + context keys (INFO/WARNING/ERROR/DEBUG) | Use `print()` or format strings in logs | | Use `log.exception()` in catch-all handlers (captures traceback) | Use `log.error()` for exceptions; let exceptions get lost | | Write tests for every feature | Ship untested code | | Use `aiohttp` for HTTP calls | Use `requests` | | Handle errors with custom exceptions | Use bare `except:` | | Keep routers thin, logic in services | Put business logic in routers | | Use `datetime.now(datetime.UTC)` | Use naive datetimes | | Run ruff + mypy before committing | Push code that doesn't pass linting | | Keep GET endpoints read-only (no `db.commit()`) | Call `db.commit()` / INSERT inside GET handlers | | Batch DB writes; issue one `db.commit()` after the loop | Commit inside a loop (1 fsync per row) | | Use `executemany()` for bulk inserts | Call `execute()` + `commit()` per row in a loop |