Addresses security concern where FastAPI's default behavior exposes interactive API documentation (/docs, /redoc) without authentication, allowing attackers to enumerate endpoints and understand API schemas. Changes: - Add BANGUI_ENABLE_DOCS boolean setting (default: false) to Settings - Modify create_app() to conditionally set docs_url, redoc_url, openapi_url - Add docs endpoints to SetupRedirectMiddleware allowlist (/api/docs, /api/redoc, /api/openapi.json) - Set BANGUI_ENABLE_DOCS=true in Docker/compose.debug.yml for development - Production compose files leave it unset (defaults to false, docs disabled) - Add comprehensive tests for docs configuration - Document the new setting in Backend-Development.md Security Impact: - API documentation is now disabled by default in production - Development environments can enable docs by setting BANGUI_ENABLE_DOCS=true - Docs endpoints are inaccessible in production without manual configuration Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1432 lines
62 KiB
Markdown
1432 lines
62 KiB
Markdown
# Backend Development — Rules & Guidelines
|
||
|
||
Rules and conventions every backend developer must follow. Read this before writing your first line of code.
|
||
|
||
---
|
||
|
||
## 1. Language & Typing
|
||
|
||
- **Python 3.12+** is the minimum version.
|
||
- **Every** function, method, and variable must have explicit type annotations — no exceptions.
|
||
- Use `str`, `int`, `float`, `bool`, `None` for primitives.
|
||
- Use `list[T]`, `dict[K, V]`, `set[T]`, `tuple[T, ...]` (lowercase, built-in generics) — never `typing.List`, `typing.Dict`, etc.
|
||
- Use `T | None` instead of `Optional[T]`.
|
||
- Use `TypeAlias`, `TypeVar`, `Protocol`, and `NewType` when they improve clarity.
|
||
- Return types are **mandatory** — including `-> None`.
|
||
- Never use `Any` unless there is no other option and a comment explains why.
|
||
- Run `mypy --strict` (or `pyright` in strict mode) — the codebase must pass with zero errors.
|
||
|
||
```python
|
||
# Good
|
||
def get_jail_by_name(name: str) -> Jail | None:
|
||
...
|
||
|
||
# Bad — missing types
|
||
def get_jail_by_name(name):
|
||
...
|
||
```
|
||
|
||
---
|
||
|
||
## 2. Core Libraries
|
||
|
||
| Purpose | Library | Notes |
|
||
|---|---|---|
|
||
| Web framework | **FastAPI** | Async endpoints only. |
|
||
| Data validation & settings | **Pydantic v2** | All request/response bodies and config models. |
|
||
| Async HTTP client | **aiohttp** (`ClientSession`) | For external calls (blocklists, IP lookups). |
|
||
| Scheduling | **APScheduler 4.x** (async) | Blocklist imports, periodic health checks. |
|
||
| Structured logging | **structlog** | Every log call must use structlog — never `print()` or `logging` directly. |
|
||
| Database | **aiosqlite** | Async SQLite access for the application database. |
|
||
| Testing | **pytest** + **pytest-asyncio** + **httpx** (`AsyncClient`) | Every feature needs tests. |
|
||
| Mocking | **unittest.mock** / **pytest-mock** | Isolate external dependencies. |
|
||
| Date & time | **datetime** (stdlib) — always timezone-aware | Use `datetime.datetime.now(datetime.UTC)`. Never naive datetimes. |
|
||
| IP / Network | **ipaddress** (stdlib) | Validate and normalise IPs and CIDR ranges. |
|
||
| Environment / config | **pydantic-settings** | Load `.env` and environment variables into typed models. |
|
||
| fail2ban integration | **fail2ban client** (bundled) | Use the local copy at [`./fail2ban-master`](../fail2ban-master). Import from [`./fail2ban-master/fail2ban/client`](../fail2ban-master/fail2ban/client) to communicate with the fail2ban socket. Do **not** install fail2ban as a pip package. |
|
||
|
||
### fail2ban Client Usage
|
||
|
||
The repository ships with a vendored copy of fail2ban located at `./fail2ban-master`.
|
||
All communication with the fail2ban daemon must go through the client classes found in `./fail2ban-master/fail2ban/client`.
|
||
Add the project root to `sys.path` (or configure it in `pyproject.toml` as a path dependency) so that `from fail2ban.client ...` resolves to the bundled copy.
|
||
|
||
```python
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
# Ensure the bundled fail2ban is importable
|
||
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "fail2ban-master"))
|
||
|
||
from fail2ban.client.csocket import CSSocket # noqa: E402
|
||
```
|
||
|
||
### Libraries you must NOT use
|
||
|
||
- `requests` — use `aiohttp` (async).
|
||
- `flask` — we use FastAPI.
|
||
- `celery` — we use APScheduler.
|
||
- `print()` for logging — use `structlog`.
|
||
- `json.loads` / `json.dumps` on Pydantic models — use `.model_dump()` / `.model_validate()`.
|
||
|
||
### Timestamp Handling
|
||
|
||
Timestamp consistency is critical for accurate ban history queries across the dashboard and history endpoints. Follow these rules:
|
||
|
||
**Rule 1: Use consistent UTC timestamps**
|
||
- All timestamps in the database are stored as Unix epochs (seconds since 1970-01-01 UTC).
|
||
- fail2ban stores timestamps using `time.time()`, which is always UTC epoch seconds.
|
||
- When querying fail2ban's SQLite database by timestamp, use `app.utils.time_utils.since_unix()` (not manual datetime calculations).
|
||
|
||
**Rule 2: Time-range windows include a 60-second slack**
|
||
- The `since_unix()` function includes a 60-second slack window (`TIME_RANGE_SLACK_SECONDS` in `app.utils.constants`).
|
||
- This slack accommodates:
|
||
- Clock drift between the local system and fail2ban.
|
||
- Test seeding delays when timestamps are manually set to exact boundaries.
|
||
- The slack ensures that dashboard and history queries return consistent row counts for the same time range.
|
||
|
||
**Rule 3: Never duplicate timestamp calculation logic**
|
||
- All services that query by time range must import and use `since_unix()`.
|
||
- Do not recalculate timestamps locally using `datetime` or `time` modules in service code.
|
||
- If you need a timestamp for a time range, use `since_unix()`.
|
||
|
||
**Example:**
|
||
```python
|
||
from app.utils.time_utils import since_unix
|
||
|
||
# Get all bans from the last 24 hours (with 60-second slack)
|
||
since_ts: int = since_unix("24h")
|
||
rows = await db.execute(
|
||
"SELECT * FROM bans WHERE timeofban >= ?",
|
||
(since_ts,)
|
||
)
|
||
```
|
||
|
||
---
|
||
|
||
## 3. Project Structure
|
||
|
||
```
|
||
backend/
|
||
├── app/
|
||
│ ├── __init__.py
|
||
│ ├── main.py # FastAPI app factory, lifespan
|
||
│ ├── config.py # Pydantic settings
|
||
│ ├── dependencies.py # FastAPI dependency providers
|
||
│ ├── models/ # Pydantic schemas (request, response, domain)
|
||
│ ├── routers/ # FastAPI routers grouped by feature
|
||
│ ├── services/ # Business logic — one service per domain
|
||
│ ├── repositories/ # Database access layer
|
||
│ ├── tasks/ # APScheduler jobs
|
||
│ └── utils/ # Helpers, constants, shared types
|
||
├── tests/
|
||
│ ├── conftest.py
|
||
│ ├── test_routers/
|
||
│ ├── test_services/
|
||
│ └── test_repositories/
|
||
├── pyproject.toml
|
||
└── .env.example
|
||
```
|
||
|
||
- **Routers** receive requests, validate input via Pydantic, and delegate to **services**.
|
||
- **Services** contain business logic and call **repositories** or external clients.
|
||
- **Repositories** handle raw database queries — nothing else.
|
||
- Never put business logic inside routers or repositories.
|
||
|
||
---
|
||
|
||
## 4. FastAPI Conventions
|
||
|
||
- Use **async def** for every endpoint — no sync endpoints.
|
||
- Every endpoint must declare explicit **response models** (`response_model=...`).
|
||
- Use **Pydantic models** for request bodies and query parameters — never raw dicts.
|
||
- Use **Depends()** for dependency injection (database sessions, services, auth).
|
||
- Group endpoints into routers by feature domain (`routers/jails.py`, `routers/bans.py`, …).
|
||
- Use appropriate HTTP status codes: `201` for creation, `204` for deletion with no body, `404` for not found, etc.
|
||
- Protected endpoints should return `401 Unauthorized` or `403 Forbidden` when the session is invalid or expired; the frontend treats these responses as a session-expiry event and redirects the user to `/login`.
|
||
- Use **HTTPException** or custom exception handlers — never return error dicts manually.
|
||
- **GET endpoints are read-only — never call `db.commit()` or execute INSERT/UPDATE/DELETE inside a GET handler.** If a GET path produces side-effects (e.g., caching resolved data), that write belongs in a background task, a scheduled flush, or a separate POST endpoint. Users and HTTP caches assume GET is idempotent and non-mutating.
|
||
|
||
```python
|
||
# Good — pass db=None on GET so geo_service never commits
|
||
result = await geo_service.lookup_batch(ips, http_session, db=None)
|
||
|
||
# Bad — triggers INSERT + COMMIT per IP inside a GET handler
|
||
result = await geo_service.lookup_batch(ips, http_session, db=app_db)
|
||
```
|
||
|
||
```python
|
||
from fastapi import APIRouter, Depends, HTTPException, status
|
||
from app.models.jail import JailResponse, JailListResponse
|
||
from app.services.jail_service import JailService
|
||
|
||
router: APIRouter = APIRouter(prefix="/api/jails", tags=["Jails"])
|
||
|
||
@router.get("/", response_model=JailListResponse)
|
||
async def list_jails(service: JailService = Depends()) -> JailListResponse:
|
||
jails: list[JailResponse] = await service.get_all_jails()
|
||
return JailListResponse(jails=jails)
|
||
```
|
||
|
||
---
|
||
|
||
## 5. Pydantic Models
|
||
|
||
- Every model inherits from `pydantic.BaseModel`.
|
||
- Use `model_config = ConfigDict(strict=True)` where appropriate.
|
||
- Field names use **snake_case** in Python, export as **camelCase** to the frontend via alias generators if needed.
|
||
- Validate at the boundary — once data enters a Pydantic model it is trusted.
|
||
- Use `Field(...)` with descriptions for every field to keep auto-generated docs useful.
|
||
- Separate **request models**, **response models**, and **domain (internal) models** — do not reuse one model for all three.
|
||
|
||
```python
|
||
from pydantic import BaseModel, Field
|
||
from datetime import datetime
|
||
|
||
class BanResponse(BaseModel):
|
||
ip: str = Field(..., description="Banned IP address")
|
||
jail: str = Field(..., description="Jail that issued the ban")
|
||
banned_at: datetime = Field(..., description="UTC timestamp of the ban")
|
||
expires_at: datetime | None = Field(None, description="UTC expiry, None if permanent")
|
||
ban_count: int = Field(..., ge=1, description="Number of times this IP was banned")
|
||
```
|
||
|
||
### Using `Literal` Types for Constrained Strings
|
||
|
||
When a field should only accept a small set of predefined values, use `Literal` to enforce this at the type level:
|
||
|
||
```python
|
||
from typing import Literal
|
||
from pydantic import BaseModel, Field
|
||
|
||
LogLevel = Literal["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"]
|
||
|
||
class GlobalConfigUpdate(BaseModel):
|
||
log_level: LogLevel | None = Field(
|
||
default=None,
|
||
description="Log level: CRITICAL, ERROR, WARNING, NOTICE, INFO, or DEBUG.",
|
||
)
|
||
```
|
||
|
||
This provides:
|
||
- **Type safety** — IDEs and type checkers enforce valid values.
|
||
- **API documentation** — OpenAPI docs automatically list all allowed values.
|
||
- **Validation** — Pydantic rejects invalid values and provides a clear error message.
|
||
|
||
### Custom Field Validators
|
||
|
||
For fields that require complex validation (e.g., file paths that must be within allowed directories), use `@field_validator`:
|
||
|
||
```python
|
||
from pydantic import field_validator
|
||
from app.utils.path_utils import validate_log_path
|
||
|
||
class AddLogPathRequest(BaseModel):
|
||
log_path: str = Field(..., description="Absolute path to the log file to monitor.")
|
||
|
||
@field_validator("log_path", mode="after")
|
||
@classmethod
|
||
def validate_log_path_field(cls, value: str) -> str:
|
||
"""Validate that the log path is within allowed directories."""
|
||
return validate_log_path(value)
|
||
```
|
||
|
||
**Path Validation Helper:**
|
||
|
||
For query parameters and other contexts where Pydantic validators cannot be used directly, use the `validate_log_path()` helper from `app.utils.path_utils`:
|
||
|
||
```python
|
||
from fastapi import HTTPException, status
|
||
from app.utils.path_utils import validate_log_path
|
||
|
||
@router.delete("/{name}/logpath")
|
||
async def delete_log_path(
|
||
name: str,
|
||
log_path: str = Query(...),
|
||
) -> None:
|
||
try:
|
||
validate_log_path(log_path)
|
||
except ValueError as e:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
||
detail=str(e),
|
||
) from e
|
||
# ... rest of handler
|
||
```
|
||
|
||
**Key points:**
|
||
- Use `mode="after"` in model validators to validate after Pydantic's basic type coercion.
|
||
- Raise `ValueError` if validation fails; Pydantic converts it to an HTTP 400 response.
|
||
- For query parameters that cannot use Pydantic validators, use the `validate_log_path()` helper and raise HTTP 422.
|
||
- **Never use string prefix matching** for path validation (e.g., `path.startswith("/var/log")`). The helper uses `Path.relative_to()` to prevent bypasses like `/var/log_evil/file.log`.
|
||
- Symlinks are resolved before validating to prevent symlink-based escapes.
|
||
|
||
---
|
||
|
||
## 6. Async Rules
|
||
|
||
- **Never** call blocking / synchronous I/O in an async function — no `time.sleep()`, no synchronous file reads, no `requests.get()`.
|
||
- Use `aiohttp.ClientSession` for HTTP calls, `aiosqlite` for database access.
|
||
- Use `asyncio.TaskGroup` (Python 3.11+) when you need to run independent coroutines concurrently.
|
||
- Long-running startup/shutdown logic goes into the **FastAPI lifespan** context manager.
|
||
- **Never call `db.commit()` inside a loop.** With aiosqlite, every commit serialises through a background thread and forces an `fsync`. N rows × 1 commit = N fsyncs. Accumulate all writes in the loop, then issue a single `db.commit()` once after the loop ends. The difference between 5,000 commits and 1 commit can be seconds vs milliseconds.
|
||
|
||
```python
|
||
# Good — one commit for the whole batch
|
||
for ip, info in results.items():
|
||
await db.execute(INSERT_SQL, (ip, info.country_code, ...))
|
||
await db.commit() # ← single fsync
|
||
|
||
# Bad — one fsync per row
|
||
for ip, info in results.items():
|
||
await db.execute(INSERT_SQL, (ip, info.country_code, ...))
|
||
await db.commit() # ← fsync on every iteration
|
||
```
|
||
- **Prefer `executemany()` over calling `execute()` in a loop** when inserting or updating multiple rows with the same SQL template. aiosqlite passes the entire batch to SQLite in one call, reducing Python↔thread overhead on top of the single-commit saving.
|
||
|
||
```python
|
||
# Good
|
||
await db.executemany(INSERT_SQL, [(ip, cc, cn, asn, org) for ip, info in results.items()])
|
||
await db.commit()
|
||
```
|
||
- Shared resources (DB connections, HTTP sessions) are created once during startup and closed during shutdown — never inside request handlers.
|
||
|
||
```python
|
||
from contextlib import asynccontextmanager
|
||
from collections.abc import AsyncGenerator
|
||
from fastapi import FastAPI
|
||
import aiohttp
|
||
import aiosqlite
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
|
||
# Startup
|
||
app.state.http_session = aiohttp.ClientSession()
|
||
app.state.db = await aiosqlite.connect("bangui.db")
|
||
yield
|
||
# Shutdown
|
||
await app.state.http_session.close()
|
||
await app.state.db.close()
|
||
```
|
||
|
||
---
|
||
|
||
## 6.1 Database Query Conventions
|
||
|
||
### LIKE Queries and Wildcard Escaping
|
||
|
||
SQLite's `LIKE` operator treats `%` (any sequence of characters) and `_` (any single character) as wildcards. When querying with user-supplied filters that may contain these characters, you must escape them to prevent unintended matches.
|
||
|
||
**The Problem:**
|
||
```python
|
||
# Bad — ip_filter="10.0.0_" matches "10.0.0.1", "10.0.0.2", etc.
|
||
ip_filter = "10.0.0_"
|
||
await db.execute(
|
||
"SELECT * FROM bans WHERE ip LIKE ?",
|
||
(f"{ip_filter}%",) # ← wildcard characters not escaped
|
||
)
|
||
```
|
||
|
||
**The Solution:**
|
||
|
||
Use the `escape_like()` helper from `app.utils.fail2ban_db_utils`:
|
||
|
||
```python
|
||
from app.utils.fail2ban_db_utils import escape_like
|
||
|
||
# Good — wildcard characters are escaped
|
||
ip_filter = "10.0.0_"
|
||
await db.execute(
|
||
"SELECT * FROM bans WHERE ip LIKE ? ESCAPE '\\'",
|
||
(f"{escape_like(ip_filter)}%",) # ← underscores escaped to literal
|
||
)
|
||
```
|
||
|
||
**How `escape_like()` works:**
|
||
|
||
The function escapes backslashes first, then `%` and `_` signs:
|
||
```python
|
||
def escape_like(s: str) -> str:
|
||
return s.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
|
||
```
|
||
|
||
**Key rules:**
|
||
1. **Backslash escapes first** — to prevent double-escaping when the input contains backslashes.
|
||
2. **Add `ESCAPE '\\'` to the SQL** — tells SQLite which character to use for escaping.
|
||
3. **Dots are not wildcards** — they do not need escaping; normal IP addresses pass through unchanged.
|
||
|
||
**Test example:**
|
||
```python
|
||
assert escape_like("10.0.0_") == "10.0.0\\_"
|
||
assert escape_like("10.0.0%test") == "10.0.0\\%test"
|
||
assert escape_like("10.0.0.1") == "10.0.0.1" # Unchanged
|
||
```
|
||
|
||
---
|
||
|
||
## 6.2 Database Migrations
|
||
|
||
The application database schema is versioned and migrated automatically on startup via `app.db.init_db()`.
|
||
|
||
### Migration Design Principles
|
||
|
||
**Migrations must be atomic.** All schema changes for a single version (DDL statements) and the `schema_migrations` record insert must be wrapped in a single `BEGIN IMMEDIATE ... COMMIT` transaction. This prevents partial migrations if a process crashes mid-migration.
|
||
|
||
If a crash occurs between migration steps, the next startup will:
|
||
1. Detect the missing `schema_migrations` record.
|
||
2. Re-apply the entire migration in a single transaction (all-or-nothing).
|
||
3. Avoid data corruption or schema inconsistency.
|
||
|
||
### Writing a New Migration
|
||
|
||
1. **Add the DDL statements** to `_MIGRATIONS` dict in `app/db.py`:
|
||
|
||
```python
|
||
_MIGRATIONS: dict[int, str] = {
|
||
1: _CREATE_INITIAL_SCHEMA,
|
||
2: """
|
||
-- Migration 2: Add new_column to users table.
|
||
ALTER TABLE users ADD COLUMN new_column TEXT DEFAULT 'default_value';
|
||
CREATE INDEX idx_users_new_column ON users(new_column);
|
||
""",
|
||
}
|
||
```
|
||
|
||
2. **Update `_CURRENT_SCHEMA_VERSION`** to the new version number:
|
||
|
||
```python
|
||
_CURRENT_SCHEMA_VERSION: int = 2 # was 1
|
||
```
|
||
|
||
3. **Ensure idempotency where possible:**
|
||
- Use `CREATE TABLE IF NOT EXISTS` and `CREATE INDEX IF NOT EXISTS`.
|
||
- For `ALTER TABLE ADD COLUMN`, check if the column exists first using `PRAGMA table_info()` if re-applying the migration is a concern.
|
||
|
||
4. **Verify atomicity in tests:**
|
||
|
||
```python
|
||
async def test_migration_2_is_atomic(tmp_path: Path) -> None:
|
||
"""Verify migration 2 rolls back on failure."""
|
||
db = await open_db(str(tmp_path / "test.db"))
|
||
try:
|
||
await db.execute("CREATE TABLE schema_migrations (version INTEGER PRIMARY KEY);")
|
||
await db.commit()
|
||
|
||
# Add a test migration that fails mid-way
|
||
original = db_module._MIGRATIONS.copy()
|
||
db_module._MIGRATIONS[99] = """
|
||
CREATE TABLE test_table (id INTEGER PRIMARY KEY);
|
||
INSERT INTO nonexistent_table VALUES (1);
|
||
"""
|
||
|
||
try:
|
||
with pytest.raises(Exception):
|
||
await _apply_migration(db, 99)
|
||
|
||
# Verify rollback: migration NOT recorded
|
||
async with db.execute(
|
||
"SELECT version FROM schema_migrations WHERE version = 99;"
|
||
) as cursor:
|
||
assert await cursor.fetchone() is None
|
||
|
||
# Verify rollback: table NOT created
|
||
async with db.execute(
|
||
"SELECT name FROM sqlite_master WHERE type='table' AND name='test_table';"
|
||
) as cursor:
|
||
assert await cursor.fetchone() is None
|
||
finally:
|
||
db_module._MIGRATIONS = original
|
||
finally:
|
||
await db.close()
|
||
```
|
||
|
||
### Common Pitfalls
|
||
|
||
- **Non-idempotent statements** — `ALTER TABLE ADD COLUMN` without `IF NOT EXISTS` will fail on re-run. Use explicit checks if needed.
|
||
- **Comments containing semicolons** — the migration parser strips comments correctly, but avoid unusual comment syntax.
|
||
- **String literals with semicolons** — the parser handles these; no special escaping needed.
|
||
- **Multiple operations in one migration** — keep migrations focused. Combine related DDL but split unrelated changes.
|
||
|
||
---
|
||
|
||
## 7. Logging
|
||
|
||
- Use **structlog** for every log message.
|
||
- Bind contextual key-value pairs — never format strings manually.
|
||
- Log levels: `debug` for development detail, `info` for operational events, `warning` for recoverable issues, `error` for failures, `critical` for fatal problems.
|
||
- **Never log sensitive data** (passwords, tokens, session tokens, raw credentials, private keys).
|
||
- For session correlation without leaking token material, use a one-way hash fragment: `hashlib.sha256(token.encode()).hexdigest()[:12]`.
|
||
- Use numeric database IDs for entity correlation instead of raw identifiers: `session_id=session.id` instead of `token=session.token`.
|
||
|
||
```python
|
||
import structlog
|
||
import hashlib
|
||
|
||
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
||
|
||
async def ban_ip(ip: str, jail: str) -> None:
|
||
log.info("banning_ip", ip=ip, jail=jail)
|
||
try:
|
||
await _execute_ban(ip, jail)
|
||
log.info("ip_banned", ip=ip, jail=jail)
|
||
except BanError as exc:
|
||
log.error("ban_failed", ip=ip, jail=jail, error=str(exc))
|
||
raise
|
||
|
||
async def logout_session(db: aiosqlite.Connection, token: str) -> None:
|
||
# Use a one-way hash for token correlation in logs
|
||
token_hash = hashlib.sha256(token.encode()).hexdigest()[:12]
|
||
await session_repo.delete_session(db, token)
|
||
log.info("session_terminated", token_hash=token_hash)
|
||
```
|
||
|
||
---
|
||
|
||
## 8. Error Handling
|
||
|
||
- Define **custom exception classes** for domain errors (e.g., `JailNotFoundError`, `BanFailedError`).
|
||
- Catch specific exceptions — never bare `except:` or `except Exception:` without re-raising.
|
||
- Map domain exceptions to HTTP status codes via FastAPI **exception handlers** registered on the app.
|
||
- Always log errors with context before raising.
|
||
|
||
```python
|
||
class JailNotFoundError(Exception):
|
||
def __init__(self, name: str) -> None:
|
||
self.name: str = name
|
||
super().__init__(f"Jail '{name}' not found")
|
||
|
||
# In main.py
|
||
@app.exception_handler(JailNotFoundError)
|
||
async def jail_not_found_handler(request: Request, exc: JailNotFoundError) -> JSONResponse:
|
||
return JSONResponse(status_code=404, content={"detail": f"Jail '{exc.name}' not found"})
|
||
```
|
||
|
||
### Routers and Exception Propagation
|
||
|
||
- **Routers must NOT construct `HTTPException` for domain errors** — let domain exceptions propagate.
|
||
- Routers should never have helper functions like `_bad_gateway()`, `_not_found()`, `_conflict()` etc. that convert domain exceptions to `HTTPException`.
|
||
- All domain exception types must have corresponding handlers registered in `main.py` via `app.add_exception_handler()`.
|
||
- Exception handlers are registered in order from most specific to least specific — FastAPI evaluates them in registration order.
|
||
|
||
```python
|
||
# ❌ BAD — routers constructing HTTPException for domain exceptions
|
||
@router.get("/{name}")
|
||
async def get_jail(name: str, socket_path: Fail2BanSocketDep) -> JailDetailResponse:
|
||
try:
|
||
return await jail_service.get_jail(socket_path, name)
|
||
except JailNotFoundError:
|
||
raise HTTPException(status_code=404, detail=f"Jail not found: {name!r}") from None
|
||
|
||
# ✅ GOOD — domain exception propagates to global handler
|
||
@router.get("/{name}")
|
||
async def get_jail(name: str, socket_path: Fail2BanSocketDep) -> JailDetailResponse:
|
||
return await jail_service.get_jail(socket_path, name)
|
||
```
|
||
|
||
All domain exceptions raised by services propagate to handlers in `main.py`, ensuring:
|
||
1. Consistent error response format across the entire API.
|
||
2. No duplicated exception-to-HTTP-status mapping logic.
|
||
3. Easy to audit all error codes — they are all in one place.
|
||
|
||
---
|
||
|
||
## 9. Testing
|
||
|
||
- **Every** new feature or bug fix must include tests.
|
||
- Tests live in `tests/` mirroring the `app/` structure.
|
||
- Use `pytest` with `pytest-asyncio` for async tests.
|
||
- Use `httpx.AsyncClient` to test FastAPI endpoints (not `TestClient` which is sync).
|
||
- Mock external dependencies (fail2ban socket, aiohttp calls) — tests must never touch real infrastructure.
|
||
- Aim for **>80 % line coverage** — critical paths (auth, banning, scheduling) must be 100 %.
|
||
- Test names follow `test_<unit>_<scenario>_<expected>` pattern.
|
||
|
||
```python
|
||
import pytest
|
||
from httpx import AsyncClient, ASGITransport
|
||
from app.main import create_app
|
||
|
||
@pytest.fixture
|
||
async def client() -> AsyncClient:
|
||
app = create_app()
|
||
transport: ASGITransport = ASGITransport(app=app)
|
||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||
yield ac
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_jails_returns_200(client: AsyncClient) -> None:
|
||
response = await client.get("/api/jails/")
|
||
assert response.status_code == 200
|
||
data: dict = response.json()
|
||
assert "jails" in data
|
||
```
|
||
|
||
---
|
||
|
||
## 9.1 Background Tasks and Scheduler Architecture
|
||
|
||
BanGUI uses **APScheduler 4.x** (async mode) to manage background jobs that execute on a schedule without user interaction. This section documents how to write and register background tasks.
|
||
|
||
### Task Location and Structure
|
||
|
||
All background tasks live in `backend/app/tasks/` as separate modules. Each task:
|
||
- Exports a `register(app: FastAPI) -> None` or `async def register(app: FastAPI) -> None` function.
|
||
- Opens its own database connection using `app.db.open_db()` or the `task_db()` helper.
|
||
- Closes connections when work completes (use the async context manager pattern).
|
||
- Runs independently of the FastAPI request/response cycle.
|
||
|
||
### Example Task
|
||
|
||
```python
|
||
# backend/app/tasks/my_task.py
|
||
import structlog
|
||
from fastapi import FastAPI
|
||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||
|
||
log = structlog.get_logger()
|
||
|
||
async def my_background_job(app: FastAPI) -> None:
|
||
"""Do important work on a schedule."""
|
||
log.info("my_background_job_started")
|
||
try:
|
||
db = await app.db.open_db(app.state.settings.database_path)
|
||
try:
|
||
# Do work...
|
||
pass
|
||
finally:
|
||
await db.close()
|
||
except Exception:
|
||
log.error("my_background_job_failed", exc_info=True)
|
||
|
||
def register(app: FastAPI) -> None:
|
||
"""Register the job with the scheduler."""
|
||
scheduler: AsyncIOScheduler = app.state.scheduler
|
||
scheduler.add_job(
|
||
my_background_job,
|
||
args=(app,),
|
||
trigger="interval",
|
||
seconds=60,
|
||
id="my_task",
|
||
name="My Background Job",
|
||
)
|
||
```
|
||
|
||
### Accessing Shared Resources in Tasks
|
||
|
||
Since tasks do not have access to `Depends(get_db)` (no request scope), they must:
|
||
1. **Open their own DB connection** via `app.state.db_factory.open_db(path)`.
|
||
2. **Access app-level state** — `app.state.http_session`, `app.state.geo_cache`, `app.state.settings`, etc.
|
||
3. **Use structlog** for all logging (never `print()`).
|
||
|
||
### Single-Worker Requirement
|
||
|
||
**The scheduler is bound to a single asyncio event loop and cannot be shared across multiple worker processes.** BanGUI enforces single-worker mode to prevent duplicate task execution.
|
||
|
||
- **Deployment constraint:** Set `BANGUI_WORKERS=1` (default).
|
||
- **Startup validation:** `startup_shared_resources()` raises `RuntimeError` if `BANGUI_WORKERS > 1`.
|
||
- See [Architekture.md § 9.2](Architekture.md) for full details.
|
||
|
||
---
|
||
|
||
## 10. Code Style & Tooling
|
||
|
||
| Tool | Purpose |
|
||
|---|---|
|
||
| **Ruff** | Linter and formatter (replaces black, isort, flake8). |
|
||
| **mypy** or **pyright** | Static type checking in strict mode. |
|
||
| **pre-commit** | Run ruff + type checker before every commit. |
|
||
|
||
- Line length: **120 characters** max.
|
||
- Strings: use **double quotes** (`"`).
|
||
- Imports: sorted by ruff — stdlib → third-party → local, one import per line.
|
||
- No unused imports, no unused variables, no `# type: ignore` without explanation.
|
||
- Docstrings in **Google style** on every public function, class, and module.
|
||
|
||
---
|
||
|
||
## 11. fail2ban Response Utilities
|
||
|
||
All services that interact with the fail2ban daemon must use the canonical response parsing utilities from `app.utils.fail2ban_response`. This ensures consistent error handling, type safety, and makes it easy to fix bugs in response handling across the entire codebase.
|
||
|
||
### Available Functions
|
||
|
||
**`ok(response: object) -> object`**
|
||
Extracts the payload from a fail2ban ``(return_code, data)`` response tuple.
|
||
- Raises `ValueError` if return code ≠ 0 or response shape is invalid.
|
||
- Use this on every response from `Fail2BanClient.send()`.
|
||
|
||
**`to_dict(pairs: object) -> dict[str, object]`**
|
||
Converts a list of ``(key, value)`` pairs (fail2ban's native response format) to a Python dict.
|
||
- Silently ignores malformed entries and non-list/tuple inputs.
|
||
- Always returns a dict (empty if input is invalid).
|
||
|
||
**`ensure_list(value: object | None) -> list[str]`**
|
||
Coerces fail2ban response values (which may be `None`, a single string, or a list) to a normalized list of strings.
|
||
- Handles all three cases consistently.
|
||
- Returns empty list for `None` or empty strings.
|
||
|
||
**`is_not_found_error(exc: Exception) -> bool`**
|
||
Checks if an exception indicates a jail does not exist.
|
||
- Checks for multiple error message patterns (case-insensitive).
|
||
- Use this to distinguish "jail not found" errors from other failures.
|
||
|
||
### Example Usage
|
||
|
||
```python
|
||
from app.utils.fail2ban_response import ok, to_dict, ensure_list, is_not_found_error
|
||
from app.utils.fail2ban_client import Fail2BanClient
|
||
|
||
client = Fail2BanClient(socket_path="/var/run/fail2ban/fail2ban.sock")
|
||
|
||
try:
|
||
# Get jail status
|
||
response = await client.send(["status", "sshd", "short"])
|
||
status_dict = to_dict(ok(response)) # Extract payload and convert to dict
|
||
|
||
# Get list of banned IPs
|
||
ban_response = await client.send(["get", "sshd", "banip"])
|
||
banned_ips = ensure_list(ok(ban_response)) # Normalize to list of strings
|
||
|
||
except ValueError as exc:
|
||
if is_not_found_error(exc):
|
||
raise JailNotFoundError("sshd") from exc
|
||
raise
|
||
```
|
||
|
||
### Why This Matters
|
||
|
||
Before this utility module, every service implemented its own copy of these functions, leading to:
|
||
- Code duplication across 7+ service files.
|
||
- Subtle inconsistencies in error handling.
|
||
- Difficult maintenance — every bug fix required touching multiple files.
|
||
|
||
Now, all services import from a single authoritative source, making response handling consistent, maintainable, and type-safe.
|
||
|
||
---
|
||
|
||
## 12. Configuration & Secrets
|
||
|
||
- All configuration lives in **environment variables** loaded through **pydantic-settings**.
|
||
- Secrets (master password hash, session key) are **never** committed to the repository.
|
||
- Provide a `.env.example` with all keys and placeholder values.
|
||
- Validate config at startup — fail fast with a clear error if a required value is missing.
|
||
|
||
```python
|
||
from pydantic_settings import BaseSettings
|
||
from pydantic import Field
|
||
|
||
class Settings(BaseSettings):
|
||
database_path: str = Field("bangui.db", description="Path to SQLite database")
|
||
fail2ban_socket: str = Field("/var/run/fail2ban/fail2ban.sock", description="fail2ban socket path")
|
||
session_secret: str = Field(..., description="Secret key for session signing")
|
||
log_level: str = Field("info", description="Logging level")
|
||
|
||
model_config = {"env_prefix": "BANGUI_", "env_file": ".env"}
|
||
```
|
||
|
||
### Session Secret Configuration
|
||
|
||
The `session_secret` is the HMAC key used to sign all session tokens. It must be at least 32 characters (256 bits) to provide sufficient cryptographic strength for HMAC-SHA256.
|
||
|
||
**Minimum Length:** 32 characters
|
||
|
||
**Why 32 characters?** Session tokens are signed using HMAC-SHA256. A secret shorter than 32 bytes (256 bits) significantly weakens the signature, potentially allowing attackers to forge valid tokens. The constraint is enforced at startup — the application will fail to start if `session_secret` is shorter than 32 characters.
|
||
|
||
**Generation:** Generate a secure secret using Python:
|
||
|
||
```bash
|
||
python -c "import secrets; print(secrets.token_hex(32))"
|
||
```
|
||
|
||
This produces a 64-character hexadecimal string (256 bits) suitable for production use.
|
||
|
||
**Environment Variable:**
|
||
|
||
```bash
|
||
BANGUI_SESSION_SECRET="your-32-character-minimum-secret-here"
|
||
```
|
||
|
||
**Never** commit the actual secret to the repository. Provide a `.env.example` with a placeholder:
|
||
|
||
```bash
|
||
# .env.example
|
||
BANGUI_SESSION_SECRET="set-this-to-a-32-character-minimum-secret"
|
||
```
|
||
|
||
### Session Cookie Security
|
||
|
||
The `session_cookie_secure` configuration controls the `Secure` flag on the session cookie. This flag prevents browsers from sending the session cookie over unencrypted HTTP.
|
||
|
||
**Default:** `true` — Production deployments are secure by default. Cookies are only sent over HTTPS.
|
||
|
||
**Local Development:** Set `BANGUI_SESSION_COOKIE_SECURE=false` in your compose file or `.env` to allow cookies over HTTP (required for `localhost:8000`).
|
||
|
||
```yaml
|
||
# Docker/compose.debug.yml
|
||
environment:
|
||
BANGUI_SESSION_COOKIE_SECURE: "false" # Allow HTTP during local development
|
||
```
|
||
|
||
**Important:** If `Secure=true` is set, browsers will reject the session cookie when the backend is served over HTTP. Ensure your nginx/reverse proxy terminates TLS and passes `X-Forwarded-Proto: https` so FastAPI knows the connection is secure.
|
||
|
||
### CSRF Protection Middleware
|
||
|
||
State-mutating endpoints (POST, PUT, DELETE, PATCH) authenticated via session cookies are protected by the `CsrfMiddleware`, which enforces a custom header check.
|
||
|
||
**How It Works:**
|
||
|
||
1. For every request using a mutating HTTP method, the middleware checks:
|
||
- Is this request authenticated via session cookie (not Bearer token)?
|
||
- If yes, require the custom header `X-BanGUI-Request: 1`.
|
||
- If missing or incorrect, return `403 Forbidden`.
|
||
|
||
2. **Bearer token requests** (via `Authorization: Bearer` header) bypass the check because tokens are not CSRF-vulnerable — they are never automatically sent on cross-origin requests.
|
||
|
||
3. **Safe HTTP methods** (GET, HEAD, OPTIONS) bypass the check.
|
||
|
||
4. **Cross-site protection:** Cross-site JavaScript (`fetch()` calls from other origins) cannot set custom headers without CORS preflight, which the backend rejects for non-allowed origins. This provides defense-in-depth against subdomain attacks and XSS injection.
|
||
|
||
**Implementation Location:**
|
||
- Middleware: `backend/app/middleware/csrf.py`
|
||
- Registered in: `backend/app/main.py` via `app.add_middleware(CsrfMiddleware)`
|
||
|
||
**Example:**
|
||
```python
|
||
# ✓ Cookie-authenticated POST with CSRF header — allowed
|
||
POST /api/bans
|
||
Cookie: bangui_session=...
|
||
X-BanGUI-Request: 1
|
||
|
||
# ✗ Cookie-authenticated POST without CSRF header — rejected with 403
|
||
POST /api/bans
|
||
Cookie: bangui_session=...
|
||
(no X-BanGUI-Request header)
|
||
|
||
# ✓ Bearer token authentication without CSRF header — allowed
|
||
POST /api/bans
|
||
Authorization: Bearer <token>
|
||
(no X-BanGUI-Request header needed)
|
||
|
||
# ✓ Safe GET method without CSRF header — allowed
|
||
GET /api/jails
|
||
Cookie: bangui_session=...
|
||
(no X-BanGUI-Request header needed)
|
||
```
|
||
|
||
### fail2ban_start_command Configuration
|
||
|
||
The `fail2ban_start_command` setting specifies the shell command used to start the fail2ban daemon during recovery operations (e.g., after a rollback).
|
||
|
||
**Format & Parsing:**
|
||
- The command is split into arguments using `shlex.split()`, which respects shell quoting rules.
|
||
- Paths with spaces must be quoted. Example: `"/opt/my tools/fail2ban-client" start`.
|
||
- The command is **not** executed through a shell — no shell variables or globbing are interpreted.
|
||
|
||
**Validation:**
|
||
- The command is validated at startup using `shlex.split()`. Mismatched quotes will raise a `ValueError` with the problematic command in the error message.
|
||
|
||
**Environment Variables:**
|
||
```bash
|
||
BANGUI_FAIL2BAN_START_COMMAND="fail2ban-client start" # Default
|
||
BANGUI_FAIL2BAN_START_COMMAND="systemctl start fail2ban" # systemd
|
||
BANGUI_FAIL2BAN_START_COMMAND='"/opt/my tools/fail2ban" start' # Quoted path
|
||
```
|
||
|
||
**Common Pitfall:**
|
||
Using `.split()` instead of `shlex.split()` would break commands with spaces in paths. Always use quoted strings for paths that contain whitespace.
|
||
|
||
### API Documentation Configuration
|
||
|
||
The `enable_docs` setting controls whether FastAPI serves interactive API documentation at `/api/docs` (Swagger UI) and `/api/redoc` (ReDoc).
|
||
|
||
**Default:** `false` — API documentation is disabled by default to prevent information disclosure in production.
|
||
|
||
**When to Enable:**
|
||
- Set `BANGUI_ENABLE_DOCS=true` in development and debugging environments only.
|
||
- Never enable in production. Exposed API documentation reveals all endpoints, request/response schemas, and allows direct API invocation from the browser.
|
||
|
||
**Environment Variables:**
|
||
```bash
|
||
BANGUI_ENABLE_DOCS="true" # Enable docs in development
|
||
BANGUI_ENABLE_DOCS="false" # Disable docs (default)
|
||
# Unset # Defaults to false (production)
|
||
```
|
||
|
||
**Debug Compose File:**
|
||
The `Docker/compose.debug.yml` sets `BANGUI_ENABLE_DOCS: "true"` for local development. Production compose files (`Docker/compose.prod.yml`) leave this unset, defaulting to `false`.
|
||
|
||
**Middleware Allowlist:**
|
||
The `SetupRedirectMiddleware` in `main.py` includes `/api/docs`, `/api/redoc`, and `/api/openapi.json` in its `_ALWAYS_ALLOWED` paths so documentation can be accessed before setup completes (if enabled).
|
||
|
||
### Log Path Validation & Allowlisting
|
||
|
||
Authenticated users can instruct fail2ban to monitor additional log files through the API endpoint `POST /api/config/jails/{name}/logpath`. To prevent path-traversal attacks and unauthorized reads of sensitive system files, all requested log paths must resolve to locations within a configurable allowlist of safe directories.
|
||
|
||
**Allowed Directories:**
|
||
- Configured via the `BANGUI_ALLOWED_LOG_DIRS` environment variable (comma-separated list).
|
||
- Defaults to: `["/var/log", "/config/log"]`.
|
||
|
||
**Path Validation Rules:**
|
||
1. The requested path is resolved to its canonical form using `Path(log_path).resolve()`, which:
|
||
- Expands relative paths to absolute paths.
|
||
- Resolves symbolic links to their real targets.
|
||
- Normalizes `.` and `..` components.
|
||
2. The resolved path is checked using `Path.is_relative_to()` against each allowed directory prefix.
|
||
3. If the resolved path is not relative to any allowed directory, a `ValueError` is raised with a descriptive error message.
|
||
|
||
**Implementation:**
|
||
- Validation occurs in the Pydantic model `AddLogPathRequest` using a `@field_validator`.
|
||
- The validator runs at request time, before the service layer is invoked.
|
||
- Symlinks that escape allowed directories are rejected (see [symlink bypass tests](../../backend/tests/test_models.py)).
|
||
|
||
**Important:** Use `is_relative_to()`, not `startswith()` or string prefix matching. The latter is bypassable with paths like `/var/log_evil/file.log`.
|
||
|
||
**Environment Variables:**
|
||
```bash
|
||
BANGUI_ALLOWED_LOG_DIRS="/var/log,/config/log" # Default
|
||
BANGUI_ALLOWED_LOG_DIRS="/var/log,/config/log,/home/app/logs" # Custom directory
|
||
```
|
||
|
||
### Log Target Validation (fail2ban)
|
||
|
||
The `log_target` field on the global config endpoint (`PUT /api/config/global`) is critical for security because fail2ban runs as root. Users can only set log targets to:
|
||
|
||
1. **Special values:** `STDOUT`, `STDERR`, `SYSLOG` (case-insensitive)
|
||
2. **File paths:** Must resolve to one of the configured allowed directories (same allowlist as log paths)
|
||
|
||
**Why This Matters:**
|
||
- fail2ban creates/opens files with root privileges. Without validation, an attacker could write to arbitrary system paths (e.g., `/etc/cron.d/malicious_script`).
|
||
- Validation occurs at **both** the Pydantic model layer (`GlobalConfigUpdate.validate_log_target()`) **and** the service layer (`update_global_config()`) for defense in depth.
|
||
- This prevents both HTTP and non-HTTP attack vectors.
|
||
|
||
**Implementation:**
|
||
```python
|
||
# Model layer: Automatic validation via @field_validator
|
||
update = GlobalConfigUpdate(log_target="/etc/passwd") # Raises ValidationError → HTTP 422
|
||
|
||
# Service layer: Defense in depth
|
||
await config_service.update_global_config(socket_path, update) # Validates again before sending to fail2ban
|
||
```
|
||
|
||
### Login Rate Limiting
|
||
|
||
The login endpoint (`POST /api/auth/login`) is protected against brute-force attacks using an in-memory rate limiter.
|
||
|
||
**Design:**
|
||
- Uses a `dict[str, deque[float]]` keyed by client IP, storing login attempt timestamps within a time window.
|
||
- Attempts outside the window are automatically removed during validation checks.
|
||
- Expired IP entries are cleaned up to prevent unbounded memory growth.
|
||
|
||
**Rate Limit Rules:**
|
||
- **5 attempts per 60 seconds** per IP address.
|
||
- Requests exceeding the limit return **HTTP 429 Too Many Requests** with a `Retry-After` header.
|
||
- Each failed login triggers a 10-second server-side delay (`asyncio.sleep`) to further slow attacks, on top of bcrypt hashing (~100ms).
|
||
|
||
**IP Extraction (Proxy Safety):**
|
||
- When behind nginx, the rate limiter reads the real client IP from `X-Forwarded-For` or `X-Real-IP` headers.
|
||
- Only trusts these headers when the immediate connection source is in a configured trusted proxy list.
|
||
- Prevents attackers from spoofing these headers to bypass rate limits.
|
||
- Falls back to the direct connection IP when proxy headers cannot be trusted.
|
||
|
||
**Process-Local Limitation:**
|
||
- The rate limiter is process-local (in-memory). In multi-worker deployments (e.g., Gunicorn with 4 workers), each worker maintains its own rate limit counter.
|
||
- This is acceptable because the single-worker constraint is enforced elsewhere. See [TASK-002/003 notes](Instructions.md) for details.
|
||
|
||
**Implementation:**
|
||
- Rate limiter: `app.utils.rate_limiter.RateLimiter`
|
||
- IP extraction: `app.utils.client_ip.get_client_ip()`
|
||
- Dependency: `LoginRateLimiterDep` in `app.dependencies`
|
||
|
||
---
|
||
|
||
## 13. File I/O Conventions
|
||
|
||
All file write operations to critical configuration files must be **atomic** to prevent corruption if the process is killed mid-write.
|
||
|
||
### Atomic File Writes
|
||
|
||
Configuration files (e.g., fail2ban jail configs in `jail.d/`) are essential for system operation. A truncated or corrupt config file can break fail2ban's ability to reload and may disable active protection.
|
||
|
||
**Rule: Always use write-to-temp + atomic rename**
|
||
|
||
Never use `Path.write_text()` or `file.write()` directly for critical files. Instead:
|
||
|
||
1. Create a temporary file in the **same directory** as the target (crucial for atomic `os.replace()`).
|
||
2. Write content to the temp file.
|
||
3. Atomically rename the temp file to replace the target.
|
||
4. Clean up the temp file if an error occurs.
|
||
|
||
**Implementation Pattern:**
|
||
|
||
```python
|
||
import os
|
||
import tempfile
|
||
from pathlib import Path
|
||
|
||
target = Path("/path/to/config/file.conf")
|
||
|
||
tmp_name: str | None = None
|
||
try:
|
||
# Create temp file in target's directory (same filesystem = atomic)
|
||
with tempfile.NamedTemporaryFile(
|
||
mode="w",
|
||
encoding="utf-8",
|
||
dir=target.parent,
|
||
delete=False,
|
||
suffix=".tmp",
|
||
) as tmp:
|
||
tmp.write(content)
|
||
tmp_name = tmp.name
|
||
# Atomic rename (single syscall on POSIX systems)
|
||
os.replace(tmp_name, target)
|
||
except OSError as exc:
|
||
# Clean up temp file on error
|
||
with contextlib.suppress(OSError):
|
||
if tmp_name is not None:
|
||
os.unlink(tmp_name)
|
||
raise ConfigWriteError(f"Cannot write config: {exc}") from exc
|
||
```
|
||
|
||
**Why this matters:**
|
||
|
||
- `Path.write_text()` overwrites in place. If the process dies mid-write, the file is left truncated or partially written.
|
||
- `os.replace()` is atomic on POSIX systems (single rename syscall) **only if source and target are on the same filesystem**.
|
||
- Creating the temp file in `target.parent` ensures atomicity.
|
||
- On Linux containers, this prevents config corruption and service degradation.
|
||
|
||
**Atomic write helper:**
|
||
|
||
A shared `atomic_write(path: Path, content: str)` helper is available in `app/services/config_file_helpers.py`. This is the preferred way to perform atomic writes — it handles all the temp file and cleanup logic:
|
||
|
||
```python
|
||
from app.services.config_file_helpers import atomic_write
|
||
|
||
atomic_write(path, updated_content) # Atomic write, auto-cleanup on error
|
||
```
|
||
|
||
**Files requiring atomic writes:**
|
||
|
||
- All config files under `jail.d/` (created/modified by `_write_conf_file`, `_create_conf_file`, `set_jail_config_enabled`, and `write_jail_config_file`)
|
||
- Any critical state files that fail2ban relies on
|
||
|
||
**Examples in the codebase:**
|
||
|
||
- `app/services/config_file_helpers.py`: `_write_conf_file`, `_create_conf_file`, `atomic_write`
|
||
- `app/services/raw_config_io_service.py`: `set_jail_config_enabled`, `write_jail_config_file`
|
||
- `app/services/jail_config_service.py`: `_write_local_file_sync`, `_restore_local_file_sync`
|
||
|
||
---
|
||
|
||
## 14. Git & Workflow
|
||
|
||
- **Branch naming:** `feature/<short-description>`, `fix/<short-description>`, `chore/<short-description>`.
|
||
- **Commit messages:** imperative tense, max 72 chars first line (`Add jail reload endpoint`, `Fix ban history query`).
|
||
- Every merge request must pass: ruff, type checker, all tests.
|
||
- Do not merge with failing CI.
|
||
- Keep pull requests small and focused — one feature or fix per PR.
|
||
|
||
---
|
||
|
||
## 15. Coding Principles
|
||
|
||
These principles are **non-negotiable**. Every backend contributor must internalise and apply them daily.
|
||
|
||
### 15.1 Clean Code
|
||
|
||
- Write code that **reads like well-written prose** — a new developer should understand intent without asking.
|
||
- **Meaningful names** — variables, functions, and classes must reveal their purpose. Avoid abbreviations (`cnt`, `mgr`, `tmp`) unless universally understood.
|
||
- **Small functions** — each function does exactly one thing. If you need a comment to explain a block inside a function, extract it into its own function.
|
||
- **No magic numbers or strings** — use named constants.
|
||
- **Boy Scout Rule** — leave every file cleaner than you found it.
|
||
- **Avoid deep nesting** — prefer early returns (guard clauses) to keep the happy path at the top indentation level.
|
||
|
||
```python
|
||
# Good — guard clause, clear name, one job
|
||
async def get_active_ban(ip: str, jail: str) -> Ban:
|
||
ban: Ban | None = await repo.find_ban(ip=ip, jail=jail)
|
||
if ban is None:
|
||
raise BanNotFoundError(ip=ip, jail=jail)
|
||
if ban.is_expired():
|
||
raise BanExpiredError(ip=ip, jail=jail)
|
||
return ban
|
||
|
||
# Bad — nested, vague name
|
||
async def check(ip, j):
|
||
b = await repo.find_ban(ip=ip, jail=j)
|
||
if b:
|
||
if not b.is_expired():
|
||
return b
|
||
else:
|
||
raise Exception("expired")
|
||
else:
|
||
raise Exception("not found")
|
||
```
|
||
|
||
### 15.2 Separation of Concerns (SoC)
|
||
|
||
- Each module, class, and function must have a **single, well-defined responsibility**.
|
||
- **Routers** → HTTP layer only (parse requests, return responses).
|
||
- **Services** → business logic and orchestration.
|
||
- **Repositories** → data access and persistence.
|
||
- **Models** → data shapes and validation.
|
||
- **Tasks** → scheduled background jobs.
|
||
- Never mix layers — a router must not execute SQL, and a repository must not raise `HTTPException`.
|
||
|
||
### 15.3 Single Responsibility Principle (SRP)
|
||
|
||
- A class or module should have **one and only one reason to change**.
|
||
- If a service handles both ban management *and* email notifications, split it into `BanService` and `NotificationService`.
|
||
|
||
### 15.4 Don't Repeat Yourself (DRY)
|
||
|
||
- Extract shared logic into utility functions, base classes, or dependency providers.
|
||
- If the same block of code appears in more than one place, **refactor it** into a single source of truth.
|
||
- But don't over-abstract — premature DRY that couples unrelated features is worse than a little duplication (see **Rule of Three**: refactor when something appears a third time).
|
||
|
||
### 15.5 KISS — Keep It Simple, Stupid
|
||
|
||
- Choose the simplest solution that works correctly.
|
||
- Avoid clever tricks, premature optimisation, and over-engineering.
|
||
- If a standard library function does the job, prefer it over a custom implementation.
|
||
|
||
### 15.6 YAGNI — You Aren't Gonna Need It
|
||
|
||
- Do **not** build features, abstractions, or config options "just in case".
|
||
- Implement what is required **now**. Extend later when a real need emerges.
|
||
|
||
### 15.7 Dependency Inversion Principle (DIP)
|
||
|
||
- High-level modules (services) must not depend on low-level modules (repositories) directly. Both should depend on **abstractions** (protocols / interfaces).
|
||
- Use FastAPI's `Depends()` to inject implementations — this makes swapping and testing trivial.
|
||
|
||
```python
|
||
from typing import Protocol
|
||
|
||
class BanRepository(Protocol):
|
||
async def find_ban(self, ip: str, jail: str) -> Ban | None: ...
|
||
async def save_ban(self, ban: Ban) -> None: ...
|
||
|
||
class SqliteBanRepository:
|
||
"""Concrete implementation — depends on aiosqlite."""
|
||
async def find_ban(self, ip: str, jail: str) -> Ban | None: ...
|
||
async def save_ban(self, ban: Ban) -> None: ...
|
||
```
|
||
|
||
#### 13.7.1 Repository Module Pattern — Module-as-Protocol Structural Compatibility
|
||
|
||
BanGUI uses **module-level functions** for repository implementations, not classes. Each repository module (e.g., `session_repo.py`, `blocklist_repo.py`) exports async functions that match the signatures defined in the Protocol interface in `protocols.py`. This is a **structural typing pattern** — mypy accepts the module as a valid Protocol implementation because the function signatures match, *even though* the module is not explicitly annotated as implementing the Protocol.
|
||
|
||
This approach works correctly with FastAPI's dependency injection via `cast()`:
|
||
|
||
```python
|
||
# In app/repositories/session_repo.py
|
||
async def create_session(db: aiosqlite.Connection, token: str, created_at: str, expires_at: str) -> Session:
|
||
"""Insert a new session row."""
|
||
...
|
||
|
||
# In app/repositories/protocols.py
|
||
class SessionRepository(Protocol):
|
||
async def create_session(
|
||
self,
|
||
db: aiosqlite.Connection,
|
||
token: str,
|
||
created_at: str,
|
||
expires_at: str,
|
||
) -> Session:
|
||
...
|
||
|
||
# In app/dependencies.py
|
||
async def get_session_repo() -> SessionRepository:
|
||
"""Provide the concrete session repository implementation."""
|
||
from app.repositories import session_repo
|
||
return session_repo # ← mypy accepts this because the module has matching functions
|
||
```
|
||
|
||
**Why this pattern is used:**
|
||
- **Simplicity** — no boilerplate class/instance wrapping.
|
||
- **Compatibility** — Python's **structural typing** (PEP 544) means the module automatically satisfies the Protocol interface if function signatures match.
|
||
- **Testability** — the same DIP principle applies; services depend on the Protocol, not the module directly, so tests can mock the Protocol.
|
||
|
||
**Risks and mitigations:**
|
||
- **Silent breakage if function signatures change** — If a parameter is added or removed from a module function, the module no longer satisfies the Protocol, but mypy does not flag this as an error because the module is loosely coupled. To prevent this, **Protocol signatures in `protocols.py` are the source of truth**. Always check that module functions match the Protocol definitions before merging changes. The CI/CD pipeline validates this compatibility at build time.
|
||
|
||
**How the validation works (CI check):**
|
||
- Before each deployment, run `mypy --strict` to ensure all dependency providers return values compatible with their Protocol types.
|
||
- The `cast()` calls in `dependencies.py` are a documented signal that structural compatibility is being verified externally, not via explicit class inheritance.
|
||
|
||
#### 13.7.2 Session Token Hashing — One-Way Protection Against Database Exposure
|
||
|
||
Session tokens must be protected against database exposure. **Session tokens are stored as one-way SHA256 hashes in the database** to ensure that if the database file is compromised (volume mount misconfiguration, backup leak, etc.), the session tokens themselves cannot be directly used to hijack sessions.
|
||
|
||
**Implementation pattern:**
|
||
|
||
```python
|
||
import hashlib
|
||
from typing import TYPE_CHECKING
|
||
|
||
if TYPE_CHECKING:
|
||
import aiosqlite
|
||
|
||
from app.models.auth import Session
|
||
|
||
def _hash_token(token: str) -> str:
|
||
"""Return the SHA256 hash of a session token."""
|
||
return hashlib.sha256(token.encode()).hexdigest()
|
||
|
||
async def create_session(
|
||
db: "aiosqlite.Connection",
|
||
token: str,
|
||
created_at: str,
|
||
expires_at: str,
|
||
) -> Session:
|
||
"""Insert a new session row with the token hash."""
|
||
token_hash = _hash_token(token)
|
||
cursor = await db.execute(
|
||
"INSERT INTO sessions (token_hash, created_at, expires_at) VALUES (?, ?, ?)",
|
||
(token_hash, created_at, expires_at),
|
||
)
|
||
await db.commit()
|
||
# Return the Session with the ORIGINAL token (not the hash)
|
||
# so the service layer can sign and return it to the client.
|
||
return Session(
|
||
id=int(cursor.lastrowid) if cursor.lastrowid else 0,
|
||
token=token, # ← raw token, not the hash
|
||
created_at=created_at,
|
||
expires_at=expires_at,
|
||
)
|
||
|
||
async def get_session(
|
||
db: "aiosqlite.Connection",
|
||
token: str
|
||
) -> Session | None:
|
||
"""Look up a session by token hash."""
|
||
token_hash = _hash_token(token)
|
||
async with db.execute(
|
||
"SELECT id, token_hash, created_at, expires_at FROM sessions WHERE token_hash = ?",
|
||
(token_hash,),
|
||
) as cursor:
|
||
row = await cursor.fetchone()
|
||
|
||
if row is None:
|
||
return None
|
||
|
||
# Return the Session with the INCOMING token (the one the client sent).
|
||
return Session(
|
||
id=int(row[0]),
|
||
token=token, # ← the raw token passed in
|
||
created_at=str(row[2]),
|
||
expires_at=str(row[3]),
|
||
)
|
||
```
|
||
|
||
**Key points:**
|
||
|
||
1. **Hash on write** — When inserting a session, hash the token before storage.
|
||
2. **Hash on read** — When validating a session, hash the incoming token before the database lookup.
|
||
3. **Never store raw tokens** — The `token_hash` column contains only hashes; raw tokens are never persisted.
|
||
4. **Return raw tokens to the service layer** — The `Session` model's `token` field contains the raw token (for signing and response), not the hash.
|
||
5. **Database schema** — Use `token_hash TEXT NOT NULL UNIQUE` instead of `token TEXT NOT NULL UNIQUE`, and create an index on `token_hash`.
|
||
6. **Migration strategy** — When upgrading from plaintext to hashed tokens, drop the old table and recreate it. This invalidates all existing sessions, which is acceptable because the database was exposed in plaintext.
|
||
|
||
**Why one-way hashing is safe:**
|
||
- If an attacker obtains a token hash from the database, they cannot reverse the SHA256 hash to recover the original token.
|
||
- The attacker cannot use the hash directly in a client request — they would need the original token to pass the hash check.
|
||
- This forces the attacker to either compromise the client (where they'd also get the raw token) or perform a brute-force attack against the hash space (infeasible for random 128-bit tokens).
|
||
|
||
**Never use symmetric encryption** — symmetric encryption stores a key in the database or environment, which merely shifts the exposure risk. A one-way hash is the correct choice for protecting tokens.
|
||
|
||
#### 13.7.2a Session Token Signing Format — HMAC-SHA256 Integrity Protection
|
||
|
||
**All session tokens sent to clients are signed using HMAC-SHA256.** The signed token format is:
|
||
|
||
```
|
||
<raw_token>.<signature>
|
||
```
|
||
|
||
where:
|
||
- `<raw_token>` is a 16-byte (128-bit) random hex string generated by `secrets.token_hex(16)`.
|
||
- `.` is the separator (defined in `app.utils.constants.SESSION_TOKEN_SIGNATURE_SEPARATOR`).
|
||
- `<signature>` is the HMAC-SHA256 hex digest of `<raw_token>` using the configured `session_secret`.
|
||
|
||
**Example:** `a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6.f7e6d5c4b3a2918f7e6d5c4b3a29180`
|
||
|
||
**Signing and verification pattern:**
|
||
|
||
```python
|
||
import hashlib
|
||
import hmac
|
||
|
||
def _session_token_signature(token: str, secret: str) -> str:
|
||
"""Return the HMAC-SHA256 signature for a session token."""
|
||
return hmac.new(secret.encode(), token.encode(), hashlib.sha256).hexdigest()
|
||
|
||
def sign_session_token(token: str, secret: str) -> str:
|
||
"""Return a signed session token string for the client."""
|
||
return f"{token}.{_session_token_signature(token, secret)}"
|
||
|
||
def unwrap_session_token(token: str, secret: str) -> str:
|
||
"""Verify and return the raw token from a signed session token.
|
||
|
||
Raises ValueError if the token lacks a signature or signature is invalid.
|
||
"""
|
||
if "." not in token:
|
||
raise ValueError("Invalid session token.")
|
||
|
||
raw_token, signature = token.rsplit(".", 1)
|
||
expected_signature = _session_token_signature(raw_token, secret)
|
||
if not hmac.compare_digest(expected_signature, signature):
|
||
raise ValueError("Invalid session token.")
|
||
return raw_token
|
||
```
|
||
|
||
**Key points:**
|
||
|
||
1. **All tokens must be signed** — Tokens without a signature (no separator) are rejected immediately.
|
||
2. **Signature is mandatory** — The `unwrap_session_token()` function raises `ValueError` if the separator is absent.
|
||
3. **Use HMAC-SHA256** — Always use `hmac.compare_digest()` for signature verification to prevent timing attacks.
|
||
4. **Sign on login** — `login()` creates a raw token, stores it (hashed) in the database, then returns the signed token to the client.
|
||
5. **Verify on every request** — The `validate_session()` service verifies the signature by calling `unwrap_session_token()` with the `session_secret`, then looks up the raw token in the database.
|
||
6. **Session invalidation** — When upgrading from plaintext to signed tokens (TASK-022), all existing sessions must be invalidated because raw tokens will no longer be stored unencrypted.
|
||
|
||
**Why HMAC signing is necessary:**
|
||
|
||
- **Prevents token forgery** — An attacker cannot create a valid token without knowing the `session_secret`.
|
||
- **Works alongside hashed storage** — Even if the database is compromised (plaintext before hashing), the attacker gets only the raw token, not a signed token. A raw token without a valid signature is rejected by `unwrap_session_token()`.
|
||
- **Timing attack resistance** — `hmac.compare_digest()` compares signatures in constant time, preventing attackers from using timing differences to guess valid signatures.
|
||
|
||
#### 13.7.3 Session Cache Pluggability — Process-Local vs. Shared Backends
|
||
|
||
Session validation is expensive (SQLite lookup + password verification). To improve performance, **validated session tokens are cached** using the `SessionCache` interface (`app.utils.session_cache`). The default implementation, `InMemorySessionCache`, stores cached sessions in process-local memory.
|
||
|
||
**Current implementation (single-worker):**
|
||
|
||
```python
|
||
from app.utils.session_cache import SessionCache, InMemorySessionCache, NoOpSessionCache
|
||
|
||
class SessionCache(Protocol):
|
||
"""Interface for session token validation cache backends."""
|
||
def get(self, token: str) -> Session | None: ...
|
||
def set(self, token: str, session: Session, ttl_seconds: float) -> None: ...
|
||
def invalidate(self, token: str) -> None: ...
|
||
def clear(self) -> None: ...
|
||
|
||
# Default in-memory implementation — PROCESS-LOCAL
|
||
class InMemorySessionCache:
|
||
def __init__(self) -> None:
|
||
self._entries: dict[str, tuple[Session, float]] = {}
|
||
```
|
||
|
||
**Single-worker constraint:**
|
||
|
||
`InMemorySessionCache` is **process-local** — each worker process has its own dict. In single-worker mode (enforced by TASK-002), this is safe and improves performance. In multi-worker deployments:
|
||
- A logout by worker A clears the session from A's cache, but worker B still has it → logout doesn't work.
|
||
- Enabling/disabling the cache requires restarting all workers to take effect.
|
||
|
||
**Multi-worker solution:**
|
||
|
||
To support multiple workers (future enhancement), implement a shared backend behind the same `SessionCache` Protocol:
|
||
|
||
```python
|
||
# Example Redis implementation (not yet in codebase)
|
||
class RedisSessionCache:
|
||
"""Session cache backed by Redis."""
|
||
def __init__(self, redis_url: str) -> None:
|
||
self.client = aioredis.from_url(redis_url)
|
||
|
||
async def get(self, token: str) -> Session | None:
|
||
data = await self.client.get(f"session:{token}")
|
||
return Session.model_validate_json(data) if data else None
|
||
|
||
async def set(self, token: str, session: Session, ttl_seconds: float) -> None:
|
||
await self.client.setex(
|
||
f"session:{token}",
|
||
int(ttl_seconds),
|
||
session.model_dump_json()
|
||
)
|
||
|
||
async def invalidate(self, token: str) -> None:
|
||
await self.client.delete(f"session:{token}")
|
||
|
||
async def clear(self) -> None:
|
||
await self.client.flushdb()
|
||
```
|
||
|
||
To adopt a Redis backend:
|
||
1. Create `RedisSessionCache` in `app.utils.session_cache`.
|
||
2. Update `app.utils.runtime_state.set_runtime_settings()` to instantiate `RedisSessionCache` when `REDIS_URL` env var is set.
|
||
3. Update `app.config.Settings` to accept optional `REDIS_URL`.
|
||
4. Tests continue to use `InMemorySessionCache` (no Redis dependency in dev).
|
||
|
||
**Implementation rules:**
|
||
- All cache methods must be `async` (even if the backend is sync).
|
||
- Never log session tokens or session data.
|
||
- TTL must be respected — expired entries must be removed on access.
|
||
- See `app/utils/session_cache.py` for the full Protocol definition and current implementations.
|
||
|
||
### 15.8 Composition over Inheritance
|
||
|
||
- Favour **composing** small, focused objects over deep inheritance hierarchies.
|
||
- Use mixins or protocols only when a clear "is-a" relationship exists; otherwise, pass collaborators as constructor arguments.
|
||
|
||
### 15.9 Fail Fast
|
||
|
||
- Validate inputs as early as possible — at the API boundary with Pydantic, at service entry with assertions or domain checks.
|
||
- Raise specific exceptions immediately rather than letting bad data propagate silently.
|
||
|
||
### 15.10 Law of Demeter (Principle of Least Knowledge)
|
||
|
||
- A function should only call methods on:
|
||
1. Its own object (`self`).
|
||
2. Objects passed as parameters.
|
||
3. Objects it creates.
|
||
- Avoid long accessor chains like `request.state.db.cursor().execute(...)` — wrap them in a meaningful method.
|
||
|
||
### 15.11 Defensive Programming
|
||
|
||
- Never trust external input — validate and sanitise everything that crosses a boundary (HTTP request, file, socket, environment variable).
|
||
- Handle edge cases explicitly: empty lists, `None` values, negative numbers, empty strings.
|
||
- Use type narrowing and exhaustive pattern matching (`match` / `case`) to eliminate impossible states.
|
||
|
||
### 15.12 SSRF Prevention (Server-Side Request Forgery)
|
||
|
||
When user-supplied URLs are fetched by the backend, validate them before making any HTTP requests:
|
||
|
||
1. **Use Pydantic's `AnyHttpUrl` type** to restrict schemes to `http://` and `https://` only.
|
||
- Rejects `file://`, `ftp://`, `gopher://`, and other non-http schemes at the model boundary.
|
||
|
||
2. **Validate resolved IP addresses** before fetching:
|
||
- Parse the hostname and resolve it via DNS (using `socket.getaddrinfo()`).
|
||
- Use `ipaddress.ip_address().is_private` to reject private/reserved ranges:
|
||
- RFC 1918: `10.0.0.0/8`, `172.16.0.0/12`, `192.168.0.0/16`
|
||
- Loopback: `127.0.0.0/8`, `::1/128`
|
||
- Link-local: `169.254.0.0/16`, `fe80::/10`
|
||
- IPv6 site-local, multicast, and reserved ranges.
|
||
- Raise `ValueError` if validation fails; let the router convert it to HTTP 400.
|
||
|
||
3. **Guard against DNS rebinding**:
|
||
- Validate DNS at URL creation/validation time (performed during request deserialization).
|
||
- For additional safety, re-validate the connection IP at HTTP client time (e.g., custom `aiohttp.TCPConnector` can inspect the resolved address during connect).
|
||
|
||
4. **Example implementation** (see `backend/app/utils/ip_utils.py`):
|
||
- `is_private_ip(ip_str: str) → bool`: Checks if IP is private/reserved/loopback/link-local.
|
||
- `async validate_blocklist_url(url: AnyHttpUrl) → None`: Async DNS resolution + private IP check.
|
||
- Service layer calls `await validate_blocklist_url(url)` before persisting; router catches `ValueError` and returns 400.
|
||
|
||
---
|
||
|
||
## 16. Quick Reference — Do / Don't
|
||
|
||
| Do | Don't |
|
||
|---|---|
|
||
| Type every function, variable, return | Leave types implicit |
|
||
| Use `async def` for I/O | Use sync functions for I/O |
|
||
| Validate with Pydantic at the boundary | Pass raw dicts through the codebase |
|
||
| Log with structlog + context keys | Use `print()` or format strings in logs |
|
||
| Write tests for every feature | Ship untested code |
|
||
| Use `aiohttp` for HTTP calls | Use `requests` |
|
||
| Handle errors with custom exceptions | Use bare `except:` |
|
||
| Keep routers thin, logic in services | Put business logic in routers |
|
||
| Use `datetime.now(datetime.UTC)` | Use naive datetimes |
|
||
| Run ruff + mypy before committing | Push code that doesn't pass linting |
|
||
| Keep GET endpoints read-only (no `db.commit()`) | Call `db.commit()` / INSERT inside GET handlers |
|
||
| Batch DB writes; issue one `db.commit()` after the loop | Commit inside a loop (1 fsync per row) |
|
||
| Use `executemany()` for bulk inserts | Call `execute()` + `commit()` per row in a loop | |