feat: Stage 1 — backend and frontend scaffolding

Backend (tasks 1.1, 1.5–1.8):
- pyproject.toml with FastAPI, Pydantic v2, aiosqlite, APScheduler 3.x,
  structlog, bcrypt; ruff + mypy strict configured
- Pydantic Settings (BANGUI_ prefix env vars, fail-fast validation)
- SQLite schema: settings, sessions, blocklist_sources, import_log;
  WAL mode + foreign keys; idempotent init_db()
- FastAPI app factory with lifespan (DB, aiohttp session, scheduler),
  CORS, unhandled-exception handler, GET /api/health
- Fail2BanClient: async Unix-socket wrapper using run_in_executor,
  custom error types, async context manager
- Utility modules: ip_utils, time_utils, constants
- 47 tests; ruff 0 errors; mypy --strict 0 errors

Frontend (tasks 1.2–1.4):
- Vite + React 18 + TypeScript strict; Fluent UI v9; ESLint + Prettier
- Custom brand theme (#0F6CBD, WCAG AA contrast) with light/dark variants
- Typed fetch API client (ApiError, get/post/put/del) + endpoints constants
- tsc --noEmit 0 errors
This commit is contained in:
2026-02-28 21:15:01 +01:00
parent 460d877339
commit 7392c930d6
59 changed files with 7601 additions and 17 deletions

View File

@@ -0,0 +1 @@
"""Pydantic request/response/domain models package."""

View File

@@ -0,0 +1,46 @@
"""Authentication Pydantic models.
Request, response, and domain models used by the auth router and service.
"""
from pydantic import BaseModel, ConfigDict, Field
class LoginRequest(BaseModel):
"""Payload for ``POST /api/auth/login``."""
model_config = ConfigDict(strict=True)
password: str = Field(..., description="Master password to authenticate with.")
class LoginResponse(BaseModel):
"""Successful login response.
The session token is also set as an ``HttpOnly`` cookie by the router.
This model documents the JSON body for API-first consumers.
"""
model_config = ConfigDict(strict=True)
token: str = Field(..., description="Session token for use in subsequent requests.")
expires_at: str = Field(..., description="ISO 8601 UTC expiry timestamp.")
class LogoutResponse(BaseModel):
"""Response body for ``POST /api/auth/logout``."""
model_config = ConfigDict(strict=True)
message: str = Field(default="Logged out successfully.")
class Session(BaseModel):
"""Internal domain model representing a persisted session record."""
model_config = ConfigDict(strict=True)
id: int = Field(..., description="Auto-incremented row ID.")
token: str = Field(..., description="Opaque session token.")
created_at: str = Field(..., description="ISO 8601 UTC creation timestamp.")
expires_at: str = Field(..., description="ISO 8601 UTC expiry timestamp.")

91
backend/app/models/ban.py Normal file
View File

@@ -0,0 +1,91 @@
"""Ban management Pydantic models.
Request, response, and domain models used by the ban router and service.
"""
from pydantic import BaseModel, ConfigDict, Field
class BanRequest(BaseModel):
"""Payload for ``POST /api/bans`` (ban an IP)."""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="IP address to ban.")
jail: str = Field(..., description="Jail in which to apply the ban.")
class UnbanRequest(BaseModel):
"""Payload for ``DELETE /api/bans`` (unban an IP)."""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="IP address to unban.")
jail: str | None = Field(
default=None,
description="Jail to remove the ban from. ``null`` means all jails.",
)
unban_all: bool = Field(
default=False,
description="When ``true`` the IP is unbanned from every jail.",
)
class Ban(BaseModel):
"""Domain model representing a single active or historical ban record."""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="Banned IP address.")
jail: str = Field(..., description="Jail that issued the ban.")
banned_at: str = Field(..., description="ISO 8601 UTC timestamp of the ban.")
expires_at: str | None = Field(
default=None,
description="ISO 8601 UTC expiry timestamp, or ``null`` if permanent.",
)
ban_count: int = Field(..., ge=1, description="Number of times this IP was banned.")
country: str | None = Field(
default=None,
description="ISO 3166-1 alpha-2 country code resolved from the IP.",
)
class BanResponse(BaseModel):
"""Response containing a single ban record."""
model_config = ConfigDict(strict=True)
ban: Ban
class BanListResponse(BaseModel):
"""Paginated list of ban records."""
model_config = ConfigDict(strict=True)
bans: list[Ban] = Field(default_factory=list)
total: int = Field(..., ge=0, description="Total number of matching records.")
class ActiveBan(BaseModel):
"""A currently active ban entry returned by ``GET /api/bans/active``."""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="Banned IP address.")
jail: str = Field(..., description="Jail holding the ban.")
banned_at: str = Field(..., description="ISO 8601 UTC start of the ban.")
expires_at: str | None = Field(
default=None,
description="ISO 8601 UTC expiry, or ``null`` if permanent.",
)
ban_count: int = Field(..., ge=1, description="Running ban count for this IP.")
class ActiveBanListResponse(BaseModel):
"""List of all currently active bans across all jails."""
model_config = ConfigDict(strict=True)
bans: list[ActiveBan] = Field(default_factory=list)
total: int = Field(..., ge=0)

View File

@@ -0,0 +1,84 @@
"""Blocklist source and import log Pydantic models."""
from pydantic import BaseModel, ConfigDict, Field
class BlocklistSource(BaseModel):
"""Domain model for a blocklist source definition."""
model_config = ConfigDict(strict=True)
id: int
name: str
url: str
enabled: bool
created_at: str
updated_at: str
class BlocklistSourceCreate(BaseModel):
"""Payload for ``POST /api/blocklists``."""
model_config = ConfigDict(strict=True)
name: str = Field(..., min_length=1, description="Human-readable source name.")
url: str = Field(..., description="URL of the blocklist file.")
enabled: bool = Field(default=True)
class BlocklistSourceUpdate(BaseModel):
"""Payload for ``PUT /api/blocklists/{id}``."""
model_config = ConfigDict(strict=True)
name: str | None = Field(default=None, min_length=1)
url: str | None = Field(default=None)
enabled: bool | None = Field(default=None)
class ImportLogEntry(BaseModel):
"""A single blocklist import run record."""
model_config = ConfigDict(strict=True)
id: int
source_id: int | None
source_url: str
timestamp: str
ips_imported: int
ips_skipped: int
errors: str | None
class BlocklistListResponse(BaseModel):
"""Response for ``GET /api/blocklists``."""
model_config = ConfigDict(strict=True)
sources: list[BlocklistSource] = Field(default_factory=list)
class ImportLogListResponse(BaseModel):
"""Response for ``GET /api/blocklists/log``."""
model_config = ConfigDict(strict=True)
entries: list[ImportLogEntry] = Field(default_factory=list)
total: int = Field(..., ge=0)
class BlocklistSchedule(BaseModel):
"""Current import schedule and next run information."""
model_config = ConfigDict(strict=True)
hour: int = Field(..., ge=0, le=23, description="UTC hour for the daily import.")
next_run_at: str | None = Field(default=None, description="ISO 8601 UTC timestamp of the next scheduled import.")
class BlocklistScheduleUpdate(BaseModel):
"""Payload for ``PUT /api/blocklists/schedule``."""
model_config = ConfigDict(strict=True)
hour: int = Field(..., ge=0, le=23)

View File

@@ -0,0 +1,57 @@
"""Configuration view/edit Pydantic models.
Request, response, and domain models for the config router and service.
"""
from pydantic import BaseModel, ConfigDict, Field
class JailConfigUpdate(BaseModel):
"""Payload for ``PUT /api/config/jails/{name}``."""
model_config = ConfigDict(strict=True)
ban_time: int | None = Field(default=None, description="Ban duration in seconds. -1 for permanent.")
max_retry: int | None = Field(default=None, ge=1)
find_time: int | None = Field(default=None, ge=1)
fail_regex: list[str] | None = Field(default=None, description="Failure detection regex patterns.")
ignore_regex: list[str] | None = Field(default=None)
date_pattern: str | None = Field(default=None)
dns_mode: str | None = Field(default=None, description="DNS lookup mode: raw | warn | no.")
enabled: bool | None = Field(default=None)
class RegexTestRequest(BaseModel):
"""Payload for ``POST /api/config/regex-test``."""
model_config = ConfigDict(strict=True)
log_line: str = Field(..., description="Sample log line to test against.")
fail_regex: str = Field(..., description="Regex pattern to match.")
class RegexTestResponse(BaseModel):
"""Result of a regex test."""
model_config = ConfigDict(strict=True)
matched: bool = Field(..., description="Whether the pattern matched the log line.")
groups: list[str] = Field(
default_factory=list,
description="Named groups captured by a successful match.",
)
error: str | None = Field(
default=None,
description="Compilation error message if the regex is invalid.",
)
class GlobalConfigResponse(BaseModel):
"""Response for ``GET /api/config/global``."""
model_config = ConfigDict(strict=True)
log_level: str
log_target: str
db_purge_age: int = Field(..., description="Seconds after which ban records are purged from the fail2ban DB.")
db_max_matches: int = Field(..., description="Maximum stored log-line matches per ban record.")

View File

@@ -0,0 +1,45 @@
"""Ban history Pydantic models."""
from pydantic import BaseModel, ConfigDict, Field
class HistoryEntry(BaseModel):
"""A single historical ban record from the fail2ban database."""
model_config = ConfigDict(strict=True)
ip: str
jail: str
banned_at: str = Field(..., description="ISO 8601 UTC timestamp of the ban.")
released_at: str | None = Field(default=None, description="ISO 8601 UTC timestamp when the ban expired.")
ban_count: int = Field(..., ge=1, description="Total number of times this IP was banned.")
country: str | None = None
matched_lines: list[str] = Field(default_factory=list)
class IpTimeline(BaseModel):
"""Per-IP ban history timeline."""
model_config = ConfigDict(strict=True)
ip: str
total_bans: int = Field(..., ge=0)
total_failures: int = Field(..., ge=0)
events: list[HistoryEntry] = Field(default_factory=list)
class HistoryListResponse(BaseModel):
"""Paginated response for ``GET /api/history``."""
model_config = ConfigDict(strict=True)
entries: list[HistoryEntry] = Field(default_factory=list)
total: int = Field(..., ge=0)
class IpHistoryResponse(BaseModel):
"""Response for ``GET /api/history/{ip}``."""
model_config = ConfigDict(strict=True)
timeline: IpTimeline

View File

@@ -0,0 +1,89 @@
"""Jail management Pydantic models.
Request, response, and domain models used by the jails router and service.
"""
from pydantic import BaseModel, ConfigDict, Field
class JailStatus(BaseModel):
"""Runtime metrics for a single jail."""
model_config = ConfigDict(strict=True)
currently_banned: int = Field(..., ge=0)
total_banned: int = Field(..., ge=0)
currently_failed: int = Field(..., ge=0)
total_failed: int = Field(..., ge=0)
class Jail(BaseModel):
"""Domain model for a single fail2ban jail with its full configuration."""
model_config = ConfigDict(strict=True)
name: str = Field(..., description="Jail name as configured in fail2ban.")
enabled: bool = Field(..., description="Whether the jail is currently active.")
running: bool = Field(..., description="Whether the jail backend is running.")
idle: bool = Field(default=False, description="Whether the jail is in idle mode.")
backend: str = Field(..., description="Log monitoring backend (e.g. polling, systemd).")
log_paths: list[str] = Field(default_factory=list, description="Monitored log files.")
fail_regex: list[str] = Field(default_factory=list, description="Failure detection regex patterns.")
ignore_regex: list[str] = Field(default_factory=list, description="Regex patterns that bypass the ban logic.")
ignore_ips: list[str] = Field(default_factory=list, description="IP addresses or CIDRs on the ignore list.")
date_pattern: str | None = Field(default=None, description="Custom date pattern for log parsing.")
log_encoding: str = Field(default="UTF-8", description="Log file encoding.")
find_time: int = Field(..., description="Time window (seconds) for counting failures.")
ban_time: int = Field(..., description="Duration (seconds) of a ban. -1 means permanent.")
max_retry: int = Field(..., description="Number of failures before a ban is issued.")
status: JailStatus | None = Field(default=None, description="Runtime counters.")
class JailSummary(BaseModel):
"""Lightweight jail entry for the overview list."""
model_config = ConfigDict(strict=True)
name: str
enabled: bool
running: bool
idle: bool
backend: str
find_time: int
ban_time: int
max_retry: int
status: JailStatus | None = None
class JailListResponse(BaseModel):
"""Response for ``GET /api/jails``."""
model_config = ConfigDict(strict=True)
jails: list[JailSummary] = Field(default_factory=list)
total: int = Field(..., ge=0)
class JailDetailResponse(BaseModel):
"""Response for ``GET /api/jails/{name}``."""
model_config = ConfigDict(strict=True)
jail: Jail
class JailCommandResponse(BaseModel):
"""Generic response for jail control commands (start, stop, reload, idle)."""
model_config = ConfigDict(strict=True)
message: str
jail: str
class IgnoreIpRequest(BaseModel):
"""Payload for adding an IP or network to a jail's ignore list."""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="IP address or CIDR network to ignore.")

View File

@@ -0,0 +1,58 @@
"""Server status and health-check Pydantic models.
Used by the dashboard router, health service, and server settings router.
"""
from pydantic import BaseModel, ConfigDict, Field
class ServerStatus(BaseModel):
"""Cached fail2ban server health snapshot."""
model_config = ConfigDict(strict=True)
online: bool = Field(..., description="Whether fail2ban is reachable via its socket.")
version: str | None = Field(default=None, description="fail2ban version string.")
active_jails: int = Field(default=0, ge=0, description="Number of currently active jails.")
total_bans: int = Field(default=0, ge=0, description="Aggregated current ban count across all jails.")
total_failures: int = Field(default=0, ge=0, description="Aggregated current failure count across all jails.")
class ServerStatusResponse(BaseModel):
"""Response for ``GET /api/dashboard/status``."""
model_config = ConfigDict(strict=True)
status: ServerStatus
class ServerSettings(BaseModel):
"""Domain model for fail2ban server-level settings."""
model_config = ConfigDict(strict=True)
log_level: str = Field(..., description="fail2ban daemon log level.")
log_target: str = Field(..., description="Log destination: STDOUT, STDERR, SYSLOG, or a file path.")
syslog_socket: str | None = Field(default=None)
db_path: str = Field(..., description="Path to the fail2ban ban history database.")
db_purge_age: int = Field(..., description="Seconds before old records are purged.")
db_max_matches: int = Field(..., description="Maximum stored matches per ban record.")
class ServerSettingsUpdate(BaseModel):
"""Payload for ``PUT /api/server/settings``."""
model_config = ConfigDict(strict=True)
log_level: str | None = Field(default=None)
log_target: str | None = Field(default=None)
db_purge_age: int | None = Field(default=None, ge=0)
db_max_matches: int | None = Field(default=None, ge=0)
class ServerSettingsResponse(BaseModel):
"""Response for ``GET /api/server/settings``."""
model_config = ConfigDict(strict=True)
settings: ServerSettings

View File

@@ -0,0 +1,56 @@
"""Setup wizard Pydantic models.
Request, response, and domain models for the first-run configuration wizard.
"""
from pydantic import BaseModel, ConfigDict, Field
class SetupRequest(BaseModel):
"""Payload for ``POST /api/setup``."""
model_config = ConfigDict(strict=True)
master_password: str = Field(
...,
min_length=8,
description="Master password that protects the BanGUI interface.",
)
database_path: str = Field(
default="bangui.db",
description="Filesystem path to the BanGUI SQLite application database.",
)
fail2ban_socket: str = Field(
default="/var/run/fail2ban/fail2ban.sock",
description="Path to the fail2ban Unix domain socket.",
)
timezone: str = Field(
default="UTC",
description="IANA timezone name used when displaying timestamps.",
)
session_duration_minutes: int = Field(
default=60,
ge=1,
description="Number of minutes a user session remains valid.",
)
class SetupResponse(BaseModel):
"""Response returned after a successful initial setup."""
model_config = ConfigDict(strict=True)
message: str = Field(
default="Setup completed successfully. Please log in.",
)
class SetupStatusResponse(BaseModel):
"""Response indicating whether setup has been completed."""
model_config = ConfigDict(strict=True)
completed: bool = Field(
...,
description="``True`` if the initial setup has already been performed.",
)