feature/ignore-self-toggle #1
@@ -304,6 +304,16 @@ async def import_source(
|
|||||||
try:
|
try:
|
||||||
await jail_service.ban_ip(socket_path, BLOCKLIST_JAIL, stripped)
|
await jail_service.ban_ip(socket_path, BLOCKLIST_JAIL, stripped)
|
||||||
imported += 1
|
imported += 1
|
||||||
|
except jail_service.JailNotFoundError as exc:
|
||||||
|
# The target jail does not exist in fail2ban — there is no point
|
||||||
|
# continuing because every subsequent ban would also fail.
|
||||||
|
ban_error = str(exc)
|
||||||
|
log.warning(
|
||||||
|
"blocklist_jail_not_found",
|
||||||
|
jail=BLOCKLIST_JAIL,
|
||||||
|
error=str(exc),
|
||||||
|
)
|
||||||
|
break
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
skipped += 1
|
skipped += 1
|
||||||
if ban_error is None:
|
if ban_error is None:
|
||||||
|
|||||||
@@ -131,6 +131,10 @@ def _ensure_list(value: Any) -> list[str]:
|
|||||||
def _is_not_found_error(exc: Exception) -> bool:
|
def _is_not_found_error(exc: Exception) -> bool:
|
||||||
"""Return ``True`` if *exc* indicates a jail does not exist.
|
"""Return ``True`` if *exc* indicates a jail does not exist.
|
||||||
|
|
||||||
|
Checks both space-separated (``"unknown jail"``) and concatenated
|
||||||
|
(``"unknownjail"``) forms because fail2ban serialises
|
||||||
|
``UnknownJailException`` without a space when pickled.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
exc: The exception to inspect.
|
exc: The exception to inspect.
|
||||||
|
|
||||||
@@ -142,6 +146,7 @@ def _is_not_found_error(exc: Exception) -> bool:
|
|||||||
phrase in msg
|
phrase in msg
|
||||||
for phrase in (
|
for phrase in (
|
||||||
"unknown jail",
|
"unknown jail",
|
||||||
|
"unknownjail", # covers UnknownJailException serialised by fail2ban
|
||||||
"no jail",
|
"no jail",
|
||||||
"does not exist",
|
"does not exist",
|
||||||
"not found",
|
"not found",
|
||||||
|
|||||||
@@ -187,6 +187,33 @@ class TestImport:
|
|||||||
assert result.ips_imported == 0
|
assert result.ips_imported == 0
|
||||||
assert result.error is not None
|
assert result.error is not None
|
||||||
|
|
||||||
|
async def test_import_source_aborts_on_jail_not_found(self, db: aiosqlite.Connection) -> None:
|
||||||
|
"""import_source aborts immediately and records an error when the target jail
|
||||||
|
does not exist in fail2ban instead of silently skipping every IP."""
|
||||||
|
from app.services.jail_service import JailNotFoundError
|
||||||
|
|
||||||
|
content = "\n".join(f"1.2.3.{i}" for i in range(100))
|
||||||
|
session = _make_session(content)
|
||||||
|
source = await blocklist_service.create_source(db, "Missing Jail", "https://mj.test/")
|
||||||
|
|
||||||
|
call_count = 0
|
||||||
|
|
||||||
|
async def _raise_jail_not_found(socket_path: str, jail: str, ip: str) -> None:
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
|
raise JailNotFoundError(jail)
|
||||||
|
|
||||||
|
with patch("app.services.jail_service.ban_ip", side_effect=_raise_jail_not_found):
|
||||||
|
result = await blocklist_service.import_source(
|
||||||
|
source, session, "/tmp/fake.sock", db
|
||||||
|
)
|
||||||
|
|
||||||
|
# Must abort after the first JailNotFoundError — only one ban attempt.
|
||||||
|
assert call_count == 1
|
||||||
|
assert result.ips_imported == 0
|
||||||
|
assert result.error is not None
|
||||||
|
assert "not found" in result.error.lower() or "blocklist-import" in result.error
|
||||||
|
|
||||||
async def test_import_all_runs_all_enabled(self, db: aiosqlite.Connection) -> None:
|
async def test_import_all_runs_all_enabled(self, db: aiosqlite.Connection) -> None:
|
||||||
"""import_all aggregates results across all enabled sources."""
|
"""import_all aggregates results across all enabled sources."""
|
||||||
await blocklist_service.create_source(db, "S1", "https://s1.test/")
|
await blocklist_service.create_source(db, "S1", "https://s1.test/")
|
||||||
|
|||||||
@@ -323,6 +323,20 @@ class TestBanUnban:
|
|||||||
with pytest.raises(ValueError, match="Invalid IP"):
|
with pytest.raises(ValueError, match="Invalid IP"):
|
||||||
await jail_service.ban_ip(_SOCKET, "sshd", "not-an-ip")
|
await jail_service.ban_ip(_SOCKET, "sshd", "not-an-ip")
|
||||||
|
|
||||||
|
async def test_ban_ip_unknown_jail_exception_raises_jail_not_found(self) -> None:
|
||||||
|
"""ban_ip raises JailNotFoundError when fail2ban returns UnknownJailException.
|
||||||
|
|
||||||
|
fail2ban serialises the exception without a space (``UnknownJailException``
|
||||||
|
rather than ``Unknown JailException``), so _is_not_found_error must match
|
||||||
|
the concatenated form ``"unknownjail``".
|
||||||
|
"""
|
||||||
|
response = (1, Exception("UnknownJailException('blocklist-import')"))
|
||||||
|
with (
|
||||||
|
_patch_client({"set|missing-jail|banip|1.2.3.4": response}),
|
||||||
|
pytest.raises(JailNotFoundError, match="missing-jail"),
|
||||||
|
):
|
||||||
|
await jail_service.ban_ip(_SOCKET, "missing-jail", "1.2.3.4")
|
||||||
|
|
||||||
async def test_ban_ipv6_success(self) -> None:
|
async def test_ban_ipv6_success(self) -> None:
|
||||||
"""ban_ip accepts an IPv6 address."""
|
"""ban_ip accepts an IPv6 address."""
|
||||||
with _patch_client({"set|sshd|banip|::1": (0, 1)}):
|
with _patch_client({"set|sshd|banip|::1": (0, 1)}):
|
||||||
|
|||||||
Reference in New Issue
Block a user