Compare commits
5 Commits
a30b92471a
...
ac4fd967aa
| Author | SHA1 | Date | |
|---|---|---|---|
| ac4fd967aa | |||
| 9f05da2d4d | |||
| 876af46955 | |||
| 0d4a2a3311 | |||
| f555b1b0a2 |
@@ -259,6 +259,17 @@ A view for exploring historical ban data stored in the fail2ban database.
|
|||||||
- Select any IP to see its full ban timeline: every ban event, which jail triggered it, when it started, and how long it lasted.
|
- Select any IP to see its full ban timeline: every ban event, which jail triggered it, when it started, and how long it lasted.
|
||||||
- Merged view showing total failures and matched log lines aggregated across all bans for that IP.
|
- Merged view showing total failures and matched log lines aggregated across all bans for that IP.
|
||||||
|
|
||||||
|
### Persistent Historical Archive
|
||||||
|
|
||||||
|
- BanGUI stores a separate long-term historical ban archive in its own application database, independent from fail2ban's database retention settings.
|
||||||
|
- On each configured sync cycle (default every 5 minutes), BanGUI reads latest entries from fail2ban `bans` table and appends any new events to BanGUI history storage.
|
||||||
|
- Supports both `ban` and `unban` events; audit record includes: `timestamp`, `ip`, `jail`, `action`, `duration`, `origin` (manual, auto, blocklist, etc.), `failures`, `matches`, and optional `country` / `ASN` enrichment.
|
||||||
|
- Includes incremental import logic with dedupe: using unique constraint on (ip, jail, action, timeofban) to prevent duplication across sync cycles.
|
||||||
|
- Provides backfill mode for initial startup: import last N days (configurable, default 7 days) of existing fail2ban history into BanGUI to avoid dark gaps after restart.
|
||||||
|
- Includes configurable archive purge policy in BanGUI (default 365 days), separate from fail2ban `dbpurgeage`, to keep app storage bounded while preserving audit data.
|
||||||
|
- Expose API endpoints for querying persistent history, with filters for timeframe, jail, origin, IP, and current ban status.
|
||||||
|
- On fail2ban connectivity failure, BanGUI continues serving historical data; next successful sync resumes ingestion without data loss.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 8. External Blocklist Importer
|
## 8. External Blocklist Importer
|
||||||
|
|||||||
@@ -8,6 +8,48 @@ Reference: `Docs/Refactoring.md` for full analysis of each issue.
|
|||||||
|
|
||||||
## Open Issues
|
## Open Issues
|
||||||
|
|
||||||
|
1. Ban history durability and fail2ban DB size management
|
||||||
|
- status: completed
|
||||||
|
- description: BanGUI currently reads fail2ban history directly, but fail2ban's `dbpurgeage` may erase old history and can cause DB growth issues with long retention. Implement a BanGUI-native persistent archive and keep fail2ban DB short-lived.
|
||||||
|
- acceptance criteria:
|
||||||
|
- BanGUI can configure and fetch fail2ban `dbpurgeage` and `dbfile` from server API.
|
||||||
|
- Introduce periodic sync job that reads new fail2ban ban/unban events and writes them to BanGUI app DB.
|
||||||
|
- Use dedupe logic to avoid duplicate entries (unique constraint by `ip,jail,action,timestamp`).
|
||||||
|
- Add persistence policy settings (default 365 days) in UI and server config.
|
||||||
|
- Add backfill workflow on startup for last 7 days if archive empty.
|
||||||
|
- Existing history API endpoints must support both a `source` filter (`fail2ban`, `archive`) and time range.
|
||||||
|
- implementation notes:
|
||||||
|
- Add repository methods `archive_ban_event`, `get_archived_history(...)`, `purge_archived_history(age_seconds)`.
|
||||||
|
- Add periodic task in `backend/app/tasks/history_sync.py` triggered by scheduler.
|
||||||
|
- Extend `Backend/app/routers/history.py` to include endpoint `/api/history/archive`.
|
||||||
|
|
||||||
|
2. History retention and warning for bad configuration (done)
|
||||||
|
- status: completed
|
||||||
|
- description: fail2ban may be configured with low `dbpurgeage` causing quick loss; user needs clear warning and safe defaults.
|
||||||
|
- acceptance criteria:
|
||||||
|
- On server settings load, if `dbpurgeage` < 86400, expose warning state in API.
|
||||||
|
- UI displays warning banner: "Current fail2ban purge age is under 24h; detailed history may be lost.".
|
||||||
|
- Allow user to increase `dbpurgeage` through server settings panel; sync to fail2ban using `set dbpurgeage`.
|
||||||
|
- Add tests for server service response and UI warning logic.
|
||||||
|
|
||||||
|
3. History access from existing BanGUI features
|
||||||
|
- status: completed
|
||||||
|
- description: Doors for dashboard and map data should use archived history to avoid data gaps.
|
||||||
|
- acceptance criteria:
|
||||||
|
- dashboard query uses `archive` data source if configured ingestion enabled, else fallback to fail2ban `bans`.
|
||||||
|
- world map grouping includes archived data and can aggregate `count` with timeframe filters.
|
||||||
|
- API and UI unit tests verify data source fallback.
|
||||||
|
|
||||||
|
4. Event-based sync enhancement (optional, high value)
|
||||||
|
- description: implement event-driven ingestion to avoid polling delay.
|
||||||
|
- acceptance criteria:
|
||||||
|
- Add fail2ban hook or systemd journal watcher to capture ban/unban events in real time.
|
||||||
|
- Recorded events store to BanGUI archive in transaction-safe manner.
|
||||||
|
- Add validation for event integrity and order.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -75,6 +75,20 @@ CREATE TABLE IF NOT EXISTS geo_cache (
|
|||||||
);
|
);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
_CREATE_HISTORY_ARCHIVE: str = """
|
||||||
|
CREATE TABLE IF NOT EXISTS history_archive (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
jail TEXT NOT NULL,
|
||||||
|
ip TEXT NOT NULL,
|
||||||
|
timeofban INTEGER NOT NULL,
|
||||||
|
bancount INTEGER NOT NULL,
|
||||||
|
data TEXT NOT NULL,
|
||||||
|
action TEXT NOT NULL CHECK(action IN ('ban', 'unban')),
|
||||||
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
|
||||||
|
UNIQUE(ip, jail, action, timeofban)
|
||||||
|
);
|
||||||
|
"""
|
||||||
|
|
||||||
# Ordered list of DDL statements to execute on initialisation.
|
# Ordered list of DDL statements to execute on initialisation.
|
||||||
_SCHEMA_STATEMENTS: list[str] = [
|
_SCHEMA_STATEMENTS: list[str] = [
|
||||||
_CREATE_SETTINGS,
|
_CREATE_SETTINGS,
|
||||||
@@ -83,6 +97,7 @@ _SCHEMA_STATEMENTS: list[str] = [
|
|||||||
_CREATE_BLOCKLIST_SOURCES,
|
_CREATE_BLOCKLIST_SOURCES,
|
||||||
_CREATE_IMPORT_LOG,
|
_CREATE_IMPORT_LOG,
|
||||||
_CREATE_GEO_CACHE,
|
_CREATE_GEO_CACHE,
|
||||||
|
_CREATE_HISTORY_ARCHIVE,
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ from app.routers import (
|
|||||||
server,
|
server,
|
||||||
setup,
|
setup,
|
||||||
)
|
)
|
||||||
from app.tasks import blocklist_import, geo_cache_flush, geo_re_resolve, health_check
|
from app.tasks import blocklist_import, geo_cache_flush, geo_re_resolve, health_check, history_sync
|
||||||
from app.utils.fail2ban_client import Fail2BanConnectionError, Fail2BanProtocolError
|
from app.utils.fail2ban_client import Fail2BanConnectionError, Fail2BanProtocolError
|
||||||
from app.utils.jail_config import ensure_jail_configs
|
from app.utils.jail_config import ensure_jail_configs
|
||||||
|
|
||||||
@@ -183,6 +183,9 @@ async def _lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
|||||||
# --- Periodic re-resolve of NULL-country geo entries ---
|
# --- Periodic re-resolve of NULL-country geo entries ---
|
||||||
geo_re_resolve.register(app)
|
geo_re_resolve.register(app)
|
||||||
|
|
||||||
|
# --- Periodic history sync from fail2ban into BanGUI archive ---
|
||||||
|
history_sync.register(app)
|
||||||
|
|
||||||
log.info("bangui_started")
|
log.info("bangui_started")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -56,3 +56,7 @@ class ServerSettingsResponse(BaseModel):
|
|||||||
model_config = ConfigDict(strict=True)
|
model_config = ConfigDict(strict=True)
|
||||||
|
|
||||||
settings: ServerSettings
|
settings: ServerSettings
|
||||||
|
warnings: dict[str, bool] = Field(
|
||||||
|
default_factory=dict,
|
||||||
|
description="Warnings highlighting potentially unsafe settings.",
|
||||||
|
)
|
||||||
|
|||||||
148
backend/app/repositories/history_archive_repo.py
Normal file
148
backend/app/repositories/history_archive_repo.py
Normal file
@@ -0,0 +1,148 @@
|
|||||||
|
"""Ban history archive repository.
|
||||||
|
|
||||||
|
Provides persistence APIs for the BanGUI archival history table in the
|
||||||
|
application database.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from app.models.ban import BLOCKLIST_JAIL, BanOrigin
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
import aiosqlite
|
||||||
|
|
||||||
|
|
||||||
|
async def archive_ban_event(
|
||||||
|
db: aiosqlite.Connection,
|
||||||
|
jail: str,
|
||||||
|
ip: str,
|
||||||
|
timeofban: int,
|
||||||
|
bancount: int,
|
||||||
|
data: str,
|
||||||
|
action: str = "ban",
|
||||||
|
) -> bool:
|
||||||
|
"""Insert a new archived ban/unban event, ignoring duplicates."""
|
||||||
|
async with db.execute(
|
||||||
|
"""INSERT OR IGNORE INTO history_archive
|
||||||
|
(jail, ip, timeofban, bancount, data, action)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?)""",
|
||||||
|
(jail, ip, timeofban, bancount, data, action),
|
||||||
|
) as cursor:
|
||||||
|
inserted = cursor.rowcount == 1
|
||||||
|
await db.commit()
|
||||||
|
return inserted
|
||||||
|
|
||||||
|
|
||||||
|
async def get_archived_history(
|
||||||
|
db: aiosqlite.Connection,
|
||||||
|
since: int | None = None,
|
||||||
|
jail: str | None = None,
|
||||||
|
ip_filter: str | None = None,
|
||||||
|
origin: BanOrigin | None = None,
|
||||||
|
action: str | None = None,
|
||||||
|
page: int = 1,
|
||||||
|
page_size: int = 100,
|
||||||
|
) -> tuple[list[dict], int]:
|
||||||
|
"""Return a paginated archived history result set."""
|
||||||
|
wheres: list[str] = []
|
||||||
|
params: list[object] = []
|
||||||
|
|
||||||
|
if since is not None:
|
||||||
|
wheres.append("timeofban >= ?")
|
||||||
|
params.append(since)
|
||||||
|
|
||||||
|
if jail is not None:
|
||||||
|
wheres.append("jail = ?")
|
||||||
|
params.append(jail)
|
||||||
|
|
||||||
|
if ip_filter is not None:
|
||||||
|
wheres.append("ip LIKE ?")
|
||||||
|
params.append(f"{ip_filter}%")
|
||||||
|
|
||||||
|
if origin == "blocklist":
|
||||||
|
wheres.append("jail = ?")
|
||||||
|
params.append(BLOCKLIST_JAIL)
|
||||||
|
elif origin == "selfblock":
|
||||||
|
wheres.append("jail != ?")
|
||||||
|
params.append(BLOCKLIST_JAIL)
|
||||||
|
|
||||||
|
if action is not None:
|
||||||
|
wheres.append("action = ?")
|
||||||
|
params.append(action)
|
||||||
|
|
||||||
|
where_sql = "WHERE " + " AND ".join(wheres) if wheres else ""
|
||||||
|
offset = (page - 1) * page_size
|
||||||
|
|
||||||
|
async with db.execute(f"SELECT COUNT(*) FROM history_archive {where_sql}", params) as cur:
|
||||||
|
row = await cur.fetchone()
|
||||||
|
total = int(row[0]) if row is not None and row[0] is not None else 0
|
||||||
|
|
||||||
|
async with db.execute(
|
||||||
|
"SELECT jail, ip, timeofban, bancount, data, action "
|
||||||
|
"FROM history_archive "
|
||||||
|
f"{where_sql} "
|
||||||
|
"ORDER BY timeofban DESC LIMIT ? OFFSET ?",
|
||||||
|
[*params, page_size, offset],
|
||||||
|
) as cur:
|
||||||
|
rows = await cur.fetchall()
|
||||||
|
|
||||||
|
records = [
|
||||||
|
{
|
||||||
|
"jail": str(r[0]),
|
||||||
|
"ip": str(r[1]),
|
||||||
|
"timeofban": int(r[2]),
|
||||||
|
"bancount": int(r[3]),
|
||||||
|
"data": str(r[4]),
|
||||||
|
"action": str(r[5]),
|
||||||
|
}
|
||||||
|
for r in rows
|
||||||
|
]
|
||||||
|
|
||||||
|
return records, total
|
||||||
|
|
||||||
|
|
||||||
|
async def get_all_archived_history(
|
||||||
|
db: aiosqlite.Connection,
|
||||||
|
since: int | None = None,
|
||||||
|
jail: str | None = None,
|
||||||
|
ip_filter: str | None = None,
|
||||||
|
origin: BanOrigin | None = None,
|
||||||
|
action: str | None = None,
|
||||||
|
) -> list[dict]:
|
||||||
|
"""Return all archived history rows for the given filters."""
|
||||||
|
page: int = 1
|
||||||
|
page_size: int = 500
|
||||||
|
all_rows: list[dict] = []
|
||||||
|
|
||||||
|
while True:
|
||||||
|
rows, total = await get_archived_history(
|
||||||
|
db=db,
|
||||||
|
since=since,
|
||||||
|
jail=jail,
|
||||||
|
ip_filter=ip_filter,
|
||||||
|
origin=origin,
|
||||||
|
action=action,
|
||||||
|
page=page,
|
||||||
|
page_size=page_size,
|
||||||
|
)
|
||||||
|
all_rows.extend(rows)
|
||||||
|
if len(rows) < page_size:
|
||||||
|
break
|
||||||
|
page += 1
|
||||||
|
|
||||||
|
return all_rows
|
||||||
|
|
||||||
|
|
||||||
|
async def purge_archived_history(db: aiosqlite.Connection, age_seconds: int) -> int:
|
||||||
|
"""Purge archived entries older than *age_seconds*; return rows deleted."""
|
||||||
|
threshold = int(datetime.datetime.now(datetime.UTC).timestamp()) - age_seconds
|
||||||
|
async with db.execute(
|
||||||
|
"DELETE FROM history_archive WHERE timeofban < ?",
|
||||||
|
(threshold,),
|
||||||
|
) as cursor:
|
||||||
|
deleted = cursor.rowcount
|
||||||
|
await db.commit()
|
||||||
|
return deleted
|
||||||
@@ -83,6 +83,7 @@ async def get_dashboard_bans(
|
|||||||
request: Request,
|
request: Request,
|
||||||
_auth: AuthDep,
|
_auth: AuthDep,
|
||||||
range: TimeRange = Query(default=_DEFAULT_RANGE, description="Time-range preset."),
|
range: TimeRange = Query(default=_DEFAULT_RANGE, description="Time-range preset."),
|
||||||
|
source: str = Query(default="fail2ban", description="Data source: 'fail2ban' or 'archive'."),
|
||||||
page: int = Query(default=1, ge=1, description="1-based page number."),
|
page: int = Query(default=1, ge=1, description="1-based page number."),
|
||||||
page_size: int = Query(default=_DEFAULT_PAGE_SIZE, ge=1, le=500, description="Items per page."),
|
page_size: int = Query(default=_DEFAULT_PAGE_SIZE, ge=1, le=500, description="Items per page."),
|
||||||
origin: BanOrigin | None = Query(
|
origin: BanOrigin | None = Query(
|
||||||
@@ -117,10 +118,11 @@ async def get_dashboard_bans(
|
|||||||
return await ban_service.list_bans(
|
return await ban_service.list_bans(
|
||||||
socket_path,
|
socket_path,
|
||||||
range,
|
range,
|
||||||
|
source=source,
|
||||||
page=page,
|
page=page,
|
||||||
page_size=page_size,
|
page_size=page_size,
|
||||||
http_session=http_session,
|
http_session=http_session,
|
||||||
app_db=None,
|
app_db=request.app.state.db,
|
||||||
geo_batch_lookup=geo_service.lookup_batch,
|
geo_batch_lookup=geo_service.lookup_batch,
|
||||||
origin=origin,
|
origin=origin,
|
||||||
)
|
)
|
||||||
@@ -135,6 +137,7 @@ async def get_bans_by_country(
|
|||||||
request: Request,
|
request: Request,
|
||||||
_auth: AuthDep,
|
_auth: AuthDep,
|
||||||
range: TimeRange = Query(default=_DEFAULT_RANGE, description="Time-range preset."),
|
range: TimeRange = Query(default=_DEFAULT_RANGE, description="Time-range preset."),
|
||||||
|
source: str = Query(default="fail2ban", description="Data source: 'fail2ban' or 'archive'."),
|
||||||
origin: BanOrigin | None = Query(
|
origin: BanOrigin | None = Query(
|
||||||
default=None,
|
default=None,
|
||||||
description="Filter by ban origin: 'blocklist' or 'selfblock'. Omit for all.",
|
description="Filter by ban origin: 'blocklist' or 'selfblock'. Omit for all.",
|
||||||
@@ -164,10 +167,11 @@ async def get_bans_by_country(
|
|||||||
return await ban_service.bans_by_country(
|
return await ban_service.bans_by_country(
|
||||||
socket_path,
|
socket_path,
|
||||||
range,
|
range,
|
||||||
|
source=source,
|
||||||
http_session=http_session,
|
http_session=http_session,
|
||||||
geo_cache_lookup=geo_service.lookup_cached_only,
|
geo_cache_lookup=geo_service.lookup_cached_only,
|
||||||
geo_batch_lookup=geo_service.lookup_batch,
|
geo_batch_lookup=geo_service.lookup_batch,
|
||||||
app_db=None,
|
app_db=request.app.state.db,
|
||||||
origin=origin,
|
origin=origin,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -181,6 +185,7 @@ async def get_ban_trend(
|
|||||||
request: Request,
|
request: Request,
|
||||||
_auth: AuthDep,
|
_auth: AuthDep,
|
||||||
range: TimeRange = Query(default=_DEFAULT_RANGE, description="Time-range preset."),
|
range: TimeRange = Query(default=_DEFAULT_RANGE, description="Time-range preset."),
|
||||||
|
source: str = Query(default="fail2ban", description="Data source: 'fail2ban' or 'archive'."),
|
||||||
origin: BanOrigin | None = Query(
|
origin: BanOrigin | None = Query(
|
||||||
default=None,
|
default=None,
|
||||||
description="Filter by ban origin: 'blocklist' or 'selfblock'. Omit for all.",
|
description="Filter by ban origin: 'blocklist' or 'selfblock'. Omit for all.",
|
||||||
@@ -212,7 +217,13 @@ async def get_ban_trend(
|
|||||||
"""
|
"""
|
||||||
socket_path: str = request.app.state.settings.fail2ban_socket
|
socket_path: str = request.app.state.settings.fail2ban_socket
|
||||||
|
|
||||||
return await ban_service.ban_trend(socket_path, range, origin=origin)
|
return await ban_service.ban_trend(
|
||||||
|
socket_path,
|
||||||
|
range,
|
||||||
|
source=source,
|
||||||
|
app_db=request.app.state.db,
|
||||||
|
origin=origin,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.get(
|
@router.get(
|
||||||
@@ -224,6 +235,7 @@ async def get_bans_by_jail(
|
|||||||
request: Request,
|
request: Request,
|
||||||
_auth: AuthDep,
|
_auth: AuthDep,
|
||||||
range: TimeRange = Query(default=_DEFAULT_RANGE, description="Time-range preset."),
|
range: TimeRange = Query(default=_DEFAULT_RANGE, description="Time-range preset."),
|
||||||
|
source: str = Query(default="fail2ban", description="Data source: 'fail2ban' or 'archive'."),
|
||||||
origin: BanOrigin | None = Query(
|
origin: BanOrigin | None = Query(
|
||||||
default=None,
|
default=None,
|
||||||
description="Filter by ban origin: 'blocklist' or 'selfblock'. Omit for all.",
|
description="Filter by ban origin: 'blocklist' or 'selfblock'. Omit for all.",
|
||||||
@@ -248,4 +260,10 @@ async def get_bans_by_jail(
|
|||||||
"""
|
"""
|
||||||
socket_path: str = request.app.state.settings.fail2ban_socket
|
socket_path: str = request.app.state.settings.fail2ban_socket
|
||||||
|
|
||||||
return await ban_service.bans_by_jail(socket_path, range, origin=origin)
|
return await ban_service.bans_by_jail(
|
||||||
|
socket_path,
|
||||||
|
range,
|
||||||
|
source=source,
|
||||||
|
app_db=request.app.state.db,
|
||||||
|
origin=origin,
|
||||||
|
)
|
||||||
|
|||||||
@@ -56,6 +56,10 @@ async def get_history(
|
|||||||
default=None,
|
default=None,
|
||||||
description="Filter by ban origin: 'blocklist' or 'selfblock'. Omit for all.",
|
description="Filter by ban origin: 'blocklist' or 'selfblock'. Omit for all.",
|
||||||
),
|
),
|
||||||
|
source: str = Query(
|
||||||
|
default="fail2ban",
|
||||||
|
description="Data source: 'fail2ban' or 'archive'.",
|
||||||
|
),
|
||||||
page: int = Query(default=1, ge=1, description="1-based page number."),
|
page: int = Query(default=1, ge=1, description="1-based page number."),
|
||||||
page_size: int = Query(
|
page_size: int = Query(
|
||||||
default=_DEFAULT_PAGE_SIZE,
|
default=_DEFAULT_PAGE_SIZE,
|
||||||
@@ -94,9 +98,47 @@ async def get_history(
|
|||||||
jail=jail,
|
jail=jail,
|
||||||
ip_filter=ip,
|
ip_filter=ip,
|
||||||
origin=origin,
|
origin=origin,
|
||||||
|
source=source,
|
||||||
page=page,
|
page=page,
|
||||||
page_size=page_size,
|
page_size=page_size,
|
||||||
geo_enricher=_enricher,
|
geo_enricher=_enricher,
|
||||||
|
db=request.app.state.db,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/archive",
|
||||||
|
response_model=HistoryListResponse,
|
||||||
|
summary="Return a paginated list of archived historical bans",
|
||||||
|
)
|
||||||
|
async def get_history_archive(
|
||||||
|
request: Request,
|
||||||
|
_auth: AuthDep,
|
||||||
|
range: TimeRange | None = Query(
|
||||||
|
default=None,
|
||||||
|
description="Optional time-range filter. Omit for all-time.",
|
||||||
|
),
|
||||||
|
jail: str | None = Query(default=None, description="Restrict results to this jail name."),
|
||||||
|
ip: str | None = Query(default=None, description="Restrict results to IPs matching this prefix."),
|
||||||
|
page: int = Query(default=1, ge=1, description="1-based page number."),
|
||||||
|
page_size: int = Query(default=_DEFAULT_PAGE_SIZE, ge=1, le=500, description="Items per page (max 500)."),
|
||||||
|
) -> HistoryListResponse:
|
||||||
|
socket_path: str = request.app.state.settings.fail2ban_socket
|
||||||
|
http_session: aiohttp.ClientSession = request.app.state.http_session
|
||||||
|
|
||||||
|
async def _enricher(addr: str) -> geo_service.GeoInfo | None:
|
||||||
|
return await geo_service.lookup(addr, http_session)
|
||||||
|
|
||||||
|
return await history_service.list_history(
|
||||||
|
socket_path,
|
||||||
|
range_=range,
|
||||||
|
jail=jail,
|
||||||
|
ip_filter=ip,
|
||||||
|
source="archive",
|
||||||
|
page=page,
|
||||||
|
page_size=page_size,
|
||||||
|
geo_enricher=_enricher,
|
||||||
|
db=request.app.state.db,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -112,6 +112,7 @@ async def list_bans(
|
|||||||
socket_path: str,
|
socket_path: str,
|
||||||
range_: TimeRange,
|
range_: TimeRange,
|
||||||
*,
|
*,
|
||||||
|
source: str = "fail2ban",
|
||||||
page: int = 1,
|
page: int = 1,
|
||||||
page_size: int = _DEFAULT_PAGE_SIZE,
|
page_size: int = _DEFAULT_PAGE_SIZE,
|
||||||
http_session: aiohttp.ClientSession | None = None,
|
http_session: aiohttp.ClientSession | None = None,
|
||||||
@@ -160,8 +161,25 @@ async def list_bans(
|
|||||||
since: int = _since_unix(range_)
|
since: int = _since_unix(range_)
|
||||||
effective_page_size: int = min(page_size, _MAX_PAGE_SIZE)
|
effective_page_size: int = min(page_size, _MAX_PAGE_SIZE)
|
||||||
offset: int = (page - 1) * effective_page_size
|
offset: int = (page - 1) * effective_page_size
|
||||||
origin_clause, origin_params = _origin_sql_filter(origin)
|
|
||||||
|
|
||||||
|
if source not in ("fail2ban", "archive"):
|
||||||
|
raise ValueError(f"Unsupported source: {source!r}")
|
||||||
|
|
||||||
|
if source == "archive":
|
||||||
|
if app_db is None:
|
||||||
|
raise ValueError("app_db must be provided when source is 'archive'")
|
||||||
|
|
||||||
|
from app.repositories.history_archive_repo import get_archived_history
|
||||||
|
|
||||||
|
rows, total = await get_archived_history(
|
||||||
|
db=app_db,
|
||||||
|
since=since,
|
||||||
|
origin=origin,
|
||||||
|
action="ban",
|
||||||
|
page=page,
|
||||||
|
page_size=effective_page_size,
|
||||||
|
)
|
||||||
|
else:
|
||||||
db_path: str = await get_fail2ban_db_path(socket_path)
|
db_path: str = await get_fail2ban_db_path(socket_path)
|
||||||
log.info(
|
log.info(
|
||||||
"ban_service_list_bans",
|
"ban_service_list_bans",
|
||||||
@@ -192,11 +210,19 @@ async def list_bans(
|
|||||||
|
|
||||||
items: list[DashboardBanItem] = []
|
items: list[DashboardBanItem] = []
|
||||||
for row in rows:
|
for row in rows:
|
||||||
jail: str = row.jail
|
if source == "archive":
|
||||||
ip: str = row.ip
|
jail = str(row["jail"])
|
||||||
banned_at: str = ts_to_iso(row.timeofban)
|
ip = str(row["ip"])
|
||||||
ban_count: int = row.bancount
|
banned_at = ts_to_iso(int(row["timeofban"]))
|
||||||
|
ban_count = int(row["bancount"])
|
||||||
|
matches, _ = parse_data_json(row["data"])
|
||||||
|
else:
|
||||||
|
jail = row.jail
|
||||||
|
ip = row.ip
|
||||||
|
banned_at = ts_to_iso(row.timeofban)
|
||||||
|
ban_count = row.bancount
|
||||||
matches, _ = parse_data_json(row.data)
|
matches, _ = parse_data_json(row.data)
|
||||||
|
|
||||||
service: str | None = matches[0] if matches else None
|
service: str | None = matches[0] if matches else None
|
||||||
|
|
||||||
country_code: str | None = None
|
country_code: str | None = None
|
||||||
@@ -256,6 +282,8 @@ _MAX_COMPANION_BANS: int = 200
|
|||||||
async def bans_by_country(
|
async def bans_by_country(
|
||||||
socket_path: str,
|
socket_path: str,
|
||||||
range_: TimeRange,
|
range_: TimeRange,
|
||||||
|
*,
|
||||||
|
source: str = "fail2ban",
|
||||||
http_session: aiohttp.ClientSession | None = None,
|
http_session: aiohttp.ClientSession | None = None,
|
||||||
geo_cache_lookup: GeoCacheLookup | None = None,
|
geo_cache_lookup: GeoCacheLookup | None = None,
|
||||||
geo_batch_lookup: GeoBatchLookup | None = None,
|
geo_batch_lookup: GeoBatchLookup | None = None,
|
||||||
@@ -300,6 +328,45 @@ async def bans_by_country(
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
since: int = _since_unix(range_)
|
since: int = _since_unix(range_)
|
||||||
|
|
||||||
|
if source not in ("fail2ban", "archive"):
|
||||||
|
raise ValueError(f"Unsupported source: {source!r}")
|
||||||
|
|
||||||
|
if source == "archive":
|
||||||
|
if app_db is None:
|
||||||
|
raise ValueError("app_db must be provided when source is 'archive'")
|
||||||
|
|
||||||
|
from app.repositories.history_archive_repo import (
|
||||||
|
get_all_archived_history,
|
||||||
|
get_archived_history,
|
||||||
|
)
|
||||||
|
|
||||||
|
all_rows = await get_all_archived_history(
|
||||||
|
db=app_db,
|
||||||
|
since=since,
|
||||||
|
origin=origin,
|
||||||
|
action="ban",
|
||||||
|
)
|
||||||
|
|
||||||
|
total = len(all_rows)
|
||||||
|
|
||||||
|
# companion rows for the table should be most recent
|
||||||
|
companion_rows, _ = await get_archived_history(
|
||||||
|
db=app_db,
|
||||||
|
since=since,
|
||||||
|
origin=origin,
|
||||||
|
action="ban",
|
||||||
|
page=1,
|
||||||
|
page_size=_MAX_COMPANION_BANS,
|
||||||
|
)
|
||||||
|
|
||||||
|
agg_rows = {}
|
||||||
|
for row in all_rows:
|
||||||
|
ip = str(row["ip"])
|
||||||
|
agg_rows[ip] = agg_rows.get(ip, 0) + 1
|
||||||
|
|
||||||
|
unique_ips = list(agg_rows.keys())
|
||||||
|
else:
|
||||||
origin_clause, origin_params = _origin_sql_filter(origin)
|
origin_clause, origin_params = _origin_sql_filter(origin)
|
||||||
db_path: str = await get_fail2ban_db_path(socket_path)
|
db_path: str = await get_fail2ban_db_path(socket_path)
|
||||||
log.info(
|
log.info(
|
||||||
@@ -334,7 +401,7 @@ async def bans_by_country(
|
|||||||
offset=0,
|
offset=0,
|
||||||
)
|
)
|
||||||
|
|
||||||
unique_ips: list[str] = [r.ip for r in agg_rows]
|
unique_ips = [r.ip for r in agg_rows]
|
||||||
geo_map: dict[str, GeoInfo] = {}
|
geo_map: dict[str, GeoInfo] = {}
|
||||||
|
|
||||||
if http_session is not None and unique_ips and geo_cache_lookup is not None:
|
if http_session is not None and unique_ips and geo_cache_lookup is not None:
|
||||||
@@ -371,12 +438,28 @@ async def bans_by_country(
|
|||||||
countries: dict[str, int] = {}
|
countries: dict[str, int] = {}
|
||||||
country_names: dict[str, str] = {}
|
country_names: dict[str, str] = {}
|
||||||
|
|
||||||
for agg_row in agg_rows:
|
if source == "archive":
|
||||||
ip: str = agg_row.ip
|
agg_items = [
|
||||||
|
{
|
||||||
|
"ip": ip,
|
||||||
|
"event_count": count,
|
||||||
|
}
|
||||||
|
for ip, count in agg_rows.items()
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
agg_items = agg_rows
|
||||||
|
|
||||||
|
for agg_row in agg_items:
|
||||||
|
if source == "archive":
|
||||||
|
ip = agg_row["ip"]
|
||||||
|
event_count = agg_row["event_count"]
|
||||||
|
else:
|
||||||
|
ip = agg_row.ip
|
||||||
|
event_count = agg_row.event_count
|
||||||
|
|
||||||
geo = geo_map.get(ip)
|
geo = geo_map.get(ip)
|
||||||
cc: str | None = geo.country_code if geo else None
|
cc: str | None = geo.country_code if geo else None
|
||||||
cn: str | None = geo.country_name if geo else None
|
cn: str | None = geo.country_name if geo else None
|
||||||
event_count: int = agg_row.event_count
|
|
||||||
|
|
||||||
if cc:
|
if cc:
|
||||||
countries[cc] = countries.get(cc, 0) + event_count
|
countries[cc] = countries.get(cc, 0) + event_count
|
||||||
@@ -386,26 +469,38 @@ async def bans_by_country(
|
|||||||
# Build companion table from recent rows (geo already cached from batch step).
|
# Build companion table from recent rows (geo already cached from batch step).
|
||||||
bans: list[DashboardBanItem] = []
|
bans: list[DashboardBanItem] = []
|
||||||
for companion_row in companion_rows:
|
for companion_row in companion_rows:
|
||||||
|
if source == "archive":
|
||||||
|
ip = companion_row["ip"]
|
||||||
|
jail = companion_row["jail"]
|
||||||
|
banned_at = ts_to_iso(int(companion_row["timeofban"]))
|
||||||
|
ban_count = int(companion_row["bancount"])
|
||||||
|
service = None
|
||||||
|
else:
|
||||||
ip = companion_row.ip
|
ip = companion_row.ip
|
||||||
|
jail = companion_row.jail
|
||||||
|
banned_at = ts_to_iso(companion_row.timeofban)
|
||||||
|
ban_count = companion_row.bancount
|
||||||
|
matches, _ = parse_data_json(companion_row.data)
|
||||||
|
service = matches[0] if matches else None
|
||||||
|
|
||||||
geo = geo_map.get(ip)
|
geo = geo_map.get(ip)
|
||||||
cc = geo.country_code if geo else None
|
cc = geo.country_code if geo else None
|
||||||
cn = geo.country_name if geo else None
|
cn = geo.country_name if geo else None
|
||||||
asn: str | None = geo.asn if geo else None
|
asn: str | None = geo.asn if geo else None
|
||||||
org: str | None = geo.org if geo else None
|
org: str | None = geo.org if geo else None
|
||||||
matches, _ = parse_data_json(companion_row.data)
|
|
||||||
|
|
||||||
bans.append(
|
bans.append(
|
||||||
DashboardBanItem(
|
DashboardBanItem(
|
||||||
ip=ip,
|
ip=ip,
|
||||||
jail=companion_row.jail,
|
jail=jail,
|
||||||
banned_at=ts_to_iso(companion_row.timeofban),
|
banned_at=banned_at,
|
||||||
service=matches[0] if matches else None,
|
service=service,
|
||||||
country_code=cc,
|
country_code=cc,
|
||||||
country_name=cn,
|
country_name=cn,
|
||||||
asn=asn,
|
asn=asn,
|
||||||
org=org,
|
org=org,
|
||||||
ban_count=companion_row.bancount,
|
ban_count=ban_count,
|
||||||
origin=_derive_origin(companion_row.jail),
|
origin=_derive_origin(jail),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -426,6 +521,8 @@ async def ban_trend(
|
|||||||
socket_path: str,
|
socket_path: str,
|
||||||
range_: TimeRange,
|
range_: TimeRange,
|
||||||
*,
|
*,
|
||||||
|
source: str = "fail2ban",
|
||||||
|
app_db: aiosqlite.Connection | None = None,
|
||||||
origin: BanOrigin | None = None,
|
origin: BanOrigin | None = None,
|
||||||
) -> BanTrendResponse:
|
) -> BanTrendResponse:
|
||||||
"""Return ban counts aggregated into equal-width time buckets.
|
"""Return ban counts aggregated into equal-width time buckets.
|
||||||
@@ -457,8 +554,40 @@ async def ban_trend(
|
|||||||
since: int = _since_unix(range_)
|
since: int = _since_unix(range_)
|
||||||
bucket_secs: int = BUCKET_SECONDS[range_]
|
bucket_secs: int = BUCKET_SECONDS[range_]
|
||||||
num_buckets: int = bucket_count(range_)
|
num_buckets: int = bucket_count(range_)
|
||||||
origin_clause, origin_params = _origin_sql_filter(origin)
|
|
||||||
|
|
||||||
|
if source not in ("fail2ban", "archive"):
|
||||||
|
raise ValueError(f"Unsupported source: {source!r}")
|
||||||
|
|
||||||
|
if source == "archive":
|
||||||
|
if app_db is None:
|
||||||
|
raise ValueError("app_db must be provided when source is 'archive'")
|
||||||
|
|
||||||
|
from app.repositories.history_archive_repo import get_all_archived_history
|
||||||
|
|
||||||
|
all_rows = await get_all_archived_history(
|
||||||
|
db=app_db,
|
||||||
|
since=since,
|
||||||
|
origin=origin,
|
||||||
|
action="ban",
|
||||||
|
)
|
||||||
|
|
||||||
|
counts: list[int] = [0] * num_buckets
|
||||||
|
for row in all_rows:
|
||||||
|
timeofban = int(row["timeofban"])
|
||||||
|
bucket_index = int((timeofban - since) / bucket_secs)
|
||||||
|
if 0 <= bucket_index < num_buckets:
|
||||||
|
counts[bucket_index] += 1
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"ban_service_ban_trend",
|
||||||
|
source=source,
|
||||||
|
since=since,
|
||||||
|
range=range_,
|
||||||
|
origin=origin,
|
||||||
|
bucket_secs=bucket_secs,
|
||||||
|
num_buckets=num_buckets,
|
||||||
|
)
|
||||||
|
else:
|
||||||
db_path: str = await get_fail2ban_db_path(socket_path)
|
db_path: str = await get_fail2ban_db_path(socket_path)
|
||||||
log.info(
|
log.info(
|
||||||
"ban_service_ban_trend",
|
"ban_service_ban_trend",
|
||||||
@@ -501,6 +630,8 @@ async def bans_by_jail(
|
|||||||
socket_path: str,
|
socket_path: str,
|
||||||
range_: TimeRange,
|
range_: TimeRange,
|
||||||
*,
|
*,
|
||||||
|
source: str = "fail2ban",
|
||||||
|
app_db: aiosqlite.Connection | None = None,
|
||||||
origin: BanOrigin | None = None,
|
origin: BanOrigin | None = None,
|
||||||
) -> BansByJailResponse:
|
) -> BansByJailResponse:
|
||||||
"""Return ban counts aggregated per jail for the selected time window.
|
"""Return ban counts aggregated per jail for the selected time window.
|
||||||
@@ -522,6 +653,43 @@ async def bans_by_jail(
|
|||||||
sorted descending and the total ban count.
|
sorted descending and the total ban count.
|
||||||
"""
|
"""
|
||||||
since: int = _since_unix(range_)
|
since: int = _since_unix(range_)
|
||||||
|
|
||||||
|
if source not in ("fail2ban", "archive"):
|
||||||
|
raise ValueError(f"Unsupported source: {source!r}")
|
||||||
|
|
||||||
|
if source == "archive":
|
||||||
|
if app_db is None:
|
||||||
|
raise ValueError("app_db must be provided when source is 'archive'")
|
||||||
|
|
||||||
|
from app.repositories.history_archive_repo import get_all_archived_history
|
||||||
|
|
||||||
|
all_rows = await get_all_archived_history(
|
||||||
|
db=app_db,
|
||||||
|
since=since,
|
||||||
|
origin=origin,
|
||||||
|
action="ban",
|
||||||
|
)
|
||||||
|
|
||||||
|
jail_counter: dict[str, int] = {}
|
||||||
|
for row in all_rows:
|
||||||
|
jail_name = str(row["jail"])
|
||||||
|
jail_counter[jail_name] = jail_counter.get(jail_name, 0) + 1
|
||||||
|
|
||||||
|
total = sum(jail_counter.values())
|
||||||
|
jail_counts = [
|
||||||
|
JailBanCountModel(jail=jail_name, count=count)
|
||||||
|
for jail_name, count in sorted(jail_counter.items(), key=lambda x: x[1], reverse=True)
|
||||||
|
]
|
||||||
|
|
||||||
|
log.debug(
|
||||||
|
"ban_service_bans_by_jail",
|
||||||
|
source=source,
|
||||||
|
since=since,
|
||||||
|
since_iso=ts_to_iso(since),
|
||||||
|
range=range_,
|
||||||
|
origin=origin,
|
||||||
|
)
|
||||||
|
else:
|
||||||
origin_clause, origin_params = _origin_sql_filter(origin)
|
origin_clause, origin_params = _origin_sql_filter(origin)
|
||||||
|
|
||||||
db_path: str = await get_fail2ban_db_path(socket_path)
|
db_path: str = await get_fail2ban_db_path(socket_path)
|
||||||
|
|||||||
@@ -351,8 +351,8 @@ async def update_jail_config(
|
|||||||
await _set("datepattern", update.date_pattern)
|
await _set("datepattern", update.date_pattern)
|
||||||
if update.dns_mode is not None:
|
if update.dns_mode is not None:
|
||||||
await _set("usedns", update.dns_mode)
|
await _set("usedns", update.dns_mode)
|
||||||
if update.backend is not None:
|
# backend is managed by fail2ban and cannot be changed at runtime by API.
|
||||||
await _set("backend", update.backend)
|
# This field is therefore ignored during updates.
|
||||||
if update.log_encoding is not None:
|
if update.log_encoding is not None:
|
||||||
await _set("logencoding", update.log_encoding)
|
await _set("logencoding", update.log_encoding)
|
||||||
if update.prefregex is not None:
|
if update.prefregex is not None:
|
||||||
|
|||||||
@@ -16,6 +16,8 @@ from typing import TYPE_CHECKING
|
|||||||
import structlog
|
import structlog
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
import aiosqlite
|
||||||
|
|
||||||
from app.models.geo import GeoEnricher
|
from app.models.geo import GeoEnricher
|
||||||
|
|
||||||
from app.models.ban import TIME_RANGE_SECONDS, BanOrigin, TimeRange
|
from app.models.ban import TIME_RANGE_SECONDS, BanOrigin, TimeRange
|
||||||
@@ -63,9 +65,11 @@ async def list_history(
|
|||||||
jail: str | None = None,
|
jail: str | None = None,
|
||||||
ip_filter: str | None = None,
|
ip_filter: str | None = None,
|
||||||
origin: BanOrigin | None = None,
|
origin: BanOrigin | None = None,
|
||||||
|
source: str = "fail2ban",
|
||||||
page: int = 1,
|
page: int = 1,
|
||||||
page_size: int = _DEFAULT_PAGE_SIZE,
|
page_size: int = _DEFAULT_PAGE_SIZE,
|
||||||
geo_enricher: GeoEnricher | None = None,
|
geo_enricher: GeoEnricher | None = None,
|
||||||
|
db: aiosqlite.Connection | None = None,
|
||||||
) -> HistoryListResponse:
|
) -> HistoryListResponse:
|
||||||
"""Return a paginated list of historical ban records with optional filters.
|
"""Return a paginated list of historical ban records with optional filters.
|
||||||
|
|
||||||
@@ -104,6 +108,63 @@ async def list_history(
|
|||||||
page=page,
|
page=page,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
items: list[HistoryBanItem] = []
|
||||||
|
total: int
|
||||||
|
|
||||||
|
if source == "archive":
|
||||||
|
if db is None:
|
||||||
|
raise ValueError("db must be provided when source is 'archive'")
|
||||||
|
|
||||||
|
from app.repositories.history_archive_repo import get_archived_history
|
||||||
|
|
||||||
|
archived_rows, total = await get_archived_history(
|
||||||
|
db=db,
|
||||||
|
since=since,
|
||||||
|
jail=jail,
|
||||||
|
ip_filter=ip_filter,
|
||||||
|
page=page,
|
||||||
|
page_size=effective_page_size,
|
||||||
|
)
|
||||||
|
|
||||||
|
for row in archived_rows:
|
||||||
|
jail_name = row["jail"]
|
||||||
|
ip = row["ip"]
|
||||||
|
banned_at = ts_to_iso(int(row["timeofban"]))
|
||||||
|
ban_count = int(row["bancount"])
|
||||||
|
matches, failures = parse_data_json(row["data"])
|
||||||
|
# archive records may include actions; we treat all as history
|
||||||
|
|
||||||
|
country_code = None
|
||||||
|
country_name = None
|
||||||
|
asn = None
|
||||||
|
org = None
|
||||||
|
|
||||||
|
if geo_enricher is not None:
|
||||||
|
try:
|
||||||
|
geo = await geo_enricher(ip)
|
||||||
|
if geo is not None:
|
||||||
|
country_code = geo.country_code
|
||||||
|
country_name = geo.country_name
|
||||||
|
asn = geo.asn
|
||||||
|
org = geo.org
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
log.warning("history_service_geo_lookup_failed", ip=ip)
|
||||||
|
|
||||||
|
items.append(
|
||||||
|
HistoryBanItem(
|
||||||
|
ip=ip,
|
||||||
|
jail=jail_name,
|
||||||
|
banned_at=banned_at,
|
||||||
|
ban_count=ban_count,
|
||||||
|
failures=failures,
|
||||||
|
matches=matches,
|
||||||
|
country_code=country_code,
|
||||||
|
country_name=country_name,
|
||||||
|
asn=asn,
|
||||||
|
org=org,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
rows, total = await fail2ban_db_repo.get_history_page(
|
rows, total = await fail2ban_db_repo.get_history_page(
|
||||||
db_path=db_path,
|
db_path=db_path,
|
||||||
since=since,
|
since=since,
|
||||||
@@ -114,7 +175,6 @@ async def list_history(
|
|||||||
page_size=effective_page_size,
|
page_size=effective_page_size,
|
||||||
)
|
)
|
||||||
|
|
||||||
items: list[HistoryBanItem] = []
|
|
||||||
for row in rows:
|
for row in rows:
|
||||||
jail_name: str = row.jail
|
jail_name: str = row.jail
|
||||||
ip: str = row.ip
|
ip: str = row.ip
|
||||||
|
|||||||
@@ -160,8 +160,12 @@ async def get_settings(socket_path: str) -> ServerSettingsResponse:
|
|||||||
db_max_matches=db_max_matches,
|
db_max_matches=db_max_matches,
|
||||||
)
|
)
|
||||||
|
|
||||||
log.info("server_settings_fetched")
|
warnings: dict[str, bool] = {
|
||||||
return ServerSettingsResponse(settings=settings)
|
"db_purge_age_too_low": db_purge_age < 86400,
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("server_settings_fetched", db_purge_age=db_purge_age, warnings=warnings)
|
||||||
|
return ServerSettingsResponse(settings=settings, warnings=warnings)
|
||||||
|
|
||||||
|
|
||||||
async def update_settings(socket_path: str, update: ServerSettingsUpdate) -> None:
|
async def update_settings(socket_path: str, update: ServerSettingsUpdate) -> None:
|
||||||
|
|||||||
109
backend/app/tasks/history_sync.py
Normal file
109
backend/app/tasks/history_sync.py
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
"""History sync background task.
|
||||||
|
|
||||||
|
Periodically copies new records from the fail2ban sqlite database into the
|
||||||
|
BanGUI application archive table to prevent gaps when fail2ban purges old rows.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
import structlog
|
||||||
|
|
||||||
|
from app.repositories import fail2ban_db_repo
|
||||||
|
from app.utils.fail2ban_db_utils import get_fail2ban_db_path
|
||||||
|
|
||||||
|
if TYPE_CHECKING: # pragma: no cover
|
||||||
|
from fastapi import FastAPI
|
||||||
|
|
||||||
|
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
||||||
|
|
||||||
|
#: Stable APScheduler job id.
|
||||||
|
JOB_ID: str = "history_sync"
|
||||||
|
|
||||||
|
#: Interval in seconds between sync runs.
|
||||||
|
HISTORY_SYNC_INTERVAL: int = 300
|
||||||
|
|
||||||
|
#: Backfill window when archive is empty (seconds).
|
||||||
|
BACKFILL_WINDOW: int = 7 * 86400
|
||||||
|
|
||||||
|
|
||||||
|
async def _get_last_archive_ts(db) -> int | None:
|
||||||
|
async with db.execute("SELECT MAX(timeofban) FROM history_archive") as cur:
|
||||||
|
row = await cur.fetchone()
|
||||||
|
if row is None or row[0] is None:
|
||||||
|
return None
|
||||||
|
return int(row[0])
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_sync(app: FastAPI) -> None:
|
||||||
|
db = app.state.db
|
||||||
|
socket_path: str = app.state.settings.fail2ban_socket
|
||||||
|
|
||||||
|
try:
|
||||||
|
last_ts = await _get_last_archive_ts(db)
|
||||||
|
now_ts = int(datetime.datetime.now(datetime.UTC).timestamp())
|
||||||
|
|
||||||
|
if last_ts is None:
|
||||||
|
last_ts = now_ts - BACKFILL_WINDOW
|
||||||
|
log.info("history_sync_backfill", window_seconds=BACKFILL_WINDOW)
|
||||||
|
|
||||||
|
per_page = 500
|
||||||
|
next_since = last_ts
|
||||||
|
total_synced = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
fail2ban_db_path = await get_fail2ban_db_path(socket_path)
|
||||||
|
rows, total = await fail2ban_db_repo.get_history_page(
|
||||||
|
db_path=fail2ban_db_path,
|
||||||
|
since=next_since,
|
||||||
|
page=1,
|
||||||
|
page_size=per_page,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not rows:
|
||||||
|
break
|
||||||
|
|
||||||
|
from app.repositories.history_archive_repo import archive_ban_event
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
await archive_ban_event(
|
||||||
|
db=db,
|
||||||
|
jail=row.jail,
|
||||||
|
ip=row.ip,
|
||||||
|
timeofban=row.timeofban,
|
||||||
|
bancount=row.bancount,
|
||||||
|
data=row.data,
|
||||||
|
action="ban",
|
||||||
|
)
|
||||||
|
total_synced += 1
|
||||||
|
|
||||||
|
# Continue where we left off by max timeofban + 1.
|
||||||
|
max_time = max(row.timeofban for row in rows)
|
||||||
|
next_since = max_time + 1
|
||||||
|
|
||||||
|
if len(rows) < per_page:
|
||||||
|
break
|
||||||
|
|
||||||
|
log.info("history_sync_complete", synced=total_synced)
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
log.exception("history_sync_failed")
|
||||||
|
|
||||||
|
|
||||||
|
def register(app: FastAPI) -> None:
|
||||||
|
"""Register the history sync periodic job.
|
||||||
|
|
||||||
|
Should be called after scheduler startup, from the lifespan handler.
|
||||||
|
"""
|
||||||
|
app.state.scheduler.add_job(
|
||||||
|
_run_sync,
|
||||||
|
trigger="interval",
|
||||||
|
seconds=HISTORY_SYNC_INTERVAL,
|
||||||
|
kwargs={"app": app},
|
||||||
|
id=JOB_ID,
|
||||||
|
replace_existing=True,
|
||||||
|
next_run_time=datetime.datetime.now(tz=datetime.UTC),
|
||||||
|
)
|
||||||
|
log.info("history_sync_scheduled", interval_seconds=HISTORY_SYNC_INTERVAL)
|
||||||
60
backend/tests/test_repositories/test_history_archive_repo.py
Normal file
60
backend/tests/test_repositories/test_history_archive_repo.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
"""Tests for history_archive_repo."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import aiosqlite
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from app.db import init_db
|
||||||
|
from app.repositories.history_archive_repo import archive_ban_event, get_archived_history, purge_archived_history
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def app_db(tmp_path: Path) -> str:
|
||||||
|
path = str(tmp_path / "app.db")
|
||||||
|
async with aiosqlite.connect(path) as db:
|
||||||
|
db.row_factory = aiosqlite.Row
|
||||||
|
await init_db(db)
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_archive_ban_event_deduplication(app_db: str) -> None:
|
||||||
|
async with aiosqlite.connect(app_db) as db:
|
||||||
|
# first insert should add
|
||||||
|
inserted = await archive_ban_event(db, "sshd", "1.1.1.1", 1000, 1, "{}", "ban")
|
||||||
|
assert inserted
|
||||||
|
|
||||||
|
# duplicate event is ignored
|
||||||
|
inserted = await archive_ban_event(db, "sshd", "1.1.1.1", 1000, 1, "{}", "ban")
|
||||||
|
assert not inserted
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_archived_history_filtering_and_pagination(app_db: str) -> None:
|
||||||
|
async with aiosqlite.connect(app_db) as db:
|
||||||
|
await archive_ban_event(db, "sshd", "1.1.1.1", 1000, 1, "{}", "ban")
|
||||||
|
await archive_ban_event(db, "nginx", "2.2.2.2", 2000, 1, "{}", "ban")
|
||||||
|
|
||||||
|
rows, total = await get_archived_history(db, jail="sshd")
|
||||||
|
assert total == 1
|
||||||
|
assert rows[0]["ip"] == "1.1.1.1"
|
||||||
|
|
||||||
|
rows, total = await get_archived_history(db, page=1, page_size=1)
|
||||||
|
assert total == 2
|
||||||
|
assert len(rows) == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_purge_archived_history(app_db: str) -> None:
|
||||||
|
now = int(time.time())
|
||||||
|
async with aiosqlite.connect(app_db) as db:
|
||||||
|
await archive_ban_event(db, "sshd", "1.1.1.1", now - 3000, 1, "{}", "ban")
|
||||||
|
await archive_ban_event(db, "sshd", "1.1.1.2", now - 1000, 1, "{}", "ban")
|
||||||
|
deleted = await purge_archived_history(db, age_seconds=2000)
|
||||||
|
assert deleted == 1
|
||||||
|
rows, total = await get_archived_history(db)
|
||||||
|
assert total == 1
|
||||||
@@ -290,6 +290,17 @@ class TestDashboardBans:
|
|||||||
called_range = mock_list.call_args[0][1]
|
called_range = mock_list.call_args[0][1]
|
||||||
assert called_range == "7d"
|
assert called_range == "7d"
|
||||||
|
|
||||||
|
async def test_accepts_source_param(
|
||||||
|
self, dashboard_client: AsyncClient
|
||||||
|
) -> None:
|
||||||
|
"""The ``source`` query parameter is forwarded to ban_service."""
|
||||||
|
mock_list = AsyncMock(return_value=_make_ban_list_response())
|
||||||
|
with patch("app.routers.dashboard.ban_service.list_bans", new=mock_list):
|
||||||
|
await dashboard_client.get("/api/dashboard/bans?source=archive")
|
||||||
|
|
||||||
|
called_source = mock_list.call_args[1]["source"]
|
||||||
|
assert called_source == "archive"
|
||||||
|
|
||||||
async def test_empty_ban_list_returns_zero_total(
|
async def test_empty_ban_list_returns_zero_total(
|
||||||
self, dashboard_client: AsyncClient
|
self, dashboard_client: AsyncClient
|
||||||
) -> None:
|
) -> None:
|
||||||
@@ -492,6 +503,16 @@ class TestDashboardBansOriginField:
|
|||||||
origins = {ban["origin"] for ban in bans}
|
origins = {ban["origin"] for ban in bans}
|
||||||
assert origins == {"blocklist", "selfblock"}
|
assert origins == {"blocklist", "selfblock"}
|
||||||
|
|
||||||
|
async def test_bans_by_country_source_param_forwarded(
|
||||||
|
self, dashboard_client: AsyncClient
|
||||||
|
) -> None:
|
||||||
|
"""The ``source`` query parameter is forwarded to bans_by_country."""
|
||||||
|
mock_fn = AsyncMock(return_value=_make_bans_by_country_response())
|
||||||
|
with patch("app.routers.dashboard.ban_service.bans_by_country", new=mock_fn):
|
||||||
|
await dashboard_client.get("/api/dashboard/bans/by-country?source=archive")
|
||||||
|
|
||||||
|
assert mock_fn.call_args[1]["source"] == "archive"
|
||||||
|
|
||||||
async def test_blocklist_origin_serialised_correctly(
|
async def test_blocklist_origin_serialised_correctly(
|
||||||
self, dashboard_client: AsyncClient
|
self, dashboard_client: AsyncClient
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|||||||
@@ -225,6 +225,32 @@ class TestHistoryList:
|
|||||||
_args, kwargs = mock_fn.call_args
|
_args, kwargs = mock_fn.call_args
|
||||||
assert kwargs.get("origin") == "blocklist"
|
assert kwargs.get("origin") == "blocklist"
|
||||||
|
|
||||||
|
async def test_forwards_source_filter(self, history_client: AsyncClient) -> None:
|
||||||
|
"""The ``source`` query parameter is forwarded to the service."""
|
||||||
|
mock_fn = AsyncMock(return_value=_make_history_list(n=0))
|
||||||
|
with patch(
|
||||||
|
"app.routers.history.history_service.list_history",
|
||||||
|
new=mock_fn,
|
||||||
|
):
|
||||||
|
await history_client.get("/api/history?source=archive")
|
||||||
|
|
||||||
|
_args, kwargs = mock_fn.call_args
|
||||||
|
assert kwargs.get("source") == "archive"
|
||||||
|
|
||||||
|
async def test_archive_route_forces_source_archive(
|
||||||
|
self, history_client: AsyncClient
|
||||||
|
) -> None:
|
||||||
|
"""GET /api/history/archive should call list_history with source='archive'."""
|
||||||
|
mock_fn = AsyncMock(return_value=_make_history_list(n=0))
|
||||||
|
with patch(
|
||||||
|
"app.routers.history.history_service.list_history",
|
||||||
|
new=mock_fn,
|
||||||
|
):
|
||||||
|
await history_client.get("/api/history/archive")
|
||||||
|
|
||||||
|
_args, kwargs = mock_fn.call_args
|
||||||
|
assert kwargs.get("source") == "archive"
|
||||||
|
|
||||||
async def test_empty_result(self, history_client: AsyncClient) -> None:
|
async def test_empty_result(self, history_client: AsyncClient) -> None:
|
||||||
"""An empty history returns items=[] and total=0."""
|
"""An empty history returns items=[] and total=0."""
|
||||||
with patch(
|
with patch(
|
||||||
|
|||||||
@@ -68,7 +68,8 @@ def _make_settings() -> ServerSettingsResponse:
|
|||||||
db_path="/var/lib/fail2ban/fail2ban.sqlite3",
|
db_path="/var/lib/fail2ban/fail2ban.sqlite3",
|
||||||
db_purge_age=86400,
|
db_purge_age=86400,
|
||||||
db_max_matches=10,
|
db_max_matches=10,
|
||||||
)
|
),
|
||||||
|
warnings={"db_purge_age_too_low": False},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -93,6 +94,7 @@ class TestGetServerSettings:
|
|||||||
data = resp.json()
|
data = resp.json()
|
||||||
assert data["settings"]["log_level"] == "INFO"
|
assert data["settings"]["log_level"] == "INFO"
|
||||||
assert data["settings"]["db_purge_age"] == 86400
|
assert data["settings"]["db_purge_age"] == 86400
|
||||||
|
assert data["warnings"]["db_purge_age_too_low"] is False
|
||||||
|
|
||||||
async def test_401_when_unauthenticated(self, server_client: AsyncClient) -> None:
|
async def test_401_when_unauthenticated(self, server_client: AsyncClient) -> None:
|
||||||
"""GET /api/server/settings returns 401 without session."""
|
"""GET /api/server/settings returns 401 without session."""
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ from unittest.mock import AsyncMock, patch
|
|||||||
import aiosqlite
|
import aiosqlite
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from app.db import init_db
|
||||||
from app.services import ban_service
|
from app.services import ban_service
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -143,6 +144,29 @@ async def empty_f2b_db_path(tmp_path: Path) -> str:
|
|||||||
return path
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def app_db_with_archive(tmp_path: Path) -> aiosqlite.Connection:
|
||||||
|
"""Return an app database connection pre-populated with archived ban rows."""
|
||||||
|
db_path = str(tmp_path / "app_archive.db")
|
||||||
|
db = await aiosqlite.connect(db_path)
|
||||||
|
db.row_factory = aiosqlite.Row
|
||||||
|
await init_db(db)
|
||||||
|
|
||||||
|
await db.execute(
|
||||||
|
"INSERT INTO history_archive (jail, ip, timeofban, bancount, data, action) VALUES (?, ?, ?, ?, ?, ?)",
|
||||||
|
("sshd", "1.2.3.4", _ONE_HOUR_AGO, 1, '{"matches": ["fail"], "failures": 1}', "ban"),
|
||||||
|
)
|
||||||
|
await db.execute(
|
||||||
|
"INSERT INTO history_archive (jail, ip, timeofban, bancount, data, action) VALUES (?, ?, ?, ?, ?, ?)",
|
||||||
|
("nginx", "5.6.7.8", _ONE_HOUR_AGO, 1, '{"matches": ["fail"], "failures": 2}', "ban"),
|
||||||
|
)
|
||||||
|
await db.commit()
|
||||||
|
|
||||||
|
yield db
|
||||||
|
|
||||||
|
await db.close()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# list_bans — happy path
|
# list_bans — happy path
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -233,6 +257,20 @@ class TestListBansHappyPath:
|
|||||||
|
|
||||||
assert result.total == 3
|
assert result.total == 3
|
||||||
|
|
||||||
|
async def test_source_archive_reads_from_archive(
|
||||||
|
self, app_db_with_archive: aiosqlite.Connection
|
||||||
|
) -> None:
|
||||||
|
"""Using source='archive' reads from the BanGUI archive table."""
|
||||||
|
result = await ban_service.list_bans(
|
||||||
|
"/fake/sock",
|
||||||
|
"24h",
|
||||||
|
source="archive",
|
||||||
|
app_db=app_db_with_archive,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.total == 2
|
||||||
|
assert {item.ip for item in result.items} == {"1.2.3.4", "5.6.7.8"}
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# list_bans — geo enrichment
|
# list_bans — geo enrichment
|
||||||
@@ -616,6 +654,20 @@ class TestOriginFilter:
|
|||||||
|
|
||||||
assert result.total == 3
|
assert result.total == 3
|
||||||
|
|
||||||
|
async def test_bans_by_country_source_archive_reads_archive(
|
||||||
|
self, app_db_with_archive: aiosqlite.Connection
|
||||||
|
) -> None:
|
||||||
|
"""``bans_by_country`` accepts source='archive' and reads archived rows."""
|
||||||
|
result = await ban_service.bans_by_country(
|
||||||
|
"/fake/sock",
|
||||||
|
"24h",
|
||||||
|
source="archive",
|
||||||
|
app_db=app_db_with_archive,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.total == 2
|
||||||
|
assert len(result.bans) == 2
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# bans_by_country — background geo resolution (Task 3)
|
# bans_by_country — background geo resolution (Task 3)
|
||||||
@@ -802,6 +854,19 @@ class TestBanTrend:
|
|||||||
timestamps = [b.timestamp for b in result.buckets]
|
timestamps = [b.timestamp for b in result.buckets]
|
||||||
assert timestamps == sorted(timestamps)
|
assert timestamps == sorted(timestamps)
|
||||||
|
|
||||||
|
async def test_ban_trend_source_archive_reads_archive(
|
||||||
|
self, app_db_with_archive: aiosqlite.Connection
|
||||||
|
) -> None:
|
||||||
|
"""``ban_trend`` accepts source='archive' and uses archived rows."""
|
||||||
|
result = await ban_service.ban_trend(
|
||||||
|
"/fake/sock",
|
||||||
|
"24h",
|
||||||
|
source="archive",
|
||||||
|
app_db=app_db_with_archive,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert sum(b.count for b in result.buckets) == 2
|
||||||
|
|
||||||
async def test_bans_counted_in_correct_bucket(self, tmp_path: Path) -> None:
|
async def test_bans_counted_in_correct_bucket(self, tmp_path: Path) -> None:
|
||||||
"""A ban at a known time appears in the expected bucket."""
|
"""A ban at a known time appears in the expected bucket."""
|
||||||
import time as _time
|
import time as _time
|
||||||
@@ -1018,6 +1083,20 @@ class TestBansByJail:
|
|||||||
assert result.total == 3
|
assert result.total == 3
|
||||||
assert len(result.jails) == 3
|
assert len(result.jails) == 3
|
||||||
|
|
||||||
|
async def test_bans_by_jail_source_archive_reads_archive(
|
||||||
|
self, app_db_with_archive: aiosqlite.Connection
|
||||||
|
) -> None:
|
||||||
|
"""``bans_by_jail`` accepts source='archive' and aggregates archived rows."""
|
||||||
|
result = await ban_service.bans_by_jail(
|
||||||
|
"/fake/sock",
|
||||||
|
"24h",
|
||||||
|
source="archive",
|
||||||
|
app_db=app_db_with_archive,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.total == 2
|
||||||
|
assert any(j.jail == "sshd" for j in result.jails)
|
||||||
|
|
||||||
async def test_diagnostic_warning_when_zero_results_despite_data(
|
async def test_diagnostic_warning_when_zero_results_despite_data(
|
||||||
self, tmp_path: Path
|
self, tmp_path: Path
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ from unittest.mock import AsyncMock, patch
|
|||||||
import aiosqlite
|
import aiosqlite
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from app.db import init_db
|
||||||
from app.services import history_service
|
from app.services import history_service
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -264,6 +265,31 @@ class TestListHistory:
|
|||||||
assert result.page == 1
|
assert result.page == 1
|
||||||
assert result.page_size == 2
|
assert result.page_size == 2
|
||||||
|
|
||||||
|
async def test_source_archive_reads_from_archive(self, f2b_db_path: str, tmp_path: Path) -> None:
|
||||||
|
"""Using source='archive' reads from the BanGUI archive table."""
|
||||||
|
app_db_path = str(tmp_path / "app_archive.db")
|
||||||
|
async with aiosqlite.connect(app_db_path) as db:
|
||||||
|
db.row_factory = aiosqlite.Row
|
||||||
|
await init_db(db)
|
||||||
|
await db.execute(
|
||||||
|
"INSERT INTO history_archive (jail, ip, timeofban, bancount, data, action) VALUES (?, ?, ?, ?, ?, ?)",
|
||||||
|
("sshd", "10.0.0.1", _ONE_HOUR_AGO, 1, '{"matches": [], "failures": 0}', "ban"),
|
||||||
|
)
|
||||||
|
await db.commit()
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"app.services.history_service.get_fail2ban_db_path",
|
||||||
|
new=AsyncMock(return_value=f2b_db_path),
|
||||||
|
):
|
||||||
|
result = await history_service.list_history(
|
||||||
|
"fake_socket",
|
||||||
|
source="archive",
|
||||||
|
db=db,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.total == 1
|
||||||
|
assert result.items[0].ip == "10.0.0.1"
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# get_ip_detail tests
|
# get_ip_detail tests
|
||||||
|
|||||||
@@ -63,6 +63,16 @@ class TestGetSettings:
|
|||||||
assert result.settings.log_target == "/var/log/fail2ban.log"
|
assert result.settings.log_target == "/var/log/fail2ban.log"
|
||||||
assert result.settings.db_purge_age == 86400
|
assert result.settings.db_purge_age == 86400
|
||||||
assert result.settings.db_max_matches == 10
|
assert result.settings.db_max_matches == 10
|
||||||
|
assert result.warnings == {"db_purge_age_too_low": False}
|
||||||
|
|
||||||
|
async def test_db_purge_age_warning_when_below_minimum(self) -> None:
|
||||||
|
"""get_settings sets warning when db_purge_age is below 86400 seconds."""
|
||||||
|
responses = {**_DEFAULT_RESPONSES, "get|dbpurgeage": (0, 3600)}
|
||||||
|
with _patch_client(responses):
|
||||||
|
result = await server_service.get_settings(_SOCKET)
|
||||||
|
|
||||||
|
assert result.settings.db_purge_age == 3600
|
||||||
|
assert result.warnings == {"db_purge_age_too_low": True}
|
||||||
|
|
||||||
async def test_db_path_parsed(self) -> None:
|
async def test_db_path_parsed(self) -> None:
|
||||||
"""get_settings returns the correct database file path."""
|
"""get_settings returns the correct database file path."""
|
||||||
|
|||||||
29
backend/tests/test_tasks/test_history_sync.py
Normal file
29
backend/tests/test_tasks/test_history_sync.py
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
"""Tests for history_sync task registration."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
from app.tasks import history_sync
|
||||||
|
|
||||||
|
|
||||||
|
class TestHistorySyncTask:
|
||||||
|
async def test_register_schedules_job(self) -> None:
|
||||||
|
fake_scheduler = MagicMock()
|
||||||
|
class FakeState:
|
||||||
|
pass
|
||||||
|
|
||||||
|
class FakeSettings:
|
||||||
|
fail2ban_socket = "/tmp/fake.sock"
|
||||||
|
|
||||||
|
app = type("FakeApp", (), {})()
|
||||||
|
app.state = FakeState()
|
||||||
|
app.state.scheduler = fake_scheduler
|
||||||
|
app.state.settings = FakeSettings()
|
||||||
|
|
||||||
|
history_sync.register(app)
|
||||||
|
|
||||||
|
fake_scheduler.add_job.assert_called_once()
|
||||||
|
called_args, called_kwargs = fake_scheduler.add_job.call_args
|
||||||
|
assert called_kwargs["id"] == history_sync.JOB_ID
|
||||||
|
assert called_kwargs["kwargs"]["app"] == app
|
||||||
Reference in New Issue
Block a user