Files
BanGUI/Docs/Backend-Development.md
Lukas f9e283541b Add explicit database transaction isolation to multi-step operations
This commit addresses race conditions in multi-step database operations by:

1. Wrap write operations in BEGIN IMMEDIATE ... COMMIT transactions:
   - import_run_repo: create_pending, mark_completed, mark_failed
   - geo_cache_repo: all upsert_*_and_commit functions
   - geo_cache_repo: bulk_upsert_entries_and_neg_entries_and_commit

2. Handle concurrent write collisions gracefully:
   - import_run_repo.create_pending can now raise IntegrityError
   - blocklist_import_workflow catches IntegrityError and retries lookup
   - Logs 'blocklist_import_lost_race' event when another request wins the race

3. Add comprehensive documentation:
   - Backend-Development.md § 6.3 Database Transactions
   - Explains when to use BEGIN IMMEDIATE
   - Shows transaction pattern with try-except-rollback
   - Documents race condition error handling pattern

The solution leverages SQLite's UNIQUE constraint for data integrity while
handling the concurrent case gracefully in application logic. This is more
efficient than using BEGIN EXCLUSIVE which would serialize all writers.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-30 22:04:15 +02:00

139 KiB
Raw Blame History

Backend Development — Rules & Guidelines

Rules and conventions every backend developer must follow. Read this before writing your first line of code.


1. Language & Typing

  • Python 3.12+ is the minimum version.
  • Every function, method, and variable must have explicit type annotations — no exceptions.
  • Use str, int, float, bool, None for primitives.
  • Use list[T], dict[K, V], set[T], tuple[T, ...] (lowercase, built-in generics) — never typing.List, typing.Dict, etc.
  • Use T | None instead of Optional[T].
  • Use TypeAlias, TypeVar, Protocol, and NewType when they improve clarity.
  • Return types are mandatory — including -> None.
  • Never use Any unless there is no other option and a comment explains why.
  • Run mypy --strict (or pyright in strict mode) — the codebase must pass with zero errors.
# Good
def get_jail_by_name(name: str) -> Jail | None:
    ...

# Bad — missing types
def get_jail_by_name(name):
    ...

2. Core Libraries

Purpose Library Notes
Web framework FastAPI Async endpoints only.
Data validation & settings Pydantic v2 All request/response bodies and config models.
Async HTTP client aiohttp (ClientSession) For external calls (blocklists, IP lookups).
Scheduling APScheduler 4.x (async) Blocklist imports, periodic health checks.
Structured logging structlog Every log call must use structlog — never print() or logging directly.
Database aiosqlite Async SQLite access for the application database.
Testing pytest + pytest-asyncio + httpx (AsyncClient) Every feature needs tests.
Mocking unittest.mock / pytest-mock Isolate external dependencies.
Date & time datetime (stdlib) — always timezone-aware Use datetime.datetime.now(datetime.UTC). Never naive datetimes.
IP / Network ipaddress (stdlib) Validate and normalise IPs and CIDR ranges.
Environment / config pydantic-settings Load .env and environment variables into typed models.
fail2ban integration fail2ban client (bundled) Use the local copy at ./fail2ban-master. Import from ./fail2ban-master/fail2ban/client to communicate with the fail2ban socket. Do not install fail2ban as a pip package.

fail2ban Client Usage

The repository ships with a vendored copy of fail2ban located at ./fail2ban-master. All communication with the fail2ban daemon must go through the client classes found in ./fail2ban-master/fail2ban/client. Add the project root to sys.path (or configure it in pyproject.toml as a path dependency) so that from fail2ban.client ... resolves to the bundled copy.

import sys
from pathlib import Path

# Ensure the bundled fail2ban is importable
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "fail2ban-master"))

from fail2ban.client.csocket import CSSocket  # noqa: E402

Libraries you must NOT use

  • requests — use aiohttp (async).
  • flask — we use FastAPI.
  • celery — we use APScheduler.
  • print() for logging — use structlog.
  • json.loads / json.dumps on Pydantic models — use .model_dump() / .model_validate().

Timestamp Handling

Timestamp consistency is critical for accurate ban history queries across the dashboard and history endpoints. Follow these rules:

Rule 1: Use consistent UTC timestamps

  • All timestamps in the database are stored as Unix epochs (seconds since 1970-01-01 UTC).
  • fail2ban stores timestamps using time.time(), which is always UTC epoch seconds.
  • When querying fail2ban's SQLite database by timestamp, use app.utils.time_utils.since_unix() (not manual datetime calculations).

Rule 2: Time-range windows include a 60-second slack

  • The since_unix() function includes a 60-second slack window (TIME_RANGE_SLACK_SECONDS in app.utils.constants).
  • This slack accommodates:
    • Clock drift between the local system and fail2ban.
    • Test seeding delays when timestamps are manually set to exact boundaries.
  • The slack ensures that dashboard and history queries return consistent row counts for the same time range.

Rule 3: Never duplicate timestamp calculation logic

  • All services that query by time range must import and use since_unix().
  • Do not recalculate timestamps locally using datetime or time modules in service code.
  • If you need a timestamp for a time range, use since_unix().

Example:

from app.utils.time_utils import since_unix

# Get all bans from the last 24 hours (with 60-second slack)
since_ts: int = since_unix("24h")
rows = await db.execute(
    "SELECT * FROM bans WHERE timeofban >= ?",
    (since_ts,)
)

Database Performance & Indexing

Large archive datasets can degrade query performance without proper indexing. The history_archive table supports multiple filter patterns:

Query Patterns (Indexed for Performance):

  1. MAX(timeofban)history_sync_task queries for the latest timestamp to know where to resume syncing from fail2ban. This is a covering index lookup.

  2. Jail filter with time ordering — Dashboard and API endpoints filter by jail and sort by timeofban DESC for pagination. This is accelerated by idx_history_archive_jail_timeofban.

  3. Time-range filter — Queries filter by timeofban >= since to fetch recent records. This uses the composite index idx_history_archive_timeofban_jail_action which includes timeofban as the leading column for efficient range scans.

  4. IP filter — Users can search by exact IP or IP prefix (using LIKE ip%). The idx_history_archive_ip index accelerates these searches.

  5. Action filter — Queries may filter by action ('ban' or 'unban'). The idx_history_archive_action index supports this.

  6. Purge old entries — Background tasks delete entries older than a threshold (timeofban < cutoff). This uses idx_history_archive_timeofban_jail_action.

Current Indexes (defined in backend/app/db.py Migration 5):

  • idx_history_archive_jail_timeofban(jail, timeofban DESC) — Composite index for jail-filtered queries.
  • idx_history_archive_timeofban_jail_action(timeofban DESC, jail, action) — Covering index for time-range queries and MAX lookups.
  • idx_history_archive_ip(ip) — Single-column index for IP searches.
  • idx_history_archive_action(action) — Single-column index for action filtering.

Benchmark Results:

Query benchmarks (see backend/tests/test_repositories/test_history_archive_indexing.py) verify that common operations complete within expected thresholds on datasets with 10,000+ records:

Operation Time Budget Actual (with indexes)
MAX(timeofban) <0.01s ✓ Uses covering index
Count with jail filter <0.10s ✓ Covering index scan
List with jail + order <0.05s ✓ Index fully utilized
Time-range filter <0.05s ✓ Range scan on timeofban
Combined filters <0.05s ✓ Composite indexes used

Adding New Indexes:

If you add new query patterns to history_archive_repo.py:

  1. Analyze the WHERE and ORDER BY clauses — Identify which columns are filtered and sorted.
  2. Check EXPLAIN QUERY PLAN in a local test:
    async with db.execute("EXPLAIN QUERY PLAN SELECT ...") as cur:
        rows = await cur.fetchall()
        for row in rows: print(row[3])  # Print the plan text
    
  3. If the plan shows a full table scan, add an index that matches the filter columns in order.
  4. Create a migration in backend/app/db.py following the pattern from Migration 5.
  5. Add a benchmark test to verify the new index improves query performance.

Index Tradeoffs:

  • Pros: Faster SELECT queries, reduced CPU during queries.
  • Cons: Slower INSERT/UPDATE/DELETE (indexes must be maintained), larger database file size.

For history_archive, the read-heavy workload justifies these indexes because:

  • Inserts are batched during sync (one batch per minute), not per-request.
  • Deletes happen once per day during purge.
  • SELECT queries run on every API request to the history endpoint.

3. Project Structure

backend/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI app factory, lifespan
│   ├── config.py            # Pydantic settings
│   ├── dependencies.py      # FastAPI dependency providers
│   ├── models/              # Pydantic schemas (request, response, domain)
│   ├── routers/             # FastAPI routers grouped by feature
│   ├── services/            # Business logic — one service per domain
│   ├── repositories/        # Database access layer
│   ├── tasks/               # APScheduler jobs
│   └── utils/               # Helpers, constants, shared types
├── tests/
│   ├── conftest.py
│   ├── test_routers/
│   ├── test_services/
│   └── test_repositories/
├── pyproject.toml
└── .env.example
  • Routers receive requests, validate input via Pydantic, and delegate to services.
  • Services contain business logic and call repositories or external clients.
  • Repositories handle raw database queries — nothing else.
  • Never put business logic inside routers or repositories.

Service Dependencies and Injection

Services should never directly import other services to avoid hidden coupling and make testing harder. Instead:

  1. Define clear service interfaces using Protocol classes in app/services/protocols.py.
  2. Make dependencies explicit by passing them as function parameters with optional defaults.
  3. Use lazy imports for fallback singletons (not at module level).
  4. Inject services via FastAPI dependencies when called from routers.

Example: The history_service depends on Fail2BanMetadataService to resolve the fail2ban database path:

# Good — dependency passed as parameter
async def list_history(
    socket_path: str,
    fail2ban_metadata_service: Fail2BanMetadataService | None = None,
) -> HistoryListResponse:
    if fail2ban_metadata_service is None:
        # Lazy import fallback for backward compatibility
        from app.services.fail2ban_metadata_service import default_fail2ban_metadata_service
        fail2ban_metadata_service = default_fail2ban_metadata_service
    ...

Routers inject the service dependency explicitly:

from app.dependencies import Fail2BanMetadataServiceDep

@router.get("/api/history")
async def get_history(
    fail2ban_metadata_service: Fail2BanMetadataServiceDep,
) -> HistoryListResponse:
    return await history_service.list_history(
        socket_path,
        fail2ban_metadata_service=fail2ban_metadata_service,
    )

This pattern prevents circular imports, makes services testable, and allows easy mocking in tests.

Mutable Runtime State

All mutable runtime state (state that changes during the application's lifetime) must be stored in RuntimeState defined in app/utils/runtime_state.py. This centralizes state management, prevents accidental global mutable variables, and makes state management testable and synchronization-safe.

Allowed locations for mutable state:

  1. RuntimeState fields — Core application state (e.g., server_status, last_activation, pending_recovery, runtime_settings). Managed through dedicated functions (e.g., record_activation(), clear_pending_recovery()).
  2. Nested service state — Service-specific mutable state (e.g., JailServiceState for jail capability detection cache) is nested within RuntimeState as a field. Services receive their state via dependency injection.
  3. Controlled via dependencies — State is injected into services and routers using FastAPI Depends(). This ensures single-source-of-truth and testability.

Example — jail_service state management:

# Define service-specific state (in app/utils/runtime_state.py)
@dataclass
class JailServiceState:
    backend_cmd_supported: bool | None = None
    backend_cmd_lock: asyncio.Lock | None = None

# Nested in RuntimeState
@dataclass
class RuntimeState:
    jail_service_state: JailServiceState = field(default_factory=JailServiceState)
    ...

# Injected into services via dependency
async def list_jails(socket_path: str, state: JailServiceState) -> JailListResponse:
    backend_cmd_is_supported = await _check_backend_cmd_supported(client, name, state)
    ...

# Routers inject state through FastAPI dependencies
@router.get("/api/jails")
async def get_jails(state: JailServiceStateDep) -> JailListResponse:
    return await jail_service.list_jails(socket_path, state)

Why: Centralizing mutable state prevents race conditions, makes concurrency boundaries explicit, simplifies testing (each test gets a fresh state object), and prepares for multi-worker deployments (shared state would need to be extracted to Redis, database, or shared memory).


4. FastAPI Conventions

  • Use async def for every endpoint — no sync endpoints.

  • Every endpoint must declare explicit response models (response_model=...).

  • Use Pydantic models for request bodies and query parameters — never raw dicts.

  • Use Depends() for dependency injection (database sessions, services, auth).

  • Group endpoints into routers by feature domain (routers/jails.py, routers/bans.py, …).

  • Use appropriate HTTP status codes: 201 for creation, 204 for deletion with no body, 404 for not found, etc.

  • Protected endpoints should return 401 Unauthorized or 403 Forbidden when the session is invalid or expired; the frontend treats these responses as a session-expiry event and redirects the user to /login.

  • Use HTTPException or custom exception handlers — never return error dicts manually.

  • All successful responses must use a standardized Pydantic response model. List and collection endpoints should wrap data in items, total, and optional pagination metadata. Detail endpoints must expose a single domain object under a named field (for example jail, status, or settings). Command endpoints must use a CommandResponse-style wrapper with message and success.

  • GET endpoints are read-only — never call db.commit() or execute INSERT/UPDATE/DELETE inside a GET handler. If a GET path produces side-effects (e.g., caching resolved data), that write belongs in a background task, a scheduled flush, or a separate POST endpoint. Users and HTTP caches assume GET is idempotent and non-mutating.

    # Good — pass db=None on GET so geo_service never commits
    result = await geo_service.lookup_batch(ips, http_session, db=None)
    
    # Bad — triggers INSERT + COMMIT per GET inside a GET handler
    result = await geo_service.lookup_batch(ips, http_session, db=app_db)
    

OpenAPI Schema Synchronization

Critical: The OpenAPI schema is the single source of truth for frontend types. When you add, modify, or remove endpoints or response models:

  1. FastAPI automatically updates the schema based on your Pydantic models and endpoint definitions.
  2. The frontend regenerates types from the schema on every build: npm run generate:types.
  3. Ensure your Pydantic models are accurate — they are directly serialized into the schema and used to generate frontend types.
  4. Test type generation locally before committing:
    cd frontend
    npm run generate:types    # Generates src/types/generated.ts
    npm run build             # Build should succeed if types match
    
  5. The backend must be running for type generation to work (the tool fetches /api/openapi.json).
  6. Commit generated types alongside backend changes — they must always be in sync.

Never:

  • Manually edit src/types/generated.ts on the frontend — regenerate from the schema instead.
  • Commit backend changes without ensuring the frontend can regenerate types.
  • Assume the OpenAPI schema is correct — validate your Pydantic model's field descriptions and types are as intended.
from fastapi import APIRouter, Depends, HTTPException, status
from app.models.jail import JailResponse, JailListResponse
from app.services.jail_service import JailService

router: APIRouter = APIRouter(prefix="/api/jails", tags=["Jails"])

@router.get("/", response_model=JailListResponse)
async def list_jails(service: JailService = Depends()) -> JailListResponse:
    jails: list[JailResponse] = await service.get_all_jails()
    return JailListResponse(jails=jails)

Dependency Layering: Enforcing the Repository Boundary

The repository boundary separates database-aware code from application logic. This is enforced through dependency injection.

For a complete overview of BanGUI's DI pattern, including the composition root, service wiring, and lifecycle management, see Architekture.md § 2.3 Dependency Wiring and Service Composition.

Layer Responsibilities Dependencies
Routers Receive requests, validate input, return responses. Service context dependencies (SessionServiceContextDep, BlocklistServiceContextDep), settings, auth. Never raw database connections.
Services Contain business logic, orchestrate operations. Other services, repositories. May receive aiosqlite.Connection for repository operations.
Repositories Execute all SQL queries. All database knowledge lives here. aiosqlite.Connection (from callers).

Rule: Routers must NOT depend on DbDep (raw database connections).

Instead, routers should:

  1. Depend on service context dependencies like SessionServiceContextDep, BlocklistServiceContextDep, etc.
  2. These context dependencies combine the database connection and related repositories.
  3. Pass the context to services, which internally orchestrate database operations.

Service Context Dependencies Available:

  • SessionServiceContextDep — Contains db and session_repo for session operations.
  • BlocklistServiceContextDep — Contains db, blocklist_repo, import_log_repo, settings_repo.
  • SettingsServiceContextDep — Contains db and settings_repo.
  • BanServiceContextDep — Contains db and fail2ban_db_repo.
  • HistoryServiceContextDep — Contains db, fail2ban_db_repo, history_archive_repo.

Why:

  • Enforcement: Not exporting DbDep from the dependencies module makes it impossible for routers to accidentally bypass repositories.
  • Clarity: Service context dependencies explicitly declare which database operations a router needs.
  • Testability: Services and routers are easier to test when they depend on repositories (which can be mocked) rather than raw connections.

Example:

# ✅ GOOD — router depends on service context
@router.post("/login")
async def login(
    body: LoginRequest,
    response: Response,
    session_ctx: SessionServiceContextDep,  # Contains db + session_repo
    _auth: AuthDep,
) -> LoginResponse:
    return await auth_service.login(
        session_ctx.db,
        password=body.password,
        session_repo=session_ctx.session_repo,
        ...
    )

# ❌ BAD — router depends on raw db (DbDep is not exported for this reason)
@router.post("/login")
async def login(
    body: LoginRequest,
    db: DbDep,  # ← Cannot import DbDep in routers
    _auth: AuthDep,
) -> LoginResponse:
    return await auth_service.login(db, password=body.password, ...)

DEPRECATED: DbDep

  • The DbDep type alias is provided for backward compatibility only.
  • DO NOT use in new code. Use service context dependencies instead.
  • See backend/app/dependencies.py for available service contexts.

4.1 API Response Envelope Policy

All API responses must follow a consistent wrapper pattern. This standardization reduces frontend branching logic, prevents integration bugs, and makes the API easier to document and maintain.

Response Patterns

Pattern 1: Paginated Lists

Use PaginatedListResponse[T] for endpoints that return paginated collections:

from app.models.response import PaginatedListResponse

class JailListResponse(PaginatedListResponse[JailSummary]):
    """Response for ``GET /api/jails``."""
    pass

# Returns:
{
  "items": [...],      # T[]
  "total": 100,        # int: total items across all pages
  "page": 2,           # int: current page (1-based)
  "page_size": 20      # int: items per page
}

When to use: Endpoints that support pagination parameters (page, page_size, limit, offset).

Pattern 2: Non-Paginated Collections

Use CollectionResponse[T] for endpoints that return a complete collection without pagination:

from app.models.response import CollectionResponse

class JailConfigListResponse(CollectionResponse[JailConfig]):
    """Response for ``GET /api/config/jails``."""
    pass

# Returns:
{
  "items": [...],      # T[]
  "total": 42          # int: total items
}

When to use: Endpoints that return a complete collection (not paginated). The frontend can render all items without worrying about paging.

Pattern 3: Single-Item Detail Responses

Use domain-specific field names (not generic wrappers) for detail endpoints:

class JailDetailResponse(BaseModel):
    """Response for ``GET /api/jails/{name}``."""
    jail: Jail
    ignore_list: list[str]
    ignore_self: bool

# Returns:
{
  "jail": { ... },              # Jail object
  "ignore_list": [...],         # Additional context
  "ignore_self": true
}

When to use: Endpoints that fetch a single entity. Use the entity name as the field (jail, status, settings, etc.).

Field naming:

  • Primary entity uses its own name: jail, status, settings, etc.
  • Related or supplementary data uses descriptive names: ignore_list, warnings, metadata, etc.

Pattern 4: Command/Action Responses

Use CommandResponse for endpoints that execute commands:

from app.models.response import CommandResponse

class JailCommandResponse(CommandResponse):
    """Generic response for jail control commands."""
    jail: str  # Target identifier (optional)

# Returns:
{
  "message": "Jail 'sshd' started.",
  "success": true,
  "jail": "sshd"  # Optional: target identifier
}

When to use: POST/PUT/DELETE endpoints that perform operations (start jail, ban IP, update config, etc.).

Fields:

  • message: str — Human-readable result or error description.
  • success: bool — Operation succeeded (default: true). Use false for non-exception error handlers.
  • Optional domain-specific fields (jail, ip, etc.) to identify the affected resource.

Pattern 5: Aggregation Responses

Use domain-specific field names for aggregated data:

class BansByJailResponse(BaseModel):
    """Response for ``GET /api/dashboard/bans/by-jail``."""
    jails: list[JailBanCount]    # Aggregated per-jail data
    total: int                    # Total count across all jails

class BansByCountryResponse(BaseModel):
    """Response for ``GET /api/dashboard/bans/by-country``."""
    countries: dict[str, int]     # Country code → count
    country_names: dict[str, str] # Country code → name
    bans: list[DashboardBanItem]  # Full list for rendering companion table
    total: int                     # Total ban count

# Returns:
{
  "jails": [ { "jail": "sshd", "count": 42 }, ... ],
  "total": 500
}

When to use: Endpoints that return computed/aggregated data. Use field names that reflect the data (jails, countries, buckets, etc.).

Summary Table

Pattern Used for Field Names Example
PaginatedListResponse Paginated collections items, total, page, page_size GET /api/dashboard/bans
CollectionResponse Complete collections items, total GET /api/config/jails
Detail Response Single entity + metadata Entity name + descriptors GET /api/jails/{name}
CommandResponse Action results message, success + optional identifiers POST /api/jails/{name}/start
Aggregation Response Computed data Domain-specific names GET /api/dashboard/bans/by-jail

Rules

  1. Always wrap lists in items field — Consistency aids frontend parsing.

    • { "items": [...], "total": 100 }
    • { "jails": [...], "total": 100 } (for list endpoints; OK for aggregations)
  2. Aggregation responses are exceptions — They use domain-specific field names because the data represents computed results, not a simple list.

    • { "countries": {...}, "jails": [...], "total": 100 }
  3. Every response with >1 item must include total — Enables frontend to understand scale.

  4. Paginated responses must include page and page_size — Enables the frontend to render pagination controls.

  5. No ad-hoc wrapper objects — Don't invent new response shapes. Use the patterns above.

Standardized Pagination Query Parameters

All paginated endpoints follow a consistent query parameter contract:

Parameter Type Constraints Default Notes
page int ≥ 1 1 1-based page number (not 0-based offset).
page_size int 1500 100 Items per page. Clients may request smaller pages for UI reasons.

Implementation:

from fastapi import Query
from app.utils.constants import DEFAULT_PAGE_SIZE

@router.get("/items")
async def get_items(
    page: int = Query(default=1, ge=1, description="1-based page number."),
    page_size: int = Query(
        default=DEFAULT_PAGE_SIZE,
        ge=1,
        le=500,
        description="Items per page (max 500).",
    ),
):
    # Compute offset for database query
    offset = (page - 1) * page_size
    items = await db.fetch("SELECT * FROM items LIMIT ? OFFSET ?", page_size, offset)
    total = await db.fetchval("SELECT COUNT(*) FROM items")

    return PaginatedListResponse(
        items=items,
        total=total,
        page=page,
        page_size=page_size,
    )

Helper functions are available in app.utils.pagination:

from app.utils.pagination import get_offset, compute_total_pages

# Calculate database offset from page and page_size
offset = get_offset(page, page_size)  # Equivalent to (page - 1) * page_size

# Calculate total pages for rendering pagination UI (optional)
total_pages = compute_total_pages(total, page_size)

Rules:

  1. Use 1-based pages — Not 0-based offsets. Page 1 is always the first page.
  2. Always provide defaults — Use DEFAULT_PAGE_SIZE (100) and initial page 1.
  3. Cap maximum page_size at 500 — Prevents accidental DoS from enormous requests.
  4. Respond with PaginatedListResponse[T] — Must include items, total, page, page_size.
  5. Never include total_pages in responses — The frontend can calculate it as Math.ceil(total / page_size).

4.2 Error Response Schema

All error responses use a consistent machine-readable format that enables frontend code to branch reliably on error conditions without string-parsing error detail text.

Error Response Format

Every non-2xx HTTP response body is a JSON object with this structure:

{
  "code": "jail_not_found",
  "detail": "Jail 'example' not found",
  "metadata": {
    "jail_name": "example"
  }
}

Fields:

  • code (string, required): Machine-readable error code for client-side branching. Examples: jail_not_found, rate_limit_exceeded, authentication_required.
  • detail (string, required): Human-readable error message. Safe for displaying to users.
  • metadata (object, optional): Structured context data relevant to the error. Only includes data safe for client consumption (no sensitive internal state). Examples: offending parameter names, resource identifiers, time windows.

Exception Hierarchy & Error Codes

All domain exceptions inherit from DomainError (defined in backend/app/exceptions.py) and are organized by HTTP status category:

HTTP Status Category Class Error Codes Use Case
404 NotFoundError not_found, jail_not_found, filter_not_found, action_not_found, config_file_not_found, blocklist_source_not_found, history_not_found Requested resource does not exist
400 BadRequestError invalid_input, config_validation_failed, config_operation_failed, jail_name_invalid, filter_name_invalid, action_name_invalid, config_file_name_invalid, filter_invalid_regex Invalid input, validation failure, malformed request
409 ConflictError conflict, jail_operation_failed, jail_already_active, jail_already_inactive, jail_not_in_config, action_already_exists, filter_already_exists, config_file_exists State conflict, resource already exists, invalid state transition
500 OperationError operation_failed, config_write_failed, config_file_write_failed, server_operation_failed, fail2ban_protocol_error Operation failure, write errors, unexpected failures
503 ServiceUnavailableError service_unavailable, config_dir_unavailable, fail2ban_unreachable Infrastructure/external service issues, temporary unavailability
401 AuthenticationError authentication_required Authentication or authorization failure, invalid/expired credentials
429 RateLimitError rate_limit_exceeded Rate limit exceeded, too many requests

Implementing Error Handlers

Every exception category has a corresponding exception handler registered in backend/app/main.py. When a domain exception is raised:

  1. FastAPI's exception handling middleware catches it.
  2. The registered handler converts it to an ErrorResponse with HTTP status code.
  3. The response is serialized as JSON with code, detail, and metadata fields.

Pattern for service code:

from app.exceptions import JailNotFoundError, ConfigValidationError

async def get_jail(name: str) -> Jail:
    """Raises JailNotFoundError if jail not found."""
    jail = await db.fetchone("SELECT * FROM jails WHERE name = ?", (name,))
    if jail is None:
        raise JailNotFoundError(name)  # HTTP 404, code='jail_not_found'
    return jail

async def apply_config(config: JailConfig) -> None:
    """Raises ConfigValidationError if invalid."""
    if not config.filter_name:
        raise ConfigValidationError("filter_name is required")  # HTTP 400, code='config_validation_failed'

Adding New Exception Types

  1. Choose the appropriate category based on the HTTP status (NotFoundError for 404, BadRequestError for 400, etc.).
  2. Create a subclass in backend/app/exceptions.py:
class MySpecificError(BadRequestError):
    """Raised when X happens."""
    
    error_code: str = "my_specific_error"
    
    def __init__(self, detail_msg: str, **context) -> None:
        self.context = context
        super().__init__(detail_msg)
    
    def get_error_metadata(self) -> dict[str, str | int | float | bool | None]:
        """Return only safe, relevant metadata."""
        return {k: v for k, v in self.context.items() if k in ("offending_value", "constraint")}
  1. Use explicit error codes — Don't derive them from the class name. This makes them self-documenting and prevents breakage on class renames.
  2. Implement get_error_metadata() — Return only data safe for client consumption. Never leak internal state, file paths, or system details.
  3. Raise from service code — Never from repositories or utils. Exceptions represent business logic violations, not infrastructure errors.

Exception Handler Hierarchy

All domain exceptions are automatically converted to ErrorResponse via handlers registered in backend/app/main.py. The handler registration order is critical:

# Handlers are registered from most specific to least specific:
1. Network errors (Fail2BanConnectionError, etc.)  HTTP 502
2. Auth/rate errors (AuthenticationError, RateLimitError)  HTTP 401/429
3. Category handlers (NotFoundError, BadRequestError, ConflictError, etc.)  HTTP 404/400/409/500/503
4. DomainError catch-all  HTTP 500  # ← Catches unregistered DomainError subclasses
5. HTTPException (FastAPI built-ins)  HTTP varies
6. ValueError (Pydantic validation)  HTTP 400
7. Exception catch-all  HTTP 500  # ← Absolute last resort

Important: The DomainError catch-all handler (step 4) is the safety net. If you add a new DomainError subclass without placing it in a category (e.g., class MyError(DomainError) instead of class MyError(BadRequestError)), it will still get the correct error_code and metadata via this handler instead of silently falling through to the generic exception handler.

Critical caveat: Every new DomainError subclass must:

  • Define an error_code class attribute (e.g., error_code: str = "my_error")
  • Override get_error_metadata() if it needs to return context data
  • Inherit from the appropriate category (NotFoundError, BadRequestError, ConflictError, OperationError, or ServiceUnavailableError)

If you forget to implement error_code or get_error_metadata(), the fallback to parent class implementations will produce misleading error codes and empty metadata — check your tests!

What NOT to do:

  • Don't raise HTTPException from service code (bypass the ErrorResponse format).
  • Don't put sensitive information in metadata (database paths, SQL, internal IDs).
  • Don't derive error codes from class names using regex (fragile and non-self-documenting).
  • Don't create a DomainError subclass without a category (always inherit from one of the seven categories)

Frontend Error Parsing

The frontend ApiError class parses error responses automatically:

import { api } from "src/api/client";

try {
  const jail = await api.get("/jails/example");
} catch (error) {
  if (error instanceof ApiError) {
    const code = error.errorResponse?.code;
    
    if (code === "jail_not_found") {
      // Handle not found
      console.log("Jail does not exist:", error.errorResponse?.metadata?.jail_name);
    } else if (code === "rate_limit_exceeded") {
      // Handle rate limit
      showRateLimitModal();
    } else if (code === "authentication_required") {
      // Handle auth — the frontend framework auto-redirects to /login
      redirectToLogin();
    }
  }
}

The errorResponse field contains the parsed error object with code, detail, and metadata fields, enabling reliable machine-readable branching.


5. Pydantic Models

Base Class

Every model in app/models/ must inherit from BanGuiBaseModel (defined in app/models/response.py), not from pydantic.BaseModel directly.

from app.models.response import BanGuiBaseModel

class BanResponse(BanGuiBaseModel):
    ip: str = Field(..., description="Banned IP address")
    jail: str = Field(..., description="Jail that issued the ban")
    banned_at: datetime = Field(..., description="UTC timestamp of the ban")
    expires_at: datetime | None = Field(None, description="UTC expiry, None if permanent")
    ban_count: int = Field(..., ge=1, description="Number of times this IP was banned")

BanGuiBaseModel sets strict=True and documents the naming policy. Do not override model_config on individual models unless you have a specific, documented reason.

API Field Naming Policy — snake_case everywhere

All API field names use snake_case in Python, in the JSON wire format, and in the corresponding TypeScript interfaces. There is no alias_generator that converts to camelCase.

  • Python field: active_jails → JSON key: "active_jails" → TypeScript property: active_jails
  • Do not add a camelCase alias_generator to individual models.
  • Do not mix field name conventions within a single API response.

This policy eliminates a whole class of frontendbackend contract bugs. If the naming policy ever needs to change (e.g. to emit camelCase), change BanGuiBaseModel once — all models update automatically.

Other Model Rules

  • Validate at the boundary — once data enters a Pydantic model it is trusted.
  • Use Field(...) with descriptions for every field to keep auto-generated docs useful.
  • Separate request models, response models, and domain (internal) models — do not reuse one model for all three.
  • Models are leaf nodes: Models in app/models/ must not import from application-layer modules (app.services, app.config, app.utils). Models may only import from:
    • Standard library and third-party packages (Pydantic, typing)
    • Other models in app/models/ (sibling models)
    • app.models.response (response envelopes)
    • Validation that requires app-level state (e.g., settings, allowed directories) must happen at the router or service layer, never in model validators.

Using Literal Types for Constrained Strings

When a field should only accept a small set of predefined values, use Literal to enforce this at the type level:

from typing import Literal
from pydantic import BaseModel, Field

LogLevel = Literal["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"]

class GlobalConfigUpdate(BaseModel):
    log_level: LogLevel | None = Field(
        default=None,
        description="Log level: CRITICAL, ERROR, WARNING, NOTICE, INFO, or DEBUG.",
    )

This provides:

  • Type safety — IDEs and type checkers enforce valid values.
  • API documentation — OpenAPI docs automatically list all allowed values.
  • Validation — Pydantic rejects invalid values and provides a clear error message.

Field Validators and Validation Placement

Critical Constraint — No Import-Time Execution:

Pydantic validators, field defaults, and computed fields execute when a model is defined (at import time), not just when instances are created. For this reason:

  • Validators must be pure functions with no side effects
  • NEVER import or call runtime-dependent functions: get_settings(), file I/O, database queries, network calls, etc.
  • NEVER import from app.config, app.utils, app.services, or app.routers in model files

Violating this constraint creates hidden circular dependencies that prevent the application from starting.

Example of What NOT to Do:

# ❌ WRONG — This gets executed at import time:
from pydantic import Field
from app.config import get_settings

class ConfigModel(BaseModel):
    max_age: int = Field(default_factory=lambda: get_settings().max_log_max_age_days)
    # ↑ get_settings() is called when Python imports this module!

Field validators in models should only contain logic that is stateless and does not depend on application configuration or state. Validators must not import from app.config, app.utils, or app.services.

For validation that depends on app-level state (e.g., file paths that must be within allowed directories), perform validation in the router or service layer:

# ✅ Good: Validation in router (has access to settings)
from fastapi import APIRouter
from app.config import get_settings
from app.utils.path_utils import validate_log_path

@router.post("/jails/{name}/logpath")
async def add_log_path(name: str, body: AddLogPathRequest) -> None:
    # Validate before using
    validate_log_path(body.log_path)
    await config_service.add_log_path(socket_path, name, body)

# ❌ Avoid: Importing from app layer in model validators
# Do NOT do this in app/models/config.py:
# from app.config import get_settings
# @field_validator("log_path")
# def validate_log_path_field(cls, value: str) -> str:
#     settings = get_settings()  # ← Models must not import from app layer
#     ...

Common Helper: For shared path validation logic, use the validate_log_path() helper from app.utils.path_utils in your router or service, not in model validators.

from fastapi import HTTPException, status
from app.utils.path_utils import validate_log_path

@router.delete("/{name}/logpath")
async def delete_log_path(
    name: str,
    log_path: str = Query(...),
) -> None:
    try:
        validate_log_path(log_path)
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=str(e),
        ) from e
    # ... rest of handler

Key points:

  • Use mode="after" in model validators to validate after Pydantic's basic type coercion.
  • Raise ValueError if validation fails; Pydantic converts it to an HTTP 400 response.
  • For query parameters that cannot use Pydantic validators, use the validate_log_path() helper and raise HTTP 422.
  • Never use string prefix matching for path validation (e.g., path.startswith("/var/log")). The helper uses Path.relative_to() to prevent bypasses like /var/log_evil/file.log.
  • Symlinks are resolved before validating to prevent symlink-based escapes.

Model Type Usage by Layer

Pydantic models are mandatory for all external-facing data structures — anything that crosses layer boundaries or is serialized to HTTP responses. TypedDict may be used only for internal, layer-private data structures where they provide precise typing without runtime overhead.

Rules:

  1. Routers (HTTP boundary): All request and response types must be Pydantic models. FastAPI uses these for validation, serialization, and OpenAPI documentation.

    • Use Pydantic request models for request bodies and query parameters.
    • Use Pydantic response models in the response_model parameter.
    # Good — Pydantic models for router layer
    class JailStatsRequest(BaseModel):
        jail_name: str
    
    class JailStatsResponse(BaseModel):
        jail_name: str
        active_bans: int
    
    @router.post("/stats", response_model=JailStatsResponse)
    async def get_stats(req: JailStatsRequest) -> JailStatsResponse:
        ...
    
  2. Services (business logic): Return types should be Pydantic models if the result is:

    • Returned to a router (likely — they become API responses).
    • Used across multiple services (shared interfaces).
    • Exposed to external consumers (even indirectly).

    If a service returns a purely internal intermediate result used by a single caller, TypedDict is acceptable but should be rare.

    # Good — service returns Pydantic (may be used by multiple routers)
    async def get_jail_details(name: str) -> JailDetailResponse:
        ...
    
    # Acceptable — purely internal utility result
    def _parse_fail2ban_response(raw: str) -> ParsedResponse:
        """Internal helper—used only by this service."""
        ...
    
  3. Repositories (data access): Return types may use TypedDict because they represent raw database rows that:

    • Are layer-private (only called by their own service).
    • Do not cross HTTP boundaries directly.
    • Benefit from lightweight typing without runtime validation.
    # Good — TypedDict for raw repository rows
    class GeoRow(TypedDict):
        ip: str
        country_code: str | None
    
    async def load_all(db: aiosqlite.Connection) -> list[GeoRow]:
        ...
    

    If a repository result becomes part of a service's public interface (returned to routers or other services), convert it to a Pydantic model.

  4. Utilities and helpers: Internal helper results may use TypedDict if they are not part of a public module interface.

Migration path: Existing internal TypedDicts (e.g., GeoCacheRow, ImportLogRow) may remain as TypedDicts so long as they stay within their layer. If a type needs to cross layer boundaries (repo → service → router), convert it to a Pydantic model incrementally as you refactor that data flow.


6. Async Rules

  • Never call blocking / synchronous I/O in an async function — no time.sleep(), no synchronous file reads, no requests.get().

  • Use aiohttp.ClientSession for HTTP calls, aiosqlite for database access.

  • Use asyncio.TaskGroup (Python 3.11+) when you need to run independent coroutines concurrently.

  • Long-running startup/shutdown logic goes into the FastAPI lifespan context manager.

  • Never call db.commit() inside a loop. With aiosqlite, every commit serialises through a background thread and forces an fsync. N rows × 1 commit = N fsyncs. Accumulate all writes in the loop, then issue a single db.commit() once after the loop ends. The difference between 5,000 commits and 1 commit can be seconds vs milliseconds.

    # Good — one commit for the whole batch
    for ip, info in results.items():
        await db.execute(INSERT_SQL, (ip, info.country_code, ...))
    await db.commit()  # ← single fsync
    
    # Bad — one fsync per row
    for ip, info in results.items():
        await db.execute(INSERT_SQL, (ip, info.country_code, ...))
        await db.commit()  # ← fsync on every iteration
    
  • Prefer executemany() over calling execute() in a loop when inserting or updating multiple rows with the same SQL template. aiosqlite passes the entire batch to SQLite in one call, reducing Python↔thread overhead on top of the single-commit saving.

    # Good
    await db.executemany(INSERT_SQL, [(ip, cc, cn, asn, org) for ip, info in results.items()])
    await db.commit()
    
  • Shared resources (DB connections, HTTP sessions) are created once during startup and closed during shutdown — never inside request handlers.

from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator
from fastapi import FastAPI
import aiohttp
import aiosqlite

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
    # Startup
    app.state.http_session = aiohttp.ClientSession()
    app.state.db = await aiosqlite.connect("bangui.db")
    yield
    # Shutdown
    await app.state.http_session.close()
    await app.state.db.close()

Fire-and-Forget Background Tasks

When you need to spawn a background task that runs independently without waiting for the result, use asyncio.create_task() with the logged_task() helper from app.utils.async_utils. This ensures exceptions in background tasks are always logged and never silently discarded.

Why this matters: In Python 3.11+, unhandled exceptions in fire-and-forget tasks become silent RuntimeWarnings. Without logging, background errors (network failures, database writes, API timeouts) become invisible in structured logs and are extremely hard to debug.

Pattern:

from app.utils.async_utils import logged_task

# Bad — exceptions are silently discarded
asyncio.create_task(some_background_work())

# Good — exceptions are logged
asyncio.create_task(
    logged_task(some_background_work(), "task_name"),
    name="task_name",
)

The logged_task() wrapper:

  • Wraps your coroutine to catch any exception
  • Logs the exception with log.exception() (structlog automatically captures the traceback)
  • Adds task_name to the structured log context
  • Never re-raises — it's safe to use with asyncio.create_task()

Example:

import asyncio
from app.utils.async_utils import logged_task
import structlog

log = structlog.get_logger()

async def geo_lookup_batch(ips: list[str]) -> None:
    """Look up geolocation data for IPs asynchronously."""
    try:
        for ip in ips:
            # May timeout, fail network call, or fail DB write
            location = await lookup_ip_location(ip)
            await db.execute(INSERT_GEO_SQL, (ip, location))
        await db.commit()
    except Exception:
        # All exceptions are automatically logged by logged_task() wrapper
        raise

# In your request handler or service:
asyncio.create_task(
    logged_task(geo_lookup_batch(uncached_ips), "geo_cache_batch"),
    name="geo_cache_batch",
)

6.1 Database Query Conventions

LIKE Queries and Wildcard Escaping

SQLite's LIKE operator treats % (any sequence of characters) and _ (any single character) as wildcards. When querying with user-supplied filters that may contain these characters, you must escape them to prevent unintended matches.

The Problem:

# Bad — ip_filter="10.0.0_" matches "10.0.0.1", "10.0.0.2", etc.
ip_filter = "10.0.0_"
await db.execute(
    "SELECT * FROM bans WHERE ip LIKE ?",
    (f"{ip_filter}%",)  # ← wildcard characters not escaped
)

The Solution:

Use the escape_like() helper from app.utils.fail2ban_db_utils:

from app.utils.fail2ban_db_utils import escape_like

# Good — wildcard characters are escaped
ip_filter = "10.0.0_"
await db.execute(
    "SELECT * FROM bans WHERE ip LIKE ? ESCAPE '\\'",
    (f"{escape_like(ip_filter)}%",)  # ← underscores escaped to literal
)

How escape_like() works:

The function escapes backslashes first, then % and _ signs:

def escape_like(s: str) -> str:
    return s.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")

Key rules:

  1. Backslash escapes first — to prevent double-escaping when the input contains backslashes.
  2. Add ESCAPE '\\' to the SQL — tells SQLite which character to use for escaping.
  3. Dots are not wildcards — they do not need escaping; normal IP addresses pass through unchanged.

Test example:

assert escape_like("10.0.0_") == "10.0.0\\_"
assert escape_like("10.0.0%test") == "10.0.0\\%test"
assert escape_like("10.0.0.1") == "10.0.0.1"  # Unchanged

6.2 Database Migrations

The application database schema is versioned and migrated automatically on startup via app.db.init_db().

Migration Design Principles

Migrations must be atomic. All schema changes for a single version (DDL statements) and the schema_migrations record insert must be wrapped in a single BEGIN IMMEDIATE ... COMMIT transaction. This prevents partial migrations if a process crashes mid-migration.

If a crash occurs between migration steps, the next startup will:

  1. Detect the missing schema_migrations record.
  2. Re-apply the entire migration in a single transaction (all-or-nothing).
  3. Avoid data corruption or schema inconsistency.

Writing a New Migration

  1. Add the DDL statements to _MIGRATIONS dict in app/db.py:
_MIGRATIONS: dict[int, str] = {
    1: _CREATE_INITIAL_SCHEMA,
    2: """
-- Migration 2: Add new_column to users table.
ALTER TABLE users ADD COLUMN new_column TEXT DEFAULT 'default_value';
CREATE INDEX idx_users_new_column ON users(new_column);
""",
}
  1. Update _CURRENT_SCHEMA_VERSION to the new version number:
_CURRENT_SCHEMA_VERSION: int = 2  # was 1
  1. Ensure idempotency where possible:

    • Use CREATE TABLE IF NOT EXISTS and CREATE INDEX IF NOT EXISTS.
    • For ALTER TABLE ADD COLUMN, check if the column exists first using PRAGMA table_info() if re-applying the migration is a concern.
  2. Verify atomicity in tests:

async def test_migration_2_is_atomic(tmp_path: Path) -> None:
    """Verify migration 2 rolls back on failure."""
    db = await open_db(str(tmp_path / "test.db"))
    try:
        await db.execute("CREATE TABLE schema_migrations (version INTEGER PRIMARY KEY);")
        await db.commit()

        # Add a test migration that fails mid-way
        original = db_module._MIGRATIONS.copy()
        db_module._MIGRATIONS[99] = """
        CREATE TABLE test_table (id INTEGER PRIMARY KEY);
        INSERT INTO nonexistent_table VALUES (1);
        """

        try:
            with pytest.raises(Exception):
                await _apply_migration(db, 99)

            # Verify rollback: migration NOT recorded
            async with db.execute(
                "SELECT version FROM schema_migrations WHERE version = 99;"
            ) as cursor:
                assert await cursor.fetchone() is None

            # Verify rollback: table NOT created
            async with db.execute(
                "SELECT name FROM sqlite_master WHERE type='table' AND name='test_table';"
            ) as cursor:
                assert await cursor.fetchone() is None
        finally:
            db_module._MIGRATIONS = original
    finally:
        await db.close()

Common Pitfalls

  • Non-idempotent statementsALTER TABLE ADD COLUMN without IF NOT EXISTS will fail on re-run. Use explicit checks if needed.
  • Comments containing semicolons — the migration parser strips comments correctly, but avoid unusual comment syntax.
  • String literals with semicolons — the parser handles these; no special escaping needed.
  • Multiple operations in one migration — keep migrations focused. Combine related DDL but split unrelated changes.

6.3 Database Transactions

Database transactions ensure atomicity for multi-step operations and prevent race conditions when concurrent requests interact with the database. BanGUI uses SQLite with WAL (Write-Ahead Logging) mode, which enables concurrent readers but serializes writers.

When to Use Explicit Transactions

Use BEGIN IMMEDIATE ... COMMIT for:

  1. Multi-step logical operations — Operations that should succeed or fail as a unit. Example:

    # Bad — two separate operations, race condition window exists
    await db.execute("INSERT INTO sessions ...")
    await db.commit()
    
    # Good — atomic single operation, no need for explicit transaction
    
  2. Operations that combine multiple queries with shared state — When the operation outcome depends on reading and then writing based on that read:

    # Bad — race condition: another request checks between our read and write
    existing_run = await import_run_repo.get_by_source_and_hash(db, source_id, content_hash)
    if existing_run is None:
        run_id = await import_run_repo.create_pending(db, source_id, content_hash)
    
    # Good — atomic: both operations within same transaction boundary
    try:
        await db.execute("BEGIN IMMEDIATE")
        cursor = await db.execute("INSERT INTO import_runs ...")
        await db.commit()
    except aiosqlite.IntegrityError:
        # Another request won the race; fetch the existing record
        existing = await import_run_repo.get_by_source_and_hash(...)
        ...
    
  3. Bulk operations that should be all-or-nothing — For example, upserting positive and negative geo cache entries:

    try:
        await db.execute("BEGIN IMMEDIATE")
        await bulk_upsert_entries(db, positive_rows)
        await bulk_upsert_neg_entries(db, negative_ips)
        await db.commit()
    except Exception:
        await db.rollback()
        raise
    

Do NOT use explicit transactions for:

  • Single SQL statements — SQLite guarantees atomic writes for individual statements. No explicit transaction needed.
  • Read-only queries — Queries do not modify data and do not need transaction boundaries.

Transaction Pattern

Always use this pattern for wrapped operations:

try:
    await db.execute("BEGIN IMMEDIATE")
    # ... perform all operations ...
    await db.commit()
except Exception:
    await db.rollback()
    raise
  • BEGIN IMMEDIATE — Acquires a write lock immediately, preventing other writers from entering the transaction window. This is critical for crash-safety and consistency.
  • COMMIT — Persists all changes.
  • ROLLBACK — Rolls back on any exception, ensuring the database is left in a consistent state.

Handling Race Condition Errors

When a UNIQUE constraint violation occurs due to a race condition (two concurrent requests attempt the same insert), the database raises aiosqlite.IntegrityError. Handle this at the call site by retrying the lookup:

try:
    run_id = await import_run_repo.create_pending(db, source_id, content_hash)
except aiosqlite.IntegrityError:
    # Another concurrent request created it first
    existing = await import_run_repo.get_by_source_and_hash(db, source_id, content_hash)
    if existing is None:
        raise RuntimeError("Constraint error indicates row exists but lookup failed")
    run_id = existing.id
    log.info("lost_race", run_id=run_id)

This approach:

  1. Lets the database constraint prevent data corruption.
  2. Gracefully handles the concurrent case in application logic.
  3. Avoids unnecessary locking overhead for the common case (no concurrent writers).

7. Structured Logging Policy

All logging in BanGUI services and tasks must use structlog for consistent, queryable event tracking. This policy defines when and how to log at each level.

7.1 Logging Levels and When to Use Them

INFO — User-facing operations and state changes

  • Use for significant operational events that the operator should know about.
  • Examples: service startup/completion, resource creation/deletion, state transitions, successful operations with business impact.
  • Never excessive — keep volume reasonable to maintain log clarity.
  • Include relevant context: resource IDs, counts, configuration changes.
log.info("jail_activated", jail=name)
log.info("blocklist_source_created", id=new_id, name=name, url=url)
log.info("session_cleanup_ran", deleted_count=count, cutoff_time=now_iso)

WARNING — Recoverable failures, degraded functionality, unexpected but handled conditions

  • Use for issues that are not fatal but indicate something is wrong or suboptimal.
  • Examples: missing optional config, fallback behavior triggered, non-critical API call failed, parsing errors, missing resources that have workarounds.
  • Include error details and context to enable investigation.
  • Do NOT use for expected error paths (e.g., wrong password in login attempt).
log.warning("jail_status_parse_error", jail=name, error=str(exc))
log.warning("geo_lookup_failed", ip=ip, error=type(exc).__name__)
log.warning("geoip_mmdb_not_found", path=mmdb_path)

ERROR — Fatal/unrecoverable failures within a request or task

  • Use for errors that prevent an operation from completing.
  • Examples: database write failed, critical resource is missing, state is corrupted.
  • Include the full error context to enable debugging.
  • Pair with exception handling — if you log an error, you've decided the caller should handle it or return a failure.
log.error("jail_activation_rollback_restore_failed", jail=name, error=str(exc))
log.error("fail2ban_probe_parse_error", error=str(exc))

EXCEPTION — Unhandled exceptions in background tasks and scheduled jobs

  • Use log.exception() (not log.error()) in catch-all exception handlers to automatically capture the full traceback.
  • This level is ONLY for surprises that should never happen in production.
  • Use in task callback exception handlers and top-level task runners.
try:
    result = await blocklist_service.import_all(...)
except Exception:
    log.exception("blocklist_import_unexpected_error")

DEBUG — Low-level details for development and troubleshooting

  • Use for details too verbose for normal operation (e.g., successful lookups, parsed files, state transitions, loop iterations).
  • Include data samples and validation results.
  • Safe to leave in the code — debug logs are not emitted by default in production.
log.debug("geo_lookup_success_mmdb", ip=ip, country=result.country_code)
log.debug("action_file_parsed", name=raw.name)
log.debug("backend_cmd_supported_detected")

7.2 Event Naming Convention

All log event names must follow a consistent pattern for queryability:

Pattern: {domain}_{entity_or_action}[_{result_or_detail}]

  • Domain — The service or feature area (e.g., jail, blocklist, geo, auth, ban).
  • Entity or Action — The noun or verb describing what happened (e.g., activated, created, failed, synced).
  • Result or Detail (optional) — Additional specificity for complex scenarios (e.g., _restore_failed, _non_200, _no_fallback).

Examples (organized by domain):

Domain INFO WARNING ERROR DEBUG
jail jail_activated, jail_deactivated, jail_reloaded jail_status_parse_error, jail_rollback_failed jail_activation_rollback_restore_failed jail_file_parsed, backend_cmd_supported_detected
geo geo_cache_loaded_from_db, geo_flush_dirty_complete geo_lookup_failed, geo_persist_failed - geo_lookup_success_mmdb, geo_cache_cleanup_ran
blocklist blocklist_import_starting, blocklist_source_created blocklist_schedule_invalid, blocklist_preview_failed - blocklist_ban_failed
ban active_bans_fetched, all_ips_unbanned ban_service_geo_lookup_failed ban_service_geo_lookup_unexpected_error ban_entry_parse_error
auth bangui_login_success, bangui_logout bangui_login_wrong_password, bangui_login_no_hash - -
config filter_created, action_updated filter_read_error, action_d_not_found - filter_file_parsed
setup bangui_setup_started, bangui_setup_completed - bangui_setup_failed -

Key rules:

  • Use snake_case (never camelCase or PascalCase).
  • Keep event names short but descriptive — aim for 24 words.
  • Use consistent terminology across the codebase (e.g., always _created, never _added or _new).
  • Prefix task/background job events with the job name (e.g., blocklist_import_starting, session_cleanup_ran).

7.3 Structured Context and Key-Value Pairs

Always log with structured context — key-value pairs that make logs queryable and analyzable.

Essential patterns:

# Operation with count
log.info("active_bans_fetched", total=len(bans))

# Resource manipulation with ID
log.info("blocklist_source_deleted", id=source_id)

# State transition with reason or error
log.warning("geo_persist_failed", ip=ip, error=type(exc).__name__)

# Time-bounded operation
log.info("session_cleanup_ran", deleted_count=count, cutoff_time=now_iso)

# Config or feature change
log.info("blocklist_schedule_updated", frequency=config.frequency, hour=config.hour)

# Batch operation with metrics
log.info("blocklist_import_finished", total_imported=result.total_imported, 
         total_skipped=result.total_skipped, errors=result.errors_count)

What to include:

  • Resource IDs (jail names, IP addresses, source IDs).
  • Counts and metrics (rows processed, items synced, errors).
  • Configuration or decision points (enabled/disabled flags, thresholds).
  • Timestamps and durations for long-running operations (optional but useful).
  • Error types and short error messages (use type(exc).__name__ or str(exc) depending on context).

What to NEVER log:

  • Sensitive data: passwords, tokens, session IDs, API keys, hashes.
    • For session correlation without leaking token material, use a one-way hash fragment: hashlib.sha256(token.encode()).hexdigest()[:12].
    • Use numeric database IDs for entity correlation instead of raw identifiers: session_id=session.id instead of token=session.token.
  • Full exception tracebacks in INFO/WARNING (use log.exception() only in catch-all handlers).
  • Redundant system time (structlog adds timestamp automatically).
  • User PII in most cases (name, email, phone, etc.) — unless directly relevant to debugging and anonymized.

7.4 Background Tasks and Scheduled Jobs

Every background task (APScheduler job) must follow this pattern:

  1. On startup: Log {job_name}_scheduled with interval or cron expression.
  2. On execution: Log {job_name}_starting (INFO) and {job_name}_finished or {job_name}_ran (INFO) with metrics.
  3. On exception: Use log.exception("{job_name}_unexpected_error") in the top-level try/except.
# app/tasks/blocklist_import.py
async def _run_import_with_resources(settings: Settings, http_session: ClientSession) -> None:
    """APScheduler callback that imports all enabled blocklist sources."""
    log.info("blocklist_import_starting")
    try:
        result = await blocklist_service.import_all(...)
        log.info("blocklist_import_finished",
                 total_imported=result.total_imported,
                 total_skipped=result.total_skipped,
                 errors=result.errors_count)
    except Exception:
        log.exception("blocklist_import_unexpected_error")

# Register and log in the lifespan or register function:
log.info("blocklist_import_scheduled", interval_seconds=INTERVAL_SECONDS)

7.5 Service Functions and Methods

Service functions should log at entry/exit for significant operations, or when errors occur:

Entry logging (optional, use for complex or long-running operations):

async def import_all(...) -> ImportResult:
    log.debug("blocklist_import_starting", count=len(sources))
    try:
        ...
    except SomeError:
        log.warning("blocklist_import_partial_failure", imported=count, error=str(exc))

Exit/success logging (log results with metrics):

async def get_all_jails(socket_path: str) -> list[JailResponse]:
    jails = await _fetch_jails(socket_path)
    log.info("jail_list_fetched", count=len(jails))
    return jails

Error handling (log with context, let caller decide how to respond):

try:
    result = await fetch_external_data(url)
except TimeoutError:
    log.warning("external_fetch_timeout", url=url, timeout_seconds=TIMEOUT)
    raise
except Exception as exc:
    log.error("external_fetch_failed", url=url, error=type(exc).__name__)
    raise

8. Error Handling

  • Define custom exception classes for domain errors (e.g., JailNotFoundError, BanFailedError).
  • Catch specific exceptions — never bare except: or except Exception: without re-raising.
  • Map domain exceptions to HTTP status codes via FastAPI exception handlers registered on the app.
  • Always log errors with context before raising.
class JailNotFoundError(Exception):
    def __init__(self, name: str) -> None:
        self.name: str = name
        super().__init__(f"Jail '{name}' not found")

# In main.py
@app.exception_handler(JailNotFoundError)
async def jail_not_found_handler(request: Request, exc: JailNotFoundError) -> JSONResponse:
    return JSONResponse(status_code=404, content={"detail": f"Jail '{exc.name}' not found"})

Routers and Exception Propagation

  • Routers must NOT construct HTTPException for domain errors — let domain exceptions propagate.
  • Routers should never have helper functions like _bad_gateway(), _not_found(), _conflict() etc. that convert domain exceptions to HTTPException.
  • All domain exception types must have corresponding handlers registered in main.py via app.add_exception_handler().
  • Exception handlers are registered in order from most specific to least specific — FastAPI evaluates them in registration order.
# ❌ BAD — routers constructing HTTPException for domain exceptions
@router.get("/{name}")
async def get_jail(name: str, socket_path: Fail2BanSocketDep) -> JailDetailResponse:
    try:
        return await jail_service.get_jail(socket_path, name)
    except JailNotFoundError:
        raise HTTPException(status_code=404, detail=f"Jail not found: {name!r}") from None

# ✅ GOOD — domain exception propagates to global handler
@router.get("/{name}")
async def get_jail(name: str, socket_path: Fail2BanSocketDep) -> JailDetailResponse:
    return await jail_service.get_jail(socket_path, name)

All domain exceptions raised by services propagate to handlers in main.py, ensuring:

  1. Consistent error response format across the entire API.
  2. No duplicated exception-to-HTTP-status mapping logic.
  3. Easy to audit all error codes — they are all in one place.

Error Message Hygiene

HTTP responses must never leak sensitive internal details that aid attackers or expose infrastructure:

  • Never include system paths in HTTP error messages (e.g., /var/run/fail2ban/fail2ban.sock, /etc/fail2ban/).
  • Never include raw exception messages that expose internal parsing or implementation logic.
  • Log full details server-side only — exception handlers must log error=str(exc) with full exception context, but return generic user-friendly messages in the HTTP response.
# ❌ BAD — leaks socket path and internal details to the client
async def _fail2ban_connection_handler(request: Request, exc: Fail2BanConnectionError) -> JSONResponse:
    return JSONResponse(
        status_code=502,
        content={"detail": f"Cannot reach fail2ban: {exc}"},  # exc includes socket path!
    )

# ✅ GOOD — generic message in response, full details in server logs
async def _fail2ban_connection_handler(request: Request, exc: Fail2BanConnectionError) -> JSONResponse:
    log.warning(
        "fail2ban_connection_error",
        path=request.url.path,
        method=request.method,
        error=str(exc),  # Full details logged server-side
    )
    return JSONResponse(
        status_code=502,
        content={"detail": "Cannot reach the fail2ban service. Check the server status page."},
    )

Exception Taxonomy and HTTP Mapping

BanGUI uses a standardized exception taxonomy that maps domain errors to HTTP status codes consistently across all services. This allows routers to handle exceptions by category rather than by individual type, reducing code duplication and ensuring consistent client-facing error responses.

Exception Categories

All domain exceptions inherit from one of six base categories defined in app.exceptions:

Base Exception HTTP Status Meaning Example
NotFoundError 404 Requested domain entity not found JailNotFoundError, FilterNotFoundError
BadRequestError 400 Invalid input, validation failure, or invalid identifier ConfigValidationError, JailNameError
ConflictError 409 State conflict or resource constraint violation JailAlreadyActiveError, FilterAlreadyExistsError
OperationError 500 Domain operation failure (write, update, delete) ConfigWriteError, ConfigFileWriteError
ServiceUnavailableError 503 Infrastructure or external service unreachable Fail2BanConnectionError, ConfigDirError

Service Exception Mapping

Every service-specific exception inherits from exactly one category. This allows main.py to register just 5 exception handlers instead of 25+:

# In app/exceptions.py — define each exception once with its category
class JailNotFoundError(NotFoundError):
    def __init__(self, name: str) -> None:
        self.name = name
        super().__init__(f"Jail not found: {name!r}")

class JailAlreadyActiveError(ConflictError):
    def __init__(self, name: str) -> None:
        self.name = name
        super().__init__(f"Jail is already active: {name!r}")

# In app/main.py — register category handlers
app.add_exception_handler(NotFoundError, _not_found_handler)
app.add_exception_handler(BadRequestError, _bad_request_handler)
app.add_exception_handler(ConflictError, _conflict_handler)
app.add_exception_handler(OperationError, _domain_error_handler)
app.add_exception_handler(ServiceUnavailableError, _service_unavailable_handler)

Service Exception Reference

When writing a new service, determine which category each exception belongs to:

  • Not found: Always NotFoundError (e.g., jail, filter, action, config file not found)
  • Invalid input: Always BadRequestError (e.g., validation errors, invalid names, regex compile failure)
  • State conflicts: Always ConflictError (e.g., already exists, already active, readonly resource)
  • Operation failures: Always OperationError (e.g., write failed, update failed, command failed)
  • Infrastructure: Always ServiceUnavailableError (e.g., config dir missing, socket unreachable, fail2ban protocol error)

Client Expectations

Clients should expect the following HTTP status codes and response format for all domain errors:

HTTP 400 Bad Request
{
  "detail": "Jail name contains invalid characters"
}

HTTP 404 Not Found
{
  "detail": "Jail not found: 'sshd'"
}

HTTP 409 Conflict
{
  "detail": "Jail is already active: 'sshd'"
}

HTTP 500 Internal Server Error
{
  "detail": "Failed to write configuration: permission denied"
}

HTTP 503 Service Unavailable
{
  "detail": "Cannot reach the fail2ban service. Check the server status page."
}

The detail field always contains the exception's message (from str(exc)). Sensitive details (socket paths, file paths, internal error messages) are never included — they are logged server-side only.

  • Network I/O: TimeoutError, aiohttp.ClientError, asyncio.TimeoutError
  • File I/O: OSError (includes IOError, FileNotFoundError, PermissionError)
  • JSON parsing: json.JSONDecodeError, ValueError
  • Database errors: aiosqlite.Error and derivatives (caught as OSError)
  • Third-party libraries: Specific exception classes (e.g., geoip2.errors.GeoIP2Error)

When catching service-critical exceptions:

  1. Catch the specific exception types for the operation.
  2. Log with the exception type and relevant context.
  3. Return a safe fallback (empty dict, None, etc.) or re-raise if the service cannot function.

When truly unavoidable broad catches are needed (e.g., retrying transient network failures):

  1. Place specific catches first.
  2. Add one final except Exception after specific cases, with error_type="unexpected" logged to flag surprises.
  3. Document why broad catching is necessary (e.g., "tests use mock objects that may raise arbitrary exceptions").

Example:

async def lookup_batch(ips: list[str], http_session: aiohttp.ClientSession) -> dict[str, GeoInfo]:
    """Resolve multiple IPs, returning empty map on failure."""
    try:
        result = await http_session.post(url, json=payload, timeout=timeout)
    except (TimeoutError, aiohttp.ClientError) as exc:
        # Expected network failures — log and return empty result
        log.warning("geo_batch_http_failed", error=type(exc).__name__)
        return {}
    except Exception as exc:
        # Unexpected — log as error for investigation
        log.error("geo_batch_unexpected_error", error=type(exc).__name__)
        return {}

9. Testing

  • Every new feature or bug fix must include tests.
  • Tests live in tests/ mirroring the app/ structure.
  • Use pytest with pytest-asyncio for async tests.
  • Use httpx.AsyncClient to test FastAPI endpoints (not TestClient which is sync).
  • Mock external dependencies (fail2ban socket, aiohttp calls) — tests must never touch real infrastructure.
  • Aim for >80 % line coverage — critical paths (auth, banning, scheduling) must be 100 %.
  • Test names follow test_<unit>_<scenario>_<expected> pattern.
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import create_app

@pytest.fixture
async def client() -> AsyncClient:
    app = create_app()
    transport: ASGITransport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac

@pytest.mark.asyncio
async def test_list_jails_returns_200(client: AsyncClient) -> None:
    response = await client.get("/api/jails/")
    assert response.status_code == 200
    data: dict = response.json()
    assert "jails" in data

9.1 Background Tasks and Scheduler Architecture

BanGUI uses APScheduler 4.x (async mode) to manage background jobs that execute on a schedule without user interaction. This section documents how to write and register background tasks.

Task Location and Structure

All background tasks live in backend/app/tasks/ as separate modules. Each task:

  • Exports a register(app: FastAPI) -> None or async def register(app: FastAPI) -> None function.
  • Opens its own database connection using app.db.open_db() or the task_db() helper.
  • Closes connections when work completes (use the async context manager pattern).
  • Runs independently of the FastAPI request/response cycle.

Example Task

# backend/app/tasks/my_task.py
import structlog
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler

log = structlog.get_logger()

async def my_background_job(app: FastAPI) -> None:
    """Do important work on a schedule."""
    log.info("my_background_job_started")
    try:
        db = await app.db.open_db(app.state.settings.database_path)
        try:
            # Do work...
            pass
        finally:
            await db.close()
    except Exception:
        log.error("my_background_job_failed", exc_info=True)

def register(app: FastAPI) -> None:
    """Register the job with the scheduler."""
    scheduler: AsyncIOScheduler = app.state.scheduler
    scheduler.add_job(
        my_background_job,
        args=(app,),
        trigger="interval",
        seconds=60,
        id="my_task",
        name="My Background Job",
    )

Accessing Shared Resources in Tasks

Since tasks do not have access to Depends(get_db) (no request scope), they must:

  1. Open their own DB connection via app.state.db_factory.open_db(path).
  2. Access app-level stateapp.state.http_session, app.state.geo_cache, app.state.settings, etc.
  3. Use structlog for all logging (never print()).

Single-Worker Requirement

The scheduler is bound to a single asyncio event loop and cannot be shared across multiple worker processes. BanGUI enforces single-worker mode to prevent duplicate task execution.

  • Deployment constraint: Set BANGUI_WORKERS=1 (default).
  • Startup validation: startup_shared_resources() raises RuntimeError if BANGUI_WORKERS > 1.
  • See Architekture.md § 9.2 for full details.

Timeout Protection for Background Tasks

All background tasks must wrap their async work with timeout protection. If a task hangs (API unreachable, network partition, database lock), it runs forever — never completes → lock never released → duplicate work starts → resource exhaustion. Timeouts prevent this.

Rule: Every task function must use run_with_timeout() from app.tasks.timeout_utils to enforce a timeout on its async work.

from app.tasks.timeout_utils import run_with_timeout

async def _run_import_with_resources(settings: Settings, http_session: ClientSession) -> None:
    """Imports blocklists with timeout protection."""
    async def _do_import() -> None:
        # ... your async work ...
        result = await blocklist_service.import_all(...)
        log.info("import_finished", total=result.total_imported)
    
    # Wrap with timeout: abort after 300 seconds
    await run_with_timeout("blocklist_import", _do_import(), timeout_seconds=300)

Why this pattern:

  1. run_with_timeout() enforces strict time limits using asyncio.wait_for().
  2. If timeout is exceeded, TimeoutError is raised and logged with elapsed time.
  3. If task approaches timeout (>80% of time budget), a warning is logged for observability.
  4. Failures are logged at warning level (not error) — timeouts are expected sometimes, but worth investigating.

Timeout Values by Task:

Task Timeout Rationale
blocklist_import 300s (5 min) Downloads, validates, applies external lists. Network delays expected.
health_check 10s Socket probe to fail2ban. Should complete quickly or fail2ban is unresponsive.
geo_cache_flush 60s Writes dirty cache entries to DB. Handles contention gracefully.
session_cleanup 30s Deletes expired sessions. DB contention unlikely but possible.
rate_limiter_cleanup 5s In-memory cleanup, no I/O. Should always be instant.
geo_cache_cleanup 60s Deletes stale geo entries from DB. May scan large table.
geo_re_resolve 120s Retries failed IP lookups with backoff. API rate-limit delays expected.
history_sync 60s Syncs records from fail2ban DB to archive. May read/write many rows.
scheduler_lock_heartbeat 5s Updates lock timestamp. Must be quick or lock is lost.

Timeout Events Are Logged:

On timeout:

task_timeout task_name=blocklist_import timeout_seconds=300 elapsed_seconds=300.45

On approaching timeout (>80% of budget used):

task_approaching_timeout task_name=blocklist_import timeout_seconds=300 elapsed_seconds=298.5 usage_percent=99.5

The logs include elapsed_seconds for observability — if you see tasks consistently near timeout, the value may need adjustment.

Testing Timeout Behavior:

Tests for timeout scenarios are in backend/tests/test_tasks/test_timeout_utils.py:

  • Verify timeout is raised and logged.
  • Verify approaching-timeout warning is logged.
  • Verify task exceptions (not timeout) propagate correctly.

Add timeout tests to your task test file:

@pytest.mark.asyncio
async def test_task_timeout_is_logged(self) -> None:
    """Task must be logged and raise TimeoutError on timeout."""
    with patch("app.tasks.my_task.log") as mock_log:
        with pytest.raises(TimeoutError):
            await my_task._run_with_resources(settings)  # exceeds timeout
    
    timeout_calls = [
        c for c in mock_log.warning.call_args_list
        if c[0][0] == "task_timeout"
    ]
    assert len(timeout_calls) == 1

Task Idempotency

Background tasks must be idempotent — retrying after a crash must produce the same result as running once.

If a task crashes or times out mid-execution, the scheduler may retry. Without idempotency, retries cause duplicate work:

  • blocklist_import: banned IPs appear twice → database corruption
  • geo_cache_flush: entries written twice → cache inconsistency
  • Any multi-step operation: partial state remains

Pattern: Content-Hash Idempotency for Blocklist Imports

Track imports by source + content hash to detect retries:

from app.repositories import import_run_repo

async def import_source(source, db, ...):
    # Download content
    status, content = await downloader.download(url)
    
    # Compute hash for idempotency detection
    content_hash = hashlib.sha256(content.encode()).hexdigest()
    
    # Check if this exact import already completed
    existing_run = await import_run_repo.get_by_source_and_hash(
        db, source.id, content_hash
    )
    
    if existing_run and existing_run.status == "completed":
        # Already done — skip banning, optionally re-warm cache
        log.info("blocklist_import_already_completed", ...)
        return ImportSourceResult(ips_imported=existing_run.imported_count, ...)
    
    # First run: create pending record
    if not existing_run:
        run_id = await import_run_repo.create_pending(
            db, source.id, content_hash
        )
    else:
        run_id = existing_run.id  # Retry case
    
    # Do work (ban IPs, etc.)
    imported, errors = await ban_executor.ban_ips(...)
    
    # Mark as completed or failed (atomically)
    if errors:
        await import_run_repo.mark_failed(db, run_id, str(errors))
    else:
        await import_run_repo.mark_completed(db, run_id, imported, skipped)

Key points:

  1. Operation ID must be deterministic — Use content hash, not timestamp

    • Same content = same operation ID → retry safe
    • Different content = different operation ID → new import run
  2. Check before doing work — Query import_runs table before banning

    • If completed: skip banning (already done)
    • If pending: retry was interrupted, try again
    • If failed: retry to recover
  3. Atomic state updates — Mark as completed AFTER all work succeeds

    • All-or-nothing: either import succeeded + logged, or failed + retryable
  4. Test idempotency — Verify retrying same content doesn't duplicate bans

    # First import: ban 2 IPs
    result1 = await import_source(source, content, db)
    assert result1.ips_imported == 2
    
    # Second import (same content): skip bans
    result2 = await import_source(source, content, db)
    assert result2.ips_imported == 2
    assert ban_ip.call_count == 2  # Only called once, not twice
    

Tool Purpose
Ruff Linter and formatter (replaces black, isort, flake8).
mypy or pyright Static type checking in strict mode.
pre-commit Run ruff + type checker before every commit.
  • Line length: 120 characters max.
  • Strings: use double quotes (").
  • Imports: sorted by ruff — stdlib → third-party → local, one import per line.
  • No unused imports, no unused variables, no # type: ignore without explanation.
  • Docstrings in Google style on every public function, class, and module.

11. fail2ban Response Utilities

All services that interact with the fail2ban daemon must use the canonical response parsing utilities from app.utils.fail2ban_response. This ensures consistent error handling, type safety, and makes it easy to fix bugs in response handling across the entire codebase.

Available Functions

ok(response: object) -> object Extracts the payload from a fail2ban (return_code, data) response tuple.

  • Raises ValueError if return code ≠ 0 or response shape is invalid.
  • Use this on every response from Fail2BanClient.send().

to_dict(pairs: object) -> dict[str, object] Converts a list of (key, value) pairs (fail2ban's native response format) to a Python dict.

  • Silently ignores malformed entries and non-list/tuple inputs.
  • Always returns a dict (empty if input is invalid).

ensure_list(value: object | None) -> list[str] Coerces fail2ban response values (which may be None, a single string, or a list) to a normalized list of strings.

  • Handles all three cases consistently.
  • Returns empty list for None or empty strings.

is_not_found_error(exc: Exception) -> bool Checks if an exception indicates a jail does not exist.

  • Checks for multiple error message patterns (case-insensitive).
  • Use this to distinguish "jail not found" errors from other failures.

Example Usage

from app.utils.fail2ban_response import ok, to_dict, ensure_list, is_not_found_error
from app.utils.fail2ban_client import Fail2BanClient

client = Fail2BanClient(socket_path="/var/run/fail2ban/fail2ban.sock")

try:
    # Get jail status
    response = await client.send(["status", "sshd", "short"])
    status_dict = to_dict(ok(response))  # Extract payload and convert to dict
    
    # Get list of banned IPs
    ban_response = await client.send(["get", "sshd", "banip"])
    banned_ips = ensure_list(ok(ban_response))  # Normalize to list of strings
    
except ValueError as exc:
    if is_not_found_error(exc):
        raise JailNotFoundError("sshd") from exc
    raise

Why This Matters

Before this utility module, every service implemented its own copy of these functions, leading to:

  • Code duplication across 7+ service files.
  • Subtle inconsistencies in error handling.
  • Difficult maintenance — every bug fix required touching multiple files.

Now, all services import from a single authoritative source, making response handling consistent, maintainable, and type-safe.


12. Configuration & Secrets

  • All configuration lives in environment variables loaded through pydantic-settings.
  • Secrets (master password hash, session key) are never committed to the repository.
  • Provide a .env.example with all keys and placeholder values.
  • Validate config at startup — fail fast with a clear error if a required value is missing.
from pydantic_settings import BaseSettings
from pydantic import Field

class Settings(BaseSettings):
    database_path: str = Field("bangui.db", description="Path to SQLite database")
    fail2ban_socket: str = Field("/var/run/fail2ban/fail2ban.sock", description="fail2ban socket path")
    session_secret: str = Field(..., description="Secret key for session signing")
    log_level: str = Field("info", description="Logging level")

    model_config = {"env_prefix": "BANGUI_", "env_file": ".env"}

Session Secret Configuration

The session_secret is the HMAC key used to sign all session tokens. It must be at least 32 characters (256 bits) to provide sufficient cryptographic strength for HMAC-SHA256.

Minimum Length: 32 characters

Why 32 characters? Session tokens are signed using HMAC-SHA256. A secret shorter than 32 bytes (256 bits) significantly weakens the signature, potentially allowing attackers to forge valid tokens. The constraint is enforced at startup — the application will fail to start if session_secret is shorter than 32 characters.

Generation: Generate a secure secret using Python:

python -c "import secrets; print(secrets.token_hex(32))"

This produces a 64-character hexadecimal string (256 bits) suitable for production use.

Environment Variable:

BANGUI_SESSION_SECRET="your-32-character-minimum-secret-here"

Never commit the actual secret to the repository. Provide a .env.example with a placeholder:

# .env.example
BANGUI_SESSION_SECRET="set-this-to-a-32-character-minimum-secret"

The session_cookie_secure configuration controls the Secure flag on the session cookie. This flag prevents browsers from sending the session cookie over unencrypted HTTP.

Default: true — Production deployments are secure by default. Cookies are only sent over HTTPS.

Local Development: Set BANGUI_SESSION_COOKIE_SECURE=false in your compose file or .env to allow cookies over HTTP (required for localhost:8000).

# Docker/compose.debug.yml
environment:
  BANGUI_SESSION_COOKIE_SECURE: "false"  # Allow HTTP during local development

Important: If Secure=true is set, browsers will reject the session cookie when the backend is served over HTTP. Ensure your nginx/reverse proxy terminates TLS and passes X-Forwarded-Proto: https so FastAPI knows the connection is secure.

CSRF Protection Middleware

State-mutating endpoints (POST, PUT, DELETE, PATCH) authenticated via session cookies are protected by the CsrfMiddleware, which enforces a custom header check.

How It Works:

  1. For every request using a mutating HTTP method, the middleware checks:

    • Is this request authenticated via session cookie (not Bearer token)?
    • If yes, require the custom header X-BanGUI-Request: 1.
    • If missing or incorrect, return 403 Forbidden.
  2. Bearer token requests (via Authorization: Bearer header) bypass the check because tokens are not CSRF-vulnerable — they are never automatically sent on cross-origin requests.

  3. Safe HTTP methods (GET, HEAD, OPTIONS) bypass the check.

  4. Cross-site protection: Cross-site JavaScript (fetch() calls from other origins) cannot set custom headers without CORS preflight, which the backend rejects for non-allowed origins. This provides defense-in-depth against subdomain attacks and XSS injection.

Implementation Location:

  • Middleware: backend/app/middleware/csrf.py
  • Registered in: backend/app/main.py via app.add_middleware(CsrfMiddleware)

Example:

# ✓ Cookie-authenticated POST with CSRF header — allowed
POST /api/bans
Cookie: bangui_session=...
X-BanGUI-Request: 1

# ✗ Cookie-authenticated POST without CSRF header — rejected with 403
POST /api/bans
Cookie: bangui_session=...
(no X-BanGUI-Request header)

# ✓ Bearer token authentication without CSRF header — allowed
POST /api/bans
Authorization: Bearer <token>
(no X-BanGUI-Request header needed)

# ✓ Safe GET method without CSRF header — allowed
GET /api/jails
Cookie: bangui_session=...
(no X-BanGUI-Request header needed)

Setup Guard Route Policy

BanGUI requires a one-time setup wizard to be completed before the application is usable. The SetupRedirectMiddleware enforces this by redirecting unauthenticated API requests to /api/setup until setup is complete.

How It Works:

  1. Explicit Allowlist: The middleware maintains two allowlists:

    • _EXACT_ALLOWED: Exact paths that bypass the guard (e.g., /api/setup, /api/health, /api/docs)
    • _PREFIX_ALLOWED: Route prefixes that bypass the guard (e.g., /api/setup/ for nested routes like /api/setup/timezone)
  2. Path Matching Strategy: The middleware uses exact matching for exact paths and prefix matching with trailing slashes for nested routes. This prevents fragile prefix-based allowlists (e.g., using startswith("/api/setup") would accidentally allow /api/setup-debug).

  3. When Setup is Complete: Once setup completes, the middleware becomes a no-op and all routes are accessible normally.

Allowlisted Paths:

  • /api/setup — Setup status check and initialization endpoint
  • /api/setup/timezone — Timezone configuration (reaches via /api/setup/ prefix)
  • /api/health — Health check endpoint (used by monitoring and load balancers)
  • /api/docs — Swagger UI documentation
  • /api/redoc — ReDoc documentation
  • /api/openapi.json — OpenAPI schema (required by docs frontends)

Adding New Setup Routes:

When adding new routes to the setup flow:

  1. If the route is an exact path (e.g., /api/setup/validate), add it to _EXACT_ALLOWED
  2. If the route is nested under /api/setup/ (e.g., /api/setup/validate/config), ensure /api/setup/ is in _PREFIX_ALLOWED (it already is)
  3. Never use prefix matching without a trailing slash — it leads to security issues with future route additions

Implementation Location:

  • Middleware: backend/app/main.pySetupRedirectMiddleware class
  • Configuration: Lines 584601 in backend/app/main.py_EXACT_ALLOWED and _PREFIX_ALLOWED constants
  • Guard logic: Lines 638648 in backend/app/main.pydispatch() method

Example:

# If setup is incomplete:
GET /api/jails
 307 Temporary Redirect to /api/setup

# Allowlisted paths are always accessible:
GET /api/setup               200 OK (setup status)
POST /api/setup              201 Created (run setup)
GET /api/setup/timezone      200 OK (get timezone)
GET /api/health              200 OK (health check)
GET /api/docs                200 OK (documentation)

# If setup is complete, all routes are accessible:
GET /api/jails               200 OK (jail list)

fail2ban_start_command Configuration

The fail2ban_start_command setting specifies the shell command used to start the fail2ban daemon during recovery operations (e.g., after a rollback).

Format & Parsing:

  • The command is split into arguments using shlex.split(), which respects shell quoting rules.
  • Paths with spaces must be quoted. Example: "/opt/my tools/fail2ban-client" start.
  • The command is not executed through a shell — no shell variables or globbing are interpreted.

Validation:

  • The command is validated at startup using shlex.split(). Mismatched quotes will raise a ValueError with the problematic command in the error message.

Environment Variables:

BANGUI_FAIL2BAN_START_COMMAND="fail2ban-client start"           # Default
BANGUI_FAIL2BAN_START_COMMAND="systemctl start fail2ban"        # systemd
BANGUI_FAIL2BAN_START_COMMAND='"/opt/my tools/fail2ban" start'  # Quoted path

Common Pitfall: Using .split() instead of shlex.split() would break commands with spaces in paths. Always use quoted strings for paths that contain whitespace.

Trusted Proxy Configuration (Reverse Proxy Deployments)

When BanGUI is deployed behind a reverse proxy (nginx, HAProxy, etc.), the proxy forwards the original client IP via HTTP headers (X-Forwarded-For, X-Real-IP). To extract the correct client IP for rate limiting and logging, you must configure which proxies are trusted.

Why This Is Needed:

Rate limiting (POST /api/auth/login) relies on accurate client IP detection to prevent brute-force attacks. Without proper proxy configuration:

  • Rate limits are applied per proxy IP (always the same) instead of per client IP — attackers can bypass limits by making many requests from the same proxy.
  • Logging shows proxy IPs instead of actual attacker IPs.

Trusted Proxies Configuration:

BANGUI_TRUSTED_PROXIES="10.0.0.0/8,172.16.0.0/12,192.168.0.0/16"

Accepted formats:

  • Single IP: BANGUI_TRUSTED_PROXIES="192.168.1.1"
  • CIDR range: BANGUI_TRUSTED_PROXIES="10.0.0.0/8" (matches any IP in 10.0.0.0 to 10.255.255.255)
  • Multiple entries (comma-separated): BANGUI_TRUSTED_PROXIES="192.168.1.1,10.0.0.0/8"
  • Whitespace is stripped: BANGUI_TRUSTED_PROXIES="192.168.1.1 , 10.0.0.0/8" is valid
  • IPv6 supported: BANGUI_TRUSTED_PROXIES="2001:db8::/32"

Default: Empty list (no proxies trusted). Proxy headers are ignored, and only the direct connection IP is used.

Validation:

The application validates all entries at startup:

  • Each entry must be a valid IP address or CIDR range.
  • Invalid entries (e.g., "not-an-ip", "10.0.0.0/33") will cause a ValidationError at startup.
  • The application will not start if any entry is invalid.

How It Works:

  1. When a request arrives, the middleware checks the immediate connection source (e.g., client.host).
  2. If the immediate connection is not in the trusted_proxies list, it is used directly as the client IP (proxy headers are ignored).
  3. If the immediate connection is trusted, the middleware extracts the original client IP from headers in this order:
    • X-Forwarded-For (leftmost IP in the chain, if present)
    • X-Real-IP (fallback)
    • Immediate connection IP (if no forwarded headers found)

Example Docker Compose Configuration:

version: '3.8'
services:
  nginx:
    image: nginx:latest
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    networks:
      - bangui-net

  backend:
    image: bangui:latest
    environment:
      BANGUI_TRUSTED_PROXIES: "10.0.0.0/8"  # Trust Docker internal network
      BANGUI_SESSION_COOKIE_SECURE: "false"  # nginx terminates TLS
    networks:
      - bangui-net

networks:
  bangui-net:
    driver: bridge

Example nginx Configuration:

upstream bangui_backend {
  server backend:8000;
}

server {
  listen 80;
  server_name bangui.example.com;

  location /api/ {
    proxy_pass http://bangui_backend;
    
    # Forward the original client IP
    proxy_set_header X-Forwarded-For $remote_addr;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-Proto $scheme;
    
    # Required for FastAPI to recognize the original protocol
    proxy_set_header Host $host;
  }
}

Important Security Notes:

  • Only trust IPs you control. Never include untrusted networks or 0.0.0.0/0. An attacker with network access to a trusted IP can forge X-Forwarded-For headers and bypass rate limits.
  • Validate proxy IPs carefully. Use CIDR ranges that match your infrastructure (e.g., 10.0.0.0/8 for Docker, 172.31.0.0/16 for specific Docker networks).
  • Use HTTPS in production. Ensure your nginx terminates TLS (uses HTTPS) and passes X-Forwarded-Proto: https so FastAPI's Secure cookie flag works correctly.
  • Beware of Header Spoofing. X-Forwarded-For can contain multiple IPs (client, proxy1, proxy2). The leftmost IP is used as the original client. If an untrusted proxy is between the client and your BanGUI instance, attackers can still spoof headers. Always filter at the network level — only allow traffic from trusted proxies.

IP Geolocation Resolution

BanGUI resolves IP addresses to country codes and network organization information for ban analytics and geomapping. The geolocation system implements a primary + fallback resolution strategy to balance security and availability:

  1. Primary Resolver (MaxMind GeoLite2-Country): All IP lookups first attempt resolution using a local MaxMind GeoLite2-Country MMDB database file (if available). The MMDB is downloaded offline and mounted into the container — no IP data is sent over the network.
  2. Fallback Resolver (ip-api.com HTTP): If the MMDB is unavailable or returns no result, the system can fall back to the ip-api.com HTTP API. This fallback must be explicitly enabled and only sends unresolved IPs over HTTP. HTTP is disabled by default for security (to avoid sending IP addresses in cleartext).

Download & Configure MaxMind GeoLite2:

The MaxMind GeoLite2-Country MMDB requires a free account and license key. To set up the database:

  1. Create a free MaxMind account at https://www.maxmind.com/en/geolite2/signup and download your license key.
  2. Download the GeoLite2-Country MMDB using the provided script or manually from the MaxMind downloads page.
  3. Mount the MMDB into the BanGUI container at a known path (e.g., /data/GeoLite2-Country.mmdb).
  4. Set BANGUI_GEOIP_DB_PATH to the mounted path in your environment.

Example Docker Compose configuration:

services:
  bangui:
    volumes:
      - ./GeoLite2-Country.mmdb:/data/GeoLite2-Country.mmdb:ro
    environment:
      BANGUI_GEOIP_DB_PATH: /data/GeoLite2-Country.mmdb

Fallback to HTTP (Not Recommended):

If the MMDB cannot be mounted (e.g., in restricted environments), you can enable the HTTP fallback:

services:
  bangui:
    environment:
      BANGUI_GEOIP_ALLOW_HTTP_FALLBACK: "true"

⚠️ Security Warning: Enabling HTTP fallback causes unresolved IP addresses to be sent unencrypted to ip-api.com. This is a privacy and GDPR/CCPA concern. Only enable this if the MMDB absolutely cannot be provisioned, and understand the implications.

Data Structure:

The GeoInfo returned by the resolution system includes:

  • country_code (str | None): ISO 3166-1 alpha-2 country code (e.g., "US", "DE").
  • country_name (str | None): Human-readable country name (e.g., "United States").
  • asn (str | None): Autonomous System Number (e.g., "AS3320"). Only populated when using the HTTP API; local MMDB lookups return None.
  • org (str | None): Organization name associated with the ASN. Only populated when using the HTTP API; local MMDB lookups return None.

Environment Variables:

BANGUI_GEOIP_DB_PATH=/data/GeoLite2-Country.mmdb           # Path to MaxMind MMDB (primary)
BANGUI_GEOIP_ALLOW_HTTP_FALLBACK="false"                   # Default: false (MMDB-only)
BANGUI_GEOIP_ALLOW_HTTP_FALLBACK="true"                    # Enable HTTP fallback (not recommended)

Caching & Performance:

  • Resolved IPs are cached in-memory and persisted to SQLite for fast subsequent lookups.
  • Failed lookups are cached for 5 minutes to avoid hammering external APIs.
  • The background geo_cache_flush task (runs every 60 seconds) persists newly resolved entries to the database.
  • The background geo_re_resolve task (configurable schedule) periodically re-resolves stale entries to keep data fresh.
  • The background geo_cache_cleanup task (runs nightly) removes entries not referenced in the configured retention period (default: 90 days) to prevent unbounded database growth and maintain query performance.

Retention & Cleanup:

The geo_cache table tracks the last time each IP was referenced via a last_seen timestamp. Over time, as unique IPs accumulate, the table can grow very large, degrading query performance on every geo lookup. To manage this:

  • The geo_cache_cleanup background task runs once per day (default: midnight UTC).
  • It removes all entries where last_seen is older than the configured retention period (default: 90 days).
  • If a purged IP is encountered again after cleanup, it will be re-resolved from the MaxMind database or ip-api.com (if configured).
  • The retention period is controlled by the constant GEO_CACHE_RETENTION_DAYS in backend/app/tasks/geo_cache_cleanup.py.

API Documentation Configuration

The enable_docs setting controls whether FastAPI serves interactive API documentation at /api/docs (Swagger UI) and /api/redoc (ReDoc).

Default: false — API documentation is disabled by default to prevent information disclosure in production.

When to Enable:

  • Set BANGUI_ENABLE_DOCS=true in development and debugging environments only.
  • Never enable in production. Exposed API documentation reveals all endpoints, request/response schemas, and allows direct API invocation from the browser.

Environment Variables:

BANGUI_ENABLE_DOCS="true"   # Enable docs in development
BANGUI_ENABLE_DOCS="false"  # Disable docs (default)
# Unset                     # Defaults to false (production)

Debug Compose File: The Docker/compose.debug.yml sets BANGUI_ENABLE_DOCS: "true" for local development. Production compose files (Docker/compose.prod.yml) leave this unset, defaulting to false.

Middleware Allowlist: The SetupRedirectMiddleware in main.py includes /api/docs, /api/redoc, and /api/openapi.json in its _ALWAYS_ALLOWED paths so documentation can be accessed before setup completes (if enabled).

Log Path Validation & Allowlisting

Authenticated users can instruct fail2ban to monitor additional log files through the API endpoint POST /api/config/jails/{name}/logpath. To prevent path-traversal attacks and unauthorized reads of sensitive system files, all requested log paths must resolve to locations within a configurable allowlist of safe directories.

Allowed Directories:

  • Configured via the BANGUI_ALLOWED_LOG_DIRS environment variable (comma-separated list).
  • Defaults to: ["/var/log", "/config/log"].

Path Validation Rules:

  1. The requested path is resolved to its canonical form using Path(log_path).resolve(), which:
    • Expands relative paths to absolute paths.
    • Resolves symbolic links to their real targets.
    • Normalizes . and .. components.
  2. The resolved path is checked using Path.is_relative_to() against each allowed directory prefix.
  3. If the resolved path is not relative to any allowed directory, a ValueError is raised with a descriptive error message.

Implementation:

  • Validation occurs in the Pydantic model AddLogPathRequest using a @field_validator.
  • The validator runs at request time, before the service layer is invoked.
  • Symlinks that escape allowed directories are rejected (see symlink bypass tests).

Important: Use is_relative_to(), not startswith() or string prefix matching. The latter is bypassable with paths like /var/log_evil/file.log.

Environment Variables:

BANGUI_ALLOWED_LOG_DIRS="/var/log,/config/log"                    # Default
BANGUI_ALLOWED_LOG_DIRS="/var/log,/config/log,/home/app/logs"     # Custom directory

Log Target Validation (fail2ban)

The log_target field on the global config endpoint (PUT /api/config/global) is critical for security because fail2ban runs as root. Users can only set log targets to:

  1. Special values: STDOUT, STDERR, SYSLOG (case-insensitive)
  2. File paths: Must resolve to one of the configured allowed directories (same allowlist as log paths)

Why This Matters:

  • fail2ban creates/opens files with root privileges. Without validation, an attacker could write to arbitrary system paths (e.g., /etc/cron.d/malicious_script).
  • Validation occurs at both the Pydantic model layer (GlobalConfigUpdate.validate_log_target()) and the service layer (update_global_config()) for defense in depth.
  • This prevents both HTTP and non-HTTP attack vectors.

Implementation:

# Model layer: Automatic validation via @field_validator
update = GlobalConfigUpdate(log_target="/etc/passwd")  # Raises ValidationError → HTTP 422

# Service layer: Defense in depth
await config_service.update_global_config(socket_path, update)  # Validates again before sending to fail2ban

Login Rate Limiting

The login endpoint (POST /api/auth/login) is protected against brute-force attacks using an in-memory exponential backoff rate limiter.

Design:

  • Uses a dict[str, deque[float]] keyed by client IP, storing failed login timestamps within a time window.
  • Old failures outside the time window are automatically pruned during validation checks.
  • Expired IP entries are cleaned up to prevent unbounded memory growth.

Rate Limit Rules:

  • Exponential backoff: Each failed login attempt incurs a progressively longer delay before the next attempt is allowed:
    • 1st failure: 1 × 2¹ = 2 seconds
    • 2nd failure: 1 × 2² = 4 seconds
    • 3rd failure: 1 × 2³ = 8 seconds
    • 4th+ failures: capped at 10 seconds (max)
  • Failed attempts that arrive during the backoff period return HTTP 429 Too Many Requests with a Retry-After header indicating the remaining wait time.
  • Each failed login is also accompanied by bcrypt password hashing (~100ms), providing additional computational resistance.
  • The backoff counter resets after the rate-limit window (60 seconds by default) expires with no new failures.

IP Extraction (Proxy Safety):

  • When behind nginx, the rate limiter reads the real client IP from X-Forwarded-For or X-Real-IP headers.
  • Only trusts these headers when the immediate connection source is in a configured trusted proxy list.
  • Prevents attackers from spoofing these headers to bypass rate limits.
  • Falls back to the direct connection IP when proxy headers cannot be trusted.

Process-Local Limitation:

  • The rate limiter is process-local (in-memory). In multi-worker deployments (e.g., Gunicorn with 4 workers), each worker maintains its own rate limit counter.
  • This is acceptable because the single-worker constraint is enforced elsewhere. See TASK-002/003 notes for details.

Implementation:

  • Rate limiter: app.utils.rate_limiter.RateLimiter
  • IP extraction: app.utils.client_ip.get_client_ip()
  • Dependency: LoginRateLimiterDep in app.dependencies

Global Rate Limiting

In addition to login-specific rate limiting, all API endpoints are protected by global per-IP rate limiting to prevent resource exhaustion, CPU spikes, and network bandwidth attacks from malicious or misconfigured clients.

Design:

  • Uses a dict[str, deque[float]] keyed by client IP, storing request timestamps within a time window.
  • Implements a sliding-window algorithm: when an IP exceeds the limit, subsequent requests are blocked until the oldest request timestamp in the window expires.
  • Applied globally via middleware that runs on every request.
  • Respects the same IP extraction logic (trusted proxies) as login rate limiting.

Rate Limit Rules:

  • Default limit: 200 requests per 60 seconds per IP.
  • Blocked requests return HTTP 429 Too Many Requests with a Retry-After header indicating the estimated seconds until the IP can retry.
  • The Retry-After value is dynamically calculated based on when the oldest request in the window will expire.
  • Different endpoints can be configured with different limits by adjusting the global rate limiter settings or using per-endpoint decorators (future enhancement).

IP Extraction (Proxy Safety):

  • Same as login rate limiting: reads real client IP from X-Forwarded-For or X-Real-IP headers when the immediate connection is from a trusted proxy.
  • Falls back to direct connection IP when headers cannot be trusted.

Process-Local Limitation:

  • The global rate limiter is process-local (in-memory), like the login rate limiter.
  • In single-worker deployments (enforced elsewhere), this is not a constraint.
  • Each worker in a multi-worker setup maintains independent counters, which is acceptable under the single-worker enforcement model.

Memory Management:

  • Old request timestamps outside the rate-limit window are automatically pruned during validation checks.
  • A scheduled background task (rate_limiter_cleanup in app.tasks.rate_limiter_cleanup) runs every 30 minutes to remove dormant IPs from memory, preventing unbounded growth.

Implementation:

  • Rate limiter: app.utils.rate_limiter.GlobalRateLimiter
  • Middleware: app.middleware.rate_limit.RateLimitMiddleware
  • IP extraction: app.utils.client_ip.get_client_ip()
  • Cleanup task: app.tasks.rate_limiter_cleanup (registered in app.startup)
  • Initialized in: app.main.create_app() and the lifespan handler

12. Authentication Endpoints

The primary authentication flow for the frontend is cookie-based and protects the session token from JavaScript access:

  1. Login (POST /api/auth/login)

    • Accepts LoginRequest (password field)
    • Returns LoginResponse containing only expires_at (ISO 8601 UTC timestamp)
    • Crucially: The session token is not included in the JSON response body
    • Instead, the token is set as an HttpOnly SameSite=Lax cookie named bangui_session
    • Frontend automatically includes this cookie in all requests via credentials: "include"
  2. Why not return token in response body?

    • Third-party JavaScript (analytics, ads, XSS injections) can intercept fetch() response bodies
    • If the token were in the response, malicious code could extract and store it in localStorage
    • An attacker could then use it via the Authorization: Bearer <token> header, bypassing the HttpOnly cookie protection
    • By returning only the expiry timestamp, we ensure the token stays exclusively in the HttpOnly cookie
  3. Session Validation (GET /api/auth/session)

    • Frontend calls this on app mount to verify the session is still valid on the server
    • Works with both cookie and Bearer token authentication
    • Returns {"valid": true} if the session exists and is not expired
    • Returns 401 Unauthorized if the session is invalid or expired
  4. Logout (POST /api/auth/logout)

    • Revokes the session in the database
    • Clears the bangui_session cookie via Set-Cookie header
    • Works with both cookie and Bearer token authentication
    • Idempotent — calling without a session returns 200 without error

Programmatic API Clients (Bearer Token)

For non-browser clients (CLI tools, batch scripts, automation) that cannot use cookies, use the Bearer token authentication path by sending:

Authorization: Bearer <token>

The token can be obtained by parsing the cookie from a login response or, in a future implementation, via a dedicated POST /api/auth/token endpoint (currently, these clients extract the token from cookies or use Bearer directly from the signed token value).

Note: Bearer token authentication is not recommended for browser-based clients because:

  • Tokens must be stored somewhere (localStorage, sessionStorage, or request body)
  • All storage mechanisms are accessible to JavaScript and thus vulnerable to XSS
  • HttpOnly cookies provide better protection

13. Password Hashing

The master password is hashed using bcrypt with an auto-generated salt. All password validation uses the models in app.models.auth and app.models.setup.

The 72-Byte Bcrypt Limitation

Important: bcrypt silently truncates all input at 72 bytes before hashing. This means:

  • A user who sets a 100-character password is actually authenticated by only the first 72 bytes
  • Extra characters beyond 72 bytes provide zero additional security
  • An attacker who has reduced their search space to 72 bytes can brute-force the password more efficiently than intended

Solution: Both password fields enforce a maximum length of 72 bytes:

  • LoginRequest.password — max 72 characters (enforced via Pydantic Field(max_length=72))
  • SetupRequest.master_password — max 72 characters (enforced via Pydantic Field(max_length=72))

Validation flow:

  1. Frontend → hashes password with SHA256 using SubtleCrypto before transmission
  2. Backend receives SHA256 hash, validates length (≤ 72 bytes)
  3. Backend → hashes with bcrypt using run_blocking(bcrypt.hashpw) to avoid event loop stall
  4. Hash stored in SQLite settings table

If a password exceeds 72 bytes:

  • Pydantic raises ValidationError with error code string_too_long
  • The router returns HTTP 422 Unprocessable Entity
  • The frontend should inform the user to choose a shorter password

Implementation:

  • Models: app.models.auth.LoginRequest, app.models.setup.SetupRequest
  • Service layer: app.services.auth_service._check_password(), app.services.setup_service.run_setup()

15. File I/O Conventions

All file write operations to critical configuration files must be atomic to prevent corruption if the process is killed mid-write.

Atomic File Writes

Configuration files (e.g., fail2ban jail configs in jail.d/) are essential for system operation. A truncated or corrupt config file can break fail2ban's ability to reload and may disable active protection.

Rule: Always use write-to-temp + atomic rename

Never use Path.write_text() or file.write() directly for critical files. Instead:

  1. Create a temporary file in the same directory as the target (crucial for atomic os.replace()).
  2. Write content to the temp file.
  3. Atomically rename the temp file to replace the target.
  4. Clean up the temp file if an error occurs.

Implementation Pattern:

import os
import tempfile
from pathlib import Path

target = Path("/path/to/config/file.conf")

tmp_name: str | None = None
try:
    # Create temp file in target's directory (same filesystem = atomic)
    with tempfile.NamedTemporaryFile(
        mode="w",
        encoding="utf-8",
        dir=target.parent,
        delete=False,
        suffix=".tmp",
    ) as tmp:
        tmp.write(content)
        tmp_name = tmp.name
    # Atomic rename (single syscall on POSIX systems)
    os.replace(tmp_name, target)
except OSError as exc:
    # Clean up temp file on error
    with contextlib.suppress(OSError):
        if tmp_name is not None:
            os.unlink(tmp_name)
    raise ConfigWriteError(f"Cannot write config: {exc}") from exc

Why this matters:

  • Path.write_text() overwrites in place. If the process dies mid-write, the file is left truncated or partially written.
  • os.replace() is atomic on POSIX systems (single rename syscall) only if source and target are on the same filesystem.
  • Creating the temp file in target.parent ensures atomicity.
  • On Linux containers, this prevents config corruption and service degradation.

Atomic write helper:

A shared atomic_write(path: Path, content: str) helper is available in app/services/config_file_helpers.py. This is the preferred way to perform atomic writes — it handles all the temp file and cleanup logic:

from app.services.config_file_helpers import atomic_write

atomic_write(path, updated_content)  # Atomic write, auto-cleanup on error

Files requiring atomic writes:

  • All config files under jail.d/ (created/modified by _write_conf_file, _create_conf_file, set_jail_config_enabled, and write_jail_config_file)
  • Any critical state files that fail2ban relies on

Examples in the codebase:

  • app/services/config_file_helpers.py: _write_conf_file, _create_conf_file, atomic_write
  • app/services/raw_config_io_service.py: set_jail_config_enabled, write_jail_config_file
  • app/services/jail_config_service.py: _write_local_file_sync, _restore_local_file_sync

16. Git & Workflow

  • Branch naming: feature/<short-description>, fix/<short-description>, chore/<short-description>.
  • Commit messages: imperative tense, max 72 chars first line (Add jail reload endpoint, Fix ban history query).
  • Every merge request must pass: ruff, type checker, all tests.
  • Do not merge with failing CI.
  • Keep pull requests small and focused — one feature or fix per PR.

17. Coding Principles

These principles are non-negotiable. Every backend contributor must internalise and apply them daily.

17.1 Clean Code

  • Write code that reads like well-written prose — a new developer should understand intent without asking.
  • Meaningful names — variables, functions, and classes must reveal their purpose. Avoid abbreviations (cnt, mgr, tmp) unless universally understood.
  • Small functions — each function does exactly one thing. If you need a comment to explain a block inside a function, extract it into its own function.
  • No magic numbers or strings — use named constants.
  • Boy Scout Rule — leave every file cleaner than you found it.
  • Avoid deep nesting — prefer early returns (guard clauses) to keep the happy path at the top indentation level.
# Good — guard clause, clear name, one job
async def get_active_ban(ip: str, jail: str) -> Ban:
    ban: Ban | None = await repo.find_ban(ip=ip, jail=jail)
    if ban is None:
        raise BanNotFoundError(ip=ip, jail=jail)
    if ban.is_expired():
        raise BanExpiredError(ip=ip, jail=jail)
    return ban

# Bad — nested, vague name
async def check(ip, j):
    b = await repo.find_ban(ip=ip, jail=j)
    if b:
        if not b.is_expired():
            return b
        else:
            raise Exception("expired")
    else:
        raise Exception("not found")

17.2 Separation of Concerns (SoC)

  • Each module, class, and function must have a single, well-defined responsibility.
  • Routers → HTTP layer only (parse requests, return responses).
  • Services → business logic and orchestration.
  • Repositories → data access and persistence.
  • Models → data shapes and validation.
  • Tasks → scheduled background jobs.
  • Never mix layers — a router must not execute SQL, and a repository must not raise HTTPException.

17.3 Single Responsibility Principle (SRP)

  • A class or module should have one and only one reason to change.
  • If a service handles both ban management and email notifications, split it into BanService and NotificationService.

17.4 Don't Repeat Yourself (DRY)

  • Extract shared logic into utility functions, base classes, or dependency providers.
  • If the same block of code appears in more than one place, refactor it into a single source of truth.
  • But don't over-abstract — premature DRY that couples unrelated features is worse than a little duplication (see Rule of Three: refactor when something appears a third time).

17.5 KISS — Keep It Simple, Stupid

  • Choose the simplest solution that works correctly.
  • Avoid clever tricks, premature optimisation, and over-engineering.
  • If a standard library function does the job, prefer it over a custom implementation.

17.6 YAGNI — You Aren't Gonna Need It

  • Do not build features, abstractions, or config options "just in case".
  • Implement what is required now. Extend later when a real need emerges.

17.7 Dependency Inversion Principle (DIP)

  • High-level modules (services) must not depend on low-level modules (repositories) directly. Both should depend on abstractions (protocols / interfaces).
  • Use FastAPI's Depends() to inject implementations — this makes swapping and testing trivial.
from typing import Protocol

class BanRepository(Protocol):
    async def find_ban(self, ip: str, jail: str) -> Ban | None: ...
    async def save_ban(self, ban: Ban) -> None: ...

class SqliteBanRepository:
    """Concrete implementation — depends on aiosqlite."""
    async def find_ban(self, ip: str, jail: str) -> Ban | None: ...
    async def save_ban(self, ban: Ban) -> None: ...

13.7.1 Repository Module Pattern — Module-as-Protocol Structural Compatibility

BanGUI uses module-level functions for repository implementations, not classes. Each repository module (e.g., session_repo.py, blocklist_repo.py) exports async functions that match the signatures defined in the Protocol interface in protocols.py. This is a structural typing pattern — mypy accepts the module as a valid Protocol implementation because the function signatures match, even though the module is not explicitly annotated as implementing the Protocol.

This approach works correctly with FastAPI's dependency injection via cast():

# In app/repositories/session_repo.py
async def create_session(db: aiosqlite.Connection, token: str, created_at: str, expires_at: str) -> Session:
    """Insert a new session row."""
    ...

# In app/repositories/protocols.py
class SessionRepository(Protocol):
    async def create_session(
        self,
        db: aiosqlite.Connection,
        token: str,
        created_at: str,
        expires_at: str,
    ) -> Session:
        ...

# In app/dependencies.py
async def get_session_repo() -> SessionRepository:
    """Provide the concrete session repository implementation."""
    from app.repositories import session_repo
    return session_repo  # ← mypy accepts this because the module has matching functions

Why this pattern is used:

  • Simplicity — no boilerplate class/instance wrapping.
  • Compatibility — Python's structural typing (PEP 544) means the module automatically satisfies the Protocol interface if function signatures match.
  • Testability — the same DIP principle applies; services depend on the Protocol, not the module directly, so tests can mock the Protocol.

Risks and mitigations:

  • Silent breakage if function signatures change — If a parameter is added or removed from a module function, the module no longer satisfies the Protocol, but mypy does not flag this as an error because the module is loosely coupled. To prevent this, Protocol signatures in protocols.py are the source of truth. Always check that module functions match the Protocol definitions before merging changes. The CI/CD pipeline validates this compatibility at build time.

How the validation works (CI check):

  • Before each deployment, run mypy --strict to ensure all dependency providers return values compatible with their Protocol types.
  • The cast() calls in dependencies.py are a documented signal that structural compatibility is being verified externally, not via explicit class inheritance.
  • Automated tests in backend/tests/test_repositories/test_protocol_compliance.py verify that each repository module implements all protocol methods, preventing silent protocol drift.

13.7.1.1 Repository Protocol Coverage Checklist

All public repository functions must be defined in a corresponding Protocol. To add a new repository:

  1. Create the repository modulebackend/app/repositories/my_repo.py with async functions.
  2. Define the Protocol — Add a MyRepository(Protocol) class in backend/app/repositories/protocols.py with methods matching every public function signature.
  3. Add imports — If the Protocol uses custom return types, import them in protocols.py.
  4. Run compliance tests — Execute pytest backend/tests/test_repositories/test_protocol_compliance.py to verify coverage.
  5. Verify type safety — Run mypy --strict backend/app/repositories/protocols.py to ensure all types are correct.

Current repository protocol coverage (all 7 repositories fully covered):

  • SessionRepository — 4 methods
  • SettingsRepository — 4 methods
  • BlocklistRepository — 6 methods
  • ImportLogRepository — 4 methods
  • GeoCacheRepository — 13 methods
  • HistoryArchiveRepository — 5 methods
  • Fail2BanDbRepository — 8 methods

13.7.2 Session Token Hashing — One-Way Protection Against Database Exposure

Session tokens must be protected against database exposure. Session tokens are stored as one-way SHA256 hashes in the database to ensure that if the database file is compromised (volume mount misconfiguration, backup leak, etc.), the session tokens themselves cannot be directly used to hijack sessions.

Implementation pattern:

import hashlib
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    import aiosqlite

from app.models.auth import Session

def _hash_token(token: str) -> str:
    """Return the SHA256 hash of a session token."""
    return hashlib.sha256(token.encode()).hexdigest()

async def create_session(
    db: "aiosqlite.Connection",
    token: str,
    created_at: str,
    expires_at: str,
) -> Session:
    """Insert a new session row with the token hash."""
    token_hash = _hash_token(token)
    cursor = await db.execute(
        "INSERT INTO sessions (token_hash, created_at, expires_at) VALUES (?, ?, ?)",
        (token_hash, created_at, expires_at),
    )
    await db.commit()
    # Return the Session with the ORIGINAL token (not the hash)
    # so the service layer can sign and return it to the client.
    return Session(
        id=int(cursor.lastrowid) if cursor.lastrowid else 0,
        token=token,  # ← raw token, not the hash
        created_at=created_at,
        expires_at=expires_at,
    )

async def get_session(
    db: "aiosqlite.Connection",
    token: str
) -> Session | None:
    """Look up a session by token hash."""
    token_hash = _hash_token(token)
    async with db.execute(
        "SELECT id, token_hash, created_at, expires_at FROM sessions WHERE token_hash = ?",
        (token_hash,),
    ) as cursor:
        row = await cursor.fetchone()
    
    if row is None:
        return None
    
    # Return the Session with the INCOMING token (the one the client sent).
    return Session(
        id=int(row[0]),
        token=token,  # ← the raw token passed in
        created_at=str(row[2]),
        expires_at=str(row[3]),
    )

Key points:

  1. Hash on write — When inserting a session, hash the token before storage.
  2. Hash on read — When validating a session, hash the incoming token before the database lookup.
  3. Never store raw tokens — The token_hash column contains only hashes; raw tokens are never persisted.
  4. Return raw tokens to the service layer — The Session model's token field contains the raw token (for signing and response), not the hash.
  5. Database schema — Use token_hash TEXT NOT NULL UNIQUE instead of token TEXT NOT NULL UNIQUE, and create an index on token_hash.
  6. Migration strategy — When upgrading from plaintext to hashed tokens, drop the old table and recreate it. This invalidates all existing sessions, which is acceptable because the database was exposed in plaintext.

Why one-way hashing is safe:

  • If an attacker obtains a token hash from the database, they cannot reverse the SHA256 hash to recover the original token.
  • The attacker cannot use the hash directly in a client request — they would need the original token to pass the hash check.
  • This forces the attacker to either compromise the client (where they'd also get the raw token) or perform a brute-force attack against the hash space (infeasible for random 128-bit tokens).

Never use symmetric encryption — symmetric encryption stores a key in the database or environment, which merely shifts the exposure risk. A one-way hash is the correct choice for protecting tokens.

13.7.2a Session Token Signing Format — HMAC-SHA256 Integrity Protection

All session tokens sent to clients are signed using HMAC-SHA256. The signed token format is:

<raw_token>.<signature>

where:

  • <raw_token> is a 16-byte (128-bit) random hex string generated by secrets.token_hex(16).
  • . is the separator (defined in app.utils.constants.SESSION_TOKEN_SIGNATURE_SEPARATOR).
  • <signature> is the HMAC-SHA256 hex digest of <raw_token> using the configured session_secret.

Example: a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6.f7e6d5c4b3a2918f7e6d5c4b3a29180

Signing and verification pattern:

import hashlib
import hmac

def _session_token_signature(token: str, secret: str) -> str:
    """Return the HMAC-SHA256 signature for a session token."""
    return hmac.new(secret.encode(), token.encode(), hashlib.sha256).hexdigest()

def sign_session_token(token: str, secret: str) -> str:
    """Return a signed session token string for the client."""
    return f"{token}.{_session_token_signature(token, secret)}"

def unwrap_session_token(token: str, secret: str) -> str:
    """Verify and return the raw token from a signed session token.
    
    Raises ValueError if the token lacks a signature or signature is invalid.
    """
    if "." not in token:
        raise ValueError("Invalid session token.")
    
    raw_token, signature = token.rsplit(".", 1)
    expected_signature = _session_token_signature(raw_token, secret)
    if not hmac.compare_digest(expected_signature, signature):
        raise ValueError("Invalid session token.")
    return raw_token

Key points:

  1. All tokens must be signed — Tokens without a signature (no separator) are rejected immediately.
  2. Signature is mandatory — The unwrap_session_token() function raises ValueError if the separator is absent.
  3. Use HMAC-SHA256 — Always use hmac.compare_digest() for signature verification to prevent timing attacks.
  4. Sign on loginlogin() creates a raw token, stores it (hashed) in the database, then returns the signed token to the client.
  5. Verify on every request — The validate_session() service verifies the signature by calling unwrap_session_token() with the session_secret, then looks up the raw token in the database.
  6. Session invalidation — When upgrading from plaintext to signed tokens (TASK-022), all existing sessions must be invalidated because raw tokens will no longer be stored unencrypted.

Why HMAC signing is necessary:

  • Prevents token forgery — An attacker cannot create a valid token without knowing the session_secret.
  • Works alongside hashed storage — Even if the database is compromised (plaintext before hashing), the attacker gets only the raw token, not a signed token. A raw token without a valid signature is rejected by unwrap_session_token().
  • Timing attack resistancehmac.compare_digest() compares signatures in constant time, preventing attackers from using timing differences to guess valid signatures.

13.7.3 Session Cache Pluggability — Process-Local vs. Shared Backends

Session validation is expensive (SQLite lookup + password verification). To improve performance, validated session tokens are cached using the SessionCache interface (app.utils.session_cache). The default implementation, InMemorySessionCache, stores cached sessions in process-local memory.

Current implementation (single-worker):

from app.utils.session_cache import SessionCache, InMemorySessionCache, NoOpSessionCache

class SessionCache(Protocol):
    """Interface for session token validation cache backends."""
    def get(self, token: str) -> Session | None: ...
    def set(self, token: str, session: Session, ttl_seconds: float) -> None: ...
    def invalidate(self, token: str) -> None: ...
    def clear(self) -> None: ...

# Default in-memory implementation — PROCESS-LOCAL
class InMemorySessionCache:
    def __init__(self) -> None:
        self._entries: dict[str, tuple[Session, float]] = {}

Single-worker constraint:

InMemorySessionCache is process-local — each worker process has its own dict. In single-worker mode (enforced by TASK-002), this is safe and improves performance. In multi-worker deployments:

  • A logout by worker A clears the session from A's cache, but worker B still has it → logout doesn't work.
  • Enabling/disabling the cache requires restarting all workers to take effect.

Multi-worker solution:

To support multiple workers (future enhancement), implement a shared backend behind the same SessionCache Protocol:

# Example Redis implementation (not yet in codebase)
class RedisSessionCache:
    """Session cache backed by Redis."""
    def __init__(self, redis_url: str) -> None:
        self.client = aioredis.from_url(redis_url)
    
    async def get(self, token: str) -> Session | None:
        data = await self.client.get(f"session:{token}")
        return Session.model_validate_json(data) if data else None
    
    async def set(self, token: str, session: Session, ttl_seconds: float) -> None:
        await self.client.setex(
            f"session:{token}",
            int(ttl_seconds),
            session.model_dump_json()
        )
    
    async def invalidate(self, token: str) -> None:
        await self.client.delete(f"session:{token}")
    
    async def clear(self) -> None:
        await self.client.flushdb()

To adopt a Redis backend:

  1. Create RedisSessionCache in app.utils.session_cache.
  2. Update app.utils.runtime_state.set_runtime_settings() to instantiate RedisSessionCache when REDIS_URL env var is set.
  3. Update app.config.Settings to accept optional REDIS_URL.
  4. Tests continue to use InMemorySessionCache (no Redis dependency in dev).

Implementation rules:

  • All cache methods must be async (even if the backend is sync).
  • Never log session tokens or session data.
  • TTL must be respected — expired entries must be removed on access.
  • See app/utils/session_cache.py for the full Protocol definition and current implementations.

17.8 Composition over Inheritance

  • Favour composing small, focused objects over deep inheritance hierarchies.
  • Use mixins or protocols only when a clear "is-a" relationship exists; otherwise, pass collaborators as constructor arguments.

17.9 Fail Fast

  • Validate inputs as early as possible — at the API boundary with Pydantic, at service entry with assertions or domain checks.
  • Raise specific exceptions immediately rather than letting bad data propagate silently.

17.10 Law of Demeter (Principle of Least Knowledge)

  • A function should only call methods on:
    1. Its own object (self).
    2. Objects passed as parameters.
    3. Objects it creates.
  • Avoid long accessor chains like request.state.db.cursor().execute(...) — wrap them in a meaningful method.

17.11 Defensive Programming

  • Never trust external input — validate and sanitise everything that crosses a boundary (HTTP request, file, socket, environment variable).
  • Handle edge cases explicitly: empty lists, None values, negative numbers, empty strings.
  • Use type narrowing and exhaustive pattern matching (match / case) to eliminate impossible states.

17.12 SSRF Prevention (Server-Side Request Forgery)

When user-supplied URLs are fetched by the backend, validate them before making any HTTP requests:

  1. Use Pydantic's AnyHttpUrl type to restrict schemes to http:// and https:// only.

    • Rejects file://, ftp://, gopher://, and other non-http schemes at the model boundary.
  2. Validate resolved IP addresses before fetching:

    • Parse the hostname and resolve it via DNS (using socket.getaddrinfo()).
    • Use ipaddress.ip_address().is_private to reject private/reserved ranges:
      • RFC 1918: 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16
      • Loopback: 127.0.0.0/8, ::1/128
      • Link-local: 169.254.0.0/16, fe80::/10
      • IPv6 site-local, multicast, and reserved ranges.
    • Raise ValueError if validation fails; let the router convert it to HTTP 400.
  3. Guard against DNS rebinding:

    • Validate DNS at URL creation/validation time (performed during request deserialization).
    • For additional safety, re-validate the connection IP at HTTP client time (e.g., custom aiohttp.TCPConnector can inspect the resolved address during connect).
  4. Example implementation (see backend/app/utils/ip_utils.py):

    • is_private_ip(ip_str: str) → bool: Checks if IP is private/reserved/loopback/link-local.
    • async validate_blocklist_url(url: AnyHttpUrl) → None: Async DNS resolution + private IP check.
    • Service layer calls await validate_blocklist_url(url) before persisting; router catches ValueError and returns 400.

18. Quick Reference — Do / Don't

Do Don't
Type every function, variable, return Leave types implicit
Use async def for I/O Use sync functions for I/O
Validate with Pydantic at the boundary Pass raw dicts through the codebase
Log with structlog + context keys (INFO/WARNING/ERROR/DEBUG) Use print() or format strings in logs
Use log.exception() in catch-all handlers (captures traceback) Use log.error() for exceptions; let exceptions get lost
Write tests for every feature Ship untested code
Use aiohttp for HTTP calls Use requests
Handle errors with custom exceptions Use bare except:
Keep routers thin, logic in services Put business logic in routers
Use datetime.now(datetime.UTC) Use naive datetimes
Run ruff + mypy before committing Push code that doesn't pass linting
Keep GET endpoints read-only (no db.commit()) Call db.commit() / INSERT inside GET handlers
Batch DB writes; issue one db.commit() after the loop Commit inside a loop (1 fsync per row)
Use executemany() for bulk inserts Call execute() + commit() per row in a loop