- Change BlocklistSourceCreate.url from str to AnyHttpUrl (Pydantic type) - Rejects non-http schemes (file://, ftp://, etc.) at model boundary - Add is_private_ip() utility to detect RFC 1918 private ranges: - 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16 (RFC 1918) - 127.0.0.0/8, ::1/128 (loopback) - 169.254.0.0/16, fe80::/10 (link-local) - IPv6 site-local, multicast, and reserved ranges - Add async validate_blocklist_url() function: - Resolves hostname via DNS using loop.run_in_executor() - Rejects if hostname resolves to private/reserved IP - Raises ValueError on validation failure - Integrate validation into service layer: - create_source() calls validate_blocklist_url() before persist - update_source() conditionally validates if url provided - Both raise ValueError on failure - Update router endpoints with error handling: - create_blocklist() and update_blocklist() catch ValueError - Return HTTP 400 Bad Request with descriptive error message - Add comprehensive test coverage (9 new SSRF tests): - file://, ftp://, localhost, 127.0.0.1, 192.168.x.x - 10.x.x.x, 172.16.x.x, 169.254.x.x (link-local) - Valid public URLs (passes validation) - All 36 service tests passing - Update documentation: - Features.md: Document URL validation constraints - Backend-Development.md: Add SSRF prevention pattern section Fixes SSRF vulnerability where authenticated users could supply file://, ftp://, or private IP URLs and the backend would fetch them. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
190 lines
5.4 KiB
Python
190 lines
5.4 KiB
Python
"""Blocklist source and import log Pydantic models.
|
|
|
|
Data shapes for blocklist source management, import operations, scheduling,
|
|
and import log retrieval.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from enum import StrEnum
|
|
|
|
from pydantic import AnyHttpUrl, BaseModel, ConfigDict, Field
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Blocklist source
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class BlocklistSource(BaseModel):
|
|
"""Domain model for a blocklist source definition."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
id: int
|
|
name: str
|
|
url: str
|
|
enabled: bool
|
|
created_at: str
|
|
updated_at: str
|
|
|
|
|
|
class BlocklistSourceCreate(BaseModel):
|
|
"""Payload for ``POST /api/blocklists``.
|
|
|
|
URL must use http/https scheme. The hostname must resolve to a public IP
|
|
(not private, loopback, link-local, or reserved). Validation happens
|
|
asynchronously in the service layer.
|
|
"""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
name: str = Field(..., min_length=1, max_length=100, description="Human-readable source name.")
|
|
url: AnyHttpUrl = Field(..., description="URL of the blocklist file (http/https only).")
|
|
enabled: bool = Field(default=True)
|
|
|
|
|
|
class BlocklistSourceUpdate(BaseModel):
|
|
"""Payload for ``PUT /api/blocklists/{id}``. All fields are optional.
|
|
|
|
If URL is provided, it must use http/https scheme.
|
|
"""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
name: str | None = Field(default=None, min_length=1, max_length=100)
|
|
url: AnyHttpUrl | None = Field(default=None)
|
|
enabled: bool | None = Field(default=None)
|
|
|
|
|
|
class BlocklistListResponse(BaseModel):
|
|
"""Response for ``GET /api/blocklists``."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
sources: list[BlocklistSource] = Field(default_factory=list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Import log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class ImportLogEntry(BaseModel):
|
|
"""A single blocklist import run record."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
id: int
|
|
source_id: int | None
|
|
source_url: str
|
|
timestamp: str
|
|
ips_imported: int
|
|
ips_skipped: int
|
|
errors: str | None
|
|
|
|
|
|
class ImportLogListResponse(BaseModel):
|
|
"""Response for ``GET /api/blocklists/log``."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
items: list[ImportLogEntry] = Field(default_factory=list)
|
|
total: int = Field(..., ge=0)
|
|
page: int = Field(default=1, ge=1)
|
|
page_size: int = Field(default=50, ge=1)
|
|
total_pages: int = Field(default=1, ge=1)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Schedule
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class ScheduleFrequency(StrEnum):
|
|
"""Available import schedule frequency presets."""
|
|
|
|
hourly = "hourly"
|
|
daily = "daily"
|
|
weekly = "weekly"
|
|
|
|
|
|
class ScheduleConfig(BaseModel):
|
|
"""Import schedule configuration.
|
|
|
|
The interpretation of fields depends on *frequency*:
|
|
|
|
- ``hourly``: ``interval_hours`` controls how often (every N hours).
|
|
- ``daily``: ``hour`` and ``minute`` specify the daily run time (UTC).
|
|
- ``weekly``: additionally uses ``day_of_week`` (0=Monday … 6=Sunday).
|
|
"""
|
|
|
|
# No strict=True here: FastAPI and json.loads() both supply enum values as
|
|
# plain strings; strict mode would reject string→enum coercion.
|
|
|
|
frequency: ScheduleFrequency = ScheduleFrequency.daily
|
|
interval_hours: int = Field(default=24, ge=1, le=168, description="Used when frequency=hourly")
|
|
hour: int = Field(default=3, ge=0, le=23, description="UTC hour for daily/weekly runs")
|
|
minute: int = Field(default=0, ge=0, le=59, description="Minute for daily/weekly runs")
|
|
day_of_week: int = Field(
|
|
default=0,
|
|
ge=0,
|
|
le=6,
|
|
description="Day of week for weekly runs (0=Monday … 6=Sunday)",
|
|
)
|
|
|
|
|
|
class ScheduleInfo(BaseModel):
|
|
"""Current schedule configuration together with runtime metadata."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
config: ScheduleConfig
|
|
next_run_at: str | None
|
|
last_run_at: str | None
|
|
last_run_errors: bool | None = None
|
|
"""``True`` if the most recent import had errors, ``False`` if clean, ``None`` if never run."""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Import results
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class ImportSourceResult(BaseModel):
|
|
"""Result of importing a single blocklist source."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
source_id: int | None
|
|
source_url: str
|
|
ips_imported: int
|
|
ips_skipped: int
|
|
error: str | None
|
|
|
|
|
|
class ImportRunResult(BaseModel):
|
|
"""Aggregated result from a full import run across all enabled sources."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
results: list[ImportSourceResult] = Field(default_factory=list)
|
|
total_imported: int
|
|
total_skipped: int
|
|
errors_count: int
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Preview
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class PreviewResponse(BaseModel):
|
|
"""Response for ``GET /api/blocklists/{id}/preview``."""
|
|
|
|
model_config = ConfigDict(strict=True)
|
|
|
|
entries: list[str] = Field(default_factory=list, description="Sample of valid IP entries")
|
|
total_lines: int
|
|
valid_count: int
|
|
skipped_count: int
|