- Updated config.py to support environment-based configuration - Added test_config.py with full test coverage - Updated Backend-Development.md with configuration documentation - Removed outdated tasks from Tasks.md Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1108 lines
48 KiB
Markdown
1108 lines
48 KiB
Markdown
# Backend Development — Rules & Guidelines
|
||
|
||
Rules and conventions every backend developer must follow. Read this before writing your first line of code.
|
||
|
||
---
|
||
|
||
## 1. Language & Typing
|
||
|
||
- **Python 3.12+** is the minimum version.
|
||
- **Every** function, method, and variable must have explicit type annotations — no exceptions.
|
||
- Use `str`, `int`, `float`, `bool`, `None` for primitives.
|
||
- Use `list[T]`, `dict[K, V]`, `set[T]`, `tuple[T, ...]` (lowercase, built-in generics) — never `typing.List`, `typing.Dict`, etc.
|
||
- Use `T | None` instead of `Optional[T]`.
|
||
- Use `TypeAlias`, `TypeVar`, `Protocol`, and `NewType` when they improve clarity.
|
||
- Return types are **mandatory** — including `-> None`.
|
||
- Never use `Any` unless there is no other option and a comment explains why.
|
||
- Run `mypy --strict` (or `pyright` in strict mode) — the codebase must pass with zero errors.
|
||
|
||
```python
|
||
# Good
|
||
def get_jail_by_name(name: str) -> Jail | None:
|
||
...
|
||
|
||
# Bad — missing types
|
||
def get_jail_by_name(name):
|
||
...
|
||
```
|
||
|
||
---
|
||
|
||
## 2. Core Libraries
|
||
|
||
| Purpose | Library | Notes |
|
||
|---|---|---|
|
||
| Web framework | **FastAPI** | Async endpoints only. |
|
||
| Data validation & settings | **Pydantic v2** | All request/response bodies and config models. |
|
||
| Async HTTP client | **aiohttp** (`ClientSession`) | For external calls (blocklists, IP lookups). |
|
||
| Scheduling | **APScheduler 4.x** (async) | Blocklist imports, periodic health checks. |
|
||
| Structured logging | **structlog** | Every log call must use structlog — never `print()` or `logging` directly. |
|
||
| Database | **aiosqlite** | Async SQLite access for the application database. |
|
||
| Testing | **pytest** + **pytest-asyncio** + **httpx** (`AsyncClient`) | Every feature needs tests. |
|
||
| Mocking | **unittest.mock** / **pytest-mock** | Isolate external dependencies. |
|
||
| Date & time | **datetime** (stdlib) — always timezone-aware | Use `datetime.datetime.now(datetime.UTC)`. Never naive datetimes. |
|
||
| IP / Network | **ipaddress** (stdlib) | Validate and normalise IPs and CIDR ranges. |
|
||
| Environment / config | **pydantic-settings** | Load `.env` and environment variables into typed models. |
|
||
| fail2ban integration | **fail2ban client** (bundled) | Use the local copy at [`./fail2ban-master`](../fail2ban-master). Import from [`./fail2ban-master/fail2ban/client`](../fail2ban-master/fail2ban/client) to communicate with the fail2ban socket. Do **not** install fail2ban as a pip package. |
|
||
|
||
### fail2ban Client Usage
|
||
|
||
The repository ships with a vendored copy of fail2ban located at `./fail2ban-master`.
|
||
All communication with the fail2ban daemon must go through the client classes found in `./fail2ban-master/fail2ban/client`.
|
||
Add the project root to `sys.path` (or configure it in `pyproject.toml` as a path dependency) so that `from fail2ban.client ...` resolves to the bundled copy.
|
||
|
||
```python
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
# Ensure the bundled fail2ban is importable
|
||
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "fail2ban-master"))
|
||
|
||
from fail2ban.client.csocket import CSSocket # noqa: E402
|
||
```
|
||
|
||
### Libraries you must NOT use
|
||
|
||
- `requests` — use `aiohttp` (async).
|
||
- `flask` — we use FastAPI.
|
||
- `celery` — we use APScheduler.
|
||
- `print()` for logging — use `structlog`.
|
||
- `json.loads` / `json.dumps` on Pydantic models — use `.model_dump()` / `.model_validate()`.
|
||
|
||
### Timestamp Handling
|
||
|
||
Timestamp consistency is critical for accurate ban history queries across the dashboard and history endpoints. Follow these rules:
|
||
|
||
**Rule 1: Use consistent UTC timestamps**
|
||
- All timestamps in the database are stored as Unix epochs (seconds since 1970-01-01 UTC).
|
||
- fail2ban stores timestamps using `time.time()`, which is always UTC epoch seconds.
|
||
- When querying fail2ban's SQLite database by timestamp, use `app.utils.time_utils.since_unix()` (not manual datetime calculations).
|
||
|
||
**Rule 2: Time-range windows include a 60-second slack**
|
||
- The `since_unix()` function includes a 60-second slack window (`TIME_RANGE_SLACK_SECONDS` in `app.utils.constants`).
|
||
- This slack accommodates:
|
||
- Clock drift between the local system and fail2ban.
|
||
- Test seeding delays when timestamps are manually set to exact boundaries.
|
||
- The slack ensures that dashboard and history queries return consistent row counts for the same time range.
|
||
|
||
**Rule 3: Never duplicate timestamp calculation logic**
|
||
- All services that query by time range must import and use `since_unix()`.
|
||
- Do not recalculate timestamps locally using `datetime` or `time` modules in service code.
|
||
- If you need a timestamp for a time range, use `since_unix()`.
|
||
|
||
**Example:**
|
||
```python
|
||
from app.utils.time_utils import since_unix
|
||
|
||
# Get all bans from the last 24 hours (with 60-second slack)
|
||
since_ts: int = since_unix("24h")
|
||
rows = await db.execute(
|
||
"SELECT * FROM bans WHERE timeofban >= ?",
|
||
(since_ts,)
|
||
)
|
||
```
|
||
|
||
---
|
||
|
||
## 3. Project Structure
|
||
|
||
```
|
||
backend/
|
||
├── app/
|
||
│ ├── __init__.py
|
||
│ ├── main.py # FastAPI app factory, lifespan
|
||
│ ├── config.py # Pydantic settings
|
||
│ ├── dependencies.py # FastAPI dependency providers
|
||
│ ├── models/ # Pydantic schemas (request, response, domain)
|
||
│ ├── routers/ # FastAPI routers grouped by feature
|
||
│ ├── services/ # Business logic — one service per domain
|
||
│ ├── repositories/ # Database access layer
|
||
│ ├── tasks/ # APScheduler jobs
|
||
│ └── utils/ # Helpers, constants, shared types
|
||
├── tests/
|
||
│ ├── conftest.py
|
||
│ ├── test_routers/
|
||
│ ├── test_services/
|
||
│ └── test_repositories/
|
||
├── pyproject.toml
|
||
└── .env.example
|
||
```
|
||
|
||
- **Routers** receive requests, validate input via Pydantic, and delegate to **services**.
|
||
- **Services** contain business logic and call **repositories** or external clients.
|
||
- **Repositories** handle raw database queries — nothing else.
|
||
- Never put business logic inside routers or repositories.
|
||
|
||
---
|
||
|
||
## 4. FastAPI Conventions
|
||
|
||
- Use **async def** for every endpoint — no sync endpoints.
|
||
- Every endpoint must declare explicit **response models** (`response_model=...`).
|
||
- Use **Pydantic models** for request bodies and query parameters — never raw dicts.
|
||
- Use **Depends()** for dependency injection (database sessions, services, auth).
|
||
- Group endpoints into routers by feature domain (`routers/jails.py`, `routers/bans.py`, …).
|
||
- Use appropriate HTTP status codes: `201` for creation, `204` for deletion with no body, `404` for not found, etc.
|
||
- Protected endpoints should return `401 Unauthorized` or `403 Forbidden` when the session is invalid or expired; the frontend treats these responses as a session-expiry event and redirects the user to `/login`.
|
||
- Use **HTTPException** or custom exception handlers — never return error dicts manually.
|
||
- **GET endpoints are read-only — never call `db.commit()` or execute INSERT/UPDATE/DELETE inside a GET handler.** If a GET path produces side-effects (e.g., caching resolved data), that write belongs in a background task, a scheduled flush, or a separate POST endpoint. Users and HTTP caches assume GET is idempotent and non-mutating.
|
||
|
||
```python
|
||
# Good — pass db=None on GET so geo_service never commits
|
||
result = await geo_service.lookup_batch(ips, http_session, db=None)
|
||
|
||
# Bad — triggers INSERT + COMMIT per IP inside a GET handler
|
||
result = await geo_service.lookup_batch(ips, http_session, db=app_db)
|
||
```
|
||
|
||
```python
|
||
from fastapi import APIRouter, Depends, HTTPException, status
|
||
from app.models.jail import JailResponse, JailListResponse
|
||
from app.services.jail_service import JailService
|
||
|
||
router: APIRouter = APIRouter(prefix="/api/jails", tags=["Jails"])
|
||
|
||
@router.get("/", response_model=JailListResponse)
|
||
async def list_jails(service: JailService = Depends()) -> JailListResponse:
|
||
jails: list[JailResponse] = await service.get_all_jails()
|
||
return JailListResponse(jails=jails)
|
||
```
|
||
|
||
---
|
||
|
||
## 5. Pydantic Models
|
||
|
||
- Every model inherits from `pydantic.BaseModel`.
|
||
- Use `model_config = ConfigDict(strict=True)` where appropriate.
|
||
- Field names use **snake_case** in Python, export as **camelCase** to the frontend via alias generators if needed.
|
||
- Validate at the boundary — once data enters a Pydantic model it is trusted.
|
||
- Use `Field(...)` with descriptions for every field to keep auto-generated docs useful.
|
||
- Separate **request models**, **response models**, and **domain (internal) models** — do not reuse one model for all three.
|
||
|
||
```python
|
||
from pydantic import BaseModel, Field
|
||
from datetime import datetime
|
||
|
||
class BanResponse(BaseModel):
|
||
ip: str = Field(..., description="Banned IP address")
|
||
jail: str = Field(..., description="Jail that issued the ban")
|
||
banned_at: datetime = Field(..., description="UTC timestamp of the ban")
|
||
expires_at: datetime | None = Field(None, description="UTC expiry, None if permanent")
|
||
ban_count: int = Field(..., ge=1, description="Number of times this IP was banned")
|
||
```
|
||
|
||
### Using `Literal` Types for Constrained Strings
|
||
|
||
When a field should only accept a small set of predefined values, use `Literal` to enforce this at the type level:
|
||
|
||
```python
|
||
from typing import Literal
|
||
from pydantic import BaseModel, Field
|
||
|
||
LogLevel = Literal["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"]
|
||
|
||
class GlobalConfigUpdate(BaseModel):
|
||
log_level: LogLevel | None = Field(
|
||
default=None,
|
||
description="Log level: CRITICAL, ERROR, WARNING, NOTICE, INFO, or DEBUG.",
|
||
)
|
||
```
|
||
|
||
This provides:
|
||
- **Type safety** — IDEs and type checkers enforce valid values.
|
||
- **API documentation** — OpenAPI docs automatically list all allowed values.
|
||
- **Validation** — Pydantic rejects invalid values and provides a clear error message.
|
||
|
||
### Custom Field Validators
|
||
|
||
For fields that require complex validation (e.g., file paths that must be within allowed directories), use `@field_validator`:
|
||
|
||
```python
|
||
from pydantic import field_validator
|
||
from app.utils.path_utils import validate_log_path
|
||
|
||
class AddLogPathRequest(BaseModel):
|
||
log_path: str = Field(..., description="Absolute path to the log file to monitor.")
|
||
|
||
@field_validator("log_path", mode="after")
|
||
@classmethod
|
||
def validate_log_path_field(cls, value: str) -> str:
|
||
"""Validate that the log path is within allowed directories."""
|
||
return validate_log_path(value)
|
||
```
|
||
|
||
**Path Validation Helper:**
|
||
|
||
For query parameters and other contexts where Pydantic validators cannot be used directly, use the `validate_log_path()` helper from `app.utils.path_utils`:
|
||
|
||
```python
|
||
from fastapi import HTTPException, status
|
||
from app.utils.path_utils import validate_log_path
|
||
|
||
@router.delete("/{name}/logpath")
|
||
async def delete_log_path(
|
||
name: str,
|
||
log_path: str = Query(...),
|
||
) -> None:
|
||
try:
|
||
validate_log_path(log_path)
|
||
except ValueError as e:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
||
detail=str(e),
|
||
) from e
|
||
# ... rest of handler
|
||
```
|
||
|
||
**Key points:**
|
||
- Use `mode="after"` in model validators to validate after Pydantic's basic type coercion.
|
||
- Raise `ValueError` if validation fails; Pydantic converts it to an HTTP 400 response.
|
||
- For query parameters that cannot use Pydantic validators, use the `validate_log_path()` helper and raise HTTP 422.
|
||
- **Never use string prefix matching** for path validation (e.g., `path.startswith("/var/log")`). The helper uses `Path.relative_to()` to prevent bypasses like `/var/log_evil/file.log`.
|
||
- Symlinks are resolved before validating to prevent symlink-based escapes.
|
||
|
||
---
|
||
|
||
## 6. Async Rules
|
||
|
||
- **Never** call blocking / synchronous I/O in an async function — no `time.sleep()`, no synchronous file reads, no `requests.get()`.
|
||
- Use `aiohttp.ClientSession` for HTTP calls, `aiosqlite` for database access.
|
||
- Use `asyncio.TaskGroup` (Python 3.11+) when you need to run independent coroutines concurrently.
|
||
- Long-running startup/shutdown logic goes into the **FastAPI lifespan** context manager.
|
||
- **Never call `db.commit()` inside a loop.** With aiosqlite, every commit serialises through a background thread and forces an `fsync`. N rows × 1 commit = N fsyncs. Accumulate all writes in the loop, then issue a single `db.commit()` once after the loop ends. The difference between 5,000 commits and 1 commit can be seconds vs milliseconds.
|
||
|
||
```python
|
||
# Good — one commit for the whole batch
|
||
for ip, info in results.items():
|
||
await db.execute(INSERT_SQL, (ip, info.country_code, ...))
|
||
await db.commit() # ← single fsync
|
||
|
||
# Bad — one fsync per row
|
||
for ip, info in results.items():
|
||
await db.execute(INSERT_SQL, (ip, info.country_code, ...))
|
||
await db.commit() # ← fsync on every iteration
|
||
```
|
||
- **Prefer `executemany()` over calling `execute()` in a loop** when inserting or updating multiple rows with the same SQL template. aiosqlite passes the entire batch to SQLite in one call, reducing Python↔thread overhead on top of the single-commit saving.
|
||
|
||
```python
|
||
# Good
|
||
await db.executemany(INSERT_SQL, [(ip, cc, cn, asn, org) for ip, info in results.items()])
|
||
await db.commit()
|
||
```
|
||
- Shared resources (DB connections, HTTP sessions) are created once during startup and closed during shutdown — never inside request handlers.
|
||
|
||
```python
|
||
from contextlib import asynccontextmanager
|
||
from collections.abc import AsyncGenerator
|
||
from fastapi import FastAPI
|
||
import aiohttp
|
||
import aiosqlite
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
|
||
# Startup
|
||
app.state.http_session = aiohttp.ClientSession()
|
||
app.state.db = await aiosqlite.connect("bangui.db")
|
||
yield
|
||
# Shutdown
|
||
await app.state.http_session.close()
|
||
await app.state.db.close()
|
||
```
|
||
|
||
---
|
||
|
||
## 6.1 Database Query Conventions
|
||
|
||
### LIKE Queries and Wildcard Escaping
|
||
|
||
SQLite's `LIKE` operator treats `%` (any sequence of characters) and `_` (any single character) as wildcards. When querying with user-supplied filters that may contain these characters, you must escape them to prevent unintended matches.
|
||
|
||
**The Problem:**
|
||
```python
|
||
# Bad — ip_filter="10.0.0_" matches "10.0.0.1", "10.0.0.2", etc.
|
||
ip_filter = "10.0.0_"
|
||
await db.execute(
|
||
"SELECT * FROM bans WHERE ip LIKE ?",
|
||
(f"{ip_filter}%",) # ← wildcard characters not escaped
|
||
)
|
||
```
|
||
|
||
**The Solution:**
|
||
|
||
Use the `escape_like()` helper from `app.utils.fail2ban_db_utils`:
|
||
|
||
```python
|
||
from app.utils.fail2ban_db_utils import escape_like
|
||
|
||
# Good — wildcard characters are escaped
|
||
ip_filter = "10.0.0_"
|
||
await db.execute(
|
||
"SELECT * FROM bans WHERE ip LIKE ? ESCAPE '\\'",
|
||
(f"{escape_like(ip_filter)}%",) # ← underscores escaped to literal
|
||
)
|
||
```
|
||
|
||
**How `escape_like()` works:**
|
||
|
||
The function escapes backslashes first, then `%` and `_` signs:
|
||
```python
|
||
def escape_like(s: str) -> str:
|
||
return s.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
|
||
```
|
||
|
||
**Key rules:**
|
||
1. **Backslash escapes first** — to prevent double-escaping when the input contains backslashes.
|
||
2. **Add `ESCAPE '\\'` to the SQL** — tells SQLite which character to use for escaping.
|
||
3. **Dots are not wildcards** — they do not need escaping; normal IP addresses pass through unchanged.
|
||
|
||
**Test example:**
|
||
```python
|
||
assert escape_like("10.0.0_") == "10.0.0\\_"
|
||
assert escape_like("10.0.0%test") == "10.0.0\\%test"
|
||
assert escape_like("10.0.0.1") == "10.0.0.1" # Unchanged
|
||
```
|
||
|
||
---
|
||
|
||
## 7. Logging
|
||
|
||
- Use **structlog** for every log message.
|
||
- Bind contextual key-value pairs — never format strings manually.
|
||
- Log levels: `debug` for development detail, `info` for operational events, `warning` for recoverable issues, `error` for failures, `critical` for fatal problems.
|
||
- **Never log sensitive data** (passwords, tokens, session tokens, raw credentials, private keys).
|
||
- For session correlation without leaking token material, use a one-way hash fragment: `hashlib.sha256(token.encode()).hexdigest()[:12]`.
|
||
- Use numeric database IDs for entity correlation instead of raw identifiers: `session_id=session.id` instead of `token=session.token`.
|
||
|
||
```python
|
||
import structlog
|
||
import hashlib
|
||
|
||
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
||
|
||
async def ban_ip(ip: str, jail: str) -> None:
|
||
log.info("banning_ip", ip=ip, jail=jail)
|
||
try:
|
||
await _execute_ban(ip, jail)
|
||
log.info("ip_banned", ip=ip, jail=jail)
|
||
except BanError as exc:
|
||
log.error("ban_failed", ip=ip, jail=jail, error=str(exc))
|
||
raise
|
||
|
||
async def logout_session(db: aiosqlite.Connection, token: str) -> None:
|
||
# Use a one-way hash for token correlation in logs
|
||
token_hash = hashlib.sha256(token.encode()).hexdigest()[:12]
|
||
await session_repo.delete_session(db, token)
|
||
log.info("session_terminated", token_hash=token_hash)
|
||
```
|
||
|
||
---
|
||
|
||
## 8. Error Handling
|
||
|
||
- Define **custom exception classes** for domain errors (e.g., `JailNotFoundError`, `BanFailedError`).
|
||
- Catch specific exceptions — never bare `except:` or `except Exception:` without re-raising.
|
||
- Map domain exceptions to HTTP status codes via FastAPI **exception handlers** registered on the app.
|
||
- Always log errors with context before raising.
|
||
|
||
```python
|
||
class JailNotFoundError(Exception):
|
||
def __init__(self, name: str) -> None:
|
||
self.name: str = name
|
||
super().__init__(f"Jail '{name}' not found")
|
||
|
||
# In main.py
|
||
@app.exception_handler(JailNotFoundError)
|
||
async def jail_not_found_handler(request: Request, exc: JailNotFoundError) -> JSONResponse:
|
||
return JSONResponse(status_code=404, content={"detail": f"Jail '{exc.name}' not found"})
|
||
```
|
||
|
||
### Routers and Exception Propagation
|
||
|
||
- **Routers must NOT construct `HTTPException` for domain errors** — let domain exceptions propagate.
|
||
- Routers should never have helper functions like `_bad_gateway()`, `_not_found()`, `_conflict()` etc. that convert domain exceptions to `HTTPException`.
|
||
- All domain exception types must have corresponding handlers registered in `main.py` via `app.add_exception_handler()`.
|
||
- Exception handlers are registered in order from most specific to least specific — FastAPI evaluates them in registration order.
|
||
|
||
```python
|
||
# ❌ BAD — routers constructing HTTPException for domain exceptions
|
||
@router.get("/{name}")
|
||
async def get_jail(name: str, socket_path: Fail2BanSocketDep) -> JailDetailResponse:
|
||
try:
|
||
return await jail_service.get_jail(socket_path, name)
|
||
except JailNotFoundError:
|
||
raise HTTPException(status_code=404, detail=f"Jail not found: {name!r}") from None
|
||
|
||
# ✅ GOOD — domain exception propagates to global handler
|
||
@router.get("/{name}")
|
||
async def get_jail(name: str, socket_path: Fail2BanSocketDep) -> JailDetailResponse:
|
||
return await jail_service.get_jail(socket_path, name)
|
||
```
|
||
|
||
All domain exceptions raised by services propagate to handlers in `main.py`, ensuring:
|
||
1. Consistent error response format across the entire API.
|
||
2. No duplicated exception-to-HTTP-status mapping logic.
|
||
3. Easy to audit all error codes — they are all in one place.
|
||
|
||
---
|
||
|
||
## 9. Testing
|
||
|
||
- **Every** new feature or bug fix must include tests.
|
||
- Tests live in `tests/` mirroring the `app/` structure.
|
||
- Use `pytest` with `pytest-asyncio` for async tests.
|
||
- Use `httpx.AsyncClient` to test FastAPI endpoints (not `TestClient` which is sync).
|
||
- Mock external dependencies (fail2ban socket, aiohttp calls) — tests must never touch real infrastructure.
|
||
- Aim for **>80 % line coverage** — critical paths (auth, banning, scheduling) must be 100 %.
|
||
- Test names follow `test_<unit>_<scenario>_<expected>` pattern.
|
||
|
||
```python
|
||
import pytest
|
||
from httpx import AsyncClient, ASGITransport
|
||
from app.main import create_app
|
||
|
||
@pytest.fixture
|
||
async def client() -> AsyncClient:
|
||
app = create_app()
|
||
transport: ASGITransport = ASGITransport(app=app)
|
||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||
yield ac
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_list_jails_returns_200(client: AsyncClient) -> None:
|
||
response = await client.get("/api/jails/")
|
||
assert response.status_code == 200
|
||
data: dict = response.json()
|
||
assert "jails" in data
|
||
```
|
||
|
||
---
|
||
|
||
## 9.1 Background Tasks and Scheduler Architecture
|
||
|
||
BanGUI uses **APScheduler 4.x** (async mode) to manage background jobs that execute on a schedule without user interaction. This section documents how to write and register background tasks.
|
||
|
||
### Task Location and Structure
|
||
|
||
All background tasks live in `backend/app/tasks/` as separate modules. Each task:
|
||
- Exports a `register(app: FastAPI) -> None` or `async def register(app: FastAPI) -> None` function.
|
||
- Opens its own database connection using `app.db.open_db()` or the `task_db()` helper.
|
||
- Closes connections when work completes (use the async context manager pattern).
|
||
- Runs independently of the FastAPI request/response cycle.
|
||
|
||
### Example Task
|
||
|
||
```python
|
||
# backend/app/tasks/my_task.py
|
||
import structlog
|
||
from fastapi import FastAPI
|
||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||
|
||
log = structlog.get_logger()
|
||
|
||
async def my_background_job(app: FastAPI) -> None:
|
||
"""Do important work on a schedule."""
|
||
log.info("my_background_job_started")
|
||
try:
|
||
db = await app.db.open_db(app.state.settings.database_path)
|
||
try:
|
||
# Do work...
|
||
pass
|
||
finally:
|
||
await db.close()
|
||
except Exception:
|
||
log.error("my_background_job_failed", exc_info=True)
|
||
|
||
def register(app: FastAPI) -> None:
|
||
"""Register the job with the scheduler."""
|
||
scheduler: AsyncIOScheduler = app.state.scheduler
|
||
scheduler.add_job(
|
||
my_background_job,
|
||
args=(app,),
|
||
trigger="interval",
|
||
seconds=60,
|
||
id="my_task",
|
||
name="My Background Job",
|
||
)
|
||
```
|
||
|
||
### Accessing Shared Resources in Tasks
|
||
|
||
Since tasks do not have access to `Depends(get_db)` (no request scope), they must:
|
||
1. **Open their own DB connection** via `app.state.db_factory.open_db(path)`.
|
||
2. **Access app-level state** — `app.state.http_session`, `app.state.geo_cache`, `app.state.settings`, etc.
|
||
3. **Use structlog** for all logging (never `print()`).
|
||
|
||
### Single-Worker Requirement
|
||
|
||
**The scheduler is bound to a single asyncio event loop and cannot be shared across multiple worker processes.** BanGUI enforces single-worker mode to prevent duplicate task execution.
|
||
|
||
- **Deployment constraint:** Set `BANGUI_WORKERS=1` (default).
|
||
- **Startup validation:** `startup_shared_resources()` raises `RuntimeError` if `BANGUI_WORKERS > 1`.
|
||
- See [Architekture.md § 9.2](Architekture.md) for full details.
|
||
|
||
---
|
||
|
||
## 10. Code Style & Tooling
|
||
|
||
| Tool | Purpose |
|
||
|---|---|
|
||
| **Ruff** | Linter and formatter (replaces black, isort, flake8). |
|
||
| **mypy** or **pyright** | Static type checking in strict mode. |
|
||
| **pre-commit** | Run ruff + type checker before every commit. |
|
||
|
||
- Line length: **120 characters** max.
|
||
- Strings: use **double quotes** (`"`).
|
||
- Imports: sorted by ruff — stdlib → third-party → local, one import per line.
|
||
- No unused imports, no unused variables, no `# type: ignore` without explanation.
|
||
- Docstrings in **Google style** on every public function, class, and module.
|
||
|
||
---
|
||
|
||
## 11. fail2ban Response Utilities
|
||
|
||
All services that interact with the fail2ban daemon must use the canonical response parsing utilities from `app.utils.fail2ban_response`. This ensures consistent error handling, type safety, and makes it easy to fix bugs in response handling across the entire codebase.
|
||
|
||
### Available Functions
|
||
|
||
**`ok(response: object) -> object`**
|
||
Extracts the payload from a fail2ban ``(return_code, data)`` response tuple.
|
||
- Raises `ValueError` if return code ≠ 0 or response shape is invalid.
|
||
- Use this on every response from `Fail2BanClient.send()`.
|
||
|
||
**`to_dict(pairs: object) -> dict[str, object]`**
|
||
Converts a list of ``(key, value)`` pairs (fail2ban's native response format) to a Python dict.
|
||
- Silently ignores malformed entries and non-list/tuple inputs.
|
||
- Always returns a dict (empty if input is invalid).
|
||
|
||
**`ensure_list(value: object | None) -> list[str]`**
|
||
Coerces fail2ban response values (which may be `None`, a single string, or a list) to a normalized list of strings.
|
||
- Handles all three cases consistently.
|
||
- Returns empty list for `None` or empty strings.
|
||
|
||
**`is_not_found_error(exc: Exception) -> bool`**
|
||
Checks if an exception indicates a jail does not exist.
|
||
- Checks for multiple error message patterns (case-insensitive).
|
||
- Use this to distinguish "jail not found" errors from other failures.
|
||
|
||
### Example Usage
|
||
|
||
```python
|
||
from app.utils.fail2ban_response import ok, to_dict, ensure_list, is_not_found_error
|
||
from app.utils.fail2ban_client import Fail2BanClient
|
||
|
||
client = Fail2BanClient(socket_path="/var/run/fail2ban/fail2ban.sock")
|
||
|
||
try:
|
||
# Get jail status
|
||
response = await client.send(["status", "sshd", "short"])
|
||
status_dict = to_dict(ok(response)) # Extract payload and convert to dict
|
||
|
||
# Get list of banned IPs
|
||
ban_response = await client.send(["get", "sshd", "banip"])
|
||
banned_ips = ensure_list(ok(ban_response)) # Normalize to list of strings
|
||
|
||
except ValueError as exc:
|
||
if is_not_found_error(exc):
|
||
raise JailNotFoundError("sshd") from exc
|
||
raise
|
||
```
|
||
|
||
### Why This Matters
|
||
|
||
Before this utility module, every service implemented its own copy of these functions, leading to:
|
||
- Code duplication across 7+ service files.
|
||
- Subtle inconsistencies in error handling.
|
||
- Difficult maintenance — every bug fix required touching multiple files.
|
||
|
||
Now, all services import from a single authoritative source, making response handling consistent, maintainable, and type-safe.
|
||
|
||
---
|
||
|
||
## 12. Configuration & Secrets
|
||
|
||
- All configuration lives in **environment variables** loaded through **pydantic-settings**.
|
||
- Secrets (master password hash, session key) are **never** committed to the repository.
|
||
- Provide a `.env.example` with all keys and placeholder values.
|
||
- Validate config at startup — fail fast with a clear error if a required value is missing.
|
||
|
||
```python
|
||
from pydantic_settings import BaseSettings
|
||
from pydantic import Field
|
||
|
||
class Settings(BaseSettings):
|
||
database_path: str = Field("bangui.db", description="Path to SQLite database")
|
||
fail2ban_socket: str = Field("/var/run/fail2ban/fail2ban.sock", description="fail2ban socket path")
|
||
session_secret: str = Field(..., description="Secret key for session signing")
|
||
log_level: str = Field("info", description="Logging level")
|
||
|
||
model_config = {"env_prefix": "BANGUI_", "env_file": ".env"}
|
||
```
|
||
|
||
### Session Secret Configuration
|
||
|
||
The `session_secret` is the HMAC key used to sign all session tokens. It must be at least 32 characters (256 bits) to provide sufficient cryptographic strength for HMAC-SHA256.
|
||
|
||
**Minimum Length:** 32 characters
|
||
|
||
**Why 32 characters?** Session tokens are signed using HMAC-SHA256. A secret shorter than 32 bytes (256 bits) significantly weakens the signature, potentially allowing attackers to forge valid tokens. The constraint is enforced at startup — the application will fail to start if `session_secret` is shorter than 32 characters.
|
||
|
||
**Generation:** Generate a secure secret using Python:
|
||
|
||
```bash
|
||
python -c "import secrets; print(secrets.token_hex(32))"
|
||
```
|
||
|
||
This produces a 64-character hexadecimal string (256 bits) suitable for production use.
|
||
|
||
**Environment Variable:**
|
||
|
||
```bash
|
||
BANGUI_SESSION_SECRET="your-32-character-minimum-secret-here"
|
||
```
|
||
|
||
**Never** commit the actual secret to the repository. Provide a `.env.example` with a placeholder:
|
||
|
||
```bash
|
||
# .env.example
|
||
BANGUI_SESSION_SECRET="set-this-to-a-32-character-minimum-secret"
|
||
```
|
||
|
||
### Session Cookie Security
|
||
|
||
The `session_cookie_secure` configuration controls the `Secure` flag on the session cookie. This flag prevents browsers from sending the session cookie over unencrypted HTTP.
|
||
|
||
**Default:** `true` — Production deployments are secure by default. Cookies are only sent over HTTPS.
|
||
|
||
**Local Development:** Set `BANGUI_SESSION_COOKIE_SECURE=false` in your compose file or `.env` to allow cookies over HTTP (required for `localhost:8000`).
|
||
|
||
```yaml
|
||
# Docker/compose.debug.yml
|
||
environment:
|
||
BANGUI_SESSION_COOKIE_SECURE: "false" # Allow HTTP during local development
|
||
```
|
||
|
||
**Important:** If `Secure=true` is set, browsers will reject the session cookie when the backend is served over HTTP. Ensure your nginx/reverse proxy terminates TLS and passes `X-Forwarded-Proto: https` so FastAPI knows the connection is secure.
|
||
|
||
### fail2ban_start_command Configuration
|
||
|
||
The `fail2ban_start_command` setting specifies the shell command used to start the fail2ban daemon during recovery operations (e.g., after a rollback).
|
||
|
||
**Format & Parsing:**
|
||
- The command is split into arguments using `shlex.split()`, which respects shell quoting rules.
|
||
- Paths with spaces must be quoted. Example: `"/opt/my tools/fail2ban-client" start`.
|
||
- The command is **not** executed through a shell — no shell variables or globbing are interpreted.
|
||
|
||
**Validation:**
|
||
- The command is validated at startup using `shlex.split()`. Mismatched quotes will raise a `ValueError` with the problematic command in the error message.
|
||
|
||
**Environment Variables:**
|
||
```bash
|
||
BANGUI_FAIL2BAN_START_COMMAND="fail2ban-client start" # Default
|
||
BANGUI_FAIL2BAN_START_COMMAND="systemctl start fail2ban" # systemd
|
||
BANGUI_FAIL2BAN_START_COMMAND='"/opt/my tools/fail2ban" start' # Quoted path
|
||
```
|
||
|
||
**Common Pitfall:**
|
||
Using `.split()` instead of `shlex.split()` would break commands with spaces in paths. Always use quoted strings for paths that contain whitespace.
|
||
|
||
### Log Path Validation & Allowlisting
|
||
|
||
Authenticated users can instruct fail2ban to monitor additional log files through the API endpoint `POST /api/config/jails/{name}/logpath`. To prevent path-traversal attacks and unauthorized reads of sensitive system files, all requested log paths must resolve to locations within a configurable allowlist of safe directories.
|
||
|
||
**Allowed Directories:**
|
||
- Configured via the `BANGUI_ALLOWED_LOG_DIRS` environment variable (comma-separated list).
|
||
- Defaults to: `["/var/log", "/config/log"]`.
|
||
|
||
**Path Validation Rules:**
|
||
1. The requested path is resolved to its canonical form using `Path(log_path).resolve()`, which:
|
||
- Expands relative paths to absolute paths.
|
||
- Resolves symbolic links to their real targets.
|
||
- Normalizes `.` and `..` components.
|
||
2. The resolved path is checked using `Path.is_relative_to()` against each allowed directory prefix.
|
||
3. If the resolved path is not relative to any allowed directory, a `ValueError` is raised with a descriptive error message.
|
||
|
||
**Implementation:**
|
||
- Validation occurs in the Pydantic model `AddLogPathRequest` using a `@field_validator`.
|
||
- The validator runs at request time, before the service layer is invoked.
|
||
- Symlinks that escape allowed directories are rejected (see [symlink bypass tests](../../backend/tests/test_models.py)).
|
||
|
||
**Important:** Use `is_relative_to()`, not `startswith()` or string prefix matching. The latter is bypassable with paths like `/var/log_evil/file.log`.
|
||
|
||
**Environment Variables:**
|
||
```bash
|
||
BANGUI_ALLOWED_LOG_DIRS="/var/log,/config/log" # Default
|
||
BANGUI_ALLOWED_LOG_DIRS="/var/log,/config/log,/home/app/logs" # Custom directory
|
||
```
|
||
|
||
### Login Rate Limiting
|
||
|
||
The login endpoint (`POST /api/auth/login`) is protected against brute-force attacks using an in-memory rate limiter.
|
||
|
||
**Design:**
|
||
- Uses a `dict[str, deque[float]]` keyed by client IP, storing login attempt timestamps within a time window.
|
||
- Attempts outside the window are automatically removed during validation checks.
|
||
- Expired IP entries are cleaned up to prevent unbounded memory growth.
|
||
|
||
**Rate Limit Rules:**
|
||
- **5 attempts per 60 seconds** per IP address.
|
||
- Requests exceeding the limit return **HTTP 429 Too Many Requests** with a `Retry-After` header.
|
||
- Each failed login triggers a 10-second server-side delay (`asyncio.sleep`) to further slow attacks, on top of bcrypt hashing (~100ms).
|
||
|
||
**IP Extraction (Proxy Safety):**
|
||
- When behind nginx, the rate limiter reads the real client IP from `X-Forwarded-For` or `X-Real-IP` headers.
|
||
- Only trusts these headers when the immediate connection source is in a configured trusted proxy list.
|
||
- Prevents attackers from spoofing these headers to bypass rate limits.
|
||
- Falls back to the direct connection IP when proxy headers cannot be trusted.
|
||
|
||
**Process-Local Limitation:**
|
||
- The rate limiter is process-local (in-memory). In multi-worker deployments (e.g., Gunicorn with 4 workers), each worker maintains its own rate limit counter.
|
||
- This is acceptable because the single-worker constraint is enforced elsewhere. See [TASK-002/003 notes](Instructions.md) for details.
|
||
|
||
**Implementation:**
|
||
- Rate limiter: `app.utils.rate_limiter.RateLimiter`
|
||
- IP extraction: `app.utils.client_ip.get_client_ip()`
|
||
- Dependency: `LoginRateLimiterDep` in `app.dependencies`
|
||
|
||
---
|
||
|
||
## 13. File I/O Conventions
|
||
|
||
All file write operations to critical configuration files must be **atomic** to prevent corruption if the process is killed mid-write.
|
||
|
||
### Atomic File Writes
|
||
|
||
Configuration files (e.g., fail2ban jail configs in `jail.d/`) are essential for system operation. A truncated or corrupt config file can break fail2ban's ability to reload and may disable active protection.
|
||
|
||
**Rule: Always use write-to-temp + atomic rename**
|
||
|
||
Never use `Path.write_text()` or `file.write()` directly for critical files. Instead:
|
||
|
||
1. Create a temporary file in the **same directory** as the target (crucial for atomic `os.replace()`).
|
||
2. Write content to the temp file.
|
||
3. Atomically rename the temp file to replace the target.
|
||
4. Clean up the temp file if an error occurs.
|
||
|
||
**Implementation Pattern:**
|
||
|
||
```python
|
||
import os
|
||
import tempfile
|
||
from pathlib import Path
|
||
|
||
target = Path("/path/to/config/file.conf")
|
||
|
||
tmp_name: str | None = None
|
||
try:
|
||
# Create temp file in target's directory (same filesystem = atomic)
|
||
with tempfile.NamedTemporaryFile(
|
||
mode="w",
|
||
encoding="utf-8",
|
||
dir=target.parent,
|
||
delete=False,
|
||
suffix=".tmp",
|
||
) as tmp:
|
||
tmp.write(content)
|
||
tmp_name = tmp.name
|
||
# Atomic rename (single syscall on POSIX systems)
|
||
os.replace(tmp_name, target)
|
||
except OSError as exc:
|
||
# Clean up temp file on error
|
||
with contextlib.suppress(OSError):
|
||
if tmp_name is not None:
|
||
os.unlink(tmp_name)
|
||
raise ConfigWriteError(f"Cannot write config: {exc}") from exc
|
||
```
|
||
|
||
**Why this matters:**
|
||
|
||
- `Path.write_text()` overwrites in place. If the process dies mid-write, the file is left truncated or partially written.
|
||
- `os.replace()` is atomic on POSIX systems (single rename syscall) **only if source and target are on the same filesystem**.
|
||
- Creating the temp file in `target.parent` ensures atomicity.
|
||
- On Linux containers, this prevents config corruption and service degradation.
|
||
|
||
**Files requiring atomic writes:**
|
||
|
||
- All config files under `jail.d/` (created/modified by `_write_conf_file` and `_create_conf_file`)
|
||
- Any critical state files that fail2ban relies on
|
||
|
||
**Examples in the codebase:**
|
||
|
||
- `app/services/config_file_helpers.py`: `_write_conf_file`, `_create_conf_file`
|
||
- `app/services/jail_config_service.py`: `_write_local_file_sync`, `_restore_local_file_sync`
|
||
|
||
---
|
||
|
||
## 14. Git & Workflow
|
||
|
||
- **Branch naming:** `feature/<short-description>`, `fix/<short-description>`, `chore/<short-description>`.
|
||
- **Commit messages:** imperative tense, max 72 chars first line (`Add jail reload endpoint`, `Fix ban history query`).
|
||
- Every merge request must pass: ruff, type checker, all tests.
|
||
- Do not merge with failing CI.
|
||
- Keep pull requests small and focused — one feature or fix per PR.
|
||
|
||
---
|
||
|
||
## 15. Coding Principles
|
||
|
||
These principles are **non-negotiable**. Every backend contributor must internalise and apply them daily.
|
||
|
||
### 15.1 Clean Code
|
||
|
||
- Write code that **reads like well-written prose** — a new developer should understand intent without asking.
|
||
- **Meaningful names** — variables, functions, and classes must reveal their purpose. Avoid abbreviations (`cnt`, `mgr`, `tmp`) unless universally understood.
|
||
- **Small functions** — each function does exactly one thing. If you need a comment to explain a block inside a function, extract it into its own function.
|
||
- **No magic numbers or strings** — use named constants.
|
||
- **Boy Scout Rule** — leave every file cleaner than you found it.
|
||
- **Avoid deep nesting** — prefer early returns (guard clauses) to keep the happy path at the top indentation level.
|
||
|
||
```python
|
||
# Good — guard clause, clear name, one job
|
||
async def get_active_ban(ip: str, jail: str) -> Ban:
|
||
ban: Ban | None = await repo.find_ban(ip=ip, jail=jail)
|
||
if ban is None:
|
||
raise BanNotFoundError(ip=ip, jail=jail)
|
||
if ban.is_expired():
|
||
raise BanExpiredError(ip=ip, jail=jail)
|
||
return ban
|
||
|
||
# Bad — nested, vague name
|
||
async def check(ip, j):
|
||
b = await repo.find_ban(ip=ip, jail=j)
|
||
if b:
|
||
if not b.is_expired():
|
||
return b
|
||
else:
|
||
raise Exception("expired")
|
||
else:
|
||
raise Exception("not found")
|
||
```
|
||
|
||
### 15.2 Separation of Concerns (SoC)
|
||
|
||
- Each module, class, and function must have a **single, well-defined responsibility**.
|
||
- **Routers** → HTTP layer only (parse requests, return responses).
|
||
- **Services** → business logic and orchestration.
|
||
- **Repositories** → data access and persistence.
|
||
- **Models** → data shapes and validation.
|
||
- **Tasks** → scheduled background jobs.
|
||
- Never mix layers — a router must not execute SQL, and a repository must not raise `HTTPException`.
|
||
|
||
### 15.3 Single Responsibility Principle (SRP)
|
||
|
||
- A class or module should have **one and only one reason to change**.
|
||
- If a service handles both ban management *and* email notifications, split it into `BanService` and `NotificationService`.
|
||
|
||
### 15.4 Don't Repeat Yourself (DRY)
|
||
|
||
- Extract shared logic into utility functions, base classes, or dependency providers.
|
||
- If the same block of code appears in more than one place, **refactor it** into a single source of truth.
|
||
- But don't over-abstract — premature DRY that couples unrelated features is worse than a little duplication (see **Rule of Three**: refactor when something appears a third time).
|
||
|
||
### 15.5 KISS — Keep It Simple, Stupid
|
||
|
||
- Choose the simplest solution that works correctly.
|
||
- Avoid clever tricks, premature optimisation, and over-engineering.
|
||
- If a standard library function does the job, prefer it over a custom implementation.
|
||
|
||
### 15.6 YAGNI — You Aren't Gonna Need It
|
||
|
||
- Do **not** build features, abstractions, or config options "just in case".
|
||
- Implement what is required **now**. Extend later when a real need emerges.
|
||
|
||
### 15.7 Dependency Inversion Principle (DIP)
|
||
|
||
- High-level modules (services) must not depend on low-level modules (repositories) directly. Both should depend on **abstractions** (protocols / interfaces).
|
||
- Use FastAPI's `Depends()` to inject implementations — this makes swapping and testing trivial.
|
||
|
||
```python
|
||
from typing import Protocol
|
||
|
||
class BanRepository(Protocol):
|
||
async def find_ban(self, ip: str, jail: str) -> Ban | None: ...
|
||
async def save_ban(self, ban: Ban) -> None: ...
|
||
|
||
class SqliteBanRepository:
|
||
"""Concrete implementation — depends on aiosqlite."""
|
||
async def find_ban(self, ip: str, jail: str) -> Ban | None: ...
|
||
async def save_ban(self, ban: Ban) -> None: ...
|
||
```
|
||
|
||
#### 13.7.1 Repository Module Pattern — Module-as-Protocol Structural Compatibility
|
||
|
||
BanGUI uses **module-level functions** for repository implementations, not classes. Each repository module (e.g., `session_repo.py`, `blocklist_repo.py`) exports async functions that match the signatures defined in the Protocol interface in `protocols.py`. This is a **structural typing pattern** — mypy accepts the module as a valid Protocol implementation because the function signatures match, *even though* the module is not explicitly annotated as implementing the Protocol.
|
||
|
||
This approach works correctly with FastAPI's dependency injection via `cast()`:
|
||
|
||
```python
|
||
# In app/repositories/session_repo.py
|
||
async def create_session(db: aiosqlite.Connection, token: str, created_at: str, expires_at: str) -> Session:
|
||
"""Insert a new session row."""
|
||
...
|
||
|
||
# In app/repositories/protocols.py
|
||
class SessionRepository(Protocol):
|
||
async def create_session(
|
||
self,
|
||
db: aiosqlite.Connection,
|
||
token: str,
|
||
created_at: str,
|
||
expires_at: str,
|
||
) -> Session:
|
||
...
|
||
|
||
# In app/dependencies.py
|
||
async def get_session_repo() -> SessionRepository:
|
||
"""Provide the concrete session repository implementation."""
|
||
from app.repositories import session_repo
|
||
return session_repo # ← mypy accepts this because the module has matching functions
|
||
```
|
||
|
||
**Why this pattern is used:**
|
||
- **Simplicity** — no boilerplate class/instance wrapping.
|
||
- **Compatibility** — Python's **structural typing** (PEP 544) means the module automatically satisfies the Protocol interface if function signatures match.
|
||
- **Testability** — the same DIP principle applies; services depend on the Protocol, not the module directly, so tests can mock the Protocol.
|
||
|
||
**Risks and mitigations:**
|
||
- **Silent breakage if function signatures change** — If a parameter is added or removed from a module function, the module no longer satisfies the Protocol, but mypy does not flag this as an error because the module is loosely coupled. To prevent this, **Protocol signatures in `protocols.py` are the source of truth**. Always check that module functions match the Protocol definitions before merging changes. The CI/CD pipeline validates this compatibility at build time.
|
||
|
||
**How the validation works (CI check):**
|
||
- Before each deployment, run `mypy --strict` to ensure all dependency providers return values compatible with their Protocol types.
|
||
- The `cast()` calls in `dependencies.py` are a documented signal that structural compatibility is being verified externally, not via explicit class inheritance.
|
||
|
||
#### 13.7.2 Session Cache Pluggability — Process-Local vs. Shared Backends
|
||
|
||
Session validation is expensive (SQLite lookup + password verification). To improve performance, **validated session tokens are cached** using the `SessionCache` interface (`app.utils.session_cache`). The default implementation, `InMemorySessionCache`, stores cached sessions in process-local memory.
|
||
|
||
**Current implementation (single-worker):**
|
||
|
||
```python
|
||
from app.utils.session_cache import SessionCache, InMemorySessionCache, NoOpSessionCache
|
||
|
||
class SessionCache(Protocol):
|
||
"""Interface for session token validation cache backends."""
|
||
def get(self, token: str) -> Session | None: ...
|
||
def set(self, token: str, session: Session, ttl_seconds: float) -> None: ...
|
||
def invalidate(self, token: str) -> None: ...
|
||
def clear(self) -> None: ...
|
||
|
||
# Default in-memory implementation — PROCESS-LOCAL
|
||
class InMemorySessionCache:
|
||
def __init__(self) -> None:
|
||
self._entries: dict[str, tuple[Session, float]] = {}
|
||
```
|
||
|
||
**Single-worker constraint:**
|
||
|
||
`InMemorySessionCache` is **process-local** — each worker process has its own dict. In single-worker mode (enforced by TASK-002), this is safe and improves performance. In multi-worker deployments:
|
||
- A logout by worker A clears the session from A's cache, but worker B still has it → logout doesn't work.
|
||
- Enabling/disabling the cache requires restarting all workers to take effect.
|
||
|
||
**Multi-worker solution:**
|
||
|
||
To support multiple workers (future enhancement), implement a shared backend behind the same `SessionCache` Protocol:
|
||
|
||
```python
|
||
# Example Redis implementation (not yet in codebase)
|
||
class RedisSessionCache:
|
||
"""Session cache backed by Redis."""
|
||
def __init__(self, redis_url: str) -> None:
|
||
self.client = aioredis.from_url(redis_url)
|
||
|
||
async def get(self, token: str) -> Session | None:
|
||
data = await self.client.get(f"session:{token}")
|
||
return Session.model_validate_json(data) if data else None
|
||
|
||
async def set(self, token: str, session: Session, ttl_seconds: float) -> None:
|
||
await self.client.setex(
|
||
f"session:{token}",
|
||
int(ttl_seconds),
|
||
session.model_dump_json()
|
||
)
|
||
|
||
async def invalidate(self, token: str) -> None:
|
||
await self.client.delete(f"session:{token}")
|
||
|
||
async def clear(self) -> None:
|
||
await self.client.flushdb()
|
||
```
|
||
|
||
To adopt a Redis backend:
|
||
1. Create `RedisSessionCache` in `app.utils.session_cache`.
|
||
2. Update `app.utils.runtime_state.set_runtime_settings()` to instantiate `RedisSessionCache` when `REDIS_URL` env var is set.
|
||
3. Update `app.config.Settings` to accept optional `REDIS_URL`.
|
||
4. Tests continue to use `InMemorySessionCache` (no Redis dependency in dev).
|
||
|
||
**Implementation rules:**
|
||
- All cache methods must be `async` (even if the backend is sync).
|
||
- Never log session tokens or session data.
|
||
- TTL must be respected — expired entries must be removed on access.
|
||
- See `app/utils/session_cache.py` for the full Protocol definition and current implementations.
|
||
|
||
### 15.8 Composition over Inheritance
|
||
|
||
- Favour **composing** small, focused objects over deep inheritance hierarchies.
|
||
- Use mixins or protocols only when a clear "is-a" relationship exists; otherwise, pass collaborators as constructor arguments.
|
||
|
||
### 15.9 Fail Fast
|
||
|
||
- Validate inputs as early as possible — at the API boundary with Pydantic, at service entry with assertions or domain checks.
|
||
- Raise specific exceptions immediately rather than letting bad data propagate silently.
|
||
|
||
### 15.10 Law of Demeter (Principle of Least Knowledge)
|
||
|
||
- A function should only call methods on:
|
||
1. Its own object (`self`).
|
||
2. Objects passed as parameters.
|
||
3. Objects it creates.
|
||
- Avoid long accessor chains like `request.state.db.cursor().execute(...)` — wrap them in a meaningful method.
|
||
|
||
### 15.11 Defensive Programming
|
||
|
||
- Never trust external input — validate and sanitise everything that crosses a boundary (HTTP request, file, socket, environment variable).
|
||
- Handle edge cases explicitly: empty lists, `None` values, negative numbers, empty strings.
|
||
- Use type narrowing and exhaustive pattern matching (`match` / `case`) to eliminate impossible states.
|
||
|
||
### 15.12 SSRF Prevention (Server-Side Request Forgery)
|
||
|
||
When user-supplied URLs are fetched by the backend, validate them before making any HTTP requests:
|
||
|
||
1. **Use Pydantic's `AnyHttpUrl` type** to restrict schemes to `http://` and `https://` only.
|
||
- Rejects `file://`, `ftp://`, `gopher://`, and other non-http schemes at the model boundary.
|
||
|
||
2. **Validate resolved IP addresses** before fetching:
|
||
- Parse the hostname and resolve it via DNS (using `socket.getaddrinfo()`).
|
||
- Use `ipaddress.ip_address().is_private` to reject private/reserved ranges:
|
||
- RFC 1918: `10.0.0.0/8`, `172.16.0.0/12`, `192.168.0.0/16`
|
||
- Loopback: `127.0.0.0/8`, `::1/128`
|
||
- Link-local: `169.254.0.0/16`, `fe80::/10`
|
||
- IPv6 site-local, multicast, and reserved ranges.
|
||
- Raise `ValueError` if validation fails; let the router convert it to HTTP 400.
|
||
|
||
3. **Guard against DNS rebinding**:
|
||
- Validate DNS at URL creation/validation time (performed during request deserialization).
|
||
- For additional safety, re-validate the connection IP at HTTP client time (e.g., custom `aiohttp.TCPConnector` can inspect the resolved address during connect).
|
||
|
||
4. **Example implementation** (see `backend/app/utils/ip_utils.py`):
|
||
- `is_private_ip(ip_str: str) → bool`: Checks if IP is private/reserved/loopback/link-local.
|
||
- `async validate_blocklist_url(url: AnyHttpUrl) → None`: Async DNS resolution + private IP check.
|
||
- Service layer calls `await validate_blocklist_url(url)` before persisting; router catches `ValueError` and returns 400.
|
||
|
||
---
|
||
|
||
## 16. Quick Reference — Do / Don't
|
||
|
||
| Do | Don't |
|
||
|---|---|
|
||
| Type every function, variable, return | Leave types implicit |
|
||
| Use `async def` for I/O | Use sync functions for I/O |
|
||
| Validate with Pydantic at the boundary | Pass raw dicts through the codebase |
|
||
| Log with structlog + context keys | Use `print()` or format strings in logs |
|
||
| Write tests for every feature | Ship untested code |
|
||
| Use `aiohttp` for HTTP calls | Use `requests` |
|
||
| Handle errors with custom exceptions | Use bare `except:` |
|
||
| Keep routers thin, logic in services | Put business logic in routers |
|
||
| Use `datetime.now(datetime.UTC)` | Use naive datetimes |
|
||
| Run ruff + mypy before committing | Push code that doesn't pass linting |
|
||
| Keep GET endpoints read-only (no `db.commit()`) | Call `db.commit()` / INSERT inside GET handlers |
|
||
| Batch DB writes; issue one `db.commit()` after the loop | Commit inside a loop (1 fsync per row) |
|
||
| Use `executemany()` for bulk inserts | Call `execute()` + `commit()` per row in a loop | |