Files
BanGUI/backend/app/services/protocols.py

399 lines
9.8 KiB
Python

"""Service interface protocols for dependency injection.
These structural protocols define the public contract that routers and higher
layers depend on, without binding them to concrete module implementations.
"""
from __future__ import annotations
from collections.abc import Awaitable, Callable
from typing import Protocol, runtime_checkable
import aiosqlite
import aiohttp
from app.models.auth import Session
from app.models.ban import BanOrigin, JailBannedIpsResponse, TimeRange
from app.models.blocklist import (
BlocklistSource,
ImportLogListResponse,
ImportRunResult,
ImportSourceResult,
PreviewResponse,
ScheduleConfig,
ScheduleInfo,
)
from app.models.config import (
AddLogPathRequest,
GlobalConfigResponse,
GlobalConfigUpdate,
JailConfigListResponse,
JailConfigResponse,
JailConfigUpdate,
LogPreviewRequest,
LogPreviewResponse,
MapColorThresholdsResponse,
MapColorThresholdsUpdate,
RegexTestResponse,
Fail2BanLogResponse,
ServiceStatusResponse,
)
from app.models.geo import GeoBatchLookup, GeoEnricher, GeoInfo
from app.models.history import HistoryListResponse, IpDetailResponse
from app.models.server import ServerSettingsResponse, ServerSettingsUpdate, ServerStatus
class AuthService(Protocol):
"""Protocol for authentication service operations."""
async def login(
self,
db: aiosqlite.Connection,
password: str,
session_duration_minutes: int,
session_repo: object | None = None,
) -> Session:
...
async def validate_session(
self,
db: aiosqlite.Connection,
token: str,
session_secret: str | None = None,
session_repo: object | None = None,
) -> Session:
...
async def logout(
self,
db: aiosqlite.Connection,
token: str,
session_secret: str | None = None,
session_repo: object | None = None,
) -> str | None:
...
class JailService(Protocol):
"""Protocol for jail management service operations."""
async def list_jails(self, socket_path: str) -> JailListResponse:
...
async def get_jail(self, socket_path: str, name: str) -> JailDetailResponse:
...
async def reload_all(self, socket_path: str) -> None:
...
async def start_jail(self, socket_path: str, name: str) -> None:
...
async def stop_jail(self, socket_path: str, name: str) -> None:
...
async def set_idle(self, socket_path: str, name: str, *, on: bool) -> None:
...
async def reload_jail(self, socket_path: str, name: str) -> None:
...
async def get_ignore_list(self, socket_path: str, name: str) -> list[str]:
...
async def add_ignore_ip(self, socket_path: str, name: str, ip: str) -> None:
...
async def del_ignore_ip(self, socket_path: str, name: str, ip: str) -> None:
...
async def set_ignore_self(self, socket_path: str, name: str, *, on: bool) -> None:
...
async def get_jail_banned_ips(
self,
socket_path: str,
jail_name: str,
page: int,
page_size: int,
search: str | None = None,
*,
geo_batch_lookup: object,
http_session: object,
app_db: aiosqlite.Connection,
) -> JailBannedIpsResponse:
...
async def lookup_ip(
self,
socket_path: str,
ip: str,
geo_enricher: object,
) -> object:
...
@runtime_checkable
class BlocklistService(Protocol):
async def list_sources(self, db: aiosqlite.Connection) -> list[BlocklistSource]:
...
async def get_source(
self,
db: aiosqlite.Connection,
source_id: int,
) -> BlocklistSource | None:
...
async def create_source(
self,
db: aiosqlite.Connection,
name: str,
url: str,
*,
enabled: bool = True,
) -> BlocklistSource:
...
async def update_source(
self,
db: aiosqlite.Connection,
source_id: int,
*,
name: str | None = None,
url: str | None = None,
enabled: bool | None = None,
) -> BlocklistSource | None:
...
async def delete_source(self, db: aiosqlite.Connection, source_id: int) -> bool:
...
async def preview_source(
self,
url: str,
http_session: aiohttp.ClientSession,
*,
sample_lines: int = ...,
) -> PreviewResponse:
...
async def import_source(
self,
source: BlocklistSource,
http_session: aiohttp.ClientSession,
socket_path: str,
db: aiosqlite.Connection,
*,
geo_is_cached: Callable[[str], bool] | None = None,
geo_batch_lookup: GeoBatchLookup | None = None,
ban_ip: Callable[[str, str, str], Awaitable[None]],
) -> ImportSourceResult:
...
async def import_all(
self,
db: aiosqlite.Connection,
http_session: aiohttp.ClientSession,
socket_path: str,
*,
ban_ip: Callable[[str, str, str], Awaitable[None]],
geo_is_cached: Callable[[str], bool] | None = None,
geo_batch_lookup: GeoBatchLookup | None = None,
) -> ImportRunResult:
...
async def get_schedule(self, db: aiosqlite.Connection) -> ScheduleConfig:
...
async def set_schedule(self, db: aiosqlite.Connection, update: ScheduleConfig) -> None:
...
async def get_schedule_info(
self,
db: aiosqlite.Connection,
next_run_at: str | None,
) -> ScheduleInfo:
...
async def list_import_logs(
self,
db: aiosqlite.Connection,
*,
source_id: int | None = None,
page: int = 1,
page_size: int = 50,
) -> ImportLogListResponse:
...
@runtime_checkable
class ConfigService(Protocol):
async def get_jail_config(self, socket_path: str, name: str) -> JailConfigResponse:
...
async def list_jail_configs(self, socket_path: str) -> JailConfigListResponse:
...
async def update_jail_config(
self,
socket_path: str,
name: str,
update: JailConfigUpdate,
) -> None:
...
async def get_global_config(self, socket_path: str) -> GlobalConfigResponse:
...
async def update_global_config(
self,
socket_path: str,
update: GlobalConfigUpdate,
) -> None:
...
def test_regex(self, request: object) -> RegexTestResponse:
...
async def add_log_path(
self,
socket_path: str,
jail: str,
req: AddLogPathRequest,
) -> None:
...
async def delete_log_path(self, socket_path: str, jail: str, log_path: str) -> None:
...
async def preview_log(
self,
req: LogPreviewRequest,
preview_fn: Callable[[LogPreviewRequest], Awaitable[LogPreviewResponse]] | None = None,
) -> LogPreviewResponse:
...
async def get_map_color_thresholds(self, db: aiosqlite.Connection) -> MapColorThresholdsResponse:
...
async def update_map_color_thresholds(
self,
db: aiosqlite.Connection,
update: MapColorThresholdsUpdate,
) -> None:
...
async def read_fail2ban_log(
self,
socket_path: str,
lines: int,
filter_text: str | None = None,
) -> Fail2BanLogResponse:
...
async def get_service_status(
self,
socket_path: str,
probe_fn: Callable[[str], Awaitable[ServiceStatusResponse]] | None = None,
) -> ServiceStatusResponse:
...
@runtime_checkable
class HistoryService(Protocol):
async def list_history(
self,
socket_path: str,
*,
range_: TimeRange | None = None,
jail: str | None = None,
ip_filter: str | None = None,
origin: BanOrigin | None = None,
source: str = "fail2ban",
page: int = 1,
page_size: int = 100,
geo_enricher: GeoEnricher | None = None,
db: aiosqlite.Connection | None = None,
) -> HistoryListResponse:
...
async def get_ip_detail(
self,
socket_path: str,
ip: str,
*,
geo_enricher: GeoEnricher | None = None,
) -> IpDetailResponse | None:
...
@runtime_checkable
class GeoService(Protocol):
def clear_cache(self) -> None:
...
def clear_neg_cache(self) -> None:
...
def is_cached(self, ip: str) -> bool:
...
def init_geoip(self, mmdb_path: str | None) -> None:
...
async def cache_stats(self, db: aiosqlite.Connection) -> dict[str, int]:
...
async def count_unresolved(self, db: aiosqlite.Connection) -> int:
...
async def get_unresolved_ips(self, db: aiosqlite.Connection) -> list[str]:
...
async def load_cache_from_db(self, db: aiosqlite.Connection) -> None:
...
async def lookup(
self,
ip: str,
http_session: aiohttp.ClientSession,
) -> GeoInfo | None:
...
async def lookup_batch(
self,
ips: list[str],
http_session: aiohttp.ClientSession,
db: aiosqlite.Connection | None = None,
) -> dict[str, GeoInfo]:
...
def lookup_cached_only(self, ip: str) -> GeoInfo | None:
...
async def flush_dirty(self, db: aiosqlite.Connection) -> int:
...
@runtime_checkable
class HealthService(Protocol):
async def probe(self, socket_path: str, timeout: float = ...) -> ServerStatus:
...
@runtime_checkable
class ServerService(Protocol):
async def get_settings(self, socket_path: str) -> ServerSettingsResponse:
...
async def update_settings(
self,
socket_path: str,
update: ServerSettingsUpdate,
) -> None:
...
async def flush_logs(self, socket_path: str) -> str:
...