"""Seed 10 000 synthetic bans into the fail2ban dev database. Usage:: cd backend python tests/scripts/seed_10k_bans.py [--db-path /path/to/fail2ban.sqlite3] This script inserts 10 000 synthetic ban rows spread over the last 365 days into the fail2ban SQLite database and pre-resolves all synthetic IPs into the BanGUI geo_cache. Run it once to get realistic dashboard and map load times in the browser without requiring a live fail2ban instance with active traffic. .. warning:: This script **writes** to the fail2ban database. Only use it against the development database (``Docker/fail2ban-dev-config/fail2ban.sqlite3`` or equivalent). Never run it against a production database. """ from __future__ import annotations import argparse import logging import random import sqlite3 import sys import time from pathlib import Path log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Default paths # --------------------------------------------------------------------------- _DEFAULT_F2B_DB: str = str( Path(__file__).resolve().parents[3] / "Docker" / "fail2ban-dev-config" / "fail2ban.sqlite3" ) _DEFAULT_APP_DB: str = str( Path(__file__).resolve().parents[2] / "bangui.db" ) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- _BAN_COUNT: int = 10_000 _YEAR_SECONDS: int = 365 * 24 * 3600 _JAIL_POOL: list[str] = ["sshd", "nginx", "blocklist-import", "postfix", "dovecot"] _COUNTRY_POOL: list[tuple[str, str]] = [ ("DE", "Germany"), ("US", "United States"), ("CN", "China"), ("RU", "Russia"), ("FR", "France"), ("BR", "Brazil"), ("IN", "India"), ("GB", "United Kingdom"), ("NL", "Netherlands"), ("CA", "Canada"), ] # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _random_ip() -> str: """Return a random dotted-decimal IPv4 string in public ranges.""" return ".".join(str(random.randint(1, 254)) for _ in range(4)) def _seed_bans(f2b_db_path: str) -> list[str]: """Insert 10 000 synthetic ban rows into the fail2ban SQLite database. Uses the synchronous ``sqlite3`` module because fail2ban itself uses synchronous writes and the schema is straightforward. Args: f2b_db_path: Filesystem path to the fail2ban SQLite database. Returns: List of all IP addresses inserted. """ now = int(time.time()) ips: list[str] = [_random_ip() for _ in range(_BAN_COUNT)] rows = [ ( random.choice(_JAIL_POOL), ip, now - random.randint(0, _YEAR_SECONDS), 3600, random.randint(1, 10), None, ) for ip in ips ] with sqlite3.connect(f2b_db_path) as con: # Ensure the bans table exists (for dev environments where fail2ban # may not have created it yet). con.execute( "CREATE TABLE IF NOT EXISTS bans (" "jail TEXT NOT NULL, " "ip TEXT, " "timeofban INTEGER NOT NULL, " "bantime INTEGER NOT NULL DEFAULT 3600, " "bancount INTEGER NOT NULL DEFAULT 1, " "data JSON" ")" ) con.executemany( "INSERT INTO bans (jail, ip, timeofban, bantime, bancount, data) " "VALUES (?, ?, ?, ?, ?, ?)", rows, ) con.commit() log.info("Inserted %d ban rows into %s", _BAN_COUNT, f2b_db_path) return ips def _seed_geo_cache(app_db_path: str, ips: list[str]) -> None: """Pre-populate the BanGUI geo_cache table for all inserted IPs. Assigns synthetic country data cycling through :data:`_COUNTRY_POOL` so the world map shows a realistic distribution of countries without making any real HTTP requests. Args: app_db_path: Filesystem path to the BanGUI application database. ips: List of IP addresses to pre-cache. """ country_cycle = _COUNTRY_POOL * (len(ips) // len(_COUNTRY_POOL) + 1) rows = [ (ip, cc, cn, f"AS{1000 + i % 500}", f"Synthetic ISP {i % 50}") for i, (ip, (cc, cn)) in enumerate(zip(ips, country_cycle, strict=False)) ] with sqlite3.connect(app_db_path) as con: con.execute( "CREATE TABLE IF NOT EXISTS geo_cache (" "ip TEXT PRIMARY KEY, " "country_code TEXT, " "country_name TEXT, " "asn TEXT, " "org TEXT, " "cached_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))" ")" ) con.executemany( """ INSERT INTO geo_cache (ip, country_code, country_name, asn, org) VALUES (?, ?, ?, ?, ?) ON CONFLICT(ip) DO UPDATE SET country_code = excluded.country_code, country_name = excluded.country_name, asn = excluded.asn, org = excluded.org """, rows, ) con.commit() log.info("Pre-cached geo data for %d IPs in %s", len(ips), app_db_path) # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main() -> None: """Parse CLI arguments and run the seed operation.""" logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") parser = argparse.ArgumentParser( description="Seed 10 000 synthetic bans for performance testing." ) parser.add_argument( "--f2b-db", default=_DEFAULT_F2B_DB, help=f"Path to the fail2ban SQLite database (default: {_DEFAULT_F2B_DB})", ) parser.add_argument( "--app-db", default=_DEFAULT_APP_DB, help=f"Path to the BanGUI application database (default: {_DEFAULT_APP_DB})", ) args = parser.parse_args() f2b_path = Path(args.f2b_db) app_path = Path(args.app_db) if not f2b_path.parent.exists(): log.error("fail2ban DB directory does not exist: %s", f2b_path.parent) sys.exit(1) if not app_path.parent.exists(): log.error("App DB directory does not exist: %s", app_path.parent) sys.exit(1) log.info("Seeding %d bans into: %s", _BAN_COUNT, f2b_path) ips = _seed_bans(str(f2b_path)) log.info("Pre-caching geo data into: %s", app_path) _seed_geo_cache(str(app_path), ips) log.info("Done. Restart the BanGUI backend to load the new geo cache entries.") if __name__ == "__main__": main()