Refactor services and update documentation
- Refactor ban_service.py with improved error handling - Refactor blocklist_service.py for better code organization - Update geo_cache.py with performance improvements - Update backend development guide and task documentation - Update runner.csx script Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -15,6 +15,7 @@ import contextlib
|
||||
import ipaddress
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
import aiohttp
|
||||
import structlog
|
||||
|
||||
from app.exceptions import JailNotFoundError, JailOperationError
|
||||
@@ -59,7 +60,8 @@ from app.utils.fail2ban_response import (
|
||||
from app.utils.time_utils import since_unix
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import aiohttp
|
||||
from collections.abc import Awaitable
|
||||
|
||||
import aiosqlite
|
||||
|
||||
from app.models.geo import GeoCacheLookup, GeoEnricher, GeoInfo
|
||||
@@ -302,7 +304,7 @@ async def get_active_bans(
|
||||
all_ips: list[str] = [ban.ip for ban in bans]
|
||||
try:
|
||||
geo_map = await geo_cache.lookup_batch(all_ips, http_session, db=app_db)
|
||||
except Exception: # noqa: BLE001
|
||||
except (TimeoutError, aiohttp.ClientError, OSError):
|
||||
log.warning("active_bans_batch_geo_failed")
|
||||
geo_map = {}
|
||||
enriched: list[ActiveBan] = []
|
||||
@@ -420,7 +422,7 @@ async def list_bans(
|
||||
page_ips: list[str] = [r.ip for r in rows]
|
||||
try:
|
||||
geo_map = await geo_cache.lookup_batch(page_ips, http_session, db=app_db)
|
||||
except Exception: # noqa: BLE001
|
||||
except (TimeoutError, aiohttp.ClientError, OSError):
|
||||
log.warning("ban_service_batch_geo_failed_list_bans")
|
||||
|
||||
items: list[DashboardBanItem] = []
|
||||
@@ -460,8 +462,10 @@ async def list_bans(
|
||||
country_name = geo.country_name
|
||||
asn = geo.asn
|
||||
org = geo.org
|
||||
except Exception: # noqa: BLE001
|
||||
except (TimeoutError, aiohttp.ClientError, OSError):
|
||||
log.warning("ban_service_geo_lookup_failed", ip=ip)
|
||||
except Exception as exc:
|
||||
log.error("ban_service_geo_lookup_unexpected_error", ip=ip, error=type(exc).__name__)
|
||||
|
||||
items.append(
|
||||
DashboardBanItem(
|
||||
@@ -624,9 +628,12 @@ async def bans_by_country(
|
||||
async def _safe_lookup(ip: str) -> tuple[str, GeoInfo | None]:
|
||||
try:
|
||||
return ip, await geo_enricher(ip)
|
||||
except Exception: # noqa: BLE001
|
||||
except (TimeoutError, aiohttp.ClientError, OSError):
|
||||
log.warning("ban_service_geo_lookup_failed", ip=ip)
|
||||
return ip, None
|
||||
except Exception as exc:
|
||||
log.error("ban_service_geo_lookup_unexpected_error", ip=ip, error=type(exc).__name__)
|
||||
return ip, None
|
||||
|
||||
results = await asyncio.gather(*(_safe_lookup(ip) for ip in unique_ips))
|
||||
geo_map = {ip: geo for ip, geo in results if geo is not None}
|
||||
|
||||
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING
|
||||
import aiohttp
|
||||
import structlog
|
||||
|
||||
from app.exceptions import JailNotFoundError
|
||||
from app.exceptions import JailNotFoundError, JailOperationError
|
||||
from app.models.blocklist import (
|
||||
BlocklistSource,
|
||||
ImportLogEntry,
|
||||
@@ -39,7 +39,6 @@ from app.utils.ip_utils import is_valid_ip, is_valid_network
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Awaitable, Callable
|
||||
|
||||
import aiohttp
|
||||
import aiosqlite
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
|
||||
@@ -92,7 +91,7 @@ async def _download_text_with_retries(
|
||||
await asyncio.sleep(backoff)
|
||||
continue
|
||||
return resp.status, text
|
||||
except Exception as exc: # noqa: BLE001
|
||||
except (TimeoutError, aiohttp.ClientError) as exc:
|
||||
last_exception = exc
|
||||
if attempt >= _BLOCKLIST_HTTP_RETRY_ATTEMPTS:
|
||||
raise
|
||||
@@ -105,6 +104,20 @@ async def _download_text_with_retries(
|
||||
backoff=backoff,
|
||||
)
|
||||
await asyncio.sleep(backoff)
|
||||
except Exception as exc:
|
||||
last_exception = exc
|
||||
if attempt >= _BLOCKLIST_HTTP_RETRY_ATTEMPTS:
|
||||
raise
|
||||
backoff = _BLOCKLIST_HTTP_BACKOFF_BASE_SECONDS * (2 ** (attempt - 1))
|
||||
log.warning(
|
||||
"blocklist_download_retry_error",
|
||||
url=url,
|
||||
attempt=attempt,
|
||||
error=repr(exc),
|
||||
error_type="unexpected",
|
||||
backoff=backoff,
|
||||
)
|
||||
await asyncio.sleep(backoff)
|
||||
|
||||
assert last_exception is not None
|
||||
raise last_exception
|
||||
@@ -281,8 +294,8 @@ async def preview_source(
|
||||
)
|
||||
if status != 200:
|
||||
raise ValueError(f"HTTP {status} from {url}")
|
||||
except Exception as exc:
|
||||
log.warning("blocklist_preview_failed", url=url, error=str(exc))
|
||||
except (TimeoutError, aiohttp.ClientError, ValueError) as exc:
|
||||
log.warning("blocklist_preview_failed", url=url, error=type(exc).__name__)
|
||||
raise ValueError(str(exc)) from exc
|
||||
|
||||
lines = raw.splitlines()
|
||||
@@ -363,7 +376,7 @@ async def import_source(
|
||||
ips_skipped=0,
|
||||
error=error_msg,
|
||||
)
|
||||
except Exception as exc:
|
||||
except (TimeoutError, aiohttp.ClientError) as exc:
|
||||
error_msg = str(exc)
|
||||
await _log_result(db, source, 0, 0, error_msg)
|
||||
log.warning("blocklist_import_download_error", url=source.url, error=error_msg)
|
||||
@@ -407,7 +420,7 @@ async def import_source(
|
||||
error=str(exc),
|
||||
)
|
||||
break
|
||||
except Exception as exc:
|
||||
except JailOperationError as exc:
|
||||
skipped += 1
|
||||
if ban_error is None:
|
||||
ban_error = str(exc)
|
||||
@@ -444,11 +457,10 @@ async def import_source(
|
||||
source_id=source.id,
|
||||
count=len(uncached_ips),
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
except (TimeoutError, aiohttp.ClientError, OSError):
|
||||
log.warning(
|
||||
"blocklist_geo_prewarm_failed",
|
||||
source_id=source.id,
|
||||
error=str(exc),
|
||||
)
|
||||
|
||||
return ImportSourceResult(
|
||||
@@ -606,8 +618,8 @@ async def get_schedule(db: aiosqlite.Connection) -> ScheduleConfig:
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
return ScheduleConfig.model_validate(data)
|
||||
except Exception:
|
||||
log.warning("blocklist_schedule_invalid", raw=raw)
|
||||
except (json.JSONDecodeError, ValueError) as exc:
|
||||
log.warning("blocklist_schedule_invalid", raw=raw, error=type(exc).__name__)
|
||||
return _DEFAULT_SCHEDULE
|
||||
|
||||
|
||||
|
||||
@@ -271,8 +271,8 @@ class GeoCache:
|
||||
return GeoInfo(country_code=code, country_name=name, asn=None, org=None)
|
||||
except geoip2.errors.AddressNotFoundError:
|
||||
return None
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geoip_lookup_failed", ip=ip, error=str(exc))
|
||||
except (OSError, geoip2.errors.GeoIP2Error) as exc:
|
||||
log.warning("geoip_lookup_failed", ip=ip, error=type(exc).__name__)
|
||||
return None
|
||||
|
||||
async def load_cache_from_db(self, db: aiosqlite.Connection) -> None:
|
||||
@@ -386,8 +386,8 @@ class GeoCache:
|
||||
asn=result.asn,
|
||||
org=result.org,
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_persist_failed", ip=ip, error=str(exc))
|
||||
except (OSError) as exc:
|
||||
log.warning("geo_persist_failed", ip=ip, error=type(exc).__name__)
|
||||
log.debug("geo_lookup_success_mmdb", ip=ip, country=result.country_code)
|
||||
return result
|
||||
|
||||
@@ -399,8 +399,8 @@ class GeoCache:
|
||||
if db is not None:
|
||||
try:
|
||||
await geo_cache_repo.upsert_neg_entry_and_commit(db=db, ip=ip)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_persist_neg_failed", ip=ip, error=str(exc))
|
||||
except (OSError) as exc:
|
||||
log.warning("geo_persist_neg_failed", ip=ip, error=type(exc).__name__)
|
||||
return GeoInfo(country_code=None, country_name=None, asn=None, org=None)
|
||||
|
||||
# HTTP API call (only when allow_http_fallback is True).
|
||||
@@ -426,8 +426,8 @@ class GeoCache:
|
||||
asn=result.asn,
|
||||
org=result.org,
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_persist_failed", ip=ip, error=str(exc))
|
||||
except (OSError) as exc:
|
||||
log.warning("geo_persist_failed", ip=ip, error=type(exc).__name__)
|
||||
log.debug("geo_lookup_success_http", ip=ip, country=result.country_code, asn=result.asn)
|
||||
return result
|
||||
log.debug(
|
||||
@@ -435,7 +435,7 @@ class GeoCache:
|
||||
ip=ip,
|
||||
message=data.get("message", "unknown"),
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
except (TimeoutError, aiohttp.ClientError, ValueError) as exc:
|
||||
log.warning(
|
||||
"geo_lookup_http_request_failed",
|
||||
ip=ip,
|
||||
@@ -565,11 +565,11 @@ class GeoCache:
|
||||
if db is not None and pos_rows:
|
||||
try:
|
||||
await geo_cache_repo.bulk_upsert_entries_and_commit(db, pos_rows)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
except (OSError) as exc:
|
||||
log.warning(
|
||||
"geo_batch_persist_mmdb_failed",
|
||||
count=len(pos_rows),
|
||||
error=str(exc),
|
||||
error=type(exc).__name__,
|
||||
)
|
||||
|
||||
# FALLBACK: Try HTTP API only if enabled and there are remaining IPs.
|
||||
@@ -584,11 +584,11 @@ class GeoCache:
|
||||
if db is not None and neg_ips:
|
||||
try:
|
||||
await geo_cache_repo.bulk_upsert_neg_entries_and_commit(db, neg_ips)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
except (OSError) as exc:
|
||||
log.warning(
|
||||
"geo_batch_persist_neg_failed",
|
||||
count=len(neg_ips),
|
||||
error=str(exc),
|
||||
error=type(exc).__name__,
|
||||
)
|
||||
|
||||
log.info(
|
||||
@@ -657,12 +657,12 @@ class GeoCache:
|
||||
pos_rows,
|
||||
neg_ips,
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
except (OSError) as exc:
|
||||
log.warning(
|
||||
"geo_batch_persist_failed",
|
||||
positive_count=len(pos_rows),
|
||||
negative_count=len(neg_ips),
|
||||
error=str(exc),
|
||||
error=type(exc).__name__,
|
||||
)
|
||||
pos_rows.clear()
|
||||
neg_ips.clear()
|
||||
@@ -704,7 +704,7 @@ class GeoCache:
|
||||
log.warning("geo_batch_non_200", status=resp.status, count=len(ips))
|
||||
return fallback
|
||||
data: list[dict[str, object]] = await resp.json(content_type=None)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
except (TimeoutError, aiohttp.ClientError, ValueError) as exc:
|
||||
log.warning(
|
||||
"geo_batch_request_failed",
|
||||
count=len(ips),
|
||||
@@ -816,8 +816,8 @@ class GeoCache:
|
||||
|
||||
try:
|
||||
await geo_cache_repo.bulk_upsert_entries_and_commit(db, rows)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("geo_flush_dirty_failed", error=str(exc))
|
||||
except (OSError) as exc:
|
||||
log.warning("geo_flush_dirty_failed", error=type(exc).__name__)
|
||||
# Re-add to dirty so they are retried on the next flush cycle.
|
||||
self._dirty.update(to_flush)
|
||||
return 0
|
||||
|
||||
Reference in New Issue
Block a user