- Create PaginationMetadata model with computed derived fields (total_pages, has_next_page, has_prev_page)
- Update PaginatedListResponse to embed pagination metadata in a separate 'pagination' object
- Add create_pagination_metadata() factory function in utils/pagination.py for consistent computation
- Update all paginated service functions to use new structure:
- history_service.list_history()
- blocklist_service.get_import_logs()
- jail_service.get_jail_banned_ips()
- ban_mappers.map_domain_dashboard_ban_list_to_response()
- Update response model docstrings with new structure examples
- Update Backend-Development.md documentation with new pagination patterns
- Update test fixtures to work with new response structure
Response shape changes from:
{"items": [...], "total": 100, "page": 1, "page_size": 50}
To:
{"items": [...], "pagination": {"page": 1, "page_size": 50, "total": 100, "total_pages": 2, "has_next_page": true, "has_prev_page": false}}
Benefits:
- Frontend receives all pagination state needed for UI controls
- No need for frontend to calculate total_pages or page navigation logic
- Consolidated pagination metadata reduces field sprawl
- OpenAPI schema automatically reflects changes
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
598 lines
18 KiB
Python
598 lines
18 KiB
Python
"""Blocklist service.
|
|
|
|
Manages blocklist source CRUD, URL preview, IP import (download → validate →
|
|
ban via fail2ban), and schedule persistence.
|
|
|
|
All ban operations target a dedicated fail2ban jail (default:
|
|
``"blocklist-import"``) so blocklist-origin bans are tracked separately from
|
|
regular bans. If that jail does not exist or fail2ban is unreachable, the
|
|
error is recorded in the import log and processing continues.
|
|
|
|
Schedule configuration is stored as JSON in the application settings table
|
|
under the key ``"blocklist_schedule"``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import TYPE_CHECKING
|
|
|
|
import aiohttp
|
|
import structlog
|
|
|
|
from app.models.blocklist import (
|
|
BlocklistSource,
|
|
ImportLogEntry,
|
|
ImportLogListResponse,
|
|
ImportRunResult,
|
|
ImportSourceResult,
|
|
PreviewResponse,
|
|
ScheduleConfig,
|
|
ScheduleFrequency,
|
|
ScheduleInfo,
|
|
)
|
|
from app.repositories import blocklist_repo, import_log_repo, settings_repo
|
|
from app.services.blocklist_downloader import BlocklistDownloader
|
|
from app.services.blocklist_import_workflow import BlocklistImportWorkflow
|
|
from app.services.blocklist_parser import BlocklistParser
|
|
from app.utils.pagination import create_pagination_metadata
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Awaitable, Callable
|
|
|
|
import aiosqlite
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
|
|
from app.config import Settings
|
|
from app.services.geo_cache import GeoCache
|
|
|
|
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
|
|
|
#: Settings key used to persist the schedule config.
|
|
_SCHEDULE_SETTINGS_KEY: str = "blocklist_schedule"
|
|
|
|
#: fail2ban jail name for blocklist-origin bans.
|
|
BLOCKLIST_JAIL: str = "blocklist-import"
|
|
|
|
#: Maximum number of sample entries returned by the preview endpoint.
|
|
_PREVIEW_LINES: int = 20
|
|
|
|
#: Maximum bytes to download for a preview (first 64 KB).
|
|
_PREVIEW_MAX_BYTES: int = 65536
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Source CRUD helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _row_to_source(row: dict[str, object]) -> BlocklistSource:
|
|
"""Convert a repository row dict to a :class:`BlocklistSource`.
|
|
|
|
Args:
|
|
row: Dict with keys matching the ``blocklist_sources`` columns.
|
|
|
|
Returns:
|
|
A validated :class:`~app.models.blocklist.BlocklistSource` instance.
|
|
"""
|
|
return BlocklistSource.model_validate(row)
|
|
|
|
|
|
async def list_sources(db: aiosqlite.Connection) -> list[BlocklistSource]:
|
|
"""Return all configured blocklist sources.
|
|
|
|
Args:
|
|
db: Active application database connection.
|
|
|
|
Returns:
|
|
List of :class:`~app.models.blocklist.BlocklistSource` instances.
|
|
"""
|
|
rows = await blocklist_repo.list_sources(db)
|
|
return [_row_to_source(r) for r in rows]
|
|
|
|
|
|
async def get_source(
|
|
db: aiosqlite.Connection,
|
|
source_id: int,
|
|
) -> BlocklistSource | None:
|
|
"""Return a single blocklist source, or ``None`` if not found.
|
|
|
|
Args:
|
|
db: Active application database connection.
|
|
source_id: Primary key of the desired source.
|
|
|
|
Returns:
|
|
:class:`~app.models.blocklist.BlocklistSource` or ``None``.
|
|
"""
|
|
row = await blocklist_repo.get_source(db, source_id)
|
|
return _row_to_source(row) if row is not None else None
|
|
|
|
|
|
async def create_source(
|
|
db: aiosqlite.Connection,
|
|
name: str,
|
|
url: str,
|
|
*,
|
|
enabled: bool = True,
|
|
) -> BlocklistSource:
|
|
"""Create a new blocklist source and return the persisted record.
|
|
|
|
Validates that the URL uses http/https and resolves to a public IP address
|
|
at source creation time. The application's HTTP connector performs additional
|
|
runtime validation at connection time to prevent DNS-rebinding attacks.
|
|
|
|
Args:
|
|
db: Active application database connection.
|
|
name: Human-readable display name.
|
|
url: URL of the blocklist text file (must be http/https and resolve to public IP).
|
|
enabled: Whether the source is active. Defaults to ``True``.
|
|
|
|
Returns:
|
|
The newly created :class:`~app.models.blocklist.BlocklistSource`.
|
|
|
|
Raises:
|
|
ValueError: If the URL fails SSRF validation.
|
|
"""
|
|
from app.utils.ip_utils import validate_blocklist_url
|
|
|
|
await validate_blocklist_url(url)
|
|
|
|
new_id = await blocklist_repo.create_source(db, name, url, enabled=enabled)
|
|
source = await get_source(db, new_id)
|
|
assert source is not None # noqa: S101
|
|
log.info("blocklist_source_created", id=new_id, name=name, url=url)
|
|
return source
|
|
|
|
|
|
async def update_source(
|
|
db: aiosqlite.Connection,
|
|
source_id: int,
|
|
*,
|
|
name: str | None = None,
|
|
url: str | None = None,
|
|
enabled: bool | None = None,
|
|
) -> BlocklistSource | None:
|
|
"""Update fields on a blocklist source.
|
|
|
|
If url is provided, validates that it uses http/https and resolves to a
|
|
public IP at update time. The application's HTTP connector performs additional
|
|
runtime validation at connection time to prevent DNS-rebinding attacks.
|
|
|
|
Args:
|
|
db: Active application database connection.
|
|
source_id: Primary key of the source to modify.
|
|
name: New display name, or ``None`` to leave unchanged.
|
|
url: New URL, or ``None`` to leave unchanged (validated if provided).
|
|
enabled: New enabled state, or ``None`` to leave unchanged.
|
|
|
|
Returns:
|
|
Updated :class:`~app.models.blocklist.BlocklistSource`, or ``None``
|
|
if the source does not exist.
|
|
|
|
Raises:
|
|
ValueError: If the URL fails SSRF validation.
|
|
"""
|
|
if url is not None:
|
|
from app.utils.ip_utils import validate_blocklist_url
|
|
|
|
await validate_blocklist_url(url)
|
|
|
|
updated = await blocklist_repo.update_source(
|
|
db, source_id, name=name, url=url, enabled=enabled
|
|
)
|
|
if not updated:
|
|
return None
|
|
source = await get_source(db, source_id)
|
|
log.info("blocklist_source_updated", id=source_id)
|
|
return source
|
|
|
|
|
|
async def delete_source(db: aiosqlite.Connection, source_id: int) -> bool:
|
|
"""Delete a blocklist source.
|
|
|
|
Args:
|
|
db: Active application database connection.
|
|
source_id: Primary key of the source to delete.
|
|
|
|
Returns:
|
|
``True`` if the source was found and deleted, ``False`` otherwise.
|
|
"""
|
|
deleted = await blocklist_repo.delete_source(db, source_id)
|
|
if deleted:
|
|
log.info("blocklist_source_deleted", id=source_id)
|
|
return deleted
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Preview
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def preview_source(
|
|
url: str,
|
|
http_session: aiohttp.ClientSession,
|
|
*,
|
|
sample_lines: int = _PREVIEW_LINES,
|
|
) -> PreviewResponse:
|
|
"""Download the beginning of a blocklist URL and return a preview.
|
|
|
|
Args:
|
|
url: URL to download.
|
|
http_session: Shared :class:`aiohttp.ClientSession`.
|
|
sample_lines: Maximum number of lines to include in the preview.
|
|
|
|
Returns:
|
|
:class:`~app.models.blocklist.PreviewResponse` with a sample of
|
|
valid IP entries and validation statistics.
|
|
|
|
Raises:
|
|
ValueError: If the URL cannot be reached or returns a non-200 status.
|
|
"""
|
|
downloader = BlocklistDownloader(http_session)
|
|
parser = BlocklistParser()
|
|
|
|
try:
|
|
status, raw = await downloader.download(
|
|
url,
|
|
_aiohttp_timeout(10),
|
|
)
|
|
if status != 200:
|
|
raise ValueError(f"HTTP {status} from {url}")
|
|
except (TimeoutError, aiohttp.ClientError, ValueError) as exc:
|
|
log.warning("blocklist_preview_failed", url=url, error=type(exc).__name__)
|
|
raise ValueError(str(exc)) from exc
|
|
|
|
entries, stats = parser.parse_with_stats(raw, sample_lines=sample_lines)
|
|
return PreviewResponse(
|
|
entries=entries,
|
|
total_lines=stats["total_lines"],
|
|
valid_count=stats["valid_count"],
|
|
skipped_count=stats["skipped_count"],
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Import
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def import_source(
|
|
source: BlocklistSource,
|
|
http_session: aiohttp.ClientSession,
|
|
socket_path: str,
|
|
db: aiosqlite.Connection,
|
|
*,
|
|
ban_ip: Callable[[str, str, str], Awaitable[None]],
|
|
geo_is_cached: Callable[[str], bool] | None = None,
|
|
geo_cache: GeoCache | None = None,
|
|
) -> ImportSourceResult:
|
|
"""Download and apply bans from a single blocklist source.
|
|
|
|
The function downloads the URL, validates each line as an IP address,
|
|
and bans valid IPv4/IPv6 addresses via fail2ban in
|
|
:data:`BLOCKLIST_JAIL`. CIDR ranges are counted as skipped since
|
|
fail2ban requires individual addresses. Any error encountered during
|
|
download is recorded and the result is returned without raising.
|
|
|
|
After a successful import the geo cache is pre-warmed by batch-resolving
|
|
all newly banned IPs. This ensures the dashboard and map show country
|
|
data immediately after import rather than facing cold-cache lookups.
|
|
|
|
Args:
|
|
source: The :class:`~app.models.blocklist.BlocklistSource` to import.
|
|
http_session: Shared :class:`aiohttp.ClientSession`.
|
|
socket_path: Path to the fail2ban Unix socket.
|
|
db: Application database for logging.
|
|
|
|
Returns:
|
|
:class:`~app.models.blocklist.ImportSourceResult` with counters.
|
|
"""
|
|
workflow = BlocklistImportWorkflow(http_session, ban_ip, _log_result)
|
|
return await workflow.import_source(
|
|
source,
|
|
socket_path,
|
|
db,
|
|
geo_is_cached=geo_is_cached,
|
|
geo_cache=geo_cache,
|
|
)
|
|
|
|
|
|
async def import_all(
|
|
db: aiosqlite.Connection,
|
|
http_session: aiohttp.ClientSession,
|
|
socket_path: str,
|
|
*,
|
|
ban_ip: Callable[[str, str, str], Awaitable[None]],
|
|
geo_is_cached: Callable[[str], bool] | None = None,
|
|
geo_cache: GeoCache | None = None,
|
|
) -> ImportRunResult:
|
|
"""Import all enabled blocklist sources.
|
|
|
|
Iterates over every source with ``enabled = True``, calls
|
|
:func:`import_source` for each, and aggregates the results.
|
|
|
|
Args:
|
|
db: Application database connection.
|
|
http_session: Shared :class:`aiohttp.ClientSession`.
|
|
socket_path: fail2ban socket path.
|
|
|
|
Returns:
|
|
:class:`~app.models.blocklist.ImportRunResult` with aggregated
|
|
counters and per-source results.
|
|
"""
|
|
sources = await blocklist_repo.list_enabled_sources(db)
|
|
results: list[ImportSourceResult] = []
|
|
total_imported = 0
|
|
total_skipped = 0
|
|
errors_count = 0
|
|
|
|
for row in sources:
|
|
source = _row_to_source(row)
|
|
result = await import_source(
|
|
source,
|
|
http_session,
|
|
socket_path,
|
|
db,
|
|
geo_is_cached=geo_is_cached,
|
|
geo_cache=geo_cache,
|
|
ban_ip=ban_ip,
|
|
)
|
|
results.append(result)
|
|
total_imported += result.ips_imported
|
|
total_skipped += result.ips_skipped
|
|
if result.error is not None:
|
|
errors_count += 1
|
|
|
|
log.info(
|
|
"blocklist_import_all_complete",
|
|
sources=len(sources),
|
|
total_imported=total_imported,
|
|
total_skipped=total_skipped,
|
|
errors=errors_count,
|
|
)
|
|
return ImportRunResult(
|
|
results=results,
|
|
total_imported=total_imported,
|
|
total_skipped=total_skipped,
|
|
errors_count=errors_count,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Schedule
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_DEFAULT_SCHEDULE = ScheduleConfig()
|
|
|
|
#: Stable APScheduler job id for the blocklist import job.
|
|
JOB_ID: str = "blocklist_import"
|
|
|
|
|
|
def _get_job_next_run_at(scheduler: AsyncIOScheduler) -> str | None:
|
|
"""Return the next scheduled run time as an ISO 8601 string."""
|
|
job = scheduler.get_job(JOB_ID)
|
|
if job is None or job.next_run_time is None:
|
|
return None
|
|
return job.next_run_time.isoformat()
|
|
|
|
|
|
def schedule_blocklist_job(
|
|
scheduler: AsyncIOScheduler,
|
|
settings: Settings,
|
|
http_session: aiohttp.ClientSession,
|
|
config: ScheduleConfig,
|
|
run_import_callback: Callable[[Settings, aiohttp.ClientSession], Awaitable[None]],
|
|
) -> None:
|
|
"""Register or replace the scheduled blocklist import job."""
|
|
|
|
if scheduler.get_job(JOB_ID):
|
|
scheduler.remove_job(JOB_ID)
|
|
|
|
kwargs: dict[str, object] = {
|
|
"settings": settings,
|
|
"http_session": http_session,
|
|
}
|
|
|
|
if config.frequency == ScheduleFrequency.hourly:
|
|
trigger_type = "interval"
|
|
trigger_kwargs = {"hours": config.interval_hours}
|
|
elif config.frequency == ScheduleFrequency.weekly:
|
|
trigger_type = "cron"
|
|
trigger_kwargs = {
|
|
"day_of_week": config.day_of_week,
|
|
"hour": config.hour,
|
|
"minute": config.minute,
|
|
}
|
|
else:
|
|
trigger_type = "cron"
|
|
trigger_kwargs = {
|
|
"hour": config.hour,
|
|
"minute": config.minute,
|
|
}
|
|
|
|
scheduler.add_job(
|
|
run_import_callback,
|
|
trigger=trigger_type,
|
|
id=JOB_ID,
|
|
kwargs=kwargs,
|
|
**trigger_kwargs,
|
|
)
|
|
log.info(
|
|
"blocklist_import_scheduled",
|
|
frequency=config.frequency,
|
|
trigger=trigger_type,
|
|
trigger_kwargs=trigger_kwargs,
|
|
)
|
|
|
|
|
|
async def get_schedule(db: aiosqlite.Connection) -> ScheduleConfig:
|
|
"""Read the import schedule config from the settings table.
|
|
|
|
Returns the default config (daily at 03:00 UTC) if no schedule has been
|
|
saved yet.
|
|
|
|
Args:
|
|
db: Active application database connection.
|
|
|
|
Returns:
|
|
The stored (or default) :class:`~app.models.blocklist.ScheduleConfig`.
|
|
"""
|
|
raw = await settings_repo.get_setting(db, _SCHEDULE_SETTINGS_KEY)
|
|
if raw is None:
|
|
return _DEFAULT_SCHEDULE
|
|
try:
|
|
data = json.loads(raw)
|
|
return ScheduleConfig.model_validate(data)
|
|
except (json.JSONDecodeError, ValueError) as exc:
|
|
log.warning("blocklist_schedule_invalid", raw=raw, error=type(exc).__name__)
|
|
return _DEFAULT_SCHEDULE
|
|
|
|
|
|
async def set_schedule(
|
|
db: aiosqlite.Connection,
|
|
config: ScheduleConfig,
|
|
) -> ScheduleConfig:
|
|
"""Persist a new schedule configuration.
|
|
|
|
Args:
|
|
db: Active application database connection.
|
|
config: The :class:`~app.models.blocklist.ScheduleConfig` to store.
|
|
|
|
Returns:
|
|
The saved configuration (same object after validation).
|
|
"""
|
|
await settings_repo.set_setting(
|
|
db, _SCHEDULE_SETTINGS_KEY, config.model_dump_json()
|
|
)
|
|
log.info("blocklist_schedule_updated", frequency=config.frequency, hour=config.hour)
|
|
return config
|
|
|
|
|
|
async def get_schedule_info(
|
|
db: aiosqlite.Connection,
|
|
next_run_at: str | None,
|
|
) -> ScheduleInfo:
|
|
"""Return the schedule config together with last-run metadata.
|
|
|
|
Args:
|
|
db: Active application database connection.
|
|
next_run_at: ISO 8601 string of the next scheduled run, or ``None``
|
|
if not yet scheduled (provided by the caller from APScheduler).
|
|
|
|
Returns:
|
|
:class:`~app.models.blocklist.ScheduleInfo` combining config and
|
|
runtime metadata.
|
|
"""
|
|
config = await get_schedule(db)
|
|
last_log = await import_log_repo.get_last_log(db)
|
|
last_run_at = last_log["timestamp"] if last_log else None
|
|
last_run_errors: bool | None = (last_log["errors"] is not None) if last_log else None
|
|
return ScheduleInfo(
|
|
config=config,
|
|
next_run_at=next_run_at,
|
|
last_run_at=last_run_at,
|
|
last_run_errors=last_run_errors,
|
|
)
|
|
|
|
|
|
async def get_schedule_info_with_runtime(
|
|
db: aiosqlite.Connection,
|
|
scheduler: AsyncIOScheduler,
|
|
) -> ScheduleInfo:
|
|
"""Return schedule info enriched with runtime scheduler metadata."""
|
|
next_run_at = _get_job_next_run_at(scheduler)
|
|
return await get_schedule_info(db, next_run_at)
|
|
|
|
|
|
async def update_schedule(
|
|
db: aiosqlite.Connection,
|
|
scheduler: AsyncIOScheduler,
|
|
http_session: aiohttp.ClientSession,
|
|
settings: Settings,
|
|
config: ScheduleConfig,
|
|
run_import_callback: Callable[[Settings, aiohttp.ClientSession], Awaitable[None]],
|
|
) -> ScheduleInfo:
|
|
"""Persist a new schedule config and re-register the scheduled job."""
|
|
await set_schedule(db, config)
|
|
schedule_blocklist_job(
|
|
scheduler,
|
|
settings,
|
|
http_session,
|
|
config,
|
|
run_import_callback,
|
|
)
|
|
return await get_schedule_info(db, _get_job_next_run_at(scheduler))
|
|
|
|
|
|
async def list_import_logs(
|
|
db: aiosqlite.Connection,
|
|
*,
|
|
source_id: int | None = None,
|
|
page: int = 1,
|
|
page_size: int = 50,
|
|
) -> ImportLogListResponse:
|
|
"""Return a paginated list of import log entries.
|
|
|
|
Args:
|
|
db: Active application database connection.
|
|
source_id: Optional filter to only return logs for a specific source.
|
|
page: 1-based page number.
|
|
page_size: Items per page.
|
|
|
|
Returns:
|
|
:class:`~app.models.blocklist.ImportLogListResponse`.
|
|
"""
|
|
items, total = await import_log_repo.list_logs(
|
|
db, source_id=source_id, page=page, page_size=page_size
|
|
)
|
|
|
|
return ImportLogListResponse(
|
|
items=[ImportLogEntry.model_validate(i) for i in items],
|
|
pagination=create_pagination_metadata(total, page, page_size),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _aiohttp_timeout(seconds: float) -> aiohttp.ClientTimeout:
|
|
"""Return an :class:`aiohttp.ClientTimeout` with the given total timeout.
|
|
|
|
Args:
|
|
seconds: Total timeout in seconds.
|
|
|
|
Returns:
|
|
An :class:`aiohttp.ClientTimeout` instance.
|
|
"""
|
|
import aiohttp # noqa: PLC0415
|
|
|
|
return aiohttp.ClientTimeout(total=seconds)
|
|
|
|
|
|
async def _log_result(
|
|
db: aiosqlite.Connection,
|
|
source: BlocklistSource,
|
|
ips_imported: int,
|
|
ips_skipped: int,
|
|
error: str | None,
|
|
) -> None:
|
|
"""Write an import log entry for a completed source import.
|
|
|
|
Args:
|
|
db: Application database connection.
|
|
source: The source that was imported.
|
|
ips_imported: Count of successfully banned IPs.
|
|
ips_skipped: Count of skipped/invalid entries.
|
|
error: Error string, or ``None`` on success.
|
|
"""
|
|
await import_log_repo.add_log(
|
|
db,
|
|
source_id=source.id,
|
|
source_url=source.url,
|
|
ips_imported=ips_imported,
|
|
ips_skipped=ips_skipped,
|
|
errors=error,
|
|
)
|