Move history geo enrichment into history service
This commit is contained in:
@@ -15,10 +15,7 @@ Routes
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import aiohttp
|
||||
from typing import Literal
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Query, Request
|
||||
|
||||
@@ -30,7 +27,7 @@ from app.dependencies import (
|
||||
)
|
||||
from app.models.ban import BanOrigin, TimeRange
|
||||
from app.models.history import HistoryListResponse, IpDetailResponse
|
||||
from app.services import geo_service, history_service
|
||||
from app.services import history_service
|
||||
|
||||
router: APIRouter = APIRouter(prefix="/api/history", tags=["History"])
|
||||
|
||||
@@ -95,9 +92,6 @@ async def get_history(
|
||||
and the total matching count.
|
||||
"""
|
||||
|
||||
async def _enricher(addr: str) -> geo_service.GeoInfo | None:
|
||||
return await geo_service.lookup(addr, http_session)
|
||||
|
||||
return await history_service.list_history(
|
||||
socket_path,
|
||||
range_=range,
|
||||
@@ -107,7 +101,7 @@ async def get_history(
|
||||
source=source,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
geo_enricher=_enricher,
|
||||
http_session=http_session,
|
||||
db=db,
|
||||
)
|
||||
|
||||
@@ -133,9 +127,6 @@ async def get_history_archive(
|
||||
page_size: int = Query(default=_DEFAULT_PAGE_SIZE, ge=1, le=500, description="Items per page (max 500)."),
|
||||
) -> HistoryListResponse:
|
||||
|
||||
async def _enricher(addr: str) -> geo_service.GeoInfo | None:
|
||||
return await geo_service.lookup(addr, http_session)
|
||||
|
||||
return await history_service.list_history(
|
||||
socket_path,
|
||||
range_=range,
|
||||
@@ -144,7 +135,7 @@ async def get_history_archive(
|
||||
source="archive",
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
geo_enricher=_enricher,
|
||||
http_session=http_session,
|
||||
db=db,
|
||||
)
|
||||
|
||||
@@ -180,13 +171,10 @@ async def get_ip_history(
|
||||
HTTPException: 404 if the IP has no history in the database.
|
||||
"""
|
||||
|
||||
async def _enricher(addr: str) -> geo_service.GeoInfo | None:
|
||||
return await geo_service.lookup(addr, http_session)
|
||||
|
||||
detail: IpDetailResponse | None = await history_service.get_ip_detail(
|
||||
socket_path,
|
||||
ip,
|
||||
geo_enricher=_enricher,
|
||||
http_session=http_session,
|
||||
)
|
||||
|
||||
if detail is None:
|
||||
|
||||
@@ -15,12 +15,14 @@ from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
|
||||
from app.models.ban import TIME_RANGE_SECONDS, BanOrigin, TimeRange
|
||||
from app.services import geo_service
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import aiosqlite
|
||||
import aiohttp
|
||||
|
||||
from app.models.geo import GeoEnricher
|
||||
|
||||
from app.models.ban import TIME_RANGE_SECONDS, BanOrigin, TimeRange
|
||||
from app.models.geo import GeoEnricher, GeoInfo
|
||||
from app.models.history import (
|
||||
HistoryBanItem,
|
||||
HistoryListResponse,
|
||||
@@ -61,6 +63,27 @@ def _since_unix(range_: TimeRange) -> int:
|
||||
return int(datetime.now(tz=UTC).timestamp()) - seconds
|
||||
|
||||
|
||||
async def _resolve_geo_info(
|
||||
ip: str,
|
||||
*,
|
||||
http_session: "aiohttp.ClientSession" | None = None,
|
||||
geo_enricher: GeoEnricher | None = None,
|
||||
) -> GeoInfo | None:
|
||||
"""Resolve geolocation information for a single IP address.
|
||||
|
||||
The explicit *geo_enricher* has priority over *http_session*. When an
|
||||
HTTP session is provided, the service uses :func:`geo_service.lookup` as a
|
||||
default enrichment strategy.
|
||||
"""
|
||||
if geo_enricher is not None:
|
||||
return await geo_enricher(ip)
|
||||
|
||||
if http_session is not None:
|
||||
return await geo_service.lookup(ip, http_session)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
_HISTORY_SYNC_PAGE_SIZE: int = 500
|
||||
_HISTORY_SYNC_BACKFILL_WINDOW: int = 648000
|
||||
|
||||
@@ -141,6 +164,7 @@ async def list_history(
|
||||
source: str = "fail2ban",
|
||||
page: int = 1,
|
||||
page_size: int = _DEFAULT_PAGE_SIZE,
|
||||
http_session: "aiohttp.ClientSession" | None = None,
|
||||
geo_enricher: GeoEnricher | None = None,
|
||||
db: aiosqlite.Connection | None = None,
|
||||
) -> HistoryListResponse:
|
||||
@@ -158,6 +182,8 @@ async def list_history(
|
||||
(or a prefix — the query uses ``LIKE ip_filter%``).
|
||||
page: 1-based page number (default: ``1``).
|
||||
page_size: Maximum items per page, capped at ``_MAX_PAGE_SIZE``.
|
||||
http_session: Optional shared :class:`aiohttp.ClientSession` used for
|
||||
geo lookups when no explicit *geo_enricher* is provided.
|
||||
geo_enricher: Optional async callable ``(ip: str) -> GeoInfo | None``.
|
||||
|
||||
Returns:
|
||||
@@ -212,16 +238,19 @@ async def list_history(
|
||||
asn = None
|
||||
org = None
|
||||
|
||||
if geo_enricher is not None:
|
||||
try:
|
||||
geo = await geo_enricher(ip)
|
||||
if geo is not None:
|
||||
country_code = geo.country_code
|
||||
country_name = geo.country_name
|
||||
asn = geo.asn
|
||||
org = geo.org
|
||||
except Exception: # noqa: BLE001
|
||||
log.warning("history_service_geo_lookup_failed", ip=ip)
|
||||
try:
|
||||
geo = await _resolve_geo_info(
|
||||
ip,
|
||||
http_session=http_session,
|
||||
geo_enricher=geo_enricher,
|
||||
)
|
||||
if geo is not None:
|
||||
country_code = geo.country_code
|
||||
country_name = geo.country_name
|
||||
asn = geo.asn
|
||||
org = geo.org
|
||||
except Exception: # noqa: BLE001
|
||||
log.warning("history_service_geo_lookup_failed", ip=ip)
|
||||
|
||||
items.append(
|
||||
HistoryBanItem(
|
||||
@@ -260,16 +289,19 @@ async def list_history(
|
||||
asn: str | None = None
|
||||
org: str | None = None
|
||||
|
||||
if geo_enricher is not None:
|
||||
try:
|
||||
geo = await geo_enricher(ip)
|
||||
if geo is not None:
|
||||
country_code = geo.country_code
|
||||
country_name = geo.country_name
|
||||
asn = geo.asn
|
||||
org = geo.org
|
||||
except Exception: # noqa: BLE001
|
||||
log.warning("history_service_geo_lookup_failed", ip=ip)
|
||||
try:
|
||||
geo = await _resolve_geo_info(
|
||||
ip,
|
||||
http_session=http_session,
|
||||
geo_enricher=geo_enricher,
|
||||
)
|
||||
if geo is not None:
|
||||
country_code = geo.country_code
|
||||
country_name = geo.country_name
|
||||
asn = geo.asn
|
||||
org = geo.org
|
||||
except Exception: # noqa: BLE001
|
||||
log.warning("history_service_geo_lookup_failed", ip=ip)
|
||||
|
||||
items.append(
|
||||
HistoryBanItem(
|
||||
@@ -298,6 +330,7 @@ async def get_ip_detail(
|
||||
socket_path: str,
|
||||
ip: str,
|
||||
*,
|
||||
http_session: "aiohttp.ClientSession" | None = None,
|
||||
geo_enricher: GeoEnricher | None = None,
|
||||
) -> IpDetailResponse | None:
|
||||
"""Return the full historical record for a single IP address.
|
||||
@@ -309,6 +342,8 @@ async def get_ip_detail(
|
||||
Args:
|
||||
socket_path: Path to the fail2ban Unix domain socket.
|
||||
ip: The IP address to look up.
|
||||
http_session: Optional shared :class:`aiohttp.ClientSession` used for
|
||||
geo lookups when no explicit *geo_enricher* is provided.
|
||||
geo_enricher: Optional async callable ``(ip: str) -> GeoInfo | None``.
|
||||
|
||||
Returns:
|
||||
@@ -349,16 +384,19 @@ async def get_ip_detail(
|
||||
asn: str | None = None
|
||||
org: str | None = None
|
||||
|
||||
if geo_enricher is not None:
|
||||
try:
|
||||
geo = await geo_enricher(ip)
|
||||
if geo is not None:
|
||||
country_code = geo.country_code
|
||||
country_name = geo.country_name
|
||||
asn = geo.asn
|
||||
org = geo.org
|
||||
except Exception: # noqa: BLE001
|
||||
log.warning("history_service_geo_lookup_failed_detail", ip=ip)
|
||||
try:
|
||||
geo = await _resolve_geo_info(
|
||||
ip,
|
||||
http_session=http_session,
|
||||
geo_enricher=geo_enricher,
|
||||
)
|
||||
if geo is not None:
|
||||
country_code = geo.country_code
|
||||
country_name = geo.country_name
|
||||
asn = geo.asn
|
||||
org = geo.org
|
||||
except Exception: # noqa: BLE001
|
||||
log.warning("history_service_geo_lookup_failed_detail", ip=ip)
|
||||
|
||||
return IpDetailResponse(
|
||||
ip=ip,
|
||||
|
||||
@@ -300,6 +300,7 @@ class HistoryService(Protocol):
|
||||
source: str = "fail2ban",
|
||||
page: int = 1,
|
||||
page_size: int = 100,
|
||||
http_session: aiohttp.ClientSession | None = None,
|
||||
geo_enricher: GeoEnricher | None = None,
|
||||
db: aiosqlite.Connection | None = None,
|
||||
) -> HistoryListResponse:
|
||||
@@ -310,6 +311,7 @@ class HistoryService(Protocol):
|
||||
socket_path: str,
|
||||
ip: str,
|
||||
*,
|
||||
http_session: aiohttp.ClientSession | None = None,
|
||||
geo_enricher: GeoEnricher | None = None,
|
||||
) -> IpDetailResponse | None:
|
||||
...
|
||||
|
||||
@@ -235,6 +235,37 @@ class TestListHistory:
|
||||
assert len(recent.matches) == 1
|
||||
assert "Failed password" in recent.matches[0]
|
||||
|
||||
async def test_http_session_geo_lookup_is_used(
|
||||
self, f2b_db_path: str
|
||||
) -> None:
|
||||
"""A provided HTTP session is used for geo enrichment by the service."""
|
||||
fake_session = AsyncMock()
|
||||
|
||||
mock_geo = AsyncMock()
|
||||
mock_geo.country_code = "US"
|
||||
mock_geo.country_name = "United States"
|
||||
mock_geo.asn = "AS15169"
|
||||
mock_geo.org = "Google"
|
||||
|
||||
with patch(
|
||||
"app.services.history_service.get_fail2ban_db_path",
|
||||
new=AsyncMock(return_value=f2b_db_path),
|
||||
), patch(
|
||||
"app.services.history_service.geo_service.lookup",
|
||||
new=AsyncMock(return_value=mock_geo),
|
||||
) as mock_lookup:
|
||||
result = await history_service.list_history(
|
||||
"fake_socket",
|
||||
ip_filter="1.2.3.4",
|
||||
http_session=fake_session,
|
||||
)
|
||||
|
||||
assert mock_lookup.call_args.args == ("1.2.3.4", fake_session)
|
||||
assert result.items[0].country_code == "US"
|
||||
assert result.items[0].country_name == "United States"
|
||||
assert result.items[0].asn == "AS15169"
|
||||
assert result.items[0].org == "Google"
|
||||
|
||||
async def test_null_data_column_handled_gracefully(
|
||||
self, f2b_db_path: str
|
||||
) -> None:
|
||||
|
||||
Reference in New Issue
Block a user