Files
BanGUI/backend/app/services/protocols.py
Lukas 1a3401f418 T-10: Fix get_geo_batch_lookup for proper injection with GeoCache instance
Instead of returning a bound method (geo_cache.lookup_batch), now inject
the GeoCache instance directly into routers and services. This provides
proper runtime isolation since T-04 made GeoCache a proper object.

Changes:
- Remove get_geo_batch_lookup() dependency provider
- Add GeoCacheDep type alias for injecting GeoCache instances
- Update all routers (bans, blocklist, dashboard, jails) to use GeoCacheDep
- Update ban_service, blocklist_service, jail_service to accept GeoCache
- Update service protocols to match new signatures
- Update docstrings to reference GeoCache methods instead of module functions

All callers now call geo_cache.lookup_batch(...) directly instead of
geo_batch_lookup(...), providing real dependency injection with proper
testing isolation.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-25 18:53:47 +02:00

389 lines
9.8 KiB
Python

"""Service interface protocols for dependency injection.
These structural protocols define the public contract that routers and higher
layers depend on, without binding them to concrete module implementations.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Protocol, runtime_checkable
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable
import aiohttp
import aiosqlite
from app.models.auth import Session
from app.models.ban import BanOrigin, JailBannedIpsResponse, TimeRange
from app.models.blocklist import (
BlocklistSource,
ImportLogListResponse,
ImportRunResult,
ImportSourceResult,
PreviewResponse,
ScheduleConfig,
ScheduleInfo,
)
from app.models.config import (
AddLogPathRequest,
GlobalConfigResponse,
GlobalConfigUpdate,
JailConfigListResponse,
JailConfigResponse,
JailConfigUpdate,
LogPreviewRequest,
LogPreviewResponse,
MapColorThresholdsResponse,
MapColorThresholdsUpdate,
RegexTestResponse,
)
from app.models.geo import GeoEnricher, GeoInfo
from app.services.geo_cache import GeoCache
from app.models.history import HistoryListResponse, IpDetailResponse
from app.models.jail import JailDetailResponse, JailListResponse
from app.models.server import ServerSettingsResponse, ServerSettingsUpdate, ServerStatus
class AuthService(Protocol):
"""Protocol for authentication service operations."""
async def login(
self,
db: aiosqlite.Connection,
password: str,
session_duration_minutes: int,
session_secret: str,
session_repo: object | None = None,
) -> tuple[str, str]:
...
async def validate_session(
self,
db: aiosqlite.Connection,
token: str,
session_secret: str | None = None,
session_repo: object | None = None,
) -> Session:
...
async def logout(
self,
db: aiosqlite.Connection,
token: str,
session_secret: str | None = None,
session_repo: object | None = None,
) -> str | None:
...
class JailService(Protocol):
"""Protocol for jail management service operations."""
async def list_jails(self, socket_path: str) -> JailListResponse:
...
async def get_jail(self, socket_path: str, name: str) -> JailDetailResponse:
...
async def reload_all(self, socket_path: str) -> None:
...
async def start_jail(self, socket_path: str, name: str) -> None:
...
async def stop_jail(self, socket_path: str, name: str) -> None:
...
async def set_idle(self, socket_path: str, name: str, *, on: bool) -> None:
...
async def reload_jail(self, socket_path: str, name: str) -> None:
...
async def get_ignore_list(self, socket_path: str, name: str) -> list[str]:
...
async def add_ignore_ip(self, socket_path: str, name: str, ip: str) -> None:
...
async def del_ignore_ip(self, socket_path: str, name: str, ip: str) -> None:
...
async def set_ignore_self(self, socket_path: str, name: str, *, on: bool) -> None:
...
async def get_jail_banned_ips(
self,
socket_path: str,
jail_name: str,
page: int,
page_size: int,
search: str | None = None,
*,
geo_batch_lookup: object,
http_session: object,
app_db: aiosqlite.Connection,
) -> JailBannedIpsResponse:
...
async def lookup_ip(
self,
socket_path: str,
ip: str,
geo_enricher: object,
) -> object:
...
@runtime_checkable
class BlocklistService(Protocol):
async def list_sources(self, db: aiosqlite.Connection) -> list[BlocklistSource]:
...
async def get_source(
self,
db: aiosqlite.Connection,
source_id: int,
) -> BlocklistSource | None:
...
async def create_source(
self,
db: aiosqlite.Connection,
name: str,
url: str,
*,
enabled: bool = True,
) -> BlocklistSource:
...
async def update_source(
self,
db: aiosqlite.Connection,
source_id: int,
*,
name: str | None = None,
url: str | None = None,
enabled: bool | None = None,
) -> BlocklistSource | None:
...
async def delete_source(self, db: aiosqlite.Connection, source_id: int) -> bool:
...
async def preview_source(
self,
url: str,
http_session: aiohttp.ClientSession,
*,
sample_lines: int = ...,
) -> PreviewResponse:
...
async def import_source(
self,
source: BlocklistSource,
http_session: aiohttp.ClientSession,
socket_path: str,
db: aiosqlite.Connection,
*,
geo_is_cached: Callable[[str], bool] | None = None,
geo_cache: GeoCache | None = None,
ban_ip: Callable[[str, str, str], Awaitable[None]],
) -> ImportSourceResult:
...
async def import_all(
self,
db: aiosqlite.Connection,
http_session: aiohttp.ClientSession,
socket_path: str,
*,
ban_ip: Callable[[str, str, str], Awaitable[None]],
geo_is_cached: Callable[[str], bool] | None = None,
geo_cache: GeoCache | None = None,
) -> ImportRunResult:
...
async def get_schedule(self, db: aiosqlite.Connection) -> ScheduleConfig:
...
async def set_schedule(self, db: aiosqlite.Connection, update: ScheduleConfig) -> None:
...
async def get_schedule_info(
self,
db: aiosqlite.Connection,
next_run_at: str | None,
) -> ScheduleInfo:
...
async def list_import_logs(
self,
db: aiosqlite.Connection,
*,
source_id: int | None = None,
page: int = 1,
page_size: int = 50,
) -> ImportLogListResponse:
...
@runtime_checkable
class ConfigService(Protocol):
async def get_jail_config(self, socket_path: str, name: str) -> JailConfigResponse:
...
async def list_jail_configs(self, socket_path: str) -> JailConfigListResponse:
...
async def update_jail_config(
self,
socket_path: str,
name: str,
update: JailConfigUpdate,
) -> None:
...
async def get_global_config(self, socket_path: str) -> GlobalConfigResponse:
...
async def update_global_config(
self,
socket_path: str,
update: GlobalConfigUpdate,
) -> None:
...
def test_regex(self, request: object) -> RegexTestResponse:
...
async def add_log_path(
self,
socket_path: str,
jail: str,
req: AddLogPathRequest,
) -> None:
...
async def delete_log_path(self, socket_path: str, jail: str, log_path: str) -> None:
...
async def preview_log(
self,
req: LogPreviewRequest,
preview_fn: Callable[[LogPreviewRequest], Awaitable[LogPreviewResponse]] | None = None,
) -> LogPreviewResponse:
...
async def get_map_color_thresholds(self, db: aiosqlite.Connection) -> MapColorThresholdsResponse:
...
async def update_map_color_thresholds(
self,
db: aiosqlite.Connection,
update: MapColorThresholdsUpdate,
) -> None:
...
@runtime_checkable
class HistoryService(Protocol):
async def list_history(
self,
socket_path: str,
*,
range_: TimeRange | None = None,
jail: str | None = None,
ip_filter: str | None = None,
origin: BanOrigin | None = None,
source: str = "fail2ban",
page: int = 1,
page_size: int = 100,
http_session: aiohttp.ClientSession | None = None,
geo_enricher: GeoEnricher | None = None,
db: aiosqlite.Connection | None = None,
) -> HistoryListResponse:
...
async def get_ip_detail(
self,
socket_path: str,
ip: str,
*,
http_session: aiohttp.ClientSession | None = None,
geo_enricher: GeoEnricher | None = None,
) -> IpDetailResponse | None:
...
@runtime_checkable
class GeoService(Protocol):
def clear_cache(self) -> None:
...
def clear_neg_cache(self) -> None:
...
def is_cached(self, ip: str) -> bool:
...
def init_geoip(self, mmdb_path: str | None) -> None:
...
async def cache_stats(self, db: aiosqlite.Connection) -> dict[str, int]:
...
async def count_unresolved(self, db: aiosqlite.Connection) -> int:
...
async def get_unresolved_ips(self, db: aiosqlite.Connection) -> list[str]:
...
async def load_cache_from_db(self, db: aiosqlite.Connection) -> None:
...
async def lookup(
self,
ip: str,
http_session: aiohttp.ClientSession,
) -> GeoInfo | None:
...
async def lookup_batch(
self,
ips: list[str],
http_session: aiohttp.ClientSession,
db: aiosqlite.Connection | None = None,
) -> dict[str, GeoInfo]:
...
def lookup_cached_only(self, ip: str) -> GeoInfo | None:
...
async def flush_dirty(self, db: aiosqlite.Connection) -> int:
...
@runtime_checkable
class HealthService(Protocol):
async def probe(self, socket_path: str, timeout: float = ...) -> ServerStatus:
...
@runtime_checkable
class ServerService(Protocol):
async def get_settings(self, socket_path: str) -> ServerSettingsResponse:
...
async def update_settings(
self,
socket_path: str,
update: ServerSettingsUpdate,
) -> None:
...
async def flush_logs(self, socket_path: str) -> str:
...