Task 8: Standardize modeling style (TypedDict vs Pydantic)

Convert inconsistent modeling style to standardized Pydantic models for all
external-facing data structures while maintaining TypedDict compatibility where
appropriate for internal layer-private structures.

Changes:
- Converted IpLookupResult TypedDict to use IpLookupResponse Pydantic model
  in jail_service.lookup_ip() for consistency with routers
- Added GeoCacheEntry Pydantic model for geo cache repository rows
- Converted GeoCacheRow TypedDict to use GeoCacheEntry alias
- Converted ImportLogRow TypedDict to use ImportLogEntry alias
- Updated routers and services to work with Pydantic models
- Updated all tests to use Pydantic model field access (attributes)
  instead of dict subscripting

Documentation:
- Added 'Model Type Usage by Layer' section to Backend-Development.md
- Defines when TypedDict is allowed (internal structures) vs Pydantic
  (external-facing, cross-boundary data)
- Provides clear guidance on modeling conventions per layer

Benefits:
- Consistent validation and serialization behavior
- Better IDE support and type checking
- Clearer separation of concerns by layer
- Reduced maintenance cost from mixed validation approaches

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-04-28 07:53:30 +02:00
parent 3888c5eb3f
commit 52a4d04d92
10 changed files with 127 additions and 85 deletions

View File

@@ -42,6 +42,33 @@ class GeoDetail(BaseModel):
)
class GeoCacheEntry(BaseModel):
"""A single cached geolocation entry for an IP address.
Represents a row from the ``geo_cache`` table in the application database.
"""
model_config = ConfigDict(strict=True)
ip: str = Field(..., description="IP address (IPv4 or IPv6).")
country_code: str | None = Field(
default=None,
description="ISO 3166-1 alpha-2 country code.",
)
country_name: str | None = Field(
default=None,
description="Human-readable country name.",
)
asn: str | None = Field(
default=None,
description="Autonomous System Number (e.g. ``'AS3320'``).",
)
org: str | None = Field(
default=None,
description="Organisation associated with the ASN.",
)
class GeoCacheStatsResponse(BaseModel):
"""Response for ``GET /api/geo/stats``.

View File

@@ -9,22 +9,18 @@ connection lifetimes.
from __future__ import annotations
from typing import TYPE_CHECKING, TypedDict
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Sequence
import aiosqlite
from app.models.geo import GeoCacheEntry
class GeoCacheRow(TypedDict):
"""A single row from the ``geo_cache`` table."""
ip: str
country_code: str | None
country_name: str | None
asn: str | None
org: str | None
# Alias for backward compatibility with protocols
GeoCacheRow = GeoCacheEntry
async def load_all(db: aiosqlite.Connection) -> list[GeoCacheRow]:

View File

@@ -8,26 +8,18 @@ table. All methods are plain async functions that accept a
from __future__ import annotations
import math
from typing import TYPE_CHECKING, TypedDict, cast
from typing import TYPE_CHECKING, cast
if TYPE_CHECKING:
from collections.abc import Mapping
import aiosqlite
class ImportLogRow(TypedDict):
"""Row shape returned by queries on the import_log table."""
id: int
source_id: int | None
source_url: str
timestamp: str
ips_imported: int
ips_skipped: int
errors: str | None
from app.models.blocklist import ImportLogEntry
# Alias for backward compatibility with protocols
ImportLogRow = ImportLogEntry
async def add_log(
db: aiosqlite.Connection,
*,
@@ -158,13 +150,13 @@ def compute_total_pages(total: int, page_size: int) -> int:
def _row_to_dict(row: object) -> ImportLogRow:
"""Convert an aiosqlite row to a plain Python dict.
"""Convert an aiosqlite row to an ImportLogEntry Pydantic model.
Args:
row: An :class:`aiosqlite.Row` or similar mapping returned by a cursor.
Returns:
Dict mapping column names to Python values.
ImportLogEntry Pydantic model instance.
"""
mapping = cast("Mapping[str, object]", row)
return cast("ImportLogRow", dict(mapping))
return ImportLogEntry(**mapping)

View File

@@ -8,10 +8,7 @@ Provides the IP enrichment endpoints:
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
if TYPE_CHECKING:
from app.services.jail_service import IpLookupResult
from typing import Annotated
from fastapi import APIRouter, Path
@@ -57,14 +54,12 @@ async def lookup_ip(
HTTPException: 400 when *ip* is not a valid IP address.
HTTPException: 502 when fail2ban is unreachable.
"""
result: IpLookupResult = await jail_service.lookup_ip(
return await jail_service.lookup_ip(
socket_path,
ip,
http_session=http_session,
)
return IpLookupResponse(**result)
# ---------------------------------------------------------------------------
# POST /api/geo/re-resolve

View File

@@ -18,14 +18,14 @@ from __future__ import annotations
import asyncio
import contextlib
import ipaddress
from typing import TYPE_CHECKING, TypedDict, cast
from typing import TYPE_CHECKING, cast
import structlog
from app.exceptions import JailNotFoundError, JailOperationError
from app.models.ban import ActiveBan, JailBannedIpsResponse
from app.models.config import BantimeEscalation
from app.models.geo import GeoDetail
from app.models.geo import GeoDetail, IpLookupResponse
from app.models.jail import (
Jail,
JailDetailResponse,
@@ -63,18 +63,6 @@ log: structlog.stdlib.BoundLogger = structlog.get_logger()
__all__ = ["reload_all"]
class IpLookupResult(TypedDict):
"""Result returned by :func:`lookup_ip`.
This is intentionally a :class:`TypedDict` to provide precise typing for
callers (e.g. routers) while keeping the implementation flexible.
"""
ip: str
currently_banned_in: list[str]
geo: GeoDetail | None
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
@@ -998,7 +986,7 @@ async def lookup_ip(
ip: str,
geo_enricher: GeoEnricher | None = None,
http_session: aiohttp.ClientSession | None = None,
) -> IpLookupResult:
) -> IpLookupResponse:
"""Return ban status and history for a single IP address.
Checks every running jail for whether the IP is currently banned.
@@ -1075,11 +1063,11 @@ async def lookup_ip(
log.info("ip_lookup_completed", ip=ip, banned_in_jails=currently_banned_in)
return {
"ip": ip,
"currently_banned_in": currently_banned_in,
"geo": geo,
}
return IpLookupResponse(
ip=ip,
currently_banned_in=currently_banned_in,
geo=geo,
)
async def unban_all_ips(socket_path: str) -> int: