Files
BanGUI/Docs/Backend-Development.md

103 KiB
Raw Blame History

Backend Development — Rules & Guidelines

Rules and conventions every backend developer must follow. Read this before writing your first line of code.


1. Language & Typing

  • Python 3.12+ is the minimum version.
  • Every function, method, and variable must have explicit type annotations — no exceptions.
  • Use str, int, float, bool, None for primitives.
  • Use list[T], dict[K, V], set[T], tuple[T, ...] (lowercase, built-in generics) — never typing.List, typing.Dict, etc.
  • Use T | None instead of Optional[T].
  • Use TypeAlias, TypeVar, Protocol, and NewType when they improve clarity.
  • Return types are mandatory — including -> None.
  • Never use Any unless there is no other option and a comment explains why.
  • Run mypy --strict (or pyright in strict mode) — the codebase must pass with zero errors.
# Good
def get_jail_by_name(name: str) -> Jail | None:
    ...

# Bad — missing types
def get_jail_by_name(name):
    ...

2. Core Libraries

Purpose Library Notes
Web framework FastAPI Async endpoints only.
Data validation & settings Pydantic v2 All request/response bodies and config models.
Async HTTP client aiohttp (ClientSession) For external calls (blocklists, IP lookups).
Scheduling APScheduler 4.x (async) Blocklist imports, periodic health checks.
Structured logging structlog Every log call must use structlog — never print() or logging directly.
Database aiosqlite Async SQLite access for the application database.
Testing pytest + pytest-asyncio + httpx (AsyncClient) Every feature needs tests.
Mocking unittest.mock / pytest-mock Isolate external dependencies.
Date & time datetime (stdlib) — always timezone-aware Use datetime.datetime.now(datetime.UTC). Never naive datetimes.
IP / Network ipaddress (stdlib) Validate and normalise IPs and CIDR ranges.
Environment / config pydantic-settings Load .env and environment variables into typed models.
fail2ban integration fail2ban client (bundled) Use the local copy at ./fail2ban-master. Import from ./fail2ban-master/fail2ban/client to communicate with the fail2ban socket. Do not install fail2ban as a pip package.

fail2ban Client Usage

The repository ships with a vendored copy of fail2ban located at ./fail2ban-master. All communication with the fail2ban daemon must go through the client classes found in ./fail2ban-master/fail2ban/client. Add the project root to sys.path (or configure it in pyproject.toml as a path dependency) so that from fail2ban.client ... resolves to the bundled copy.

import sys
from pathlib import Path

# Ensure the bundled fail2ban is importable
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "fail2ban-master"))

from fail2ban.client.csocket import CSSocket  # noqa: E402

Libraries you must NOT use

  • requests — use aiohttp (async).
  • flask — we use FastAPI.
  • celery — we use APScheduler.
  • print() for logging — use structlog.
  • json.loads / json.dumps on Pydantic models — use .model_dump() / .model_validate().

Timestamp Handling

Timestamp consistency is critical for accurate ban history queries across the dashboard and history endpoints. Follow these rules:

Rule 1: Use consistent UTC timestamps

  • All timestamps in the database are stored as Unix epochs (seconds since 1970-01-01 UTC).
  • fail2ban stores timestamps using time.time(), which is always UTC epoch seconds.
  • When querying fail2ban's SQLite database by timestamp, use app.utils.time_utils.since_unix() (not manual datetime calculations).

Rule 2: Time-range windows include a 60-second slack

  • The since_unix() function includes a 60-second slack window (TIME_RANGE_SLACK_SECONDS in app.utils.constants).
  • This slack accommodates:
    • Clock drift between the local system and fail2ban.
    • Test seeding delays when timestamps are manually set to exact boundaries.
  • The slack ensures that dashboard and history queries return consistent row counts for the same time range.

Rule 3: Never duplicate timestamp calculation logic

  • All services that query by time range must import and use since_unix().
  • Do not recalculate timestamps locally using datetime or time modules in service code.
  • If you need a timestamp for a time range, use since_unix().

Example:

from app.utils.time_utils import since_unix

# Get all bans from the last 24 hours (with 60-second slack)
since_ts: int = since_unix("24h")
rows = await db.execute(
    "SELECT * FROM bans WHERE timeofban >= ?",
    (since_ts,)
)

3. Project Structure

backend/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI app factory, lifespan
│   ├── config.py            # Pydantic settings
│   ├── dependencies.py      # FastAPI dependency providers
│   ├── models/              # Pydantic schemas (request, response, domain)
│   ├── routers/             # FastAPI routers grouped by feature
│   ├── services/            # Business logic — one service per domain
│   ├── repositories/        # Database access layer
│   ├── tasks/               # APScheduler jobs
│   └── utils/               # Helpers, constants, shared types
├── tests/
│   ├── conftest.py
│   ├── test_routers/
│   ├── test_services/
│   └── test_repositories/
├── pyproject.toml
└── .env.example
  • Routers receive requests, validate input via Pydantic, and delegate to services.
  • Services contain business logic and call repositories or external clients.
  • Repositories handle raw database queries — nothing else.
  • Never put business logic inside routers or repositories.

Service Dependencies and Injection

Services should never directly import other services to avoid hidden coupling and make testing harder. Instead:

  1. Define clear service interfaces using Protocol classes in app/services/protocols.py.
  2. Make dependencies explicit by passing them as function parameters with optional defaults.
  3. Use lazy imports for fallback singletons (not at module level).
  4. Inject services via FastAPI dependencies when called from routers.

Example: The history_service depends on Fail2BanMetadataService to resolve the fail2ban database path:

# Good — dependency passed as parameter
async def list_history(
    socket_path: str,
    fail2ban_metadata_service: Fail2BanMetadataService | None = None,
) -> HistoryListResponse:
    if fail2ban_metadata_service is None:
        # Lazy import fallback for backward compatibility
        from app.services.fail2ban_metadata_service import default_fail2ban_metadata_service
        fail2ban_metadata_service = default_fail2ban_metadata_service
    ...

Routers inject the service dependency explicitly:

from app.dependencies import Fail2BanMetadataServiceDep

@router.get("/api/history")
async def get_history(
    fail2ban_metadata_service: Fail2BanMetadataServiceDep,
) -> HistoryListResponse:
    return await history_service.list_history(
        socket_path,
        fail2ban_metadata_service=fail2ban_metadata_service,
    )

This pattern prevents circular imports, makes services testable, and allows easy mocking in tests.

Mutable Runtime State

All mutable runtime state (state that changes during the application's lifetime) must be stored in RuntimeState defined in app/utils/runtime_state.py. This centralizes state management, prevents accidental global mutable variables, and makes state management testable and synchronization-safe.

Allowed locations for mutable state:

  1. RuntimeState fields — Core application state (e.g., server_status, last_activation, pending_recovery, runtime_settings). Managed through dedicated functions (e.g., record_activation(), clear_pending_recovery()).
  2. Nested service state — Service-specific mutable state (e.g., JailServiceState for jail capability detection cache) is nested within RuntimeState as a field. Services receive their state via dependency injection.
  3. Controlled via dependencies — State is injected into services and routers using FastAPI Depends(). This ensures single-source-of-truth and testability.

Example — jail_service state management:

# Define service-specific state (in app/utils/runtime_state.py)
@dataclass
class JailServiceState:
    backend_cmd_supported: bool | None = None
    backend_cmd_lock: asyncio.Lock | None = None

# Nested in RuntimeState
@dataclass
class RuntimeState:
    jail_service_state: JailServiceState = field(default_factory=JailServiceState)
    ...

# Injected into services via dependency
async def list_jails(socket_path: str, state: JailServiceState) -> JailListResponse:
    backend_cmd_is_supported = await _check_backend_cmd_supported(client, name, state)
    ...

# Routers inject state through FastAPI dependencies
@router.get("/api/jails")
async def get_jails(state: JailServiceStateDep) -> JailListResponse:
    return await jail_service.list_jails(socket_path, state)

Why: Centralizing mutable state prevents race conditions, makes concurrency boundaries explicit, simplifies testing (each test gets a fresh state object), and prepares for multi-worker deployments (shared state would need to be extracted to Redis, database, or shared memory).


4. FastAPI Conventions

  • Use async def for every endpoint — no sync endpoints.

  • Every endpoint must declare explicit response models (response_model=...).

  • Use Pydantic models for request bodies and query parameters — never raw dicts.

  • Use Depends() for dependency injection (database sessions, services, auth).

  • Group endpoints into routers by feature domain (routers/jails.py, routers/bans.py, …).

  • Use appropriate HTTP status codes: 201 for creation, 204 for deletion with no body, 404 for not found, etc.

  • Protected endpoints should return 401 Unauthorized or 403 Forbidden when the session is invalid or expired; the frontend treats these responses as a session-expiry event and redirects the user to /login.

  • Use HTTPException or custom exception handlers — never return error dicts manually.

  • All successful responses must use a standardized Pydantic response model. List and collection endpoints should wrap data in items, total, and optional pagination metadata. Detail endpoints must expose a single domain object under a named field (for example jail, status, or settings). Command endpoints must use a CommandResponse-style wrapper with message and success.

  • GET endpoints are read-only — never call db.commit() or execute INSERT/UPDATE/DELETE inside a GET handler. If a GET path produces side-effects (e.g., caching resolved data), that write belongs in a background task, a scheduled flush, or a separate POST endpoint. Users and HTTP caches assume GET is idempotent and non-mutating.

    # Good — pass db=None on GET so geo_service never commits
    result = await geo_service.lookup_batch(ips, http_session, db=None)
    
    # Bad — triggers INSERT + COMMIT per IP inside a GET handler
    result = await geo_service.lookup_batch(ips, http_session, db=app_db)
    
from fastapi import APIRouter, Depends, HTTPException, status
from app.models.jail import JailResponse, JailListResponse
from app.services.jail_service import JailService

router: APIRouter = APIRouter(prefix="/api/jails", tags=["Jails"])

@router.get("/", response_model=JailListResponse)
async def list_jails(service: JailService = Depends()) -> JailListResponse:
    jails: list[JailResponse] = await service.get_all_jails()
    return JailListResponse(jails=jails)

Dependency Layering: Enforcing the Repository Boundary

The repository boundary separates database-aware code from application logic. This is enforced through dependency injection:

Layer Responsibilities Dependencies
Routers Receive requests, validate input, return responses. Service context dependencies (SessionServiceContextDep, BlocklistServiceContextDep), settings, auth. Never raw database connections.
Services Contain business logic, orchestrate operations. Other services, repositories. May receive aiosqlite.Connection for repository operations.
Repositories Execute all SQL queries. All database knowledge lives here. aiosqlite.Connection (from callers).

Rule: Routers must NOT depend on DbDep (raw database connections).

Instead, routers should:

  1. Depend on service context dependencies like SessionServiceContextDep, BlocklistServiceContextDep, etc.
  2. These context dependencies combine the database connection and related repositories.
  3. Pass the context to services, which internally orchestrate database operations.

Service Context Dependencies Available:

  • SessionServiceContextDep — Contains db and session_repo for session operations.
  • BlocklistServiceContextDep — Contains db, blocklist_repo, import_log_repo, settings_repo.
  • SettingsServiceContextDep — Contains db and settings_repo.
  • BanServiceContextDep — Contains db and fail2ban_db_repo.
  • HistoryServiceContextDep — Contains db, fail2ban_db_repo, history_archive_repo.

Why:

  • Enforcement: Not exporting DbDep from the dependencies module makes it impossible for routers to accidentally bypass repositories.
  • Clarity: Service context dependencies explicitly declare which database operations a router needs.
  • Testability: Services and routers are easier to test when they depend on repositories (which can be mocked) rather than raw connections.

Example:

# ✅ GOOD — router depends on service context
@router.post("/login")
async def login(
    body: LoginRequest,
    response: Response,
    session_ctx: SessionServiceContextDep,  # Contains db + session_repo
    _auth: AuthDep,
) -> LoginResponse:
    return await auth_service.login(
        session_ctx.db,
        password=body.password,
        session_repo=session_ctx.session_repo,
        ...
    )

# ❌ BAD — router depends on raw db (DbDep is not exported for this reason)
@router.post("/login")
async def login(
    body: LoginRequest,
    db: DbDep,  # ← Cannot import DbDep in routers
    _auth: AuthDep,
) -> LoginResponse:
    return await auth_service.login(db, password=body.password, ...)

DEPRECATED: DbDep

  • The DbDep type alias is provided for backward compatibility only.
  • DO NOT use in new code. Use service context dependencies instead.
  • See backend/app/dependencies.py for available service contexts.

4.1 API Response Envelope Policy

All API responses must follow a consistent wrapper pattern. This standardization reduces frontend branching logic, prevents integration bugs, and makes the API easier to document and maintain.

Response Patterns

Pattern 1: Paginated Lists

Use PaginatedListResponse[T] for endpoints that return paginated collections:

from app.models.response import PaginatedListResponse

class JailListResponse(PaginatedListResponse[JailSummary]):
    """Response for ``GET /api/jails``."""
    pass

# Returns:
{
  "items": [...],      # T[]
  "total": 100,        # int: total items across all pages
  "page": 2,           # int: current page (1-based)
  "page_size": 20      # int: items per page
}

When to use: Endpoints that support pagination parameters (page, page_size, limit, offset).

Pattern 2: Non-Paginated Collections

Use CollectionResponse[T] for endpoints that return a complete collection without pagination:

from app.models.response import CollectionResponse

class JailConfigListResponse(CollectionResponse[JailConfig]):
    """Response for ``GET /api/config/jails``."""
    pass

# Returns:
{
  "items": [...],      # T[]
  "total": 42          # int: total items
}

When to use: Endpoints that return a complete collection (not paginated). The frontend can render all items without worrying about paging.

Pattern 3: Single-Item Detail Responses

Use domain-specific field names (not generic wrappers) for detail endpoints:

class JailDetailResponse(BaseModel):
    """Response for ``GET /api/jails/{name}``."""
    jail: Jail
    ignore_list: list[str]
    ignore_self: bool

# Returns:
{
  "jail": { ... },              # Jail object
  "ignore_list": [...],         # Additional context
  "ignore_self": true
}

When to use: Endpoints that fetch a single entity. Use the entity name as the field (jail, status, settings, etc.).

Field naming:

  • Primary entity uses its own name: jail, status, settings, etc.
  • Related or supplementary data uses descriptive names: ignore_list, warnings, metadata, etc.

Pattern 4: Command/Action Responses

Use CommandResponse for endpoints that execute commands:

from app.models.response import CommandResponse

class JailCommandResponse(CommandResponse):
    """Generic response for jail control commands."""
    jail: str  # Target identifier (optional)

# Returns:
{
  "message": "Jail 'sshd' started.",
  "success": true,
  "jail": "sshd"  # Optional: target identifier
}

When to use: POST/PUT/DELETE endpoints that perform operations (start jail, ban IP, update config, etc.).

Fields:

  • message: str — Human-readable result or error description.
  • success: bool — Operation succeeded (default: true). Use false for non-exception error handlers.
  • Optional domain-specific fields (jail, ip, etc.) to identify the affected resource.

Pattern 5: Aggregation Responses

Use domain-specific field names for aggregated data:

class BansByJailResponse(BaseModel):
    """Response for ``GET /api/dashboard/bans/by-jail``."""
    jails: list[JailBanCount]    # Aggregated per-jail data
    total: int                    # Total count across all jails

class BansByCountryResponse(BaseModel):
    """Response for ``GET /api/dashboard/bans/by-country``."""
    countries: dict[str, int]     # Country code → count
    country_names: dict[str, str] # Country code → name
    bans: list[DashboardBanItem]  # Full list for rendering companion table
    total: int                     # Total ban count

# Returns:
{
  "jails": [ { "jail": "sshd", "count": 42 }, ... ],
  "total": 500
}

When to use: Endpoints that return computed/aggregated data. Use field names that reflect the data (jails, countries, buckets, etc.).

Summary Table

Pattern Used for Field Names Example
PaginatedListResponse Paginated collections items, total, page, page_size GET /api/dashboard/bans
CollectionResponse Complete collections items, total GET /api/config/jails
Detail Response Single entity + metadata Entity name + descriptors GET /api/jails/{name}
CommandResponse Action results message, success + optional identifiers POST /api/jails/{name}/start
Aggregation Response Computed data Domain-specific names GET /api/dashboard/bans/by-jail

Rules

  1. Always wrap lists in items field — Consistency aids frontend parsing.

    • { "items": [...], "total": 100 }
    • { "jails": [...], "total": 100 } (for list endpoints; OK for aggregations)
  2. Aggregation responses are exceptions — They use domain-specific field names because the data represents computed results, not a simple list.

    • { "countries": {...}, "jails": [...], "total": 100 }
  3. Every response with >1 item must include total — Enables frontend to understand scale.

  4. Paginated responses must include page and page_size — Enables the frontend to render pagination controls.

  5. No ad-hoc wrapper objects — Don't invent new response shapes. Use the patterns above.


5. Pydantic Models

  • Every model inherits from pydantic.BaseModel.
  • Use model_config = ConfigDict(strict=True) where appropriate.
  • Field names use snake_case in Python, export as camelCase to the frontend via alias generators if needed.
  • Validate at the boundary — once data enters a Pydantic model it is trusted.
  • Use Field(...) with descriptions for every field to keep auto-generated docs useful.
  • Separate request models, response models, and domain (internal) models — do not reuse one model for all three.
from pydantic import BaseModel, Field
from datetime import datetime

class BanResponse(BaseModel):
    ip: str = Field(..., description="Banned IP address")
    jail: str = Field(..., description="Jail that issued the ban")
    banned_at: datetime = Field(..., description="UTC timestamp of the ban")
    expires_at: datetime | None = Field(None, description="UTC expiry, None if permanent")
    ban_count: int = Field(..., ge=1, description="Number of times this IP was banned")

Using Literal Types for Constrained Strings

When a field should only accept a small set of predefined values, use Literal to enforce this at the type level:

from typing import Literal
from pydantic import BaseModel, Field

LogLevel = Literal["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"]

class GlobalConfigUpdate(BaseModel):
    log_level: LogLevel | None = Field(
        default=None,
        description="Log level: CRITICAL, ERROR, WARNING, NOTICE, INFO, or DEBUG.",
    )

This provides:

  • Type safety — IDEs and type checkers enforce valid values.
  • API documentation — OpenAPI docs automatically list all allowed values.
  • Validation — Pydantic rejects invalid values and provides a clear error message.

Custom Field Validators

For fields that require complex validation (e.g., file paths that must be within allowed directories), use @field_validator:

from pydantic import field_validator
from app.utils.path_utils import validate_log_path

class AddLogPathRequest(BaseModel):
    log_path: str = Field(..., description="Absolute path to the log file to monitor.")

    @field_validator("log_path", mode="after")
    @classmethod
    def validate_log_path_field(cls, value: str) -> str:
        """Validate that the log path is within allowed directories."""
        return validate_log_path(value)

Path Validation Helper:

For query parameters and other contexts where Pydantic validators cannot be used directly, use the validate_log_path() helper from app.utils.path_utils:

from fastapi import HTTPException, status
from app.utils.path_utils import validate_log_path

@router.delete("/{name}/logpath")
async def delete_log_path(
    name: str,
    log_path: str = Query(...),
) -> None:
    try:
        validate_log_path(log_path)
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=str(e),
        ) from e
    # ... rest of handler

Key points:

  • Use mode="after" in model validators to validate after Pydantic's basic type coercion.
  • Raise ValueError if validation fails; Pydantic converts it to an HTTP 400 response.
  • For query parameters that cannot use Pydantic validators, use the validate_log_path() helper and raise HTTP 422.
  • Never use string prefix matching for path validation (e.g., path.startswith("/var/log")). The helper uses Path.relative_to() to prevent bypasses like /var/log_evil/file.log.
  • Symlinks are resolved before validating to prevent symlink-based escapes.

Model Type Usage by Layer

Pydantic models are mandatory for all external-facing data structures — anything that crosses layer boundaries or is serialized to HTTP responses. TypedDict may be used only for internal, layer-private data structures where they provide precise typing without runtime overhead.

Rules:

  1. Routers (HTTP boundary): All request and response types must be Pydantic models. FastAPI uses these for validation, serialization, and OpenAPI documentation.

    • Use Pydantic request models for request bodies and query parameters.
    • Use Pydantic response models in the response_model parameter.
    # Good — Pydantic models for router layer
    class JailStatsRequest(BaseModel):
        jail_name: str
    
    class JailStatsResponse(BaseModel):
        jail_name: str
        active_bans: int
    
    @router.post("/stats", response_model=JailStatsResponse)
    async def get_stats(req: JailStatsRequest) -> JailStatsResponse:
        ...
    
  2. Services (business logic): Return types should be Pydantic models if the result is:

    • Returned to a router (likely — they become API responses).
    • Used across multiple services (shared interfaces).
    • Exposed to external consumers (even indirectly).

    If a service returns a purely internal intermediate result used by a single caller, TypedDict is acceptable but should be rare.

    # Good — service returns Pydantic (may be used by multiple routers)
    async def get_jail_details(name: str) -> JailDetailResponse:
        ...
    
    # Acceptable — purely internal utility result
    def _parse_fail2ban_response(raw: str) -> ParsedResponse:
        """Internal helper—used only by this service."""
        ...
    
  3. Repositories (data access): Return types may use TypedDict because they represent raw database rows that:

    • Are layer-private (only called by their own service).
    • Do not cross HTTP boundaries directly.
    • Benefit from lightweight typing without runtime validation.
    # Good — TypedDict for raw repository rows
    class GeoRow(TypedDict):
        ip: str
        country_code: str | None
    
    async def load_all(db: aiosqlite.Connection) -> list[GeoRow]:
        ...
    

    If a repository result becomes part of a service's public interface (returned to routers or other services), convert it to a Pydantic model.

  4. Utilities and helpers: Internal helper results may use TypedDict if they are not part of a public module interface.

Migration path: Existing internal TypedDicts (e.g., GeoCacheRow, ImportLogRow) may remain as TypedDicts so long as they stay within their layer. If a type needs to cross layer boundaries (repo → service → router), convert it to a Pydantic model incrementally as you refactor that data flow.


6. Async Rules

  • Never call blocking / synchronous I/O in an async function — no time.sleep(), no synchronous file reads, no requests.get().

  • Use aiohttp.ClientSession for HTTP calls, aiosqlite for database access.

  • Use asyncio.TaskGroup (Python 3.11+) when you need to run independent coroutines concurrently.

  • Long-running startup/shutdown logic goes into the FastAPI lifespan context manager.

  • Never call db.commit() inside a loop. With aiosqlite, every commit serialises through a background thread and forces an fsync. N rows × 1 commit = N fsyncs. Accumulate all writes in the loop, then issue a single db.commit() once after the loop ends. The difference between 5,000 commits and 1 commit can be seconds vs milliseconds.

    # Good — one commit for the whole batch
    for ip, info in results.items():
        await db.execute(INSERT_SQL, (ip, info.country_code, ...))
    await db.commit()  # ← single fsync
    
    # Bad — one fsync per row
    for ip, info in results.items():
        await db.execute(INSERT_SQL, (ip, info.country_code, ...))
        await db.commit()  # ← fsync on every iteration
    
  • Prefer executemany() over calling execute() in a loop when inserting or updating multiple rows with the same SQL template. aiosqlite passes the entire batch to SQLite in one call, reducing Python↔thread overhead on top of the single-commit saving.

    # Good
    await db.executemany(INSERT_SQL, [(ip, cc, cn, asn, org) for ip, info in results.items()])
    await db.commit()
    
  • Shared resources (DB connections, HTTP sessions) are created once during startup and closed during shutdown — never inside request handlers.

from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator
from fastapi import FastAPI
import aiohttp
import aiosqlite

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
    # Startup
    app.state.http_session = aiohttp.ClientSession()
    app.state.db = await aiosqlite.connect("bangui.db")
    yield
    # Shutdown
    await app.state.http_session.close()
    await app.state.db.close()

Fire-and-Forget Background Tasks

When you need to spawn a background task that runs independently without waiting for the result, use asyncio.create_task() with the logged_task() helper from app.utils.async_utils. This ensures exceptions in background tasks are always logged and never silently discarded.

Why this matters: In Python 3.11+, unhandled exceptions in fire-and-forget tasks become silent RuntimeWarnings. Without logging, background errors (network failures, database writes, API timeouts) become invisible in structured logs and are extremely hard to debug.

Pattern:

from app.utils.async_utils import logged_task

# Bad — exceptions are silently discarded
asyncio.create_task(some_background_work())

# Good — exceptions are logged
asyncio.create_task(
    logged_task(some_background_work(), "task_name"),
    name="task_name",
)

The logged_task() wrapper:

  • Wraps your coroutine to catch any exception
  • Logs the exception with log.exception() (structlog automatically captures the traceback)
  • Adds task_name to the structured log context
  • Never re-raises — it's safe to use with asyncio.create_task()

Example:

import asyncio
from app.utils.async_utils import logged_task
import structlog

log = structlog.get_logger()

async def geo_lookup_batch(ips: list[str]) -> None:
    """Look up geolocation data for IPs asynchronously."""
    try:
        for ip in ips:
            # May timeout, fail network call, or fail DB write
            location = await lookup_ip_location(ip)
            await db.execute(INSERT_GEO_SQL, (ip, location))
        await db.commit()
    except Exception:
        # All exceptions are automatically logged by logged_task() wrapper
        raise

# In your request handler or service:
asyncio.create_task(
    logged_task(geo_lookup_batch(uncached_ips), "geo_cache_batch"),
    name="geo_cache_batch",
)

6.1 Database Query Conventions

LIKE Queries and Wildcard Escaping

SQLite's LIKE operator treats % (any sequence of characters) and _ (any single character) as wildcards. When querying with user-supplied filters that may contain these characters, you must escape them to prevent unintended matches.

The Problem:

# Bad — ip_filter="10.0.0_" matches "10.0.0.1", "10.0.0.2", etc.
ip_filter = "10.0.0_"
await db.execute(
    "SELECT * FROM bans WHERE ip LIKE ?",
    (f"{ip_filter}%",)  # ← wildcard characters not escaped
)

The Solution:

Use the escape_like() helper from app.utils.fail2ban_db_utils:

from app.utils.fail2ban_db_utils import escape_like

# Good — wildcard characters are escaped
ip_filter = "10.0.0_"
await db.execute(
    "SELECT * FROM bans WHERE ip LIKE ? ESCAPE '\\'",
    (f"{escape_like(ip_filter)}%",)  # ← underscores escaped to literal
)

How escape_like() works:

The function escapes backslashes first, then % and _ signs:

def escape_like(s: str) -> str:
    return s.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")

Key rules:

  1. Backslash escapes first — to prevent double-escaping when the input contains backslashes.
  2. Add ESCAPE '\\' to the SQL — tells SQLite which character to use for escaping.
  3. Dots are not wildcards — they do not need escaping; normal IP addresses pass through unchanged.

Test example:

assert escape_like("10.0.0_") == "10.0.0\\_"
assert escape_like("10.0.0%test") == "10.0.0\\%test"
assert escape_like("10.0.0.1") == "10.0.0.1"  # Unchanged

6.2 Database Migrations

The application database schema is versioned and migrated automatically on startup via app.db.init_db().

Migration Design Principles

Migrations must be atomic. All schema changes for a single version (DDL statements) and the schema_migrations record insert must be wrapped in a single BEGIN IMMEDIATE ... COMMIT transaction. This prevents partial migrations if a process crashes mid-migration.

If a crash occurs between migration steps, the next startup will:

  1. Detect the missing schema_migrations record.
  2. Re-apply the entire migration in a single transaction (all-or-nothing).
  3. Avoid data corruption or schema inconsistency.

Writing a New Migration

  1. Add the DDL statements to _MIGRATIONS dict in app/db.py:
_MIGRATIONS: dict[int, str] = {
    1: _CREATE_INITIAL_SCHEMA,
    2: """
-- Migration 2: Add new_column to users table.
ALTER TABLE users ADD COLUMN new_column TEXT DEFAULT 'default_value';
CREATE INDEX idx_users_new_column ON users(new_column);
""",
}
  1. Update _CURRENT_SCHEMA_VERSION to the new version number:
_CURRENT_SCHEMA_VERSION: int = 2  # was 1
  1. Ensure idempotency where possible:

    • Use CREATE TABLE IF NOT EXISTS and CREATE INDEX IF NOT EXISTS.
    • For ALTER TABLE ADD COLUMN, check if the column exists first using PRAGMA table_info() if re-applying the migration is a concern.
  2. Verify atomicity in tests:

async def test_migration_2_is_atomic(tmp_path: Path) -> None:
    """Verify migration 2 rolls back on failure."""
    db = await open_db(str(tmp_path / "test.db"))
    try:
        await db.execute("CREATE TABLE schema_migrations (version INTEGER PRIMARY KEY);")
        await db.commit()

        # Add a test migration that fails mid-way
        original = db_module._MIGRATIONS.copy()
        db_module._MIGRATIONS[99] = """
        CREATE TABLE test_table (id INTEGER PRIMARY KEY);
        INSERT INTO nonexistent_table VALUES (1);
        """

        try:
            with pytest.raises(Exception):
                await _apply_migration(db, 99)

            # Verify rollback: migration NOT recorded
            async with db.execute(
                "SELECT version FROM schema_migrations WHERE version = 99;"
            ) as cursor:
                assert await cursor.fetchone() is None

            # Verify rollback: table NOT created
            async with db.execute(
                "SELECT name FROM sqlite_master WHERE type='table' AND name='test_table';"
            ) as cursor:
                assert await cursor.fetchone() is None
        finally:
            db_module._MIGRATIONS = original
    finally:
        await db.close()

Common Pitfalls

  • Non-idempotent statementsALTER TABLE ADD COLUMN without IF NOT EXISTS will fail on re-run. Use explicit checks if needed.
  • Comments containing semicolons — the migration parser strips comments correctly, but avoid unusual comment syntax.
  • String literals with semicolons — the parser handles these; no special escaping needed.
  • Multiple operations in one migration — keep migrations focused. Combine related DDL but split unrelated changes.

7. Structured Logging Policy

All logging in BanGUI services and tasks must use structlog for consistent, queryable event tracking. This policy defines when and how to log at each level.

7.1 Logging Levels and When to Use Them

INFO — User-facing operations and state changes

  • Use for significant operational events that the operator should know about.
  • Examples: service startup/completion, resource creation/deletion, state transitions, successful operations with business impact.
  • Never excessive — keep volume reasonable to maintain log clarity.
  • Include relevant context: resource IDs, counts, configuration changes.
log.info("jail_activated", jail=name)
log.info("blocklist_source_created", id=new_id, name=name, url=url)
log.info("session_cleanup_ran", deleted_count=count, cutoff_time=now_iso)

WARNING — Recoverable failures, degraded functionality, unexpected but handled conditions

  • Use for issues that are not fatal but indicate something is wrong or suboptimal.
  • Examples: missing optional config, fallback behavior triggered, non-critical API call failed, parsing errors, missing resources that have workarounds.
  • Include error details and context to enable investigation.
  • Do NOT use for expected error paths (e.g., wrong password in login attempt).
log.warning("jail_status_parse_error", jail=name, error=str(exc))
log.warning("geo_lookup_failed", ip=ip, error=type(exc).__name__)
log.warning("geoip_mmdb_not_found", path=mmdb_path)

ERROR — Fatal/unrecoverable failures within a request or task

  • Use for errors that prevent an operation from completing.
  • Examples: database write failed, critical resource is missing, state is corrupted.
  • Include the full error context to enable debugging.
  • Pair with exception handling — if you log an error, you've decided the caller should handle it or return a failure.
log.error("jail_activation_rollback_restore_failed", jail=name, error=str(exc))
log.error("fail2ban_probe_parse_error", error=str(exc))

EXCEPTION — Unhandled exceptions in background tasks and scheduled jobs

  • Use log.exception() (not log.error()) in catch-all exception handlers to automatically capture the full traceback.
  • This level is ONLY for surprises that should never happen in production.
  • Use in task callback exception handlers and top-level task runners.
try:
    result = await blocklist_service.import_all(...)
except Exception:
    log.exception("blocklist_import_unexpected_error")

DEBUG — Low-level details for development and troubleshooting

  • Use for details too verbose for normal operation (e.g., successful lookups, parsed files, state transitions, loop iterations).
  • Include data samples and validation results.
  • Safe to leave in the code — debug logs are not emitted by default in production.
log.debug("geo_lookup_success_mmdb", ip=ip, country=result.country_code)
log.debug("action_file_parsed", name=raw.name)
log.debug("backend_cmd_supported_detected")

7.2 Event Naming Convention

All log event names must follow a consistent pattern for queryability:

Pattern: {domain}_{entity_or_action}[_{result_or_detail}]

  • Domain — The service or feature area (e.g., jail, blocklist, geo, auth, ban).
  • Entity or Action — The noun or verb describing what happened (e.g., activated, created, failed, synced).
  • Result or Detail (optional) — Additional specificity for complex scenarios (e.g., _restore_failed, _non_200, _no_fallback).

Examples (organized by domain):

Domain INFO WARNING ERROR DEBUG
jail jail_activated, jail_deactivated, jail_reloaded jail_status_parse_error, jail_rollback_failed jail_activation_rollback_restore_failed jail_file_parsed, backend_cmd_supported_detected
geo geo_cache_loaded_from_db, geo_flush_dirty_complete geo_lookup_failed, geo_persist_failed - geo_lookup_success_mmdb, geo_cache_cleanup_ran
blocklist blocklist_import_starting, blocklist_source_created blocklist_schedule_invalid, blocklist_preview_failed - blocklist_ban_failed
ban active_bans_fetched, all_ips_unbanned ban_service_geo_lookup_failed ban_service_geo_lookup_unexpected_error ban_entry_parse_error
auth bangui_login_success, bangui_logout bangui_login_wrong_password, bangui_login_no_hash - -
config filter_created, action_updated filter_read_error, action_d_not_found - filter_file_parsed
setup bangui_setup_started, bangui_setup_completed - bangui_setup_failed -

Key rules:

  • Use snake_case (never camelCase or PascalCase).
  • Keep event names short but descriptive — aim for 24 words.
  • Use consistent terminology across the codebase (e.g., always _created, never _added or _new).
  • Prefix task/background job events with the job name (e.g., blocklist_import_starting, session_cleanup_ran).

7.3 Structured Context and Key-Value Pairs

Always log with structured context — key-value pairs that make logs queryable and analyzable.

Essential patterns:

# Operation with count
log.info("active_bans_fetched", total=len(bans))

# Resource manipulation with ID
log.info("blocklist_source_deleted", id=source_id)

# State transition with reason or error
log.warning("geo_persist_failed", ip=ip, error=type(exc).__name__)

# Time-bounded operation
log.info("session_cleanup_ran", deleted_count=count, cutoff_time=now_iso)

# Config or feature change
log.info("blocklist_schedule_updated", frequency=config.frequency, hour=config.hour)

# Batch operation with metrics
log.info("blocklist_import_finished", total_imported=result.total_imported, 
         total_skipped=result.total_skipped, errors=result.errors_count)

What to include:

  • Resource IDs (jail names, IP addresses, source IDs).
  • Counts and metrics (rows processed, items synced, errors).
  • Configuration or decision points (enabled/disabled flags, thresholds).
  • Timestamps and durations for long-running operations (optional but useful).
  • Error types and short error messages (use type(exc).__name__ or str(exc) depending on context).

What to NEVER log:

  • Sensitive data: passwords, tokens, session IDs, API keys, hashes.
    • For session correlation without leaking token material, use a one-way hash fragment: hashlib.sha256(token.encode()).hexdigest()[:12].
    • Use numeric database IDs for entity correlation instead of raw identifiers: session_id=session.id instead of token=session.token.
  • Full exception tracebacks in INFO/WARNING (use log.exception() only in catch-all handlers).
  • Redundant system time (structlog adds timestamp automatically).
  • User PII in most cases (name, email, phone, etc.) — unless directly relevant to debugging and anonymized.

7.4 Background Tasks and Scheduled Jobs

Every background task (APScheduler job) must follow this pattern:

  1. On startup: Log {job_name}_scheduled with interval or cron expression.
  2. On execution: Log {job_name}_starting (INFO) and {job_name}_finished or {job_name}_ran (INFO) with metrics.
  3. On exception: Use log.exception("{job_name}_unexpected_error") in the top-level try/except.
# app/tasks/blocklist_import.py
async def _run_import_with_resources(settings: Settings, http_session: ClientSession) -> None:
    """APScheduler callback that imports all enabled blocklist sources."""
    log.info("blocklist_import_starting")
    try:
        result = await blocklist_service.import_all(...)
        log.info("blocklist_import_finished",
                 total_imported=result.total_imported,
                 total_skipped=result.total_skipped,
                 errors=result.errors_count)
    except Exception:
        log.exception("blocklist_import_unexpected_error")

# Register and log in the lifespan or register function:
log.info("blocklist_import_scheduled", interval_seconds=INTERVAL_SECONDS)

7.5 Service Functions and Methods

Service functions should log at entry/exit for significant operations, or when errors occur:

Entry logging (optional, use for complex or long-running operations):

async def import_all(...) -> ImportResult:
    log.debug("blocklist_import_starting", count=len(sources))
    try:
        ...
    except SomeError:
        log.warning("blocklist_import_partial_failure", imported=count, error=str(exc))

Exit/success logging (log results with metrics):

async def get_all_jails(socket_path: str) -> list[JailResponse]:
    jails = await _fetch_jails(socket_path)
    log.info("jail_list_fetched", count=len(jails))
    return jails

Error handling (log with context, let caller decide how to respond):

try:
    result = await fetch_external_data(url)
except TimeoutError:
    log.warning("external_fetch_timeout", url=url, timeout_seconds=TIMEOUT)
    raise
except Exception as exc:
    log.error("external_fetch_failed", url=url, error=type(exc).__name__)
    raise

8. Error Handling

  • Define custom exception classes for domain errors (e.g., JailNotFoundError, BanFailedError).
  • Catch specific exceptions — never bare except: or except Exception: without re-raising.
  • Map domain exceptions to HTTP status codes via FastAPI exception handlers registered on the app.
  • Always log errors with context before raising.
class JailNotFoundError(Exception):
    def __init__(self, name: str) -> None:
        self.name: str = name
        super().__init__(f"Jail '{name}' not found")

# In main.py
@app.exception_handler(JailNotFoundError)
async def jail_not_found_handler(request: Request, exc: JailNotFoundError) -> JSONResponse:
    return JSONResponse(status_code=404, content={"detail": f"Jail '{exc.name}' not found"})

Routers and Exception Propagation

  • Routers must NOT construct HTTPException for domain errors — let domain exceptions propagate.
  • Routers should never have helper functions like _bad_gateway(), _not_found(), _conflict() etc. that convert domain exceptions to HTTPException.
  • All domain exception types must have corresponding handlers registered in main.py via app.add_exception_handler().
  • Exception handlers are registered in order from most specific to least specific — FastAPI evaluates them in registration order.
# ❌ BAD — routers constructing HTTPException for domain exceptions
@router.get("/{name}")
async def get_jail(name: str, socket_path: Fail2BanSocketDep) -> JailDetailResponse:
    try:
        return await jail_service.get_jail(socket_path, name)
    except JailNotFoundError:
        raise HTTPException(status_code=404, detail=f"Jail not found: {name!r}") from None

# ✅ GOOD — domain exception propagates to global handler
@router.get("/{name}")
async def get_jail(name: str, socket_path: Fail2BanSocketDep) -> JailDetailResponse:
    return await jail_service.get_jail(socket_path, name)

All domain exceptions raised by services propagate to handlers in main.py, ensuring:

  1. Consistent error response format across the entire API.
  2. No duplicated exception-to-HTTP-status mapping logic.
  3. Easy to audit all error codes — they are all in one place.

Error Message Hygiene

HTTP responses must never leak sensitive internal details that aid attackers or expose infrastructure:

  • Never include system paths in HTTP error messages (e.g., /var/run/fail2ban/fail2ban.sock, /etc/fail2ban/).
  • Never include raw exception messages that expose internal parsing or implementation logic.
  • Log full details server-side only — exception handlers must log error=str(exc) with full exception context, but return generic user-friendly messages in the HTTP response.
# ❌ BAD — leaks socket path and internal details to the client
async def _fail2ban_connection_handler(request: Request, exc: Fail2BanConnectionError) -> JSONResponse:
    return JSONResponse(
        status_code=502,
        content={"detail": f"Cannot reach fail2ban: {exc}"},  # exc includes socket path!
    )

# ✅ GOOD — generic message in response, full details in server logs
async def _fail2ban_connection_handler(request: Request, exc: Fail2BanConnectionError) -> JSONResponse:
    log.warning(
        "fail2ban_connection_error",
        path=request.url.path,
        method=request.method,
        error=str(exc),  # Full details logged server-side
    )
    return JSONResponse(
        status_code=502,
        content={"detail": "Cannot reach the fail2ban service. Check the server status page."},
    )

Exception Taxonomy and HTTP Mapping

BanGUI uses a standardized exception taxonomy that maps domain errors to HTTP status codes consistently across all services. This allows routers to handle exceptions by category rather than by individual type, reducing code duplication and ensuring consistent client-facing error responses.

Exception Categories

All domain exceptions inherit from one of six base categories defined in app.exceptions:

Base Exception HTTP Status Meaning Example
NotFoundError 404 Requested domain entity not found JailNotFoundError, FilterNotFoundError
BadRequestError 400 Invalid input, validation failure, or invalid identifier ConfigValidationError, JailNameError
ConflictError 409 State conflict or resource constraint violation JailAlreadyActiveError, FilterAlreadyExistsError
OperationError 500 Domain operation failure (write, update, delete) ConfigWriteError, ConfigFileWriteError
ServiceUnavailableError 503 Infrastructure or external service unreachable Fail2BanConnectionError, ConfigDirError

Service Exception Mapping

Every service-specific exception inherits from exactly one category. This allows main.py to register just 5 exception handlers instead of 25+:

# In app/exceptions.py — define each exception once with its category
class JailNotFoundError(NotFoundError):
    def __init__(self, name: str) -> None:
        self.name = name
        super().__init__(f"Jail not found: {name!r}")

class JailAlreadyActiveError(ConflictError):
    def __init__(self, name: str) -> None:
        self.name = name
        super().__init__(f"Jail is already active: {name!r}")

# In app/main.py — register category handlers
app.add_exception_handler(NotFoundError, _not_found_handler)
app.add_exception_handler(BadRequestError, _bad_request_handler)
app.add_exception_handler(ConflictError, _conflict_handler)
app.add_exception_handler(OperationError, _domain_error_handler)
app.add_exception_handler(ServiceUnavailableError, _service_unavailable_handler)

Service Exception Reference

When writing a new service, determine which category each exception belongs to:

  • Not found: Always NotFoundError (e.g., jail, filter, action, config file not found)
  • Invalid input: Always BadRequestError (e.g., validation errors, invalid names, regex compile failure)
  • State conflicts: Always ConflictError (e.g., already exists, already active, readonly resource)
  • Operation failures: Always OperationError (e.g., write failed, update failed, command failed)
  • Infrastructure: Always ServiceUnavailableError (e.g., config dir missing, socket unreachable, fail2ban protocol error)

Client Expectations

Clients should expect the following HTTP status codes and response format for all domain errors:

HTTP 400 Bad Request
{
  "detail": "Jail name contains invalid characters"
}

HTTP 404 Not Found
{
  "detail": "Jail not found: 'sshd'"
}

HTTP 409 Conflict
{
  "detail": "Jail is already active: 'sshd'"
}

HTTP 500 Internal Server Error
{
  "detail": "Failed to write configuration: permission denied"
}

HTTP 503 Service Unavailable
{
  "detail": "Cannot reach the fail2ban service. Check the server status page."
}

The detail field always contains the exception's message (from str(exc)). Sensitive details (socket paths, file paths, internal error messages) are never included — they are logged server-side only.

  • Network I/O: TimeoutError, aiohttp.ClientError, asyncio.TimeoutError
  • File I/O: OSError (includes IOError, FileNotFoundError, PermissionError)
  • JSON parsing: json.JSONDecodeError, ValueError
  • Database errors: aiosqlite.Error and derivatives (caught as OSError)
  • Third-party libraries: Specific exception classes (e.g., geoip2.errors.GeoIP2Error)

When catching service-critical exceptions:

  1. Catch the specific exception types for the operation.
  2. Log with the exception type and relevant context.
  3. Return a safe fallback (empty dict, None, etc.) or re-raise if the service cannot function.

When truly unavoidable broad catches are needed (e.g., retrying transient network failures):

  1. Place specific catches first.
  2. Add one final except Exception after specific cases, with error_type="unexpected" logged to flag surprises.
  3. Document why broad catching is necessary (e.g., "tests use mock objects that may raise arbitrary exceptions").

Example:

async def lookup_batch(ips: list[str], http_session: aiohttp.ClientSession) -> dict[str, GeoInfo]:
    """Resolve multiple IPs, returning empty map on failure."""
    try:
        result = await http_session.post(url, json=payload, timeout=timeout)
    except (TimeoutError, aiohttp.ClientError) as exc:
        # Expected network failures — log and return empty result
        log.warning("geo_batch_http_failed", error=type(exc).__name__)
        return {}
    except Exception as exc:
        # Unexpected — log as error for investigation
        log.error("geo_batch_unexpected_error", error=type(exc).__name__)
        return {}

9. Testing

  • Every new feature or bug fix must include tests.
  • Tests live in tests/ mirroring the app/ structure.
  • Use pytest with pytest-asyncio for async tests.
  • Use httpx.AsyncClient to test FastAPI endpoints (not TestClient which is sync).
  • Mock external dependencies (fail2ban socket, aiohttp calls) — tests must never touch real infrastructure.
  • Aim for >80 % line coverage — critical paths (auth, banning, scheduling) must be 100 %.
  • Test names follow test_<unit>_<scenario>_<expected> pattern.
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import create_app

@pytest.fixture
async def client() -> AsyncClient:
    app = create_app()
    transport: ASGITransport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac

@pytest.mark.asyncio
async def test_list_jails_returns_200(client: AsyncClient) -> None:
    response = await client.get("/api/jails/")
    assert response.status_code == 200
    data: dict = response.json()
    assert "jails" in data

9.1 Background Tasks and Scheduler Architecture

BanGUI uses APScheduler 4.x (async mode) to manage background jobs that execute on a schedule without user interaction. This section documents how to write and register background tasks.

Task Location and Structure

All background tasks live in backend/app/tasks/ as separate modules. Each task:

  • Exports a register(app: FastAPI) -> None or async def register(app: FastAPI) -> None function.
  • Opens its own database connection using app.db.open_db() or the task_db() helper.
  • Closes connections when work completes (use the async context manager pattern).
  • Runs independently of the FastAPI request/response cycle.

Example Task

# backend/app/tasks/my_task.py
import structlog
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler

log = structlog.get_logger()

async def my_background_job(app: FastAPI) -> None:
    """Do important work on a schedule."""
    log.info("my_background_job_started")
    try:
        db = await app.db.open_db(app.state.settings.database_path)
        try:
            # Do work...
            pass
        finally:
            await db.close()
    except Exception:
        log.error("my_background_job_failed", exc_info=True)

def register(app: FastAPI) -> None:
    """Register the job with the scheduler."""
    scheduler: AsyncIOScheduler = app.state.scheduler
    scheduler.add_job(
        my_background_job,
        args=(app,),
        trigger="interval",
        seconds=60,
        id="my_task",
        name="My Background Job",
    )

Accessing Shared Resources in Tasks

Since tasks do not have access to Depends(get_db) (no request scope), they must:

  1. Open their own DB connection via app.state.db_factory.open_db(path).
  2. Access app-level stateapp.state.http_session, app.state.geo_cache, app.state.settings, etc.
  3. Use structlog for all logging (never print()).

Single-Worker Requirement

The scheduler is bound to a single asyncio event loop and cannot be shared across multiple worker processes. BanGUI enforces single-worker mode to prevent duplicate task execution.

  • Deployment constraint: Set BANGUI_WORKERS=1 (default).
  • Startup validation: startup_shared_resources() raises RuntimeError if BANGUI_WORKERS > 1.
  • See Architekture.md § 9.2 for full details.

10. Code Style & Tooling

Tool Purpose
Ruff Linter and formatter (replaces black, isort, flake8).
mypy or pyright Static type checking in strict mode.
pre-commit Run ruff + type checker before every commit.
  • Line length: 120 characters max.
  • Strings: use double quotes (").
  • Imports: sorted by ruff — stdlib → third-party → local, one import per line.
  • No unused imports, no unused variables, no # type: ignore without explanation.
  • Docstrings in Google style on every public function, class, and module.

11. fail2ban Response Utilities

All services that interact with the fail2ban daemon must use the canonical response parsing utilities from app.utils.fail2ban_response. This ensures consistent error handling, type safety, and makes it easy to fix bugs in response handling across the entire codebase.

Available Functions

ok(response: object) -> object Extracts the payload from a fail2ban (return_code, data) response tuple.

  • Raises ValueError if return code ≠ 0 or response shape is invalid.
  • Use this on every response from Fail2BanClient.send().

to_dict(pairs: object) -> dict[str, object] Converts a list of (key, value) pairs (fail2ban's native response format) to a Python dict.

  • Silently ignores malformed entries and non-list/tuple inputs.
  • Always returns a dict (empty if input is invalid).

ensure_list(value: object | None) -> list[str] Coerces fail2ban response values (which may be None, a single string, or a list) to a normalized list of strings.

  • Handles all three cases consistently.
  • Returns empty list for None or empty strings.

is_not_found_error(exc: Exception) -> bool Checks if an exception indicates a jail does not exist.

  • Checks for multiple error message patterns (case-insensitive).
  • Use this to distinguish "jail not found" errors from other failures.

Example Usage

from app.utils.fail2ban_response import ok, to_dict, ensure_list, is_not_found_error
from app.utils.fail2ban_client import Fail2BanClient

client = Fail2BanClient(socket_path="/var/run/fail2ban/fail2ban.sock")

try:
    # Get jail status
    response = await client.send(["status", "sshd", "short"])
    status_dict = to_dict(ok(response))  # Extract payload and convert to dict
    
    # Get list of banned IPs
    ban_response = await client.send(["get", "sshd", "banip"])
    banned_ips = ensure_list(ok(ban_response))  # Normalize to list of strings
    
except ValueError as exc:
    if is_not_found_error(exc):
        raise JailNotFoundError("sshd") from exc
    raise

Why This Matters

Before this utility module, every service implemented its own copy of these functions, leading to:

  • Code duplication across 7+ service files.
  • Subtle inconsistencies in error handling.
  • Difficult maintenance — every bug fix required touching multiple files.

Now, all services import from a single authoritative source, making response handling consistent, maintainable, and type-safe.


12. Configuration & Secrets

  • All configuration lives in environment variables loaded through pydantic-settings.
  • Secrets (master password hash, session key) are never committed to the repository.
  • Provide a .env.example with all keys and placeholder values.
  • Validate config at startup — fail fast with a clear error if a required value is missing.
from pydantic_settings import BaseSettings
from pydantic import Field

class Settings(BaseSettings):
    database_path: str = Field("bangui.db", description="Path to SQLite database")
    fail2ban_socket: str = Field("/var/run/fail2ban/fail2ban.sock", description="fail2ban socket path")
    session_secret: str = Field(..., description="Secret key for session signing")
    log_level: str = Field("info", description="Logging level")

    model_config = {"env_prefix": "BANGUI_", "env_file": ".env"}

Session Secret Configuration

The session_secret is the HMAC key used to sign all session tokens. It must be at least 32 characters (256 bits) to provide sufficient cryptographic strength for HMAC-SHA256.

Minimum Length: 32 characters

Why 32 characters? Session tokens are signed using HMAC-SHA256. A secret shorter than 32 bytes (256 bits) significantly weakens the signature, potentially allowing attackers to forge valid tokens. The constraint is enforced at startup — the application will fail to start if session_secret is shorter than 32 characters.

Generation: Generate a secure secret using Python:

python -c "import secrets; print(secrets.token_hex(32))"

This produces a 64-character hexadecimal string (256 bits) suitable for production use.

Environment Variable:

BANGUI_SESSION_SECRET="your-32-character-minimum-secret-here"

Never commit the actual secret to the repository. Provide a .env.example with a placeholder:

# .env.example
BANGUI_SESSION_SECRET="set-this-to-a-32-character-minimum-secret"

The session_cookie_secure configuration controls the Secure flag on the session cookie. This flag prevents browsers from sending the session cookie over unencrypted HTTP.

Default: true — Production deployments are secure by default. Cookies are only sent over HTTPS.

Local Development: Set BANGUI_SESSION_COOKIE_SECURE=false in your compose file or .env to allow cookies over HTTP (required for localhost:8000).

# Docker/compose.debug.yml
environment:
  BANGUI_SESSION_COOKIE_SECURE: "false"  # Allow HTTP during local development

Important: If Secure=true is set, browsers will reject the session cookie when the backend is served over HTTP. Ensure your nginx/reverse proxy terminates TLS and passes X-Forwarded-Proto: https so FastAPI knows the connection is secure.

CSRF Protection Middleware

State-mutating endpoints (POST, PUT, DELETE, PATCH) authenticated via session cookies are protected by the CsrfMiddleware, which enforces a custom header check.

How It Works:

  1. For every request using a mutating HTTP method, the middleware checks:

    • Is this request authenticated via session cookie (not Bearer token)?
    • If yes, require the custom header X-BanGUI-Request: 1.
    • If missing or incorrect, return 403 Forbidden.
  2. Bearer token requests (via Authorization: Bearer header) bypass the check because tokens are not CSRF-vulnerable — they are never automatically sent on cross-origin requests.

  3. Safe HTTP methods (GET, HEAD, OPTIONS) bypass the check.

  4. Cross-site protection: Cross-site JavaScript (fetch() calls from other origins) cannot set custom headers without CORS preflight, which the backend rejects for non-allowed origins. This provides defense-in-depth against subdomain attacks and XSS injection.

Implementation Location:

  • Middleware: backend/app/middleware/csrf.py
  • Registered in: backend/app/main.py via app.add_middleware(CsrfMiddleware)

Example:

# ✓ Cookie-authenticated POST with CSRF header — allowed
POST /api/bans
Cookie: bangui_session=...
X-BanGUI-Request: 1

# ✗ Cookie-authenticated POST without CSRF header — rejected with 403
POST /api/bans
Cookie: bangui_session=...
(no X-BanGUI-Request header)

# ✓ Bearer token authentication without CSRF header — allowed
POST /api/bans
Authorization: Bearer <token>
(no X-BanGUI-Request header needed)

# ✓ Safe GET method without CSRF header — allowed
GET /api/jails
Cookie: bangui_session=...
(no X-BanGUI-Request header needed)

fail2ban_start_command Configuration

The fail2ban_start_command setting specifies the shell command used to start the fail2ban daemon during recovery operations (e.g., after a rollback).

Format & Parsing:

  • The command is split into arguments using shlex.split(), which respects shell quoting rules.
  • Paths with spaces must be quoted. Example: "/opt/my tools/fail2ban-client" start.
  • The command is not executed through a shell — no shell variables or globbing are interpreted.

Validation:

  • The command is validated at startup using shlex.split(). Mismatched quotes will raise a ValueError with the problematic command in the error message.

Environment Variables:

BANGUI_FAIL2BAN_START_COMMAND="fail2ban-client start"           # Default
BANGUI_FAIL2BAN_START_COMMAND="systemctl start fail2ban"        # systemd
BANGUI_FAIL2BAN_START_COMMAND='"/opt/my tools/fail2ban" start'  # Quoted path

Common Pitfall: Using .split() instead of shlex.split() would break commands with spaces in paths. Always use quoted strings for paths that contain whitespace.

IP Geolocation Resolution

BanGUI resolves IP addresses to country codes and network organization information for ban analytics and geomapping. The geolocation system implements a primary + fallback resolution strategy to balance security and availability:

  1. Primary Resolver (MaxMind GeoLite2-Country): All IP lookups first attempt resolution using a local MaxMind GeoLite2-Country MMDB database file (if available). The MMDB is downloaded offline and mounted into the container — no IP data is sent over the network.
  2. Fallback Resolver (ip-api.com HTTP): If the MMDB is unavailable or returns no result, the system can fall back to the ip-api.com HTTP API. This fallback must be explicitly enabled and only sends unresolved IPs over HTTP. HTTP is disabled by default for security (to avoid sending IP addresses in cleartext).

Download & Configure MaxMind GeoLite2:

The MaxMind GeoLite2-Country MMDB requires a free account and license key. To set up the database:

  1. Create a free MaxMind account at https://www.maxmind.com/en/geolite2/signup and download your license key.
  2. Download the GeoLite2-Country MMDB using the provided script or manually from the MaxMind downloads page.
  3. Mount the MMDB into the BanGUI container at a known path (e.g., /data/GeoLite2-Country.mmdb).
  4. Set BANGUI_GEOIP_DB_PATH to the mounted path in your environment.

Example Docker Compose configuration:

services:
  bangui:
    volumes:
      - ./GeoLite2-Country.mmdb:/data/GeoLite2-Country.mmdb:ro
    environment:
      BANGUI_GEOIP_DB_PATH: /data/GeoLite2-Country.mmdb

Fallback to HTTP (Not Recommended):

If the MMDB cannot be mounted (e.g., in restricted environments), you can enable the HTTP fallback:

services:
  bangui:
    environment:
      BANGUI_GEOIP_ALLOW_HTTP_FALLBACK: "true"

⚠️ Security Warning: Enabling HTTP fallback causes unresolved IP addresses to be sent unencrypted to ip-api.com. This is a privacy and GDPR/CCPA concern. Only enable this if the MMDB absolutely cannot be provisioned, and understand the implications.

Data Structure:

The GeoInfo returned by the resolution system includes:

  • country_code (str | None): ISO 3166-1 alpha-2 country code (e.g., "US", "DE").
  • country_name (str | None): Human-readable country name (e.g., "United States").
  • asn (str | None): Autonomous System Number (e.g., "AS3320"). Only populated when using the HTTP API; local MMDB lookups return None.
  • org (str | None): Organization name associated with the ASN. Only populated when using the HTTP API; local MMDB lookups return None.

Environment Variables:

BANGUI_GEOIP_DB_PATH=/data/GeoLite2-Country.mmdb           # Path to MaxMind MMDB (primary)
BANGUI_GEOIP_ALLOW_HTTP_FALLBACK="false"                   # Default: false (MMDB-only)
BANGUI_GEOIP_ALLOW_HTTP_FALLBACK="true"                    # Enable HTTP fallback (not recommended)

Caching & Performance:

  • Resolved IPs are cached in-memory and persisted to SQLite for fast subsequent lookups.
  • Failed lookups are cached for 5 minutes to avoid hammering external APIs.
  • The background geo_cache_flush task (runs every 60 seconds) persists newly resolved entries to the database.
  • The background geo_re_resolve task (configurable schedule) periodically re-resolves stale entries to keep data fresh.
  • The background geo_cache_cleanup task (runs nightly) removes entries not referenced in the configured retention period (default: 90 days) to prevent unbounded database growth and maintain query performance.

Retention & Cleanup:

The geo_cache table tracks the last time each IP was referenced via a last_seen timestamp. Over time, as unique IPs accumulate, the table can grow very large, degrading query performance on every geo lookup. To manage this:

  • The geo_cache_cleanup background task runs once per day (default: midnight UTC).
  • It removes all entries where last_seen is older than the configured retention period (default: 90 days).
  • If a purged IP is encountered again after cleanup, it will be re-resolved from the MaxMind database or ip-api.com (if configured).
  • The retention period is controlled by the constant GEO_CACHE_RETENTION_DAYS in backend/app/tasks/geo_cache_cleanup.py.

API Documentation Configuration

The enable_docs setting controls whether FastAPI serves interactive API documentation at /api/docs (Swagger UI) and /api/redoc (ReDoc).

Default: false — API documentation is disabled by default to prevent information disclosure in production.

When to Enable:

  • Set BANGUI_ENABLE_DOCS=true in development and debugging environments only.
  • Never enable in production. Exposed API documentation reveals all endpoints, request/response schemas, and allows direct API invocation from the browser.

Environment Variables:

BANGUI_ENABLE_DOCS="true"   # Enable docs in development
BANGUI_ENABLE_DOCS="false"  # Disable docs (default)
# Unset                     # Defaults to false (production)

Debug Compose File: The Docker/compose.debug.yml sets BANGUI_ENABLE_DOCS: "true" for local development. Production compose files (Docker/compose.prod.yml) leave this unset, defaulting to false.

Middleware Allowlist: The SetupRedirectMiddleware in main.py includes /api/docs, /api/redoc, and /api/openapi.json in its _ALWAYS_ALLOWED paths so documentation can be accessed before setup completes (if enabled).

Log Path Validation & Allowlisting

Authenticated users can instruct fail2ban to monitor additional log files through the API endpoint POST /api/config/jails/{name}/logpath. To prevent path-traversal attacks and unauthorized reads of sensitive system files, all requested log paths must resolve to locations within a configurable allowlist of safe directories.

Allowed Directories:

  • Configured via the BANGUI_ALLOWED_LOG_DIRS environment variable (comma-separated list).
  • Defaults to: ["/var/log", "/config/log"].

Path Validation Rules:

  1. The requested path is resolved to its canonical form using Path(log_path).resolve(), which:
    • Expands relative paths to absolute paths.
    • Resolves symbolic links to their real targets.
    • Normalizes . and .. components.
  2. The resolved path is checked using Path.is_relative_to() against each allowed directory prefix.
  3. If the resolved path is not relative to any allowed directory, a ValueError is raised with a descriptive error message.

Implementation:

  • Validation occurs in the Pydantic model AddLogPathRequest using a @field_validator.
  • The validator runs at request time, before the service layer is invoked.
  • Symlinks that escape allowed directories are rejected (see symlink bypass tests).

Important: Use is_relative_to(), not startswith() or string prefix matching. The latter is bypassable with paths like /var/log_evil/file.log.

Environment Variables:

BANGUI_ALLOWED_LOG_DIRS="/var/log,/config/log"                    # Default
BANGUI_ALLOWED_LOG_DIRS="/var/log,/config/log,/home/app/logs"     # Custom directory

Log Target Validation (fail2ban)

The log_target field on the global config endpoint (PUT /api/config/global) is critical for security because fail2ban runs as root. Users can only set log targets to:

  1. Special values: STDOUT, STDERR, SYSLOG (case-insensitive)
  2. File paths: Must resolve to one of the configured allowed directories (same allowlist as log paths)

Why This Matters:

  • fail2ban creates/opens files with root privileges. Without validation, an attacker could write to arbitrary system paths (e.g., /etc/cron.d/malicious_script).
  • Validation occurs at both the Pydantic model layer (GlobalConfigUpdate.validate_log_target()) and the service layer (update_global_config()) for defense in depth.
  • This prevents both HTTP and non-HTTP attack vectors.

Implementation:

# Model layer: Automatic validation via @field_validator
update = GlobalConfigUpdate(log_target="/etc/passwd")  # Raises ValidationError → HTTP 422

# Service layer: Defense in depth
await config_service.update_global_config(socket_path, update)  # Validates again before sending to fail2ban

Login Rate Limiting

The login endpoint (POST /api/auth/login) is protected against brute-force attacks using an in-memory rate limiter.

Design:

  • Uses a dict[str, deque[float]] keyed by client IP, storing login attempt timestamps within a time window.
  • Attempts outside the window are automatically removed during validation checks.
  • Expired IP entries are cleaned up to prevent unbounded memory growth.

Rate Limit Rules:

  • 5 attempts per 60 seconds per IP address.
  • Requests exceeding the limit return HTTP 429 Too Many Requests with a Retry-After header.
  • Each failed login triggers a 10-second server-side delay (asyncio.sleep) to further slow attacks, on top of bcrypt hashing (~100ms).

IP Extraction (Proxy Safety):

  • When behind nginx, the rate limiter reads the real client IP from X-Forwarded-For or X-Real-IP headers.
  • Only trusts these headers when the immediate connection source is in a configured trusted proxy list.
  • Prevents attackers from spoofing these headers to bypass rate limits.
  • Falls back to the direct connection IP when proxy headers cannot be trusted.

Process-Local Limitation:

  • The rate limiter is process-local (in-memory). In multi-worker deployments (e.g., Gunicorn with 4 workers), each worker maintains its own rate limit counter.
  • This is acceptable because the single-worker constraint is enforced elsewhere. See TASK-002/003 notes for details.

Implementation:

  • Rate limiter: app.utils.rate_limiter.RateLimiter
  • IP extraction: app.utils.client_ip.get_client_ip()
  • Dependency: LoginRateLimiterDep in app.dependencies

12. Authentication Endpoints

The primary authentication flow for the frontend is cookie-based and protects the session token from JavaScript access:

  1. Login (POST /api/auth/login)

    • Accepts LoginRequest (password field)
    • Returns LoginResponse containing only expires_at (ISO 8601 UTC timestamp)
    • Crucially: The session token is not included in the JSON response body
    • Instead, the token is set as an HttpOnly SameSite=Lax cookie named bangui_session
    • Frontend automatically includes this cookie in all requests via credentials: "include"
  2. Why not return token in response body?

    • Third-party JavaScript (analytics, ads, XSS injections) can intercept fetch() response bodies
    • If the token were in the response, malicious code could extract and store it in localStorage
    • An attacker could then use it via the Authorization: Bearer <token> header, bypassing the HttpOnly cookie protection
    • By returning only the expiry timestamp, we ensure the token stays exclusively in the HttpOnly cookie
  3. Session Validation (GET /api/auth/session)

    • Frontend calls this on app mount to verify the session is still valid on the server
    • Works with both cookie and Bearer token authentication
    • Returns {"valid": true} if the session exists and is not expired
    • Returns 401 Unauthorized if the session is invalid or expired
  4. Logout (POST /api/auth/logout)

    • Revokes the session in the database
    • Clears the bangui_session cookie via Set-Cookie header
    • Works with both cookie and Bearer token authentication
    • Idempotent — calling without a session returns 200 without error

Programmatic API Clients (Bearer Token)

For non-browser clients (CLI tools, batch scripts, automation) that cannot use cookies, use the Bearer token authentication path by sending:

Authorization: Bearer <token>

The token can be obtained by parsing the cookie from a login response or, in a future implementation, via a dedicated POST /api/auth/token endpoint (currently, these clients extract the token from cookies or use Bearer directly from the signed token value).

Note: Bearer token authentication is not recommended for browser-based clients because:

  • Tokens must be stored somewhere (localStorage, sessionStorage, or request body)
  • All storage mechanisms are accessible to JavaScript and thus vulnerable to XSS
  • HttpOnly cookies provide better protection

13. Password Hashing

The master password is hashed using bcrypt with an auto-generated salt. All password validation uses the models in app.models.auth and app.models.setup.

The 72-Byte Bcrypt Limitation

Important: bcrypt silently truncates all input at 72 bytes before hashing. This means:

  • A user who sets a 100-character password is actually authenticated by only the first 72 bytes
  • Extra characters beyond 72 bytes provide zero additional security
  • An attacker who has reduced their search space to 72 bytes can brute-force the password more efficiently than intended

Solution: Both password fields enforce a maximum length of 72 bytes:

  • LoginRequest.password — max 72 characters (enforced via Pydantic Field(max_length=72))
  • SetupRequest.master_password — max 72 characters (enforced via Pydantic Field(max_length=72))

Validation flow:

  1. Frontend → hashes password with SHA256 using SubtleCrypto before transmission
  2. Backend receives SHA256 hash, validates length (≤ 72 bytes)
  3. Backend → hashes with bcrypt using run_blocking(bcrypt.hashpw) to avoid event loop stall
  4. Hash stored in SQLite settings table

If a password exceeds 72 bytes:

  • Pydantic raises ValidationError with error code string_too_long
  • The router returns HTTP 422 Unprocessable Entity
  • The frontend should inform the user to choose a shorter password

Implementation:

  • Models: app.models.auth.LoginRequest, app.models.setup.SetupRequest
  • Service layer: app.services.auth_service._check_password(), app.services.setup_service.run_setup()

15. File I/O Conventions

All file write operations to critical configuration files must be atomic to prevent corruption if the process is killed mid-write.

Atomic File Writes

Configuration files (e.g., fail2ban jail configs in jail.d/) are essential for system operation. A truncated or corrupt config file can break fail2ban's ability to reload and may disable active protection.

Rule: Always use write-to-temp + atomic rename

Never use Path.write_text() or file.write() directly for critical files. Instead:

  1. Create a temporary file in the same directory as the target (crucial for atomic os.replace()).
  2. Write content to the temp file.
  3. Atomically rename the temp file to replace the target.
  4. Clean up the temp file if an error occurs.

Implementation Pattern:

import os
import tempfile
from pathlib import Path

target = Path("/path/to/config/file.conf")

tmp_name: str | None = None
try:
    # Create temp file in target's directory (same filesystem = atomic)
    with tempfile.NamedTemporaryFile(
        mode="w",
        encoding="utf-8",
        dir=target.parent,
        delete=False,
        suffix=".tmp",
    ) as tmp:
        tmp.write(content)
        tmp_name = tmp.name
    # Atomic rename (single syscall on POSIX systems)
    os.replace(tmp_name, target)
except OSError as exc:
    # Clean up temp file on error
    with contextlib.suppress(OSError):
        if tmp_name is not None:
            os.unlink(tmp_name)
    raise ConfigWriteError(f"Cannot write config: {exc}") from exc

Why this matters:

  • Path.write_text() overwrites in place. If the process dies mid-write, the file is left truncated or partially written.
  • os.replace() is atomic on POSIX systems (single rename syscall) only if source and target are on the same filesystem.
  • Creating the temp file in target.parent ensures atomicity.
  • On Linux containers, this prevents config corruption and service degradation.

Atomic write helper:

A shared atomic_write(path: Path, content: str) helper is available in app/services/config_file_helpers.py. This is the preferred way to perform atomic writes — it handles all the temp file and cleanup logic:

from app.services.config_file_helpers import atomic_write

atomic_write(path, updated_content)  # Atomic write, auto-cleanup on error

Files requiring atomic writes:

  • All config files under jail.d/ (created/modified by _write_conf_file, _create_conf_file, set_jail_config_enabled, and write_jail_config_file)
  • Any critical state files that fail2ban relies on

Examples in the codebase:

  • app/services/config_file_helpers.py: _write_conf_file, _create_conf_file, atomic_write
  • app/services/raw_config_io_service.py: set_jail_config_enabled, write_jail_config_file
  • app/services/jail_config_service.py: _write_local_file_sync, _restore_local_file_sync

16. Git & Workflow

  • Branch naming: feature/<short-description>, fix/<short-description>, chore/<short-description>.
  • Commit messages: imperative tense, max 72 chars first line (Add jail reload endpoint, Fix ban history query).
  • Every merge request must pass: ruff, type checker, all tests.
  • Do not merge with failing CI.
  • Keep pull requests small and focused — one feature or fix per PR.

17. Coding Principles

These principles are non-negotiable. Every backend contributor must internalise and apply them daily.

17.1 Clean Code

  • Write code that reads like well-written prose — a new developer should understand intent without asking.
  • Meaningful names — variables, functions, and classes must reveal their purpose. Avoid abbreviations (cnt, mgr, tmp) unless universally understood.
  • Small functions — each function does exactly one thing. If you need a comment to explain a block inside a function, extract it into its own function.
  • No magic numbers or strings — use named constants.
  • Boy Scout Rule — leave every file cleaner than you found it.
  • Avoid deep nesting — prefer early returns (guard clauses) to keep the happy path at the top indentation level.
# Good — guard clause, clear name, one job
async def get_active_ban(ip: str, jail: str) -> Ban:
    ban: Ban | None = await repo.find_ban(ip=ip, jail=jail)
    if ban is None:
        raise BanNotFoundError(ip=ip, jail=jail)
    if ban.is_expired():
        raise BanExpiredError(ip=ip, jail=jail)
    return ban

# Bad — nested, vague name
async def check(ip, j):
    b = await repo.find_ban(ip=ip, jail=j)
    if b:
        if not b.is_expired():
            return b
        else:
            raise Exception("expired")
    else:
        raise Exception("not found")

17.2 Separation of Concerns (SoC)

  • Each module, class, and function must have a single, well-defined responsibility.
  • Routers → HTTP layer only (parse requests, return responses).
  • Services → business logic and orchestration.
  • Repositories → data access and persistence.
  • Models → data shapes and validation.
  • Tasks → scheduled background jobs.
  • Never mix layers — a router must not execute SQL, and a repository must not raise HTTPException.

17.3 Single Responsibility Principle (SRP)

  • A class or module should have one and only one reason to change.
  • If a service handles both ban management and email notifications, split it into BanService and NotificationService.

17.4 Don't Repeat Yourself (DRY)

  • Extract shared logic into utility functions, base classes, or dependency providers.
  • If the same block of code appears in more than one place, refactor it into a single source of truth.
  • But don't over-abstract — premature DRY that couples unrelated features is worse than a little duplication (see Rule of Three: refactor when something appears a third time).

17.5 KISS — Keep It Simple, Stupid

  • Choose the simplest solution that works correctly.
  • Avoid clever tricks, premature optimisation, and over-engineering.
  • If a standard library function does the job, prefer it over a custom implementation.

17.6 YAGNI — You Aren't Gonna Need It

  • Do not build features, abstractions, or config options "just in case".
  • Implement what is required now. Extend later when a real need emerges.

17.7 Dependency Inversion Principle (DIP)

  • High-level modules (services) must not depend on low-level modules (repositories) directly. Both should depend on abstractions (protocols / interfaces).
  • Use FastAPI's Depends() to inject implementations — this makes swapping and testing trivial.
from typing import Protocol

class BanRepository(Protocol):
    async def find_ban(self, ip: str, jail: str) -> Ban | None: ...
    async def save_ban(self, ban: Ban) -> None: ...

class SqliteBanRepository:
    """Concrete implementation — depends on aiosqlite."""
    async def find_ban(self, ip: str, jail: str) -> Ban | None: ...
    async def save_ban(self, ban: Ban) -> None: ...

13.7.1 Repository Module Pattern — Module-as-Protocol Structural Compatibility

BanGUI uses module-level functions for repository implementations, not classes. Each repository module (e.g., session_repo.py, blocklist_repo.py) exports async functions that match the signatures defined in the Protocol interface in protocols.py. This is a structural typing pattern — mypy accepts the module as a valid Protocol implementation because the function signatures match, even though the module is not explicitly annotated as implementing the Protocol.

This approach works correctly with FastAPI's dependency injection via cast():

# In app/repositories/session_repo.py
async def create_session(db: aiosqlite.Connection, token: str, created_at: str, expires_at: str) -> Session:
    """Insert a new session row."""
    ...

# In app/repositories/protocols.py
class SessionRepository(Protocol):
    async def create_session(
        self,
        db: aiosqlite.Connection,
        token: str,
        created_at: str,
        expires_at: str,
    ) -> Session:
        ...

# In app/dependencies.py
async def get_session_repo() -> SessionRepository:
    """Provide the concrete session repository implementation."""
    from app.repositories import session_repo
    return session_repo  # ← mypy accepts this because the module has matching functions

Why this pattern is used:

  • Simplicity — no boilerplate class/instance wrapping.
  • Compatibility — Python's structural typing (PEP 544) means the module automatically satisfies the Protocol interface if function signatures match.
  • Testability — the same DIP principle applies; services depend on the Protocol, not the module directly, so tests can mock the Protocol.

Risks and mitigations:

  • Silent breakage if function signatures change — If a parameter is added or removed from a module function, the module no longer satisfies the Protocol, but mypy does not flag this as an error because the module is loosely coupled. To prevent this, Protocol signatures in protocols.py are the source of truth. Always check that module functions match the Protocol definitions before merging changes. The CI/CD pipeline validates this compatibility at build time.

How the validation works (CI check):

  • Before each deployment, run mypy --strict to ensure all dependency providers return values compatible with their Protocol types.
  • The cast() calls in dependencies.py are a documented signal that structural compatibility is being verified externally, not via explicit class inheritance.
  • Automated tests in backend/tests/test_repositories/test_protocol_compliance.py verify that each repository module implements all protocol methods, preventing silent protocol drift.

13.7.1.1 Repository Protocol Coverage Checklist

All public repository functions must be defined in a corresponding Protocol. To add a new repository:

  1. Create the repository modulebackend/app/repositories/my_repo.py with async functions.
  2. Define the Protocol — Add a MyRepository(Protocol) class in backend/app/repositories/protocols.py with methods matching every public function signature.
  3. Add imports — If the Protocol uses custom return types, import them in protocols.py.
  4. Run compliance tests — Execute pytest backend/tests/test_repositories/test_protocol_compliance.py to verify coverage.
  5. Verify type safety — Run mypy --strict backend/app/repositories/protocols.py to ensure all types are correct.

Current repository protocol coverage (all 7 repositories fully covered):

  • SessionRepository — 4 methods
  • SettingsRepository — 4 methods
  • BlocklistRepository — 6 methods
  • ImportLogRepository — 4 methods
  • GeoCacheRepository — 13 methods
  • HistoryArchiveRepository — 5 methods
  • Fail2BanDbRepository — 8 methods

13.7.2 Session Token Hashing — One-Way Protection Against Database Exposure

Session tokens must be protected against database exposure. Session tokens are stored as one-way SHA256 hashes in the database to ensure that if the database file is compromised (volume mount misconfiguration, backup leak, etc.), the session tokens themselves cannot be directly used to hijack sessions.

Implementation pattern:

import hashlib
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    import aiosqlite

from app.models.auth import Session

def _hash_token(token: str) -> str:
    """Return the SHA256 hash of a session token."""
    return hashlib.sha256(token.encode()).hexdigest()

async def create_session(
    db: "aiosqlite.Connection",
    token: str,
    created_at: str,
    expires_at: str,
) -> Session:
    """Insert a new session row with the token hash."""
    token_hash = _hash_token(token)
    cursor = await db.execute(
        "INSERT INTO sessions (token_hash, created_at, expires_at) VALUES (?, ?, ?)",
        (token_hash, created_at, expires_at),
    )
    await db.commit()
    # Return the Session with the ORIGINAL token (not the hash)
    # so the service layer can sign and return it to the client.
    return Session(
        id=int(cursor.lastrowid) if cursor.lastrowid else 0,
        token=token,  # ← raw token, not the hash
        created_at=created_at,
        expires_at=expires_at,
    )

async def get_session(
    db: "aiosqlite.Connection",
    token: str
) -> Session | None:
    """Look up a session by token hash."""
    token_hash = _hash_token(token)
    async with db.execute(
        "SELECT id, token_hash, created_at, expires_at FROM sessions WHERE token_hash = ?",
        (token_hash,),
    ) as cursor:
        row = await cursor.fetchone()
    
    if row is None:
        return None
    
    # Return the Session with the INCOMING token (the one the client sent).
    return Session(
        id=int(row[0]),
        token=token,  # ← the raw token passed in
        created_at=str(row[2]),
        expires_at=str(row[3]),
    )

Key points:

  1. Hash on write — When inserting a session, hash the token before storage.
  2. Hash on read — When validating a session, hash the incoming token before the database lookup.
  3. Never store raw tokens — The token_hash column contains only hashes; raw tokens are never persisted.
  4. Return raw tokens to the service layer — The Session model's token field contains the raw token (for signing and response), not the hash.
  5. Database schema — Use token_hash TEXT NOT NULL UNIQUE instead of token TEXT NOT NULL UNIQUE, and create an index on token_hash.
  6. Migration strategy — When upgrading from plaintext to hashed tokens, drop the old table and recreate it. This invalidates all existing sessions, which is acceptable because the database was exposed in plaintext.

Why one-way hashing is safe:

  • If an attacker obtains a token hash from the database, they cannot reverse the SHA256 hash to recover the original token.
  • The attacker cannot use the hash directly in a client request — they would need the original token to pass the hash check.
  • This forces the attacker to either compromise the client (where they'd also get the raw token) or perform a brute-force attack against the hash space (infeasible for random 128-bit tokens).

Never use symmetric encryption — symmetric encryption stores a key in the database or environment, which merely shifts the exposure risk. A one-way hash is the correct choice for protecting tokens.

13.7.2a Session Token Signing Format — HMAC-SHA256 Integrity Protection

All session tokens sent to clients are signed using HMAC-SHA256. The signed token format is:

<raw_token>.<signature>

where:

  • <raw_token> is a 16-byte (128-bit) random hex string generated by secrets.token_hex(16).
  • . is the separator (defined in app.utils.constants.SESSION_TOKEN_SIGNATURE_SEPARATOR).
  • <signature> is the HMAC-SHA256 hex digest of <raw_token> using the configured session_secret.

Example: a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6.f7e6d5c4b3a2918f7e6d5c4b3a29180

Signing and verification pattern:

import hashlib
import hmac

def _session_token_signature(token: str, secret: str) -> str:
    """Return the HMAC-SHA256 signature for a session token."""
    return hmac.new(secret.encode(), token.encode(), hashlib.sha256).hexdigest()

def sign_session_token(token: str, secret: str) -> str:
    """Return a signed session token string for the client."""
    return f"{token}.{_session_token_signature(token, secret)}"

def unwrap_session_token(token: str, secret: str) -> str:
    """Verify and return the raw token from a signed session token.
    
    Raises ValueError if the token lacks a signature or signature is invalid.
    """
    if "." not in token:
        raise ValueError("Invalid session token.")
    
    raw_token, signature = token.rsplit(".", 1)
    expected_signature = _session_token_signature(raw_token, secret)
    if not hmac.compare_digest(expected_signature, signature):
        raise ValueError("Invalid session token.")
    return raw_token

Key points:

  1. All tokens must be signed — Tokens without a signature (no separator) are rejected immediately.
  2. Signature is mandatory — The unwrap_session_token() function raises ValueError if the separator is absent.
  3. Use HMAC-SHA256 — Always use hmac.compare_digest() for signature verification to prevent timing attacks.
  4. Sign on loginlogin() creates a raw token, stores it (hashed) in the database, then returns the signed token to the client.
  5. Verify on every request — The validate_session() service verifies the signature by calling unwrap_session_token() with the session_secret, then looks up the raw token in the database.
  6. Session invalidation — When upgrading from plaintext to signed tokens (TASK-022), all existing sessions must be invalidated because raw tokens will no longer be stored unencrypted.

Why HMAC signing is necessary:

  • Prevents token forgery — An attacker cannot create a valid token without knowing the session_secret.
  • Works alongside hashed storage — Even if the database is compromised (plaintext before hashing), the attacker gets only the raw token, not a signed token. A raw token without a valid signature is rejected by unwrap_session_token().
  • Timing attack resistancehmac.compare_digest() compares signatures in constant time, preventing attackers from using timing differences to guess valid signatures.

13.7.3 Session Cache Pluggability — Process-Local vs. Shared Backends

Session validation is expensive (SQLite lookup + password verification). To improve performance, validated session tokens are cached using the SessionCache interface (app.utils.session_cache). The default implementation, InMemorySessionCache, stores cached sessions in process-local memory.

Current implementation (single-worker):

from app.utils.session_cache import SessionCache, InMemorySessionCache, NoOpSessionCache

class SessionCache(Protocol):
    """Interface for session token validation cache backends."""
    def get(self, token: str) -> Session | None: ...
    def set(self, token: str, session: Session, ttl_seconds: float) -> None: ...
    def invalidate(self, token: str) -> None: ...
    def clear(self) -> None: ...

# Default in-memory implementation — PROCESS-LOCAL
class InMemorySessionCache:
    def __init__(self) -> None:
        self._entries: dict[str, tuple[Session, float]] = {}

Single-worker constraint:

InMemorySessionCache is process-local — each worker process has its own dict. In single-worker mode (enforced by TASK-002), this is safe and improves performance. In multi-worker deployments:

  • A logout by worker A clears the session from A's cache, but worker B still has it → logout doesn't work.
  • Enabling/disabling the cache requires restarting all workers to take effect.

Multi-worker solution:

To support multiple workers (future enhancement), implement a shared backend behind the same SessionCache Protocol:

# Example Redis implementation (not yet in codebase)
class RedisSessionCache:
    """Session cache backed by Redis."""
    def __init__(self, redis_url: str) -> None:
        self.client = aioredis.from_url(redis_url)
    
    async def get(self, token: str) -> Session | None:
        data = await self.client.get(f"session:{token}")
        return Session.model_validate_json(data) if data else None
    
    async def set(self, token: str, session: Session, ttl_seconds: float) -> None:
        await self.client.setex(
            f"session:{token}",
            int(ttl_seconds),
            session.model_dump_json()
        )
    
    async def invalidate(self, token: str) -> None:
        await self.client.delete(f"session:{token}")
    
    async def clear(self) -> None:
        await self.client.flushdb()

To adopt a Redis backend:

  1. Create RedisSessionCache in app.utils.session_cache.
  2. Update app.utils.runtime_state.set_runtime_settings() to instantiate RedisSessionCache when REDIS_URL env var is set.
  3. Update app.config.Settings to accept optional REDIS_URL.
  4. Tests continue to use InMemorySessionCache (no Redis dependency in dev).

Implementation rules:

  • All cache methods must be async (even if the backend is sync).
  • Never log session tokens or session data.
  • TTL must be respected — expired entries must be removed on access.
  • See app/utils/session_cache.py for the full Protocol definition and current implementations.

17.8 Composition over Inheritance

  • Favour composing small, focused objects over deep inheritance hierarchies.
  • Use mixins or protocols only when a clear "is-a" relationship exists; otherwise, pass collaborators as constructor arguments.

17.9 Fail Fast

  • Validate inputs as early as possible — at the API boundary with Pydantic, at service entry with assertions or domain checks.
  • Raise specific exceptions immediately rather than letting bad data propagate silently.

17.10 Law of Demeter (Principle of Least Knowledge)

  • A function should only call methods on:
    1. Its own object (self).
    2. Objects passed as parameters.
    3. Objects it creates.
  • Avoid long accessor chains like request.state.db.cursor().execute(...) — wrap them in a meaningful method.

17.11 Defensive Programming

  • Never trust external input — validate and sanitise everything that crosses a boundary (HTTP request, file, socket, environment variable).
  • Handle edge cases explicitly: empty lists, None values, negative numbers, empty strings.
  • Use type narrowing and exhaustive pattern matching (match / case) to eliminate impossible states.

17.12 SSRF Prevention (Server-Side Request Forgery)

When user-supplied URLs are fetched by the backend, validate them before making any HTTP requests:

  1. Use Pydantic's AnyHttpUrl type to restrict schemes to http:// and https:// only.

    • Rejects file://, ftp://, gopher://, and other non-http schemes at the model boundary.
  2. Validate resolved IP addresses before fetching:

    • Parse the hostname and resolve it via DNS (using socket.getaddrinfo()).
    • Use ipaddress.ip_address().is_private to reject private/reserved ranges:
      • RFC 1918: 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16
      • Loopback: 127.0.0.0/8, ::1/128
      • Link-local: 169.254.0.0/16, fe80::/10
      • IPv6 site-local, multicast, and reserved ranges.
    • Raise ValueError if validation fails; let the router convert it to HTTP 400.
  3. Guard against DNS rebinding:

    • Validate DNS at URL creation/validation time (performed during request deserialization).
    • For additional safety, re-validate the connection IP at HTTP client time (e.g., custom aiohttp.TCPConnector can inspect the resolved address during connect).
  4. Example implementation (see backend/app/utils/ip_utils.py):

    • is_private_ip(ip_str: str) → bool: Checks if IP is private/reserved/loopback/link-local.
    • async validate_blocklist_url(url: AnyHttpUrl) → None: Async DNS resolution + private IP check.
    • Service layer calls await validate_blocklist_url(url) before persisting; router catches ValueError and returns 400.

18. Quick Reference — Do / Don't

Do Don't
Type every function, variable, return Leave types implicit
Use async def for I/O Use sync functions for I/O
Validate with Pydantic at the boundary Pass raw dicts through the codebase
Log with structlog + context keys (INFO/WARNING/ERROR/DEBUG) Use print() or format strings in logs
Use log.exception() in catch-all handlers (captures traceback) Use log.error() for exceptions; let exceptions get lost
Write tests for every feature Ship untested code
Use aiohttp for HTTP calls Use requests
Handle errors with custom exceptions Use bare except:
Keep routers thin, logic in services Put business logic in routers
Use datetime.now(datetime.UTC) Use naive datetimes
Run ruff + mypy before committing Push code that doesn't pass linting
Keep GET endpoints read-only (no db.commit()) Call db.commit() / INSERT inside GET handlers
Batch DB writes; issue one db.commit() after the loop Commit inside a loop (1 fsync per row)
Use executemany() for bulk inserts Call execute() + commit() per row in a loop