Stage 6: jail management — backend service, routers, tests, and frontend

- jail_service.py: list/detail/control/ban/unban/ignore-list/IP-lookup
- jails.py router: 11 endpoints including ignore list management
- bans.py router: active bans, ban, unban
- geo.py router: IP lookup with geo enrichment
- models: Jail.actions, ActiveBan.country/.banned_at optional, GeoDetail
- 217 tests pass (40 service + 36 router + 141 existing), 76% coverage
- Frontend: types/jail.ts, api/jails.ts, hooks/useJails.ts
- JailsPage: jail overview table with controls, ban/unban forms,
  active bans table, IP lookup
- JailDetailPage: full detail, start/stop/idle/reload, patterns,
  ignore list management
This commit is contained in:
2026-03-01 14:09:02 +01:00
parent 9ac7f8d22d
commit ebec5e0f58
18 changed files with 5472 additions and 62 deletions

View File

@@ -0,0 +1,272 @@
"""Tests for the bans router endpoints."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import aiosqlite
import pytest
from httpx import ASGITransport, AsyncClient
from app.config import Settings
from app.db import init_db
from app.main import create_app
from app.models.ban import ActiveBan, ActiveBanListResponse
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
_SETUP_PAYLOAD = {
"master_password": "testpassword1",
"database_path": "bangui.db",
"fail2ban_socket": "/var/run/fail2ban/fail2ban.sock",
"timezone": "UTC",
"session_duration_minutes": 60,
}
@pytest.fixture
async def bans_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc]
"""Provide an authenticated ``AsyncClient`` for bans endpoint tests."""
settings = Settings(
database_path=str(tmp_path / "bans_test.db"),
fail2ban_socket="/tmp/fake.sock",
session_secret="test-bans-secret",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
app = create_app(settings=settings)
db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path)
db.row_factory = aiosqlite.Row
await init_db(db)
app.state.db = db
app.state.http_session = MagicMock()
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
await ac.post("/api/setup", json=_SETUP_PAYLOAD)
login = await ac.post(
"/api/auth/login",
json={"password": _SETUP_PAYLOAD["master_password"]},
)
assert login.status_code == 200
yield ac
await db.close()
# ---------------------------------------------------------------------------
# GET /api/bans/active
# ---------------------------------------------------------------------------
class TestGetActiveBans:
"""Tests for ``GET /api/bans/active``."""
async def test_200_when_authenticated(self, bans_client: AsyncClient) -> None:
"""GET /api/bans/active returns 200 with an ActiveBanListResponse."""
mock_response = ActiveBanListResponse(
bans=[
ActiveBan(
ip="1.2.3.4",
jail="sshd",
banned_at="2025-01-01T12:00:00+00:00",
expires_at="2025-01-01T13:00:00+00:00",
ban_count=1,
country="DE",
)
],
total=1,
)
with patch(
"app.routers.bans.jail_service.get_active_bans",
AsyncMock(return_value=mock_response),
):
resp = await bans_client.get("/api/bans/active")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert data["bans"][0]["ip"] == "1.2.3.4"
assert data["bans"][0]["jail"] == "sshd"
async def test_401_when_unauthenticated(self, bans_client: AsyncClient) -> None:
"""GET /api/bans/active returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=bans_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/bans/active")
assert resp.status_code == 401
async def test_empty_when_no_bans(self, bans_client: AsyncClient) -> None:
"""GET /api/bans/active returns empty list when no bans are active."""
mock_response = ActiveBanListResponse(bans=[], total=0)
with patch(
"app.routers.bans.jail_service.get_active_bans",
AsyncMock(return_value=mock_response),
):
resp = await bans_client.get("/api/bans/active")
assert resp.status_code == 200
assert resp.json()["total"] == 0
assert resp.json()["bans"] == []
async def test_response_shape(self, bans_client: AsyncClient) -> None:
"""GET /api/bans/active returns expected fields per ban entry."""
mock_response = ActiveBanListResponse(
bans=[
ActiveBan(
ip="10.0.0.1",
jail="nginx",
banned_at=None,
expires_at=None,
ban_count=1,
country=None,
)
],
total=1,
)
with patch(
"app.routers.bans.jail_service.get_active_bans",
AsyncMock(return_value=mock_response),
):
resp = await bans_client.get("/api/bans/active")
ban = resp.json()["bans"][0]
assert "ip" in ban
assert "jail" in ban
assert "banned_at" in ban
assert "expires_at" in ban
assert "ban_count" in ban
# ---------------------------------------------------------------------------
# POST /api/bans
# ---------------------------------------------------------------------------
class TestBanIp:
"""Tests for ``POST /api/bans``."""
async def test_201_on_success(self, bans_client: AsyncClient) -> None:
"""POST /api/bans returns 201 when the IP is banned."""
with patch(
"app.routers.bans.jail_service.ban_ip",
AsyncMock(return_value=None),
):
resp = await bans_client.post(
"/api/bans",
json={"ip": "1.2.3.4", "jail": "sshd"},
)
assert resp.status_code == 201
assert resp.json()["jail"] == "sshd"
async def test_400_for_invalid_ip(self, bans_client: AsyncClient) -> None:
"""POST /api/bans returns 400 for an invalid IP address."""
with patch(
"app.routers.bans.jail_service.ban_ip",
AsyncMock(side_effect=ValueError("Invalid IP address: 'bad'")),
):
resp = await bans_client.post(
"/api/bans",
json={"ip": "bad", "jail": "sshd"},
)
assert resp.status_code == 400
async def test_404_for_unknown_jail(self, bans_client: AsyncClient) -> None:
"""POST /api/bans returns 404 when jail does not exist."""
from app.services.jail_service import JailNotFoundError
with patch(
"app.routers.bans.jail_service.ban_ip",
AsyncMock(side_effect=JailNotFoundError("ghost")),
):
resp = await bans_client.post(
"/api/bans",
json={"ip": "1.2.3.4", "jail": "ghost"},
)
assert resp.status_code == 404
async def test_401_when_unauthenticated(self, bans_client: AsyncClient) -> None:
"""POST /api/bans returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=bans_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).post("/api/bans", json={"ip": "1.2.3.4", "jail": "sshd"})
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# DELETE /api/bans
# ---------------------------------------------------------------------------
class TestUnbanIp:
"""Tests for ``DELETE /api/bans``."""
async def test_200_unban_from_all(self, bans_client: AsyncClient) -> None:
"""DELETE /api/bans with unban_all=true unbans from all jails."""
with patch(
"app.routers.bans.jail_service.unban_ip",
AsyncMock(return_value=None),
):
resp = await bans_client.request(
"DELETE",
"/api/bans",
json={"ip": "1.2.3.4", "unban_all": True},
)
assert resp.status_code == 200
assert "all jails" in resp.json()["message"]
async def test_200_unban_from_specific_jail(self, bans_client: AsyncClient) -> None:
"""DELETE /api/bans with a jail unbans from that jail only."""
with patch(
"app.routers.bans.jail_service.unban_ip",
AsyncMock(return_value=None),
):
resp = await bans_client.request(
"DELETE",
"/api/bans",
json={"ip": "1.2.3.4", "jail": "sshd"},
)
assert resp.status_code == 200
assert "sshd" in resp.json()["message"]
async def test_400_for_invalid_ip(self, bans_client: AsyncClient) -> None:
"""DELETE /api/bans returns 400 for an invalid IP."""
with patch(
"app.routers.bans.jail_service.unban_ip",
AsyncMock(side_effect=ValueError("Invalid IP address: 'bad'")),
):
resp = await bans_client.request(
"DELETE",
"/api/bans",
json={"ip": "bad", "unban_all": True},
)
assert resp.status_code == 400
async def test_404_for_unknown_jail(self, bans_client: AsyncClient) -> None:
"""DELETE /api/bans returns 404 when jail does not exist."""
from app.services.jail_service import JailNotFoundError
with patch(
"app.routers.bans.jail_service.unban_ip",
AsyncMock(side_effect=JailNotFoundError("ghost")),
):
resp = await bans_client.request(
"DELETE",
"/api/bans",
json={"ip": "1.2.3.4", "jail": "ghost"},
)
assert resp.status_code == 404

View File

@@ -0,0 +1,159 @@
"""Tests for the geo/IP-lookup router endpoints."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import aiosqlite
import pytest
from httpx import ASGITransport, AsyncClient
from app.config import Settings
from app.db import init_db
from app.main import create_app
from app.services.geo_service import GeoInfo
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
_SETUP_PAYLOAD = {
"master_password": "testpassword1",
"database_path": "bangui.db",
"fail2ban_socket": "/var/run/fail2ban/fail2ban.sock",
"timezone": "UTC",
"session_duration_minutes": 60,
}
@pytest.fixture
async def geo_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc]
"""Provide an authenticated ``AsyncClient`` for geo endpoint tests."""
settings = Settings(
database_path=str(tmp_path / "geo_test.db"),
fail2ban_socket="/tmp/fake.sock",
session_secret="test-geo-secret",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
app = create_app(settings=settings)
db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path)
db.row_factory = aiosqlite.Row
await init_db(db)
app.state.db = db
app.state.http_session = MagicMock()
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
await ac.post("/api/setup", json=_SETUP_PAYLOAD)
login = await ac.post(
"/api/auth/login",
json={"password": _SETUP_PAYLOAD["master_password"]},
)
assert login.status_code == 200
yield ac
await db.close()
# ---------------------------------------------------------------------------
# GET /api/geo/lookup/{ip}
# ---------------------------------------------------------------------------
class TestGeoLookup:
"""Tests for ``GET /api/geo/lookup/{ip}``."""
async def test_200_with_geo_info(self, geo_client: AsyncClient) -> None:
"""GET /api/geo/lookup/{ip} returns 200 with enriched result."""
geo = GeoInfo(country_code="DE", country_name="Germany", asn="12345", org="Acme")
result = {
"ip": "1.2.3.4",
"currently_banned_in": ["sshd"],
"geo": geo,
}
with patch(
"app.routers.geo.jail_service.lookup_ip",
AsyncMock(return_value=result),
):
resp = await geo_client.get("/api/geo/lookup/1.2.3.4")
assert resp.status_code == 200
data = resp.json()
assert data["ip"] == "1.2.3.4"
assert data["currently_banned_in"] == ["sshd"]
assert data["geo"]["country_code"] == "DE"
assert data["geo"]["country_name"] == "Germany"
assert data["geo"]["asn"] == "12345"
assert data["geo"]["org"] == "Acme"
async def test_200_when_not_banned(self, geo_client: AsyncClient) -> None:
"""GET /api/geo/lookup/{ip} returns empty list when IP is not banned anywhere."""
result = {
"ip": "8.8.8.8",
"currently_banned_in": [],
"geo": GeoInfo(country_code="US", country_name="United States", asn=None, org=None),
}
with patch(
"app.routers.geo.jail_service.lookup_ip",
AsyncMock(return_value=result),
):
resp = await geo_client.get("/api/geo/lookup/8.8.8.8")
assert resp.status_code == 200
assert resp.json()["currently_banned_in"] == []
async def test_200_with_no_geo(self, geo_client: AsyncClient) -> None:
"""GET /api/geo/lookup/{ip} returns null geo when enricher fails."""
result = {
"ip": "1.2.3.4",
"currently_banned_in": [],
"geo": None,
}
with patch(
"app.routers.geo.jail_service.lookup_ip",
AsyncMock(return_value=result),
):
resp = await geo_client.get("/api/geo/lookup/1.2.3.4")
assert resp.status_code == 200
assert resp.json()["geo"] is None
async def test_400_for_invalid_ip(self, geo_client: AsyncClient) -> None:
"""GET /api/geo/lookup/{ip} returns 400 for an invalid IP address."""
with patch(
"app.routers.geo.jail_service.lookup_ip",
AsyncMock(side_effect=ValueError("Invalid IP address: 'bad_ip'")),
):
resp = await geo_client.get("/api/geo/lookup/bad_ip")
assert resp.status_code == 400
assert "detail" in resp.json()
async def test_401_when_unauthenticated(self, geo_client: AsyncClient) -> None:
"""GET /api/geo/lookup/{ip} returns 401 without a session."""
app = geo_client._transport.app # type: ignore[attr-defined]
resp = await AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
).get("/api/geo/lookup/1.2.3.4")
assert resp.status_code == 401
async def test_ipv6_address(self, geo_client: AsyncClient) -> None:
"""GET /api/geo/lookup/{ip} handles IPv6 addresses."""
result = {
"ip": "2001:db8::1",
"currently_banned_in": [],
"geo": None,
}
with patch(
"app.routers.geo.jail_service.lookup_ip",
AsyncMock(return_value=result),
):
resp = await geo_client.get("/api/geo/lookup/2001:db8::1")
assert resp.status_code == 200
assert resp.json()["ip"] == "2001:db8::1"

View File

@@ -0,0 +1,407 @@
"""Tests for the jails router endpoints."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import aiosqlite
import pytest
from httpx import ASGITransport, AsyncClient
from app.config import Settings
from app.db import init_db
from app.main import create_app
from app.models.jail import JailCommandResponse, JailDetailResponse, JailListResponse, JailStatus, JailSummary, Jail
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
_SETUP_PAYLOAD = {
"master_password": "testpassword1",
"database_path": "bangui.db",
"fail2ban_socket": "/var/run/fail2ban/fail2ban.sock",
"timezone": "UTC",
"session_duration_minutes": 60,
}
@pytest.fixture
async def jails_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc]
"""Provide an authenticated ``AsyncClient`` for jail endpoint tests."""
settings = Settings(
database_path=str(tmp_path / "jails_test.db"),
fail2ban_socket="/tmp/fake.sock",
session_secret="test-jails-secret",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
app = create_app(settings=settings)
db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path)
db.row_factory = aiosqlite.Row
await init_db(db)
app.state.db = db
app.state.http_session = MagicMock()
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
await ac.post("/api/setup", json=_SETUP_PAYLOAD)
login = await ac.post(
"/api/auth/login",
json={"password": _SETUP_PAYLOAD["master_password"]},
)
assert login.status_code == 200
yield ac
await db.close()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _summary(name: str = "sshd") -> JailSummary:
return JailSummary(
name=name,
enabled=True,
running=True,
idle=False,
backend="polling",
find_time=600,
ban_time=600,
max_retry=5,
status=JailStatus(
currently_banned=2,
total_banned=10,
currently_failed=1,
total_failed=50,
),
)
def _detail(name: str = "sshd") -> JailDetailResponse:
return JailDetailResponse(
jail=Jail(
name=name,
enabled=True,
running=True,
idle=False,
backend="polling",
log_paths=["/var/log/auth.log"],
fail_regex=["^.*Failed.*<HOST>"],
ignore_regex=[],
ignore_ips=["127.0.0.1"],
date_pattern=None,
log_encoding="UTF-8",
find_time=600,
ban_time=600,
max_retry=5,
actions=["iptables-multiport"],
status=JailStatus(
currently_banned=2,
total_banned=10,
currently_failed=1,
total_failed=50,
),
)
)
# ---------------------------------------------------------------------------
# GET /api/jails
# ---------------------------------------------------------------------------
class TestGetJails:
"""Tests for ``GET /api/jails``."""
async def test_200_when_authenticated(self, jails_client: AsyncClient) -> None:
"""GET /api/jails returns 200 with a JailListResponse."""
mock_response = JailListResponse(jails=[_summary()], total=1)
with patch(
"app.routers.jails.jail_service.list_jails",
AsyncMock(return_value=mock_response),
):
resp = await jails_client.get("/api/jails")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert data["jails"][0]["name"] == "sshd"
async def test_401_when_unauthenticated(self, jails_client: AsyncClient) -> None:
"""GET /api/jails returns 401 without a session cookie."""
resp = await AsyncClient(
transport=ASGITransport(app=jails_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/jails")
assert resp.status_code == 401
async def test_response_shape(self, jails_client: AsyncClient) -> None:
"""GET /api/jails response contains expected fields."""
mock_response = JailListResponse(jails=[_summary()], total=1)
with patch(
"app.routers.jails.jail_service.list_jails",
AsyncMock(return_value=mock_response),
):
resp = await jails_client.get("/api/jails")
jail = resp.json()["jails"][0]
assert "name" in jail
assert "enabled" in jail
assert "running" in jail
assert "idle" in jail
assert "backend" in jail
assert "status" in jail
# ---------------------------------------------------------------------------
# GET /api/jails/{name}
# ---------------------------------------------------------------------------
class TestGetJailDetail:
"""Tests for ``GET /api/jails/{name}``."""
async def test_200_for_existing_jail(self, jails_client: AsyncClient) -> None:
"""GET /api/jails/sshd returns 200 with full jail detail."""
with patch(
"app.routers.jails.jail_service.get_jail",
AsyncMock(return_value=_detail()),
):
resp = await jails_client.get("/api/jails/sshd")
assert resp.status_code == 200
data = resp.json()
assert data["jail"]["name"] == "sshd"
assert "log_paths" in data["jail"]
assert "fail_regex" in data["jail"]
assert "actions" in data["jail"]
async def test_404_for_unknown_jail(self, jails_client: AsyncClient) -> None:
"""GET /api/jails/ghost returns 404."""
from app.services.jail_service import JailNotFoundError
with patch(
"app.routers.jails.jail_service.get_jail",
AsyncMock(side_effect=JailNotFoundError("ghost")),
):
resp = await jails_client.get("/api/jails/ghost")
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# POST /api/jails/{name}/start
# ---------------------------------------------------------------------------
class TestStartJail:
"""Tests for ``POST /api/jails/{name}/start``."""
async def test_200_starts_jail(self, jails_client: AsyncClient) -> None:
"""POST /api/jails/sshd/start returns 200 on success."""
with patch(
"app.routers.jails.jail_service.start_jail",
AsyncMock(return_value=None),
):
resp = await jails_client.post("/api/jails/sshd/start")
assert resp.status_code == 200
assert resp.json()["jail"] == "sshd"
async def test_404_for_unknown_jail(self, jails_client: AsyncClient) -> None:
"""POST /api/jails/ghost/start returns 404."""
from app.services.jail_service import JailNotFoundError
with patch(
"app.routers.jails.jail_service.start_jail",
AsyncMock(side_effect=JailNotFoundError("ghost")),
):
resp = await jails_client.post("/api/jails/ghost/start")
assert resp.status_code == 404
async def test_409_on_operation_error(self, jails_client: AsyncClient) -> None:
"""POST /api/jails/sshd/start returns 409 on operation failure."""
from app.services.jail_service import JailOperationError
with patch(
"app.routers.jails.jail_service.start_jail",
AsyncMock(side_effect=JailOperationError("already running")),
):
resp = await jails_client.post("/api/jails/sshd/start")
assert resp.status_code == 409
# ---------------------------------------------------------------------------
# POST /api/jails/{name}/stop
# ---------------------------------------------------------------------------
class TestStopJail:
"""Tests for ``POST /api/jails/{name}/stop``."""
async def test_200_stops_jail(self, jails_client: AsyncClient) -> None:
"""POST /api/jails/sshd/stop returns 200 on success."""
with patch(
"app.routers.jails.jail_service.stop_jail",
AsyncMock(return_value=None),
):
resp = await jails_client.post("/api/jails/sshd/stop")
assert resp.status_code == 200
async def test_404_for_unknown_jail(self, jails_client: AsyncClient) -> None:
"""POST /api/jails/ghost/stop returns 404."""
from app.services.jail_service import JailNotFoundError
with patch(
"app.routers.jails.jail_service.stop_jail",
AsyncMock(side_effect=JailNotFoundError("ghost")),
):
resp = await jails_client.post("/api/jails/ghost/stop")
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# POST /api/jails/{name}/idle
# ---------------------------------------------------------------------------
class TestToggleIdle:
"""Tests for ``POST /api/jails/{name}/idle``."""
async def test_200_idle_on(self, jails_client: AsyncClient) -> None:
"""POST /api/jails/sshd/idle?on=true returns 200."""
with patch(
"app.routers.jails.jail_service.set_idle",
AsyncMock(return_value=None),
):
resp = await jails_client.post(
"/api/jails/sshd/idle",
content="true",
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 200
async def test_200_idle_off(self, jails_client: AsyncClient) -> None:
"""POST /api/jails/sshd/idle with false turns idle off."""
with patch(
"app.routers.jails.jail_service.set_idle",
AsyncMock(return_value=None),
):
resp = await jails_client.post(
"/api/jails/sshd/idle",
content="false",
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 200
# ---------------------------------------------------------------------------
# POST /api/jails/{name}/reload
# ---------------------------------------------------------------------------
class TestReloadJail:
"""Tests for ``POST /api/jails/{name}/reload``."""
async def test_200_reloads_jail(self, jails_client: AsyncClient) -> None:
"""POST /api/jails/sshd/reload returns 200 on success."""
with patch(
"app.routers.jails.jail_service.reload_jail",
AsyncMock(return_value=None),
):
resp = await jails_client.post("/api/jails/sshd/reload")
assert resp.status_code == 200
assert resp.json()["jail"] == "sshd"
# ---------------------------------------------------------------------------
# POST /api/jails/reload-all
# ---------------------------------------------------------------------------
class TestReloadAll:
"""Tests for ``POST /api/jails/reload-all``."""
async def test_200_reloads_all(self, jails_client: AsyncClient) -> None:
"""POST /api/jails/reload-all returns 200 on success."""
with patch(
"app.routers.jails.jail_service.reload_all",
AsyncMock(return_value=None),
):
resp = await jails_client.post("/api/jails/reload-all")
assert resp.status_code == 200
assert resp.json()["jail"] == "*"
# ---------------------------------------------------------------------------
# GET /api/jails/{name}/ignoreip
# ---------------------------------------------------------------------------
class TestIgnoreIpEndpoints:
"""Tests for ignore-list management endpoints."""
async def test_get_ignore_list(self, jails_client: AsyncClient) -> None:
"""GET /api/jails/sshd/ignoreip returns 200 with a list."""
with patch(
"app.routers.jails.jail_service.get_ignore_list",
AsyncMock(return_value=["127.0.0.1"]),
):
resp = await jails_client.get("/api/jails/sshd/ignoreip")
assert resp.status_code == 200
assert "127.0.0.1" in resp.json()
async def test_add_ignore_ip_returns_201(self, jails_client: AsyncClient) -> None:
"""POST /api/jails/sshd/ignoreip returns 201 on success."""
with patch(
"app.routers.jails.jail_service.add_ignore_ip",
AsyncMock(return_value=None),
):
resp = await jails_client.post(
"/api/jails/sshd/ignoreip",
json={"ip": "192.168.1.0/24"},
)
assert resp.status_code == 201
async def test_add_invalid_ip_returns_400(self, jails_client: AsyncClient) -> None:
"""POST /api/jails/sshd/ignoreip returns 400 for invalid IP."""
with patch(
"app.routers.jails.jail_service.add_ignore_ip",
AsyncMock(side_effect=ValueError("Invalid IP address or network: 'bad'")),
):
resp = await jails_client.post(
"/api/jails/sshd/ignoreip",
json={"ip": "bad"},
)
assert resp.status_code == 400
async def test_delete_ignore_ip(self, jails_client: AsyncClient) -> None:
"""DELETE /api/jails/sshd/ignoreip returns 200 on success."""
with patch(
"app.routers.jails.jail_service.del_ignore_ip",
AsyncMock(return_value=None),
):
resp = await jails_client.request(
"DELETE",
"/api/jails/sshd/ignoreip",
json={"ip": "127.0.0.1"},
)
assert resp.status_code == 200

View File

@@ -0,0 +1,526 @@
"""Tests for jail_service functions."""
from __future__ import annotations
from typing import Any
from unittest.mock import AsyncMock, patch
import pytest
from app.models.ban import ActiveBanListResponse
from app.models.jail import JailDetailResponse, JailListResponse
from app.services import jail_service
from app.services.jail_service import JailNotFoundError, JailOperationError
from app.utils.fail2ban_client import Fail2BanConnectionError
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_SOCKET = "/fake/fail2ban.sock"
_JAIL_NAMES = "sshd, nginx"
def _make_global_status(names: str = _JAIL_NAMES) -> tuple[int, list[Any]]:
return (0, [("Number of jail", 2), ("Jail list", names)])
def _make_short_status(
banned: int = 2,
total_banned: int = 10,
failed: int = 3,
total_failed: int = 20,
) -> tuple[int, list[Any]]:
return (
0,
[
("Filter", [("Currently failed", failed), ("Total failed", total_failed)]),
("Actions", [("Currently banned", banned), ("Total banned", total_banned)]),
],
)
def _make_send(responses: dict[str, Any]) -> AsyncMock:
"""Build an ``AsyncMock`` for ``Fail2BanClient.send``.
Responses are keyed by the command joined with a pipe, e.g.
``"status"`` or ``"status|sshd|short"``.
"""
async def _side_effect(command: list[Any]) -> Any:
key = "|".join(str(c) for c in command)
if key in responses:
return responses[key]
# Fall back to partial key matching.
for resp_key, resp_value in responses.items():
if key.startswith(resp_key):
return resp_value
raise KeyError(f"Unexpected command key {key!r}")
return AsyncMock(side_effect=_side_effect)
def _patch_client(responses: dict[str, Any]) -> Any:
"""Return a ``patch`` context manager that mocks ``Fail2BanClient``."""
mock_send = _make_send(responses)
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = mock_send
return patch("app.services.jail_service.Fail2BanClient", _FakeClient)
# ---------------------------------------------------------------------------
# list_jails
# ---------------------------------------------------------------------------
class TestListJails:
"""Unit tests for :func:`~app.services.jail_service.list_jails`."""
async def test_returns_jail_list_response(self) -> None:
"""list_jails returns a JailListResponse."""
responses = {
"status": _make_global_status("sshd"),
"status|sshd|short": _make_short_status(),
"get|sshd|bantime": (0, 600),
"get|sshd|findtime": (0, 600),
"get|sshd|maxretry": (0, 5),
"get|sshd|backend": (0, "polling"),
"get|sshd|idle": (0, False),
}
with _patch_client(responses):
result = await jail_service.list_jails(_SOCKET)
assert isinstance(result, JailListResponse)
assert result.total == 1
assert result.jails[0].name == "sshd"
async def test_empty_jail_list(self) -> None:
"""list_jails returns empty response when no jails are active."""
responses = {"status": (0, [("Number of jail", 0), ("Jail list", "")])}
with _patch_client(responses):
result = await jail_service.list_jails(_SOCKET)
assert result.total == 0
assert result.jails == []
async def test_jail_status_populated(self) -> None:
"""list_jails populates JailStatus with failed/banned counters."""
responses = {
"status": _make_global_status("sshd"),
"status|sshd|short": _make_short_status(banned=5, total_banned=50),
"get|sshd|bantime": (0, 600),
"get|sshd|findtime": (0, 600),
"get|sshd|maxretry": (0, 5),
"get|sshd|backend": (0, "polling"),
"get|sshd|idle": (0, False),
}
with _patch_client(responses):
result = await jail_service.list_jails(_SOCKET)
jail = result.jails[0]
assert jail.status is not None
assert jail.status.currently_banned == 5
assert jail.status.total_banned == 50
async def test_jail_config_populated(self) -> None:
"""list_jails populates ban_time, find_time, max_retry, backend."""
responses = {
"status": _make_global_status("sshd"),
"status|sshd|short": _make_short_status(),
"get|sshd|bantime": (0, 3600),
"get|sshd|findtime": (0, 300),
"get|sshd|maxretry": (0, 3),
"get|sshd|backend": (0, "systemd"),
"get|sshd|idle": (0, True),
}
with _patch_client(responses):
result = await jail_service.list_jails(_SOCKET)
jail = result.jails[0]
assert jail.ban_time == 3600
assert jail.find_time == 300
assert jail.max_retry == 3
assert jail.backend == "systemd"
assert jail.idle is True
async def test_multiple_jails_returned(self) -> None:
"""list_jails fetches all jails listed in the global status."""
responses = {
"status": _make_global_status("sshd, nginx"),
"status|sshd|short": _make_short_status(),
"status|nginx|short": _make_short_status(banned=0),
"get|sshd|bantime": (0, 600),
"get|sshd|findtime": (0, 600),
"get|sshd|maxretry": (0, 5),
"get|sshd|backend": (0, "polling"),
"get|sshd|idle": (0, False),
"get|nginx|bantime": (0, 1800),
"get|nginx|findtime": (0, 600),
"get|nginx|maxretry": (0, 5),
"get|nginx|backend": (0, "polling"),
"get|nginx|idle": (0, False),
}
with _patch_client(responses):
result = await jail_service.list_jails(_SOCKET)
assert result.total == 2
names = {j.name for j in result.jails}
assert names == {"sshd", "nginx"}
async def test_connection_error_propagates(self) -> None:
"""list_jails raises Fail2BanConnectionError when socket unreachable."""
async def _raise(*_: Any, **__: Any) -> None:
raise Fail2BanConnectionError("no socket", _SOCKET)
class _FailClient:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=Fail2BanConnectionError("no socket", _SOCKET))
with patch("app.services.jail_service.Fail2BanClient", _FailClient):
with pytest.raises(Fail2BanConnectionError):
await jail_service.list_jails(_SOCKET)
# ---------------------------------------------------------------------------
# get_jail
# ---------------------------------------------------------------------------
class TestGetJail:
"""Unit tests for :func:`~app.services.jail_service.get_jail`."""
def _full_responses(self, name: str = "sshd") -> dict[str, Any]:
return {
f"status|{name}|short": _make_short_status(),
f"get|{name}|logpath": (0, ["/var/log/auth.log"]),
f"get|{name}|failregex": (0, ["^.*Failed.*from <HOST>"]),
f"get|{name}|ignoreregex": (0, []),
f"get|{name}|ignoreip": (0, ["127.0.0.1"]),
f"get|{name}|datepattern": (0, None),
f"get|{name}|logencoding": (0, "UTF-8"),
f"get|{name}|bantime": (0, 600),
f"get|{name}|findtime": (0, 600),
f"get|{name}|maxretry": (0, 5),
f"get|{name}|backend": (0, "polling"),
f"get|{name}|idle": (0, False),
f"get|{name}|actions": (0, ["iptables-multiport"]),
}
async def test_returns_jail_detail_response(self) -> None:
"""get_jail returns a JailDetailResponse."""
with _patch_client(self._full_responses()):
result = await jail_service.get_jail(_SOCKET, "sshd")
assert isinstance(result, JailDetailResponse)
assert result.jail.name == "sshd"
async def test_log_paths_parsed(self) -> None:
"""get_jail populates log_paths from fail2ban."""
with _patch_client(self._full_responses()):
result = await jail_service.get_jail(_SOCKET, "sshd")
assert result.jail.log_paths == ["/var/log/auth.log"]
async def test_fail_regex_parsed(self) -> None:
"""get_jail populates fail_regex list."""
with _patch_client(self._full_responses()):
result = await jail_service.get_jail(_SOCKET, "sshd")
assert "^.*Failed.*from <HOST>" in result.jail.fail_regex
async def test_ignore_ips_parsed(self) -> None:
"""get_jail populates ignore_ips list."""
with _patch_client(self._full_responses()):
result = await jail_service.get_jail(_SOCKET, "sshd")
assert "127.0.0.1" in result.jail.ignore_ips
async def test_actions_parsed(self) -> None:
"""get_jail populates actions list."""
with _patch_client(self._full_responses()):
result = await jail_service.get_jail(_SOCKET, "sshd")
assert result.jail.actions == ["iptables-multiport"]
async def test_jail_not_found_raises(self) -> None:
"""get_jail raises JailNotFoundError when jail is unknown."""
not_found_response = (1, Exception("Unknown jail: 'ghost'"))
with _patch_client({r"status|ghost|short": not_found_response}):
with pytest.raises(JailNotFoundError):
await jail_service.get_jail(_SOCKET, "ghost")
# ---------------------------------------------------------------------------
# Jail control commands
# ---------------------------------------------------------------------------
class TestJailControls:
"""Unit tests for start, stop, idle, reload commands."""
async def test_start_jail_success(self) -> None:
"""start_jail sends the start command without error."""
with _patch_client({"start|sshd": (0, None)}):
await jail_service.start_jail(_SOCKET, "sshd") # should not raise
async def test_stop_jail_success(self) -> None:
"""stop_jail sends the stop command without error."""
with _patch_client({"stop|sshd": (0, None)}):
await jail_service.stop_jail(_SOCKET, "sshd") # should not raise
async def test_set_idle_on(self) -> None:
"""set_idle sends idle=on when on=True."""
with _patch_client({"set|sshd|idle|on": (0, True)}):
await jail_service.set_idle(_SOCKET, "sshd", on=True) # should not raise
async def test_set_idle_off(self) -> None:
"""set_idle sends idle=off when on=False."""
with _patch_client({"set|sshd|idle|off": (0, True)}):
await jail_service.set_idle(_SOCKET, "sshd", on=False) # should not raise
async def test_reload_jail_success(self) -> None:
"""reload_jail sends the reload command without error."""
with _patch_client({"reload|sshd|[]|[]": (0, "OK")}):
await jail_service.reload_jail(_SOCKET, "sshd") # should not raise
async def test_reload_all_success(self) -> None:
"""reload_all sends the reload --all command without error."""
with _patch_client({"reload|--all|[]|[]": (0, "OK")}):
await jail_service.reload_all(_SOCKET) # should not raise
async def test_start_not_found_raises(self) -> None:
"""start_jail raises JailNotFoundError for unknown jail."""
with _patch_client({"start|ghost": (1, Exception("Unknown jail: 'ghost'"))}):
with pytest.raises(JailNotFoundError):
await jail_service.start_jail(_SOCKET, "ghost")
async def test_stop_operation_error_raises(self) -> None:
"""stop_jail raises JailOperationError on fail2ban error code."""
with _patch_client({"stop|sshd": (1, Exception("cannot stop"))}):
with pytest.raises(JailOperationError):
await jail_service.stop_jail(_SOCKET, "sshd")
# ---------------------------------------------------------------------------
# ban_ip / unban_ip
# ---------------------------------------------------------------------------
class TestBanUnban:
"""Unit tests for :func:`~app.services.jail_service.ban_ip` and
:func:`~app.services.jail_service.unban_ip`.
"""
async def test_ban_ip_success(self) -> None:
"""ban_ip sends the banip command for a valid IP."""
with _patch_client({"set|sshd|banip|1.2.3.4": (0, 1)}):
await jail_service.ban_ip(_SOCKET, "sshd", "1.2.3.4") # should not raise
async def test_ban_ip_invalid_raises(self) -> None:
"""ban_ip raises ValueError for a non-IP value."""
with pytest.raises(ValueError, match="Invalid IP"):
await jail_service.ban_ip(_SOCKET, "sshd", "not-an-ip")
async def test_ban_ipv6_success(self) -> None:
"""ban_ip accepts an IPv6 address."""
with _patch_client({"set|sshd|banip|::1": (0, 1)}):
await jail_service.ban_ip(_SOCKET, "sshd", "::1") # should not raise
async def test_unban_ip_all_jails(self) -> None:
"""unban_ip with jail=None uses the global unban command."""
with _patch_client({"unban|1.2.3.4": (0, 1)}):
await jail_service.unban_ip(_SOCKET, "1.2.3.4") # should not raise
async def test_unban_ip_specific_jail(self) -> None:
"""unban_ip with a jail sends the set unbanip command."""
with _patch_client({"set|sshd|unbanip|1.2.3.4": (0, 1)}):
await jail_service.unban_ip(_SOCKET, "1.2.3.4", jail="sshd") # should not raise
async def test_unban_invalid_ip_raises(self) -> None:
"""unban_ip raises ValueError for an invalid IP."""
with pytest.raises(ValueError, match="Invalid IP"):
await jail_service.unban_ip(_SOCKET, "bad-ip")
# ---------------------------------------------------------------------------
# get_active_bans
# ---------------------------------------------------------------------------
class TestGetActiveBans:
"""Unit tests for :func:`~app.services.jail_service.get_active_bans`."""
async def test_returns_active_ban_list_response(self) -> None:
"""get_active_bans returns an ActiveBanListResponse."""
responses = {
"status": _make_global_status("sshd"),
"get|sshd|banip|--with-time": (
0,
["1.2.3.4 \t2025-01-01 12:00:00 + 3600 = 2025-01-01 13:00:00"],
),
}
with _patch_client(responses):
result = await jail_service.get_active_bans(_SOCKET)
assert isinstance(result, ActiveBanListResponse)
assert result.total == 1
assert result.bans[0].ip == "1.2.3.4"
assert result.bans[0].jail == "sshd"
async def test_empty_when_no_jails(self) -> None:
"""get_active_bans returns empty list when no jails are active."""
responses = {"status": (0, [("Number of jail", 0), ("Jail list", "")])}
with _patch_client(responses):
result = await jail_service.get_active_bans(_SOCKET)
assert result.total == 0
assert result.bans == []
async def test_empty_when_no_bans(self) -> None:
"""get_active_bans returns empty list when all jails have zero bans."""
responses = {
"status": _make_global_status("sshd"),
"get|sshd|banip|--with-time": (0, []),
}
with _patch_client(responses):
result = await jail_service.get_active_bans(_SOCKET)
assert result.total == 0
async def test_ban_time_parsed(self) -> None:
"""get_active_bans populates banned_at and expires_at from the entry."""
responses = {
"status": _make_global_status("sshd"),
"get|sshd|banip|--with-time": (
0,
["10.0.0.1 \t2025-03-01 08:00:00 + 7200 = 2025-03-01 10:00:00"],
),
}
with _patch_client(responses):
result = await jail_service.get_active_bans(_SOCKET)
ban = result.bans[0]
assert ban.banned_at is not None
assert "2025-03-01" in ban.banned_at
assert ban.expires_at is not None
assert "2025-03-01" in ban.expires_at
async def test_error_in_jail_tolerated(self) -> None:
"""get_active_bans skips a jail that errors during the ban-list fetch."""
responses = {
"status": _make_global_status("sshd, nginx"),
"get|sshd|banip|--with-time": (
0,
["1.2.3.4 \t2025-01-01 10:00:00 + 600 = 2025-01-01 10:10:00"],
),
"get|nginx|banip|--with-time": Fail2BanConnectionError("no nginx", _SOCKET),
}
async def _side(*args: Any) -> Any:
key = "|".join(str(a) for a in args[0])
resp = responses.get(key)
if isinstance(resp, Exception):
raise resp
if resp is None:
raise KeyError(f"Unexpected key {key!r}")
return resp
class _FakeClientPartial:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_side)
with patch("app.services.jail_service.Fail2BanClient", _FakeClientPartial):
result = await jail_service.get_active_bans(_SOCKET)
# Only sshd ban returned (nginx silently skipped)
assert result.total == 1
assert result.bans[0].jail == "sshd"
# ---------------------------------------------------------------------------
# Ignore list
# ---------------------------------------------------------------------------
class TestIgnoreList:
"""Unit tests for ignore list operations."""
async def test_get_ignore_list(self) -> None:
"""get_ignore_list returns a list of IP strings."""
with _patch_client({"get|sshd|ignoreip": (0, ["127.0.0.1", "10.0.0.0/8"])}):
result = await jail_service.get_ignore_list(_SOCKET, "sshd")
assert "127.0.0.1" in result
assert "10.0.0.0/8" in result
async def test_add_ignore_ip(self) -> None:
"""add_ignore_ip sends addignoreip for a valid CIDR."""
with _patch_client({"set|sshd|addignoreip|192.168.0.0/24": (0, "OK")}):
await jail_service.add_ignore_ip(_SOCKET, "sshd", "192.168.0.0/24")
async def test_add_ignore_ip_invalid_raises(self) -> None:
"""add_ignore_ip raises ValueError for an invalid CIDR."""
with pytest.raises(ValueError, match="Invalid IP"):
await jail_service.add_ignore_ip(_SOCKET, "sshd", "not-a-cidr")
async def test_del_ignore_ip(self) -> None:
"""del_ignore_ip sends delignoreip command."""
with _patch_client({"set|sshd|delignoreip|127.0.0.1": (0, "OK")}):
await jail_service.del_ignore_ip(_SOCKET, "sshd", "127.0.0.1")
async def test_get_ignore_self(self) -> None:
"""get_ignore_self returns a boolean."""
with _patch_client({"get|sshd|ignoreself": (0, True)}):
result = await jail_service.get_ignore_self(_SOCKET, "sshd")
assert result is True
async def test_set_ignore_self_on(self) -> None:
"""set_ignore_self sends ignoreself=true."""
with _patch_client({"set|sshd|ignoreself|true": (0, True)}):
await jail_service.set_ignore_self(_SOCKET, "sshd", on=True)
# ---------------------------------------------------------------------------
# lookup_ip
# ---------------------------------------------------------------------------
class TestLookupIp:
"""Unit tests for :func:`~app.services.jail_service.lookup_ip`."""
async def test_basic_lookup(self) -> None:
"""lookup_ip returns currently_banned_in list."""
responses = {
"get|--all|banned|1.2.3.4": (0, []),
"status": _make_global_status("sshd"),
"get|sshd|banip": (0, ["1.2.3.4", "5.6.7.8"]),
}
with _patch_client(responses):
result = await jail_service.lookup_ip(_SOCKET, "1.2.3.4")
assert result["ip"] == "1.2.3.4"
assert "sshd" in result["currently_banned_in"]
async def test_invalid_ip_raises(self) -> None:
"""lookup_ip raises ValueError for invalid IP."""
with pytest.raises(ValueError, match="Invalid IP"):
await jail_service.lookup_ip(_SOCKET, "not-an-ip")
async def test_not_banned_returns_empty_list(self) -> None:
"""lookup_ip returns empty currently_banned_in when IP is not banned."""
responses = {
"get|--all|banned|9.9.9.9": (0, []),
"status": _make_global_status("sshd"),
"get|sshd|banip": (0, ["1.2.3.4"]),
}
with _patch_client(responses):
result = await jail_service.lookup_ip(_SOCKET, "9.9.9.9")
assert result["currently_banned_in"] == []