- Change _fail2ban_connection_handler() to return generic message instead of leaking socket path in HTTP 502 response body - Change _fail2ban_protocol_handler() to return generic message instead of leaking raw exception details in HTTP 502 response body - Full error details are still logged server-side (error=str(exc)) for debugging - Update Backend-Development.md with error message hygiene section explaining the pattern: generic user-friendly messages in HTTP responses, full details in server logs only Fixes TASK-029: Fail2BanConnectionError leaks socket path in HTTP error responses Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
65 KiB
Backend Development — Rules & Guidelines
Rules and conventions every backend developer must follow. Read this before writing your first line of code.
1. Language & Typing
- Python 3.12+ is the minimum version.
- Every function, method, and variable must have explicit type annotations — no exceptions.
- Use
str,int,float,bool,Nonefor primitives. - Use
list[T],dict[K, V],set[T],tuple[T, ...](lowercase, built-in generics) — nevertyping.List,typing.Dict, etc. - Use
T | Noneinstead ofOptional[T]. - Use
TypeAlias,TypeVar,Protocol, andNewTypewhen they improve clarity. - Return types are mandatory — including
-> None. - Never use
Anyunless there is no other option and a comment explains why. - Run
mypy --strict(orpyrightin strict mode) — the codebase must pass with zero errors.
# Good
def get_jail_by_name(name: str) -> Jail | None:
...
# Bad — missing types
def get_jail_by_name(name):
...
2. Core Libraries
| Purpose | Library | Notes |
|---|---|---|
| Web framework | FastAPI | Async endpoints only. |
| Data validation & settings | Pydantic v2 | All request/response bodies and config models. |
| Async HTTP client | aiohttp (ClientSession) |
For external calls (blocklists, IP lookups). |
| Scheduling | APScheduler 4.x (async) | Blocklist imports, periodic health checks. |
| Structured logging | structlog | Every log call must use structlog — never print() or logging directly. |
| Database | aiosqlite | Async SQLite access for the application database. |
| Testing | pytest + pytest-asyncio + httpx (AsyncClient) |
Every feature needs tests. |
| Mocking | unittest.mock / pytest-mock | Isolate external dependencies. |
| Date & time | datetime (stdlib) — always timezone-aware | Use datetime.datetime.now(datetime.UTC). Never naive datetimes. |
| IP / Network | ipaddress (stdlib) | Validate and normalise IPs and CIDR ranges. |
| Environment / config | pydantic-settings | Load .env and environment variables into typed models. |
| fail2ban integration | fail2ban client (bundled) | Use the local copy at ./fail2ban-master. Import from ./fail2ban-master/fail2ban/client to communicate with the fail2ban socket. Do not install fail2ban as a pip package. |
fail2ban Client Usage
The repository ships with a vendored copy of fail2ban located at ./fail2ban-master.
All communication with the fail2ban daemon must go through the client classes found in ./fail2ban-master/fail2ban/client.
Add the project root to sys.path (or configure it in pyproject.toml as a path dependency) so that from fail2ban.client ... resolves to the bundled copy.
import sys
from pathlib import Path
# Ensure the bundled fail2ban is importable
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "fail2ban-master"))
from fail2ban.client.csocket import CSSocket # noqa: E402
Libraries you must NOT use
requests— useaiohttp(async).flask— we use FastAPI.celery— we use APScheduler.print()for logging — usestructlog.json.loads/json.dumpson Pydantic models — use.model_dump()/.model_validate().
Timestamp Handling
Timestamp consistency is critical for accurate ban history queries across the dashboard and history endpoints. Follow these rules:
Rule 1: Use consistent UTC timestamps
- All timestamps in the database are stored as Unix epochs (seconds since 1970-01-01 UTC).
- fail2ban stores timestamps using
time.time(), which is always UTC epoch seconds. - When querying fail2ban's SQLite database by timestamp, use
app.utils.time_utils.since_unix()(not manual datetime calculations).
Rule 2: Time-range windows include a 60-second slack
- The
since_unix()function includes a 60-second slack window (TIME_RANGE_SLACK_SECONDSinapp.utils.constants). - This slack accommodates:
- Clock drift between the local system and fail2ban.
- Test seeding delays when timestamps are manually set to exact boundaries.
- The slack ensures that dashboard and history queries return consistent row counts for the same time range.
Rule 3: Never duplicate timestamp calculation logic
- All services that query by time range must import and use
since_unix(). - Do not recalculate timestamps locally using
datetimeortimemodules in service code. - If you need a timestamp for a time range, use
since_unix().
Example:
from app.utils.time_utils import since_unix
# Get all bans from the last 24 hours (with 60-second slack)
since_ts: int = since_unix("24h")
rows = await db.execute(
"SELECT * FROM bans WHERE timeofban >= ?",
(since_ts,)
)
3. Project Structure
backend/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app factory, lifespan
│ ├── config.py # Pydantic settings
│ ├── dependencies.py # FastAPI dependency providers
│ ├── models/ # Pydantic schemas (request, response, domain)
│ ├── routers/ # FastAPI routers grouped by feature
│ ├── services/ # Business logic — one service per domain
│ ├── repositories/ # Database access layer
│ ├── tasks/ # APScheduler jobs
│ └── utils/ # Helpers, constants, shared types
├── tests/
│ ├── conftest.py
│ ├── test_routers/
│ ├── test_services/
│ └── test_repositories/
├── pyproject.toml
└── .env.example
- Routers receive requests, validate input via Pydantic, and delegate to services.
- Services contain business logic and call repositories or external clients.
- Repositories handle raw database queries — nothing else.
- Never put business logic inside routers or repositories.
4. FastAPI Conventions
-
Use async def for every endpoint — no sync endpoints.
-
Every endpoint must declare explicit response models (
response_model=...). -
Use Pydantic models for request bodies and query parameters — never raw dicts.
-
Use Depends() for dependency injection (database sessions, services, auth).
-
Group endpoints into routers by feature domain (
routers/jails.py,routers/bans.py, …). -
Use appropriate HTTP status codes:
201for creation,204for deletion with no body,404for not found, etc. -
Protected endpoints should return
401 Unauthorizedor403 Forbiddenwhen the session is invalid or expired; the frontend treats these responses as a session-expiry event and redirects the user to/login. -
Use HTTPException or custom exception handlers — never return error dicts manually.
-
GET endpoints are read-only — never call
db.commit()or execute INSERT/UPDATE/DELETE inside a GET handler. If a GET path produces side-effects (e.g., caching resolved data), that write belongs in a background task, a scheduled flush, or a separate POST endpoint. Users and HTTP caches assume GET is idempotent and non-mutating.# Good — pass db=None on GET so geo_service never commits result = await geo_service.lookup_batch(ips, http_session, db=None) # Bad — triggers INSERT + COMMIT per IP inside a GET handler result = await geo_service.lookup_batch(ips, http_session, db=app_db)
from fastapi import APIRouter, Depends, HTTPException, status
from app.models.jail import JailResponse, JailListResponse
from app.services.jail_service import JailService
router: APIRouter = APIRouter(prefix="/api/jails", tags=["Jails"])
@router.get("/", response_model=JailListResponse)
async def list_jails(service: JailService = Depends()) -> JailListResponse:
jails: list[JailResponse] = await service.get_all_jails()
return JailListResponse(jails=jails)
5. Pydantic Models
- Every model inherits from
pydantic.BaseModel. - Use
model_config = ConfigDict(strict=True)where appropriate. - Field names use snake_case in Python, export as camelCase to the frontend via alias generators if needed.
- Validate at the boundary — once data enters a Pydantic model it is trusted.
- Use
Field(...)with descriptions for every field to keep auto-generated docs useful. - Separate request models, response models, and domain (internal) models — do not reuse one model for all three.
from pydantic import BaseModel, Field
from datetime import datetime
class BanResponse(BaseModel):
ip: str = Field(..., description="Banned IP address")
jail: str = Field(..., description="Jail that issued the ban")
banned_at: datetime = Field(..., description="UTC timestamp of the ban")
expires_at: datetime | None = Field(None, description="UTC expiry, None if permanent")
ban_count: int = Field(..., ge=1, description="Number of times this IP was banned")
Using Literal Types for Constrained Strings
When a field should only accept a small set of predefined values, use Literal to enforce this at the type level:
from typing import Literal
from pydantic import BaseModel, Field
LogLevel = Literal["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"]
class GlobalConfigUpdate(BaseModel):
log_level: LogLevel | None = Field(
default=None,
description="Log level: CRITICAL, ERROR, WARNING, NOTICE, INFO, or DEBUG.",
)
This provides:
- Type safety — IDEs and type checkers enforce valid values.
- API documentation — OpenAPI docs automatically list all allowed values.
- Validation — Pydantic rejects invalid values and provides a clear error message.
Custom Field Validators
For fields that require complex validation (e.g., file paths that must be within allowed directories), use @field_validator:
from pydantic import field_validator
from app.utils.path_utils import validate_log_path
class AddLogPathRequest(BaseModel):
log_path: str = Field(..., description="Absolute path to the log file to monitor.")
@field_validator("log_path", mode="after")
@classmethod
def validate_log_path_field(cls, value: str) -> str:
"""Validate that the log path is within allowed directories."""
return validate_log_path(value)
Path Validation Helper:
For query parameters and other contexts where Pydantic validators cannot be used directly, use the validate_log_path() helper from app.utils.path_utils:
from fastapi import HTTPException, status
from app.utils.path_utils import validate_log_path
@router.delete("/{name}/logpath")
async def delete_log_path(
name: str,
log_path: str = Query(...),
) -> None:
try:
validate_log_path(log_path)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(e),
) from e
# ... rest of handler
Key points:
- Use
mode="after"in model validators to validate after Pydantic's basic type coercion. - Raise
ValueErrorif validation fails; Pydantic converts it to an HTTP 400 response. - For query parameters that cannot use Pydantic validators, use the
validate_log_path()helper and raise HTTP 422. - Never use string prefix matching for path validation (e.g.,
path.startswith("/var/log")). The helper usesPath.relative_to()to prevent bypasses like/var/log_evil/file.log. - Symlinks are resolved before validating to prevent symlink-based escapes.
6. Async Rules
-
Never call blocking / synchronous I/O in an async function — no
time.sleep(), no synchronous file reads, norequests.get(). -
Use
aiohttp.ClientSessionfor HTTP calls,aiosqlitefor database access. -
Use
asyncio.TaskGroup(Python 3.11+) when you need to run independent coroutines concurrently. -
Long-running startup/shutdown logic goes into the FastAPI lifespan context manager.
-
Never call
db.commit()inside a loop. With aiosqlite, every commit serialises through a background thread and forces anfsync. N rows × 1 commit = N fsyncs. Accumulate all writes in the loop, then issue a singledb.commit()once after the loop ends. The difference between 5,000 commits and 1 commit can be seconds vs milliseconds.# Good — one commit for the whole batch for ip, info in results.items(): await db.execute(INSERT_SQL, (ip, info.country_code, ...)) await db.commit() # ← single fsync # Bad — one fsync per row for ip, info in results.items(): await db.execute(INSERT_SQL, (ip, info.country_code, ...)) await db.commit() # ← fsync on every iteration -
Prefer
executemany()over callingexecute()in a loop when inserting or updating multiple rows with the same SQL template. aiosqlite passes the entire batch to SQLite in one call, reducing Python↔thread overhead on top of the single-commit saving.# Good await db.executemany(INSERT_SQL, [(ip, cc, cn, asn, org) for ip, info in results.items()]) await db.commit() -
Shared resources (DB connections, HTTP sessions) are created once during startup and closed during shutdown — never inside request handlers.
from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator
from fastapi import FastAPI
import aiohttp
import aiosqlite
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
# Startup
app.state.http_session = aiohttp.ClientSession()
app.state.db = await aiosqlite.connect("bangui.db")
yield
# Shutdown
await app.state.http_session.close()
await app.state.db.close()
Fire-and-Forget Background Tasks
When you need to spawn a background task that runs independently without waiting for the result, use asyncio.create_task() with the logged_task() helper from app.utils.async_utils. This ensures exceptions in background tasks are always logged and never silently discarded.
Why this matters: In Python 3.11+, unhandled exceptions in fire-and-forget tasks become silent RuntimeWarnings. Without logging, background errors (network failures, database writes, API timeouts) become invisible in structured logs and are extremely hard to debug.
Pattern:
from app.utils.async_utils import logged_task
# Bad — exceptions are silently discarded
asyncio.create_task(some_background_work())
# Good — exceptions are logged
asyncio.create_task(
logged_task(some_background_work(), "task_name"),
name="task_name",
)
The logged_task() wrapper:
- Wraps your coroutine to catch any exception
- Logs the exception with
log.exception()(structlog automatically captures the traceback) - Adds
task_nameto the structured log context - Never re-raises — it's safe to use with
asyncio.create_task()
Example:
import asyncio
from app.utils.async_utils import logged_task
import structlog
log = structlog.get_logger()
async def geo_lookup_batch(ips: list[str]) -> None:
"""Look up geolocation data for IPs asynchronously."""
try:
for ip in ips:
# May timeout, fail network call, or fail DB write
location = await lookup_ip_location(ip)
await db.execute(INSERT_GEO_SQL, (ip, location))
await db.commit()
except Exception:
# All exceptions are automatically logged by logged_task() wrapper
raise
# In your request handler or service:
asyncio.create_task(
logged_task(geo_lookup_batch(uncached_ips), "geo_cache_batch"),
name="geo_cache_batch",
)
6.1 Database Query Conventions
LIKE Queries and Wildcard Escaping
SQLite's LIKE operator treats % (any sequence of characters) and _ (any single character) as wildcards. When querying with user-supplied filters that may contain these characters, you must escape them to prevent unintended matches.
The Problem:
# Bad — ip_filter="10.0.0_" matches "10.0.0.1", "10.0.0.2", etc.
ip_filter = "10.0.0_"
await db.execute(
"SELECT * FROM bans WHERE ip LIKE ?",
(f"{ip_filter}%",) # ← wildcard characters not escaped
)
The Solution:
Use the escape_like() helper from app.utils.fail2ban_db_utils:
from app.utils.fail2ban_db_utils import escape_like
# Good — wildcard characters are escaped
ip_filter = "10.0.0_"
await db.execute(
"SELECT * FROM bans WHERE ip LIKE ? ESCAPE '\\'",
(f"{escape_like(ip_filter)}%",) # ← underscores escaped to literal
)
How escape_like() works:
The function escapes backslashes first, then % and _ signs:
def escape_like(s: str) -> str:
return s.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
Key rules:
- Backslash escapes first — to prevent double-escaping when the input contains backslashes.
- Add
ESCAPE '\\'to the SQL — tells SQLite which character to use for escaping. - Dots are not wildcards — they do not need escaping; normal IP addresses pass through unchanged.
Test example:
assert escape_like("10.0.0_") == "10.0.0\\_"
assert escape_like("10.0.0%test") == "10.0.0\\%test"
assert escape_like("10.0.0.1") == "10.0.0.1" # Unchanged
6.2 Database Migrations
The application database schema is versioned and migrated automatically on startup via app.db.init_db().
Migration Design Principles
Migrations must be atomic. All schema changes for a single version (DDL statements) and the schema_migrations record insert must be wrapped in a single BEGIN IMMEDIATE ... COMMIT transaction. This prevents partial migrations if a process crashes mid-migration.
If a crash occurs between migration steps, the next startup will:
- Detect the missing
schema_migrationsrecord. - Re-apply the entire migration in a single transaction (all-or-nothing).
- Avoid data corruption or schema inconsistency.
Writing a New Migration
- Add the DDL statements to
_MIGRATIONSdict inapp/db.py:
_MIGRATIONS: dict[int, str] = {
1: _CREATE_INITIAL_SCHEMA,
2: """
-- Migration 2: Add new_column to users table.
ALTER TABLE users ADD COLUMN new_column TEXT DEFAULT 'default_value';
CREATE INDEX idx_users_new_column ON users(new_column);
""",
}
- Update
_CURRENT_SCHEMA_VERSIONto the new version number:
_CURRENT_SCHEMA_VERSION: int = 2 # was 1
-
Ensure idempotency where possible:
- Use
CREATE TABLE IF NOT EXISTSandCREATE INDEX IF NOT EXISTS. - For
ALTER TABLE ADD COLUMN, check if the column exists first usingPRAGMA table_info()if re-applying the migration is a concern.
- Use
-
Verify atomicity in tests:
async def test_migration_2_is_atomic(tmp_path: Path) -> None:
"""Verify migration 2 rolls back on failure."""
db = await open_db(str(tmp_path / "test.db"))
try:
await db.execute("CREATE TABLE schema_migrations (version INTEGER PRIMARY KEY);")
await db.commit()
# Add a test migration that fails mid-way
original = db_module._MIGRATIONS.copy()
db_module._MIGRATIONS[99] = """
CREATE TABLE test_table (id INTEGER PRIMARY KEY);
INSERT INTO nonexistent_table VALUES (1);
"""
try:
with pytest.raises(Exception):
await _apply_migration(db, 99)
# Verify rollback: migration NOT recorded
async with db.execute(
"SELECT version FROM schema_migrations WHERE version = 99;"
) as cursor:
assert await cursor.fetchone() is None
# Verify rollback: table NOT created
async with db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='test_table';"
) as cursor:
assert await cursor.fetchone() is None
finally:
db_module._MIGRATIONS = original
finally:
await db.close()
Common Pitfalls
- Non-idempotent statements —
ALTER TABLE ADD COLUMNwithoutIF NOT EXISTSwill fail on re-run. Use explicit checks if needed. - Comments containing semicolons — the migration parser strips comments correctly, but avoid unusual comment syntax.
- String literals with semicolons — the parser handles these; no special escaping needed.
- Multiple operations in one migration — keep migrations focused. Combine related DDL but split unrelated changes.
7. Logging
- Use structlog for every log message.
- Bind contextual key-value pairs — never format strings manually.
- Log levels:
debugfor development detail,infofor operational events,warningfor recoverable issues,errorfor failures,criticalfor fatal problems. - Never log sensitive data (passwords, tokens, session tokens, raw credentials, private keys).
- For session correlation without leaking token material, use a one-way hash fragment:
hashlib.sha256(token.encode()).hexdigest()[:12]. - Use numeric database IDs for entity correlation instead of raw identifiers:
session_id=session.idinstead oftoken=session.token.
- For session correlation without leaking token material, use a one-way hash fragment:
import structlog
import hashlib
log: structlog.stdlib.BoundLogger = structlog.get_logger()
async def ban_ip(ip: str, jail: str) -> None:
log.info("banning_ip", ip=ip, jail=jail)
try:
await _execute_ban(ip, jail)
log.info("ip_banned", ip=ip, jail=jail)
except BanError as exc:
log.error("ban_failed", ip=ip, jail=jail, error=str(exc))
raise
async def logout_session(db: aiosqlite.Connection, token: str) -> None:
# Use a one-way hash for token correlation in logs
token_hash = hashlib.sha256(token.encode()).hexdigest()[:12]
await session_repo.delete_session(db, token)
log.info("session_terminated", token_hash=token_hash)
8. Error Handling
- Define custom exception classes for domain errors (e.g.,
JailNotFoundError,BanFailedError). - Catch specific exceptions — never bare
except:orexcept Exception:without re-raising. - Map domain exceptions to HTTP status codes via FastAPI exception handlers registered on the app.
- Always log errors with context before raising.
class JailNotFoundError(Exception):
def __init__(self, name: str) -> None:
self.name: str = name
super().__init__(f"Jail '{name}' not found")
# In main.py
@app.exception_handler(JailNotFoundError)
async def jail_not_found_handler(request: Request, exc: JailNotFoundError) -> JSONResponse:
return JSONResponse(status_code=404, content={"detail": f"Jail '{exc.name}' not found"})
Routers and Exception Propagation
- Routers must NOT construct
HTTPExceptionfor domain errors — let domain exceptions propagate. - Routers should never have helper functions like
_bad_gateway(),_not_found(),_conflict()etc. that convert domain exceptions toHTTPException. - All domain exception types must have corresponding handlers registered in
main.pyviaapp.add_exception_handler(). - Exception handlers are registered in order from most specific to least specific — FastAPI evaluates them in registration order.
# ❌ BAD — routers constructing HTTPException for domain exceptions
@router.get("/{name}")
async def get_jail(name: str, socket_path: Fail2BanSocketDep) -> JailDetailResponse:
try:
return await jail_service.get_jail(socket_path, name)
except JailNotFoundError:
raise HTTPException(status_code=404, detail=f"Jail not found: {name!r}") from None
# ✅ GOOD — domain exception propagates to global handler
@router.get("/{name}")
async def get_jail(name: str, socket_path: Fail2BanSocketDep) -> JailDetailResponse:
return await jail_service.get_jail(socket_path, name)
All domain exceptions raised by services propagate to handlers in main.py, ensuring:
- Consistent error response format across the entire API.
- No duplicated exception-to-HTTP-status mapping logic.
- Easy to audit all error codes — they are all in one place.
Error Message Hygiene
HTTP responses must never leak sensitive internal details that aid attackers or expose infrastructure:
- Never include system paths in HTTP error messages (e.g.,
/var/run/fail2ban/fail2ban.sock,/etc/fail2ban/). - Never include raw exception messages that expose internal parsing or implementation logic.
- Log full details server-side only — exception handlers must log
error=str(exc)with full exception context, but return generic user-friendly messages in the HTTP response.
# ❌ BAD — leaks socket path and internal details to the client
async def _fail2ban_connection_handler(request: Request, exc: Fail2BanConnectionError) -> JSONResponse:
return JSONResponse(
status_code=502,
content={"detail": f"Cannot reach fail2ban: {exc}"}, # exc includes socket path!
)
# ✅ GOOD — generic message in response, full details in server logs
async def _fail2ban_connection_handler(request: Request, exc: Fail2BanConnectionError) -> JSONResponse:
log.warning(
"fail2ban_connection_error",
path=request.url.path,
method=request.method,
error=str(exc), # Full details logged server-side
)
return JSONResponse(
status_code=502,
content={"detail": "Cannot reach the fail2ban service. Check the server status page."},
)
9. Testing
- Every new feature or bug fix must include tests.
- Tests live in
tests/mirroring theapp/structure. - Use
pytestwithpytest-asynciofor async tests. - Use
httpx.AsyncClientto test FastAPI endpoints (notTestClientwhich is sync). - Mock external dependencies (fail2ban socket, aiohttp calls) — tests must never touch real infrastructure.
- Aim for >80 % line coverage — critical paths (auth, banning, scheduling) must be 100 %.
- Test names follow
test_<unit>_<scenario>_<expected>pattern.
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import create_app
@pytest.fixture
async def client() -> AsyncClient:
app = create_app()
transport: ASGITransport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
@pytest.mark.asyncio
async def test_list_jails_returns_200(client: AsyncClient) -> None:
response = await client.get("/api/jails/")
assert response.status_code == 200
data: dict = response.json()
assert "jails" in data
9.1 Background Tasks and Scheduler Architecture
BanGUI uses APScheduler 4.x (async mode) to manage background jobs that execute on a schedule without user interaction. This section documents how to write and register background tasks.
Task Location and Structure
All background tasks live in backend/app/tasks/ as separate modules. Each task:
- Exports a
register(app: FastAPI) -> Noneorasync def register(app: FastAPI) -> Nonefunction. - Opens its own database connection using
app.db.open_db()or thetask_db()helper. - Closes connections when work completes (use the async context manager pattern).
- Runs independently of the FastAPI request/response cycle.
Example Task
# backend/app/tasks/my_task.py
import structlog
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
log = structlog.get_logger()
async def my_background_job(app: FastAPI) -> None:
"""Do important work on a schedule."""
log.info("my_background_job_started")
try:
db = await app.db.open_db(app.state.settings.database_path)
try:
# Do work...
pass
finally:
await db.close()
except Exception:
log.error("my_background_job_failed", exc_info=True)
def register(app: FastAPI) -> None:
"""Register the job with the scheduler."""
scheduler: AsyncIOScheduler = app.state.scheduler
scheduler.add_job(
my_background_job,
args=(app,),
trigger="interval",
seconds=60,
id="my_task",
name="My Background Job",
)
Accessing Shared Resources in Tasks
Since tasks do not have access to Depends(get_db) (no request scope), they must:
- Open their own DB connection via
app.state.db_factory.open_db(path). - Access app-level state —
app.state.http_session,app.state.geo_cache,app.state.settings, etc. - Use structlog for all logging (never
print()).
Single-Worker Requirement
The scheduler is bound to a single asyncio event loop and cannot be shared across multiple worker processes. BanGUI enforces single-worker mode to prevent duplicate task execution.
- Deployment constraint: Set
BANGUI_WORKERS=1(default). - Startup validation:
startup_shared_resources()raisesRuntimeErrorifBANGUI_WORKERS > 1. - See Architekture.md § 9.2 for full details.
10. Code Style & Tooling
| Tool | Purpose |
|---|---|
| Ruff | Linter and formatter (replaces black, isort, flake8). |
| mypy or pyright | Static type checking in strict mode. |
| pre-commit | Run ruff + type checker before every commit. |
- Line length: 120 characters max.
- Strings: use double quotes (
"). - Imports: sorted by ruff — stdlib → third-party → local, one import per line.
- No unused imports, no unused variables, no
# type: ignorewithout explanation. - Docstrings in Google style on every public function, class, and module.
11. fail2ban Response Utilities
All services that interact with the fail2ban daemon must use the canonical response parsing utilities from app.utils.fail2ban_response. This ensures consistent error handling, type safety, and makes it easy to fix bugs in response handling across the entire codebase.
Available Functions
ok(response: object) -> object
Extracts the payload from a fail2ban (return_code, data) response tuple.
- Raises
ValueErrorif return code ≠ 0 or response shape is invalid. - Use this on every response from
Fail2BanClient.send().
to_dict(pairs: object) -> dict[str, object]
Converts a list of (key, value) pairs (fail2ban's native response format) to a Python dict.
- Silently ignores malformed entries and non-list/tuple inputs.
- Always returns a dict (empty if input is invalid).
ensure_list(value: object | None) -> list[str]
Coerces fail2ban response values (which may be None, a single string, or a list) to a normalized list of strings.
- Handles all three cases consistently.
- Returns empty list for
Noneor empty strings.
is_not_found_error(exc: Exception) -> bool
Checks if an exception indicates a jail does not exist.
- Checks for multiple error message patterns (case-insensitive).
- Use this to distinguish "jail not found" errors from other failures.
Example Usage
from app.utils.fail2ban_response import ok, to_dict, ensure_list, is_not_found_error
from app.utils.fail2ban_client import Fail2BanClient
client = Fail2BanClient(socket_path="/var/run/fail2ban/fail2ban.sock")
try:
# Get jail status
response = await client.send(["status", "sshd", "short"])
status_dict = to_dict(ok(response)) # Extract payload and convert to dict
# Get list of banned IPs
ban_response = await client.send(["get", "sshd", "banip"])
banned_ips = ensure_list(ok(ban_response)) # Normalize to list of strings
except ValueError as exc:
if is_not_found_error(exc):
raise JailNotFoundError("sshd") from exc
raise
Why This Matters
Before this utility module, every service implemented its own copy of these functions, leading to:
- Code duplication across 7+ service files.
- Subtle inconsistencies in error handling.
- Difficult maintenance — every bug fix required touching multiple files.
Now, all services import from a single authoritative source, making response handling consistent, maintainable, and type-safe.
12. Configuration & Secrets
- All configuration lives in environment variables loaded through pydantic-settings.
- Secrets (master password hash, session key) are never committed to the repository.
- Provide a
.env.examplewith all keys and placeholder values. - Validate config at startup — fail fast with a clear error if a required value is missing.
from pydantic_settings import BaseSettings
from pydantic import Field
class Settings(BaseSettings):
database_path: str = Field("bangui.db", description="Path to SQLite database")
fail2ban_socket: str = Field("/var/run/fail2ban/fail2ban.sock", description="fail2ban socket path")
session_secret: str = Field(..., description="Secret key for session signing")
log_level: str = Field("info", description="Logging level")
model_config = {"env_prefix": "BANGUI_", "env_file": ".env"}
Session Secret Configuration
The session_secret is the HMAC key used to sign all session tokens. It must be at least 32 characters (256 bits) to provide sufficient cryptographic strength for HMAC-SHA256.
Minimum Length: 32 characters
Why 32 characters? Session tokens are signed using HMAC-SHA256. A secret shorter than 32 bytes (256 bits) significantly weakens the signature, potentially allowing attackers to forge valid tokens. The constraint is enforced at startup — the application will fail to start if session_secret is shorter than 32 characters.
Generation: Generate a secure secret using Python:
python -c "import secrets; print(secrets.token_hex(32))"
This produces a 64-character hexadecimal string (256 bits) suitable for production use.
Environment Variable:
BANGUI_SESSION_SECRET="your-32-character-minimum-secret-here"
Never commit the actual secret to the repository. Provide a .env.example with a placeholder:
# .env.example
BANGUI_SESSION_SECRET="set-this-to-a-32-character-minimum-secret"
Session Cookie Security
The session_cookie_secure configuration controls the Secure flag on the session cookie. This flag prevents browsers from sending the session cookie over unencrypted HTTP.
Default: true — Production deployments are secure by default. Cookies are only sent over HTTPS.
Local Development: Set BANGUI_SESSION_COOKIE_SECURE=false in your compose file or .env to allow cookies over HTTP (required for localhost:8000).
# Docker/compose.debug.yml
environment:
BANGUI_SESSION_COOKIE_SECURE: "false" # Allow HTTP during local development
Important: If Secure=true is set, browsers will reject the session cookie when the backend is served over HTTP. Ensure your nginx/reverse proxy terminates TLS and passes X-Forwarded-Proto: https so FastAPI knows the connection is secure.
CSRF Protection Middleware
State-mutating endpoints (POST, PUT, DELETE, PATCH) authenticated via session cookies are protected by the CsrfMiddleware, which enforces a custom header check.
How It Works:
-
For every request using a mutating HTTP method, the middleware checks:
- Is this request authenticated via session cookie (not Bearer token)?
- If yes, require the custom header
X-BanGUI-Request: 1. - If missing or incorrect, return
403 Forbidden.
-
Bearer token requests (via
Authorization: Bearerheader) bypass the check because tokens are not CSRF-vulnerable — they are never automatically sent on cross-origin requests. -
Safe HTTP methods (GET, HEAD, OPTIONS) bypass the check.
-
Cross-site protection: Cross-site JavaScript (
fetch()calls from other origins) cannot set custom headers without CORS preflight, which the backend rejects for non-allowed origins. This provides defense-in-depth against subdomain attacks and XSS injection.
Implementation Location:
- Middleware:
backend/app/middleware/csrf.py - Registered in:
backend/app/main.pyviaapp.add_middleware(CsrfMiddleware)
Example:
# ✓ Cookie-authenticated POST with CSRF header — allowed
POST /api/bans
Cookie: bangui_session=...
X-BanGUI-Request: 1
# ✗ Cookie-authenticated POST without CSRF header — rejected with 403
POST /api/bans
Cookie: bangui_session=...
(no X-BanGUI-Request header)
# ✓ Bearer token authentication without CSRF header — allowed
POST /api/bans
Authorization: Bearer <token>
(no X-BanGUI-Request header needed)
# ✓ Safe GET method without CSRF header — allowed
GET /api/jails
Cookie: bangui_session=...
(no X-BanGUI-Request header needed)
fail2ban_start_command Configuration
The fail2ban_start_command setting specifies the shell command used to start the fail2ban daemon during recovery operations (e.g., after a rollback).
Format & Parsing:
- The command is split into arguments using
shlex.split(), which respects shell quoting rules. - Paths with spaces must be quoted. Example:
"/opt/my tools/fail2ban-client" start. - The command is not executed through a shell — no shell variables or globbing are interpreted.
Validation:
- The command is validated at startup using
shlex.split(). Mismatched quotes will raise aValueErrorwith the problematic command in the error message.
Environment Variables:
BANGUI_FAIL2BAN_START_COMMAND="fail2ban-client start" # Default
BANGUI_FAIL2BAN_START_COMMAND="systemctl start fail2ban" # systemd
BANGUI_FAIL2BAN_START_COMMAND='"/opt/my tools/fail2ban" start' # Quoted path
Common Pitfall:
Using .split() instead of shlex.split() would break commands with spaces in paths. Always use quoted strings for paths that contain whitespace.
API Documentation Configuration
The enable_docs setting controls whether FastAPI serves interactive API documentation at /api/docs (Swagger UI) and /api/redoc (ReDoc).
Default: false — API documentation is disabled by default to prevent information disclosure in production.
When to Enable:
- Set
BANGUI_ENABLE_DOCS=truein development and debugging environments only. - Never enable in production. Exposed API documentation reveals all endpoints, request/response schemas, and allows direct API invocation from the browser.
Environment Variables:
BANGUI_ENABLE_DOCS="true" # Enable docs in development
BANGUI_ENABLE_DOCS="false" # Disable docs (default)
# Unset # Defaults to false (production)
Debug Compose File:
The Docker/compose.debug.yml sets BANGUI_ENABLE_DOCS: "true" for local development. Production compose files (Docker/compose.prod.yml) leave this unset, defaulting to false.
Middleware Allowlist:
The SetupRedirectMiddleware in main.py includes /api/docs, /api/redoc, and /api/openapi.json in its _ALWAYS_ALLOWED paths so documentation can be accessed before setup completes (if enabled).
Log Path Validation & Allowlisting
Authenticated users can instruct fail2ban to monitor additional log files through the API endpoint POST /api/config/jails/{name}/logpath. To prevent path-traversal attacks and unauthorized reads of sensitive system files, all requested log paths must resolve to locations within a configurable allowlist of safe directories.
Allowed Directories:
- Configured via the
BANGUI_ALLOWED_LOG_DIRSenvironment variable (comma-separated list). - Defaults to:
["/var/log", "/config/log"].
Path Validation Rules:
- The requested path is resolved to its canonical form using
Path(log_path).resolve(), which:- Expands relative paths to absolute paths.
- Resolves symbolic links to their real targets.
- Normalizes
.and..components.
- The resolved path is checked using
Path.is_relative_to()against each allowed directory prefix. - If the resolved path is not relative to any allowed directory, a
ValueErroris raised with a descriptive error message.
Implementation:
- Validation occurs in the Pydantic model
AddLogPathRequestusing a@field_validator. - The validator runs at request time, before the service layer is invoked.
- Symlinks that escape allowed directories are rejected (see symlink bypass tests).
Important: Use is_relative_to(), not startswith() or string prefix matching. The latter is bypassable with paths like /var/log_evil/file.log.
Environment Variables:
BANGUI_ALLOWED_LOG_DIRS="/var/log,/config/log" # Default
BANGUI_ALLOWED_LOG_DIRS="/var/log,/config/log,/home/app/logs" # Custom directory
Log Target Validation (fail2ban)
The log_target field on the global config endpoint (PUT /api/config/global) is critical for security because fail2ban runs as root. Users can only set log targets to:
- Special values:
STDOUT,STDERR,SYSLOG(case-insensitive) - File paths: Must resolve to one of the configured allowed directories (same allowlist as log paths)
Why This Matters:
- fail2ban creates/opens files with root privileges. Without validation, an attacker could write to arbitrary system paths (e.g.,
/etc/cron.d/malicious_script). - Validation occurs at both the Pydantic model layer (
GlobalConfigUpdate.validate_log_target()) and the service layer (update_global_config()) for defense in depth. - This prevents both HTTP and non-HTTP attack vectors.
Implementation:
# Model layer: Automatic validation via @field_validator
update = GlobalConfigUpdate(log_target="/etc/passwd") # Raises ValidationError → HTTP 422
# Service layer: Defense in depth
await config_service.update_global_config(socket_path, update) # Validates again before sending to fail2ban
Login Rate Limiting
The login endpoint (POST /api/auth/login) is protected against brute-force attacks using an in-memory rate limiter.
Design:
- Uses a
dict[str, deque[float]]keyed by client IP, storing login attempt timestamps within a time window. - Attempts outside the window are automatically removed during validation checks.
- Expired IP entries are cleaned up to prevent unbounded memory growth.
Rate Limit Rules:
- 5 attempts per 60 seconds per IP address.
- Requests exceeding the limit return HTTP 429 Too Many Requests with a
Retry-Afterheader. - Each failed login triggers a 10-second server-side delay (
asyncio.sleep) to further slow attacks, on top of bcrypt hashing (~100ms).
IP Extraction (Proxy Safety):
- When behind nginx, the rate limiter reads the real client IP from
X-Forwarded-FororX-Real-IPheaders. - Only trusts these headers when the immediate connection source is in a configured trusted proxy list.
- Prevents attackers from spoofing these headers to bypass rate limits.
- Falls back to the direct connection IP when proxy headers cannot be trusted.
Process-Local Limitation:
- The rate limiter is process-local (in-memory). In multi-worker deployments (e.g., Gunicorn with 4 workers), each worker maintains its own rate limit counter.
- This is acceptable because the single-worker constraint is enforced elsewhere. See TASK-002/003 notes for details.
Implementation:
- Rate limiter:
app.utils.rate_limiter.RateLimiter - IP extraction:
app.utils.client_ip.get_client_ip() - Dependency:
LoginRateLimiterDepinapp.dependencies
13. File I/O Conventions
All file write operations to critical configuration files must be atomic to prevent corruption if the process is killed mid-write.
Atomic File Writes
Configuration files (e.g., fail2ban jail configs in jail.d/) are essential for system operation. A truncated or corrupt config file can break fail2ban's ability to reload and may disable active protection.
Rule: Always use write-to-temp + atomic rename
Never use Path.write_text() or file.write() directly for critical files. Instead:
- Create a temporary file in the same directory as the target (crucial for atomic
os.replace()). - Write content to the temp file.
- Atomically rename the temp file to replace the target.
- Clean up the temp file if an error occurs.
Implementation Pattern:
import os
import tempfile
from pathlib import Path
target = Path("/path/to/config/file.conf")
tmp_name: str | None = None
try:
# Create temp file in target's directory (same filesystem = atomic)
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=target.parent,
delete=False,
suffix=".tmp",
) as tmp:
tmp.write(content)
tmp_name = tmp.name
# Atomic rename (single syscall on POSIX systems)
os.replace(tmp_name, target)
except OSError as exc:
# Clean up temp file on error
with contextlib.suppress(OSError):
if tmp_name is not None:
os.unlink(tmp_name)
raise ConfigWriteError(f"Cannot write config: {exc}") from exc
Why this matters:
Path.write_text()overwrites in place. If the process dies mid-write, the file is left truncated or partially written.os.replace()is atomic on POSIX systems (single rename syscall) only if source and target are on the same filesystem.- Creating the temp file in
target.parentensures atomicity. - On Linux containers, this prevents config corruption and service degradation.
Atomic write helper:
A shared atomic_write(path: Path, content: str) helper is available in app/services/config_file_helpers.py. This is the preferred way to perform atomic writes — it handles all the temp file and cleanup logic:
from app.services.config_file_helpers import atomic_write
atomic_write(path, updated_content) # Atomic write, auto-cleanup on error
Files requiring atomic writes:
- All config files under
jail.d/(created/modified by_write_conf_file,_create_conf_file,set_jail_config_enabled, andwrite_jail_config_file) - Any critical state files that fail2ban relies on
Examples in the codebase:
app/services/config_file_helpers.py:_write_conf_file,_create_conf_file,atomic_writeapp/services/raw_config_io_service.py:set_jail_config_enabled,write_jail_config_fileapp/services/jail_config_service.py:_write_local_file_sync,_restore_local_file_sync
14. Git & Workflow
- Branch naming:
feature/<short-description>,fix/<short-description>,chore/<short-description>. - Commit messages: imperative tense, max 72 chars first line (
Add jail reload endpoint,Fix ban history query). - Every merge request must pass: ruff, type checker, all tests.
- Do not merge with failing CI.
- Keep pull requests small and focused — one feature or fix per PR.
15. Coding Principles
These principles are non-negotiable. Every backend contributor must internalise and apply them daily.
15.1 Clean Code
- Write code that reads like well-written prose — a new developer should understand intent without asking.
- Meaningful names — variables, functions, and classes must reveal their purpose. Avoid abbreviations (
cnt,mgr,tmp) unless universally understood. - Small functions — each function does exactly one thing. If you need a comment to explain a block inside a function, extract it into its own function.
- No magic numbers or strings — use named constants.
- Boy Scout Rule — leave every file cleaner than you found it.
- Avoid deep nesting — prefer early returns (guard clauses) to keep the happy path at the top indentation level.
# Good — guard clause, clear name, one job
async def get_active_ban(ip: str, jail: str) -> Ban:
ban: Ban | None = await repo.find_ban(ip=ip, jail=jail)
if ban is None:
raise BanNotFoundError(ip=ip, jail=jail)
if ban.is_expired():
raise BanExpiredError(ip=ip, jail=jail)
return ban
# Bad — nested, vague name
async def check(ip, j):
b = await repo.find_ban(ip=ip, jail=j)
if b:
if not b.is_expired():
return b
else:
raise Exception("expired")
else:
raise Exception("not found")
15.2 Separation of Concerns (SoC)
- Each module, class, and function must have a single, well-defined responsibility.
- Routers → HTTP layer only (parse requests, return responses).
- Services → business logic and orchestration.
- Repositories → data access and persistence.
- Models → data shapes and validation.
- Tasks → scheduled background jobs.
- Never mix layers — a router must not execute SQL, and a repository must not raise
HTTPException.
15.3 Single Responsibility Principle (SRP)
- A class or module should have one and only one reason to change.
- If a service handles both ban management and email notifications, split it into
BanServiceandNotificationService.
15.4 Don't Repeat Yourself (DRY)
- Extract shared logic into utility functions, base classes, or dependency providers.
- If the same block of code appears in more than one place, refactor it into a single source of truth.
- But don't over-abstract — premature DRY that couples unrelated features is worse than a little duplication (see Rule of Three: refactor when something appears a third time).
15.5 KISS — Keep It Simple, Stupid
- Choose the simplest solution that works correctly.
- Avoid clever tricks, premature optimisation, and over-engineering.
- If a standard library function does the job, prefer it over a custom implementation.
15.6 YAGNI — You Aren't Gonna Need It
- Do not build features, abstractions, or config options "just in case".
- Implement what is required now. Extend later when a real need emerges.
15.7 Dependency Inversion Principle (DIP)
- High-level modules (services) must not depend on low-level modules (repositories) directly. Both should depend on abstractions (protocols / interfaces).
- Use FastAPI's
Depends()to inject implementations — this makes swapping and testing trivial.
from typing import Protocol
class BanRepository(Protocol):
async def find_ban(self, ip: str, jail: str) -> Ban | None: ...
async def save_ban(self, ban: Ban) -> None: ...
class SqliteBanRepository:
"""Concrete implementation — depends on aiosqlite."""
async def find_ban(self, ip: str, jail: str) -> Ban | None: ...
async def save_ban(self, ban: Ban) -> None: ...
13.7.1 Repository Module Pattern — Module-as-Protocol Structural Compatibility
BanGUI uses module-level functions for repository implementations, not classes. Each repository module (e.g., session_repo.py, blocklist_repo.py) exports async functions that match the signatures defined in the Protocol interface in protocols.py. This is a structural typing pattern — mypy accepts the module as a valid Protocol implementation because the function signatures match, even though the module is not explicitly annotated as implementing the Protocol.
This approach works correctly with FastAPI's dependency injection via cast():
# In app/repositories/session_repo.py
async def create_session(db: aiosqlite.Connection, token: str, created_at: str, expires_at: str) -> Session:
"""Insert a new session row."""
...
# In app/repositories/protocols.py
class SessionRepository(Protocol):
async def create_session(
self,
db: aiosqlite.Connection,
token: str,
created_at: str,
expires_at: str,
) -> Session:
...
# In app/dependencies.py
async def get_session_repo() -> SessionRepository:
"""Provide the concrete session repository implementation."""
from app.repositories import session_repo
return session_repo # ← mypy accepts this because the module has matching functions
Why this pattern is used:
- Simplicity — no boilerplate class/instance wrapping.
- Compatibility — Python's structural typing (PEP 544) means the module automatically satisfies the Protocol interface if function signatures match.
- Testability — the same DIP principle applies; services depend on the Protocol, not the module directly, so tests can mock the Protocol.
Risks and mitigations:
- Silent breakage if function signatures change — If a parameter is added or removed from a module function, the module no longer satisfies the Protocol, but mypy does not flag this as an error because the module is loosely coupled. To prevent this, Protocol signatures in
protocols.pyare the source of truth. Always check that module functions match the Protocol definitions before merging changes. The CI/CD pipeline validates this compatibility at build time.
How the validation works (CI check):
- Before each deployment, run
mypy --strictto ensure all dependency providers return values compatible with their Protocol types. - The
cast()calls independencies.pyare a documented signal that structural compatibility is being verified externally, not via explicit class inheritance.
13.7.2 Session Token Hashing — One-Way Protection Against Database Exposure
Session tokens must be protected against database exposure. Session tokens are stored as one-way SHA256 hashes in the database to ensure that if the database file is compromised (volume mount misconfiguration, backup leak, etc.), the session tokens themselves cannot be directly used to hijack sessions.
Implementation pattern:
import hashlib
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import aiosqlite
from app.models.auth import Session
def _hash_token(token: str) -> str:
"""Return the SHA256 hash of a session token."""
return hashlib.sha256(token.encode()).hexdigest()
async def create_session(
db: "aiosqlite.Connection",
token: str,
created_at: str,
expires_at: str,
) -> Session:
"""Insert a new session row with the token hash."""
token_hash = _hash_token(token)
cursor = await db.execute(
"INSERT INTO sessions (token_hash, created_at, expires_at) VALUES (?, ?, ?)",
(token_hash, created_at, expires_at),
)
await db.commit()
# Return the Session with the ORIGINAL token (not the hash)
# so the service layer can sign and return it to the client.
return Session(
id=int(cursor.lastrowid) if cursor.lastrowid else 0,
token=token, # ← raw token, not the hash
created_at=created_at,
expires_at=expires_at,
)
async def get_session(
db: "aiosqlite.Connection",
token: str
) -> Session | None:
"""Look up a session by token hash."""
token_hash = _hash_token(token)
async with db.execute(
"SELECT id, token_hash, created_at, expires_at FROM sessions WHERE token_hash = ?",
(token_hash,),
) as cursor:
row = await cursor.fetchone()
if row is None:
return None
# Return the Session with the INCOMING token (the one the client sent).
return Session(
id=int(row[0]),
token=token, # ← the raw token passed in
created_at=str(row[2]),
expires_at=str(row[3]),
)
Key points:
- Hash on write — When inserting a session, hash the token before storage.
- Hash on read — When validating a session, hash the incoming token before the database lookup.
- Never store raw tokens — The
token_hashcolumn contains only hashes; raw tokens are never persisted. - Return raw tokens to the service layer — The
Sessionmodel'stokenfield contains the raw token (for signing and response), not the hash. - Database schema — Use
token_hash TEXT NOT NULL UNIQUEinstead oftoken TEXT NOT NULL UNIQUE, and create an index ontoken_hash. - Migration strategy — When upgrading from plaintext to hashed tokens, drop the old table and recreate it. This invalidates all existing sessions, which is acceptable because the database was exposed in plaintext.
Why one-way hashing is safe:
- If an attacker obtains a token hash from the database, they cannot reverse the SHA256 hash to recover the original token.
- The attacker cannot use the hash directly in a client request — they would need the original token to pass the hash check.
- This forces the attacker to either compromise the client (where they'd also get the raw token) or perform a brute-force attack against the hash space (infeasible for random 128-bit tokens).
Never use symmetric encryption — symmetric encryption stores a key in the database or environment, which merely shifts the exposure risk. A one-way hash is the correct choice for protecting tokens.
13.7.2a Session Token Signing Format — HMAC-SHA256 Integrity Protection
All session tokens sent to clients are signed using HMAC-SHA256. The signed token format is:
<raw_token>.<signature>
where:
<raw_token>is a 16-byte (128-bit) random hex string generated bysecrets.token_hex(16)..is the separator (defined inapp.utils.constants.SESSION_TOKEN_SIGNATURE_SEPARATOR).<signature>is the HMAC-SHA256 hex digest of<raw_token>using the configuredsession_secret.
Example: a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6.f7e6d5c4b3a2918f7e6d5c4b3a29180
Signing and verification pattern:
import hashlib
import hmac
def _session_token_signature(token: str, secret: str) -> str:
"""Return the HMAC-SHA256 signature for a session token."""
return hmac.new(secret.encode(), token.encode(), hashlib.sha256).hexdigest()
def sign_session_token(token: str, secret: str) -> str:
"""Return a signed session token string for the client."""
return f"{token}.{_session_token_signature(token, secret)}"
def unwrap_session_token(token: str, secret: str) -> str:
"""Verify and return the raw token from a signed session token.
Raises ValueError if the token lacks a signature or signature is invalid.
"""
if "." not in token:
raise ValueError("Invalid session token.")
raw_token, signature = token.rsplit(".", 1)
expected_signature = _session_token_signature(raw_token, secret)
if not hmac.compare_digest(expected_signature, signature):
raise ValueError("Invalid session token.")
return raw_token
Key points:
- All tokens must be signed — Tokens without a signature (no separator) are rejected immediately.
- Signature is mandatory — The
unwrap_session_token()function raisesValueErrorif the separator is absent. - Use HMAC-SHA256 — Always use
hmac.compare_digest()for signature verification to prevent timing attacks. - Sign on login —
login()creates a raw token, stores it (hashed) in the database, then returns the signed token to the client. - Verify on every request — The
validate_session()service verifies the signature by callingunwrap_session_token()with thesession_secret, then looks up the raw token in the database. - Session invalidation — When upgrading from plaintext to signed tokens (TASK-022), all existing sessions must be invalidated because raw tokens will no longer be stored unencrypted.
Why HMAC signing is necessary:
- Prevents token forgery — An attacker cannot create a valid token without knowing the
session_secret. - Works alongside hashed storage — Even if the database is compromised (plaintext before hashing), the attacker gets only the raw token, not a signed token. A raw token without a valid signature is rejected by
unwrap_session_token(). - Timing attack resistance —
hmac.compare_digest()compares signatures in constant time, preventing attackers from using timing differences to guess valid signatures.
13.7.3 Session Cache Pluggability — Process-Local vs. Shared Backends
Session validation is expensive (SQLite lookup + password verification). To improve performance, validated session tokens are cached using the SessionCache interface (app.utils.session_cache). The default implementation, InMemorySessionCache, stores cached sessions in process-local memory.
Current implementation (single-worker):
from app.utils.session_cache import SessionCache, InMemorySessionCache, NoOpSessionCache
class SessionCache(Protocol):
"""Interface for session token validation cache backends."""
def get(self, token: str) -> Session | None: ...
def set(self, token: str, session: Session, ttl_seconds: float) -> None: ...
def invalidate(self, token: str) -> None: ...
def clear(self) -> None: ...
# Default in-memory implementation — PROCESS-LOCAL
class InMemorySessionCache:
def __init__(self) -> None:
self._entries: dict[str, tuple[Session, float]] = {}
Single-worker constraint:
InMemorySessionCache is process-local — each worker process has its own dict. In single-worker mode (enforced by TASK-002), this is safe and improves performance. In multi-worker deployments:
- A logout by worker A clears the session from A's cache, but worker B still has it → logout doesn't work.
- Enabling/disabling the cache requires restarting all workers to take effect.
Multi-worker solution:
To support multiple workers (future enhancement), implement a shared backend behind the same SessionCache Protocol:
# Example Redis implementation (not yet in codebase)
class RedisSessionCache:
"""Session cache backed by Redis."""
def __init__(self, redis_url: str) -> None:
self.client = aioredis.from_url(redis_url)
async def get(self, token: str) -> Session | None:
data = await self.client.get(f"session:{token}")
return Session.model_validate_json(data) if data else None
async def set(self, token: str, session: Session, ttl_seconds: float) -> None:
await self.client.setex(
f"session:{token}",
int(ttl_seconds),
session.model_dump_json()
)
async def invalidate(self, token: str) -> None:
await self.client.delete(f"session:{token}")
async def clear(self) -> None:
await self.client.flushdb()
To adopt a Redis backend:
- Create
RedisSessionCacheinapp.utils.session_cache. - Update
app.utils.runtime_state.set_runtime_settings()to instantiateRedisSessionCachewhenREDIS_URLenv var is set. - Update
app.config.Settingsto accept optionalREDIS_URL. - Tests continue to use
InMemorySessionCache(no Redis dependency in dev).
Implementation rules:
- All cache methods must be
async(even if the backend is sync). - Never log session tokens or session data.
- TTL must be respected — expired entries must be removed on access.
- See
app/utils/session_cache.pyfor the full Protocol definition and current implementations.
15.8 Composition over Inheritance
- Favour composing small, focused objects over deep inheritance hierarchies.
- Use mixins or protocols only when a clear "is-a" relationship exists; otherwise, pass collaborators as constructor arguments.
15.9 Fail Fast
- Validate inputs as early as possible — at the API boundary with Pydantic, at service entry with assertions or domain checks.
- Raise specific exceptions immediately rather than letting bad data propagate silently.
15.10 Law of Demeter (Principle of Least Knowledge)
- A function should only call methods on:
- Its own object (
self). - Objects passed as parameters.
- Objects it creates.
- Its own object (
- Avoid long accessor chains like
request.state.db.cursor().execute(...)— wrap them in a meaningful method.
15.11 Defensive Programming
- Never trust external input — validate and sanitise everything that crosses a boundary (HTTP request, file, socket, environment variable).
- Handle edge cases explicitly: empty lists,
Nonevalues, negative numbers, empty strings. - Use type narrowing and exhaustive pattern matching (
match/case) to eliminate impossible states.
15.12 SSRF Prevention (Server-Side Request Forgery)
When user-supplied URLs are fetched by the backend, validate them before making any HTTP requests:
-
Use Pydantic's
AnyHttpUrltype to restrict schemes tohttp://andhttps://only.- Rejects
file://,ftp://,gopher://, and other non-http schemes at the model boundary.
- Rejects
-
Validate resolved IP addresses before fetching:
- Parse the hostname and resolve it via DNS (using
socket.getaddrinfo()). - Use
ipaddress.ip_address().is_privateto reject private/reserved ranges:- RFC 1918:
10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 - Loopback:
127.0.0.0/8,::1/128 - Link-local:
169.254.0.0/16,fe80::/10 - IPv6 site-local, multicast, and reserved ranges.
- RFC 1918:
- Raise
ValueErrorif validation fails; let the router convert it to HTTP 400.
- Parse the hostname and resolve it via DNS (using
-
Guard against DNS rebinding:
- Validate DNS at URL creation/validation time (performed during request deserialization).
- For additional safety, re-validate the connection IP at HTTP client time (e.g., custom
aiohttp.TCPConnectorcan inspect the resolved address during connect).
-
Example implementation (see
backend/app/utils/ip_utils.py):is_private_ip(ip_str: str) → bool: Checks if IP is private/reserved/loopback/link-local.async validate_blocklist_url(url: AnyHttpUrl) → None: Async DNS resolution + private IP check.- Service layer calls
await validate_blocklist_url(url)before persisting; router catchesValueErrorand returns 400.
16. Quick Reference — Do / Don't
| Do | Don't |
|---|---|
| Type every function, variable, return | Leave types implicit |
Use async def for I/O |
Use sync functions for I/O |
| Validate with Pydantic at the boundary | Pass raw dicts through the codebase |
| Log with structlog + context keys | Use print() or format strings in logs |
| Write tests for every feature | Ship untested code |
Use aiohttp for HTTP calls |
Use requests |
| Handle errors with custom exceptions | Use bare except: |
| Keep routers thin, logic in services | Put business logic in routers |
Use datetime.now(datetime.UTC) |
Use naive datetimes |
| Run ruff + mypy before committing | Push code that doesn't pass linting |
Keep GET endpoints read-only (no db.commit()) |
Call db.commit() / INSERT inside GET handlers |
Batch DB writes; issue one db.commit() after the loop |
Commit inside a loop (1 fsync per row) |
Use executemany() for bulk inserts |
Call execute() + commit() per row in a loop |