- Add TYPE_CHECKING guards for runtime-expensive imports (aiohttp, aiosqlite) - Reorganize imports to follow PEP 8 conventions - Convert TypeAlias to modern PEP 695 type syntax (where appropriate) - Use Sequence/Mapping from collections.abc for type hints (covariant) - Replace string literals with cast() for improved type inference - Fix casting of Fail2BanResponse and TypedDict patterns - Add IpLookupResult TypedDict for precise return type annotation - Reformat overlong lines for readability (120 char limit) - Add asyncio_mode and filterwarnings to pytest config - Update test fixtures with improved type hints This improves mypy type checking and makes type relationships explicit.
935 lines
35 KiB
Python
935 lines
35 KiB
Python
"""Tests for the jails router endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import aiosqlite
|
|
import pytest
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from app.config import Settings
|
|
from app.db import init_db
|
|
from app.main import create_app
|
|
from app.models.ban import JailBannedIpsResponse
|
|
from app.models.jail import Jail, JailDetailResponse, JailListResponse, JailStatus, JailSummary
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_SETUP_PAYLOAD = {
|
|
"master_password": "testpassword1",
|
|
"database_path": "bangui.db",
|
|
"fail2ban_socket": "/var/run/fail2ban/fail2ban.sock",
|
|
"timezone": "UTC",
|
|
"session_duration_minutes": 60,
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
async def jails_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc]
|
|
"""Provide an authenticated ``AsyncClient`` for jail endpoint tests."""
|
|
settings = Settings(
|
|
database_path=str(tmp_path / "jails_test.db"),
|
|
fail2ban_socket="/tmp/fake.sock",
|
|
session_secret="test-jails-secret",
|
|
session_duration_minutes=60,
|
|
timezone="UTC",
|
|
log_level="debug",
|
|
)
|
|
app = create_app(settings=settings)
|
|
|
|
db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path)
|
|
db.row_factory = aiosqlite.Row
|
|
await init_db(db)
|
|
app.state.db = db
|
|
app.state.http_session = MagicMock()
|
|
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
|
await ac.post("/api/setup", json=_SETUP_PAYLOAD)
|
|
login = await ac.post(
|
|
"/api/auth/login",
|
|
json={"password": _SETUP_PAYLOAD["master_password"]},
|
|
)
|
|
assert login.status_code == 200
|
|
yield ac
|
|
|
|
await db.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _summary(name: str = "sshd") -> JailSummary:
|
|
return JailSummary(
|
|
name=name,
|
|
enabled=True,
|
|
running=True,
|
|
idle=False,
|
|
backend="polling",
|
|
find_time=600,
|
|
ban_time=600,
|
|
max_retry=5,
|
|
status=JailStatus(
|
|
currently_banned=2,
|
|
total_banned=10,
|
|
currently_failed=1,
|
|
total_failed=50,
|
|
),
|
|
)
|
|
|
|
|
|
def _detail(name: str = "sshd") -> JailDetailResponse:
|
|
return JailDetailResponse(
|
|
jail=Jail(
|
|
name=name,
|
|
enabled=True,
|
|
running=True,
|
|
idle=False,
|
|
backend="polling",
|
|
log_paths=["/var/log/auth.log"],
|
|
fail_regex=["^.*Failed.*<HOST>"],
|
|
ignore_regex=[],
|
|
ignore_ips=["127.0.0.1"],
|
|
date_pattern=None,
|
|
log_encoding="UTF-8",
|
|
find_time=600,
|
|
ban_time=600,
|
|
max_retry=5,
|
|
actions=["iptables-multiport"],
|
|
status=JailStatus(
|
|
currently_banned=2,
|
|
total_banned=10,
|
|
currently_failed=1,
|
|
total_failed=50,
|
|
),
|
|
)
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/jails
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetJails:
|
|
"""Tests for ``GET /api/jails``."""
|
|
|
|
async def test_200_when_authenticated(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails returns 200 with a JailListResponse."""
|
|
mock_response = JailListResponse(jails=[_summary()], total=1)
|
|
with patch(
|
|
"app.routers.jails.jail_service.list_jails",
|
|
AsyncMock(return_value=mock_response),
|
|
):
|
|
resp = await jails_client.get("/api/jails")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 1
|
|
assert data["jails"][0]["name"] == "sshd"
|
|
|
|
async def test_401_when_unauthenticated(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails returns 401 without a session cookie."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=jails_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).get("/api/jails")
|
|
assert resp.status_code == 401
|
|
|
|
async def test_response_shape(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails response contains expected fields."""
|
|
mock_response = JailListResponse(jails=[_summary()], total=1)
|
|
with patch(
|
|
"app.routers.jails.jail_service.list_jails",
|
|
AsyncMock(return_value=mock_response),
|
|
):
|
|
resp = await jails_client.get("/api/jails")
|
|
|
|
jail = resp.json()["jails"][0]
|
|
assert "name" in jail
|
|
assert "enabled" in jail
|
|
assert "running" in jail
|
|
assert "idle" in jail
|
|
assert "backend" in jail
|
|
assert "status" in jail
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/jails/{name}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetJailDetail:
|
|
"""Tests for ``GET /api/jails/{name}``."""
|
|
|
|
async def test_200_for_existing_jail(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/sshd returns 200 with full jail detail."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.get_jail",
|
|
AsyncMock(return_value=_detail()),
|
|
):
|
|
resp = await jails_client.get("/api/jails/sshd")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["jail"]["name"] == "sshd"
|
|
assert "log_paths" in data["jail"]
|
|
assert "fail_regex" in data["jail"]
|
|
assert "actions" in data["jail"]
|
|
|
|
async def test_404_for_unknown_jail(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/ghost returns 404."""
|
|
from app.services.jail_service import JailNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.get_jail",
|
|
AsyncMock(side_effect=JailNotFoundError("ghost")),
|
|
):
|
|
resp = await jails_client.get("/api/jails/ghost")
|
|
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/jails/{name}/start
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestStartJail:
|
|
"""Tests for ``POST /api/jails/{name}/start``."""
|
|
|
|
async def test_200_starts_jail(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/start returns 200 on success."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.start_jail",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await jails_client.post("/api/jails/sshd/start")
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["jail"] == "sshd"
|
|
|
|
async def test_404_for_unknown_jail(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/ghost/start returns 404."""
|
|
from app.services.jail_service import JailNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.start_jail",
|
|
AsyncMock(side_effect=JailNotFoundError("ghost")),
|
|
):
|
|
resp = await jails_client.post("/api/jails/ghost/start")
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_409_on_operation_error(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/start returns 409 on operation failure."""
|
|
from app.services.jail_service import JailOperationError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.start_jail",
|
|
AsyncMock(side_effect=JailOperationError("already running")),
|
|
):
|
|
resp = await jails_client.post("/api/jails/sshd/start")
|
|
|
|
assert resp.status_code == 409
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/jails/{name}/stop
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestStopJail:
|
|
"""Tests for ``POST /api/jails/{name}/stop``."""
|
|
|
|
async def test_200_stops_jail(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/stop returns 200 on success."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.stop_jail",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await jails_client.post("/api/jails/sshd/stop")
|
|
|
|
assert resp.status_code == 200
|
|
|
|
async def test_200_for_already_stopped_jail(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/stop returns 200 even when the jail is already stopped.
|
|
|
|
stop_jail is idempotent — service returns None rather than raising
|
|
JailNotFoundError when the jail is not present in fail2ban's runtime.
|
|
"""
|
|
with patch(
|
|
"app.routers.jails.jail_service.stop_jail",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await jails_client.post("/api/jails/sshd/stop")
|
|
|
|
assert resp.status_code == 200
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/jails/{name}/idle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestToggleIdle:
|
|
"""Tests for ``POST /api/jails/{name}/idle``."""
|
|
|
|
async def test_200_idle_on(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/idle?on=true returns 200."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.set_idle",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/sshd/idle",
|
|
content="true",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
|
|
async def test_200_idle_off(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/idle with false turns idle off."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.set_idle",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/sshd/idle",
|
|
content="false",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/jails/{name}/reload
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestReloadJail:
|
|
"""Tests for ``POST /api/jails/{name}/reload``."""
|
|
|
|
async def test_200_reloads_jail(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/reload returns 200 on success."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.reload_jail",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await jails_client.post("/api/jails/sshd/reload")
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["jail"] == "sshd"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/jails/reload-all
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestReloadAll:
|
|
"""Tests for ``POST /api/jails/reload-all``."""
|
|
|
|
async def test_200_reloads_all(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/reload-all returns 200 on success."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.reload_all",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await jails_client.post("/api/jails/reload-all")
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["jail"] == "*"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/jails/{name}/ignoreip
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestIgnoreIpEndpoints:
|
|
"""Tests for ignore-list management endpoints."""
|
|
|
|
async def test_get_ignore_list(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/sshd/ignoreip returns 200 with a list."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.get_ignore_list",
|
|
AsyncMock(return_value=["127.0.0.1"]),
|
|
):
|
|
resp = await jails_client.get("/api/jails/sshd/ignoreip")
|
|
|
|
assert resp.status_code == 200
|
|
assert "127.0.0.1" in resp.json()
|
|
|
|
async def test_add_ignore_ip_returns_201(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/ignoreip returns 201 on success."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.add_ignore_ip",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/sshd/ignoreip",
|
|
json={"ip": "192.168.1.0/24"},
|
|
)
|
|
|
|
assert resp.status_code == 201
|
|
|
|
async def test_add_invalid_ip_returns_400(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/ignoreip returns 400 for invalid IP."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.add_ignore_ip",
|
|
AsyncMock(side_effect=ValueError("Invalid IP address or network: 'bad'")),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/sshd/ignoreip",
|
|
json={"ip": "bad"},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_delete_ignore_ip(self, jails_client: AsyncClient) -> None:
|
|
"""DELETE /api/jails/sshd/ignoreip returns 200 on success."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.del_ignore_ip",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await jails_client.request(
|
|
"DELETE",
|
|
"/api/jails/sshd/ignoreip",
|
|
json={"ip": "127.0.0.1"},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
|
|
async def test_get_ignore_list_404_for_unknown_jail(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/ghost/ignoreip returns 404 for unknown jail."""
|
|
from app.services.jail_service import JailNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.get_ignore_list",
|
|
AsyncMock(side_effect=JailNotFoundError("ghost")),
|
|
):
|
|
resp = await jails_client.get("/api/jails/ghost/ignoreip")
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_get_ignore_list_502_on_connection_error(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/sshd/ignoreip returns 502 when fail2ban is unreachable."""
|
|
from app.utils.fail2ban_client import Fail2BanConnectionError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.get_ignore_list",
|
|
AsyncMock(side_effect=Fail2BanConnectionError("socket dead", "/tmp/fake.sock")),
|
|
):
|
|
resp = await jails_client.get("/api/jails/sshd/ignoreip")
|
|
|
|
assert resp.status_code == 502
|
|
|
|
async def test_add_ignore_ip_404_for_unknown_jail(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/ghost/ignoreip returns 404 for unknown jail."""
|
|
from app.services.jail_service import JailNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.add_ignore_ip",
|
|
AsyncMock(side_effect=JailNotFoundError("ghost")),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/ghost/ignoreip",
|
|
json={"ip": "1.2.3.4"},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_add_ignore_ip_409_on_operation_error(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/ignoreip returns 409 on operation failure."""
|
|
from app.services.jail_service import JailOperationError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.add_ignore_ip",
|
|
AsyncMock(side_effect=JailOperationError("fail2ban rejected")),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/sshd/ignoreip",
|
|
json={"ip": "1.2.3.4"},
|
|
)
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_add_ignore_ip_502_on_connection_error(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/ignoreip returns 502 when fail2ban is unreachable."""
|
|
from app.utils.fail2ban_client import Fail2BanConnectionError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.add_ignore_ip",
|
|
AsyncMock(side_effect=Fail2BanConnectionError("socket dead", "/tmp/fake.sock")),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/sshd/ignoreip",
|
|
json={"ip": "1.2.3.4"},
|
|
)
|
|
|
|
assert resp.status_code == 502
|
|
|
|
async def test_delete_ignore_ip_404_for_unknown_jail(self, jails_client: AsyncClient) -> None:
|
|
"""DELETE /api/jails/ghost/ignoreip returns 404 for unknown jail."""
|
|
from app.services.jail_service import JailNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.del_ignore_ip",
|
|
AsyncMock(side_effect=JailNotFoundError("ghost")),
|
|
):
|
|
resp = await jails_client.request(
|
|
"DELETE",
|
|
"/api/jails/ghost/ignoreip",
|
|
json={"ip": "1.2.3.4"},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_delete_ignore_ip_409_on_operation_error(self, jails_client: AsyncClient) -> None:
|
|
"""DELETE /api/jails/sshd/ignoreip returns 409 on operation failure."""
|
|
from app.services.jail_service import JailOperationError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.del_ignore_ip",
|
|
AsyncMock(side_effect=JailOperationError("fail2ban rejected")),
|
|
):
|
|
resp = await jails_client.request(
|
|
"DELETE",
|
|
"/api/jails/sshd/ignoreip",
|
|
json={"ip": "1.2.3.4"},
|
|
)
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_delete_ignore_ip_502_on_connection_error(self, jails_client: AsyncClient) -> None:
|
|
"""DELETE /api/jails/sshd/ignoreip returns 502 when fail2ban is unreachable."""
|
|
from app.utils.fail2ban_client import Fail2BanConnectionError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.del_ignore_ip",
|
|
AsyncMock(side_effect=Fail2BanConnectionError("socket dead", "/tmp/fake.sock")),
|
|
):
|
|
resp = await jails_client.request(
|
|
"DELETE",
|
|
"/api/jails/sshd/ignoreip",
|
|
json={"ip": "1.2.3.4"},
|
|
)
|
|
|
|
assert resp.status_code == 502
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/jails/{name}/ignoreself
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestToggleIgnoreSelf:
|
|
"""Tests for ``POST /api/jails/{name}/ignoreself``."""
|
|
|
|
async def test_200_enables_ignore_self(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/ignoreself with ``true`` returns 200."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.set_ignore_self",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/sshd/ignoreself",
|
|
content="true",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
assert "enabled" in resp.json()["message"]
|
|
|
|
async def test_200_disables_ignore_self(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/ignoreself with ``false`` returns 200."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.set_ignore_self",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/sshd/ignoreself",
|
|
content="false",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
assert "disabled" in resp.json()["message"]
|
|
|
|
async def test_404_for_unknown_jail(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/ghost/ignoreself returns 404 for unknown jail."""
|
|
from app.services.jail_service import JailNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.set_ignore_self",
|
|
AsyncMock(side_effect=JailNotFoundError("ghost")),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/ghost/ignoreself",
|
|
content="true",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_409_on_operation_error(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/ignoreself returns 409 on operation failure."""
|
|
from app.services.jail_service import JailOperationError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.set_ignore_self",
|
|
AsyncMock(side_effect=JailOperationError("fail2ban rejected")),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/sshd/ignoreself",
|
|
content="true",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_502_on_connection_error(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/ignoreself returns 502 when fail2ban is unreachable."""
|
|
from app.utils.fail2ban_client import Fail2BanConnectionError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.set_ignore_self",
|
|
AsyncMock(side_effect=Fail2BanConnectionError("socket dead", "/tmp/fake.sock")),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/sshd/ignoreself",
|
|
content="true",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
assert resp.status_code == 502
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 502 error paths — Fail2BanConnectionError across remaining endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestFail2BanConnectionErrors:
|
|
"""Tests that every endpoint returns 502 when fail2ban is unreachable."""
|
|
|
|
async def test_get_jails_502(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails returns 502 when fail2ban socket is unreachable."""
|
|
from app.utils.fail2ban_client import Fail2BanConnectionError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.list_jails",
|
|
AsyncMock(side_effect=Fail2BanConnectionError("socket dead", "/tmp/fake.sock")),
|
|
):
|
|
resp = await jails_client.get("/api/jails")
|
|
|
|
assert resp.status_code == 502
|
|
|
|
async def test_get_jail_502(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/sshd returns 502 when fail2ban is unreachable."""
|
|
from app.utils.fail2ban_client import Fail2BanConnectionError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.get_jail",
|
|
AsyncMock(side_effect=Fail2BanConnectionError("socket dead", "/tmp/fake.sock")),
|
|
):
|
|
resp = await jails_client.get("/api/jails/sshd")
|
|
|
|
assert resp.status_code == 502
|
|
|
|
async def test_reload_all_409(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/reload-all returns 409 on operation failure."""
|
|
from app.services.jail_service import JailOperationError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.reload_all",
|
|
AsyncMock(side_effect=JailOperationError("reload failed")),
|
|
):
|
|
resp = await jails_client.post("/api/jails/reload-all")
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_reload_all_502(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/reload-all returns 502 when fail2ban is unreachable."""
|
|
from app.utils.fail2ban_client import Fail2BanConnectionError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.reload_all",
|
|
AsyncMock(side_effect=Fail2BanConnectionError("socket dead", "/tmp/fake.sock")),
|
|
):
|
|
resp = await jails_client.post("/api/jails/reload-all")
|
|
|
|
assert resp.status_code == 502
|
|
|
|
async def test_start_jail_502(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/start returns 502 when fail2ban is unreachable."""
|
|
from app.utils.fail2ban_client import Fail2BanConnectionError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.start_jail",
|
|
AsyncMock(side_effect=Fail2BanConnectionError("socket dead", "/tmp/fake.sock")),
|
|
):
|
|
resp = await jails_client.post("/api/jails/sshd/start")
|
|
|
|
assert resp.status_code == 502
|
|
|
|
async def test_stop_jail_409(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/stop returns 409 on operation failure."""
|
|
from app.services.jail_service import JailOperationError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.stop_jail",
|
|
AsyncMock(side_effect=JailOperationError("stop failed")),
|
|
):
|
|
resp = await jails_client.post("/api/jails/sshd/stop")
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_stop_jail_502(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/stop returns 502 when fail2ban is unreachable."""
|
|
from app.utils.fail2ban_client import Fail2BanConnectionError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.stop_jail",
|
|
AsyncMock(side_effect=Fail2BanConnectionError("socket dead", "/tmp/fake.sock")),
|
|
):
|
|
resp = await jails_client.post("/api/jails/sshd/stop")
|
|
|
|
assert resp.status_code == 502
|
|
|
|
async def test_toggle_idle_404(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/ghost/idle returns 404 for unknown jail."""
|
|
from app.services.jail_service import JailNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.set_idle",
|
|
AsyncMock(side_effect=JailNotFoundError("ghost")),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/ghost/idle",
|
|
content="true",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_toggle_idle_409(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/idle returns 409 on operation failure."""
|
|
from app.services.jail_service import JailOperationError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.set_idle",
|
|
AsyncMock(side_effect=JailOperationError("idle failed")),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/sshd/idle",
|
|
content="true",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_toggle_idle_502(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/idle returns 502 when fail2ban is unreachable."""
|
|
from app.utils.fail2ban_client import Fail2BanConnectionError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.set_idle",
|
|
AsyncMock(side_effect=Fail2BanConnectionError("socket dead", "/tmp/fake.sock")),
|
|
):
|
|
resp = await jails_client.post(
|
|
"/api/jails/sshd/idle",
|
|
content="true",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
assert resp.status_code == 502
|
|
|
|
async def test_reload_jail_404(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/ghost/reload returns 404 for unknown jail."""
|
|
from app.services.jail_service import JailNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.reload_jail",
|
|
AsyncMock(side_effect=JailNotFoundError("ghost")),
|
|
):
|
|
resp = await jails_client.post("/api/jails/ghost/reload")
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_reload_jail_409(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/reload returns 409 on operation failure."""
|
|
from app.services.jail_service import JailOperationError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.reload_jail",
|
|
AsyncMock(side_effect=JailOperationError("reload failed")),
|
|
):
|
|
resp = await jails_client.post("/api/jails/sshd/reload")
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_reload_jail_502(self, jails_client: AsyncClient) -> None:
|
|
"""POST /api/jails/sshd/reload returns 502 when fail2ban is unreachable."""
|
|
from app.utils.fail2ban_client import Fail2BanConnectionError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.reload_jail",
|
|
AsyncMock(side_effect=Fail2BanConnectionError("socket dead", "/tmp/fake.sock")),
|
|
):
|
|
resp = await jails_client.post("/api/jails/sshd/reload")
|
|
|
|
assert resp.status_code == 502
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/jails/{name}/banned
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetJailBannedIps:
|
|
"""Tests for ``GET /api/jails/{name}/banned``."""
|
|
|
|
def _mock_response(
|
|
self,
|
|
*,
|
|
items: list[dict[str, str | None]] | None = None,
|
|
total: int = 2,
|
|
page: int = 1,
|
|
page_size: int = 25,
|
|
) -> JailBannedIpsResponse:
|
|
from app.models.ban import ActiveBan, JailBannedIpsResponse
|
|
|
|
ban_items = (
|
|
[
|
|
ActiveBan(
|
|
ip=item.get("ip") or "1.2.3.4",
|
|
jail="sshd",
|
|
banned_at=item.get("banned_at", "2025-01-01T10:00:00+00:00"),
|
|
expires_at=item.get("expires_at", "2025-01-01T10:10:00+00:00"),
|
|
ban_count=1,
|
|
country=item.get("country", None),
|
|
)
|
|
for item in (items or [{"ip": "1.2.3.4"}, {"ip": "5.6.7.8"}])
|
|
]
|
|
)
|
|
return JailBannedIpsResponse(
|
|
items=ban_items, total=total, page=page, page_size=page_size
|
|
)
|
|
|
|
async def test_200_returns_paginated_bans(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/sshd/banned returns 200 with a JailBannedIpsResponse."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.get_jail_banned_ips",
|
|
AsyncMock(return_value=self._mock_response()),
|
|
):
|
|
resp = await jails_client.get("/api/jails/sshd/banned")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "items" in data
|
|
assert "total" in data
|
|
assert "page" in data
|
|
assert "page_size" in data
|
|
assert data["total"] == 2
|
|
|
|
async def test_200_with_search_parameter(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/sshd/banned?search=1.2.3 passes search to service."""
|
|
mock_fn = AsyncMock(return_value=self._mock_response(items=[{"ip": "1.2.3.4"}], total=1))
|
|
with patch("app.routers.jails.jail_service.get_jail_banned_ips", mock_fn):
|
|
resp = await jails_client.get("/api/jails/sshd/banned?search=1.2.3")
|
|
|
|
assert resp.status_code == 200
|
|
_args, call_kwargs = mock_fn.call_args
|
|
assert call_kwargs.get("search") == "1.2.3"
|
|
|
|
async def test_200_with_page_and_page_size(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/sshd/banned?page=2&page_size=10 passes params to service."""
|
|
mock_fn = AsyncMock(
|
|
return_value=self._mock_response(page=2, page_size=10, total=0, items=[])
|
|
)
|
|
with patch("app.routers.jails.jail_service.get_jail_banned_ips", mock_fn):
|
|
resp = await jails_client.get("/api/jails/sshd/banned?page=2&page_size=10")
|
|
|
|
assert resp.status_code == 200
|
|
_args, call_kwargs = mock_fn.call_args
|
|
assert call_kwargs.get("page") == 2
|
|
assert call_kwargs.get("page_size") == 10
|
|
|
|
async def test_400_when_page_is_zero(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/sshd/banned?page=0 returns 400."""
|
|
resp = await jails_client.get("/api/jails/sshd/banned?page=0")
|
|
assert resp.status_code == 400
|
|
|
|
async def test_400_when_page_size_exceeds_max(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/sshd/banned?page_size=200 returns 400."""
|
|
resp = await jails_client.get("/api/jails/sshd/banned?page_size=200")
|
|
assert resp.status_code == 400
|
|
|
|
async def test_400_when_page_size_is_zero(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/sshd/banned?page_size=0 returns 400."""
|
|
resp = await jails_client.get("/api/jails/sshd/banned?page_size=0")
|
|
assert resp.status_code == 400
|
|
|
|
async def test_404_for_unknown_jail(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/ghost/banned returns 404 when jail is unknown."""
|
|
from app.services.jail_service import JailNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.get_jail_banned_ips",
|
|
AsyncMock(side_effect=JailNotFoundError("ghost")),
|
|
):
|
|
resp = await jails_client.get("/api/jails/ghost/banned")
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_502_when_fail2ban_unreachable(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/sshd/banned returns 502 when fail2ban is unreachable."""
|
|
from app.utils.fail2ban_client import Fail2BanConnectionError
|
|
|
|
with patch(
|
|
"app.routers.jails.jail_service.get_jail_banned_ips",
|
|
AsyncMock(
|
|
side_effect=Fail2BanConnectionError("socket dead", "/tmp/fake.sock")
|
|
),
|
|
):
|
|
resp = await jails_client.get("/api/jails/sshd/banned")
|
|
|
|
assert resp.status_code == 502
|
|
|
|
async def test_response_items_have_expected_fields(
|
|
self, jails_client: AsyncClient
|
|
) -> None:
|
|
"""Response items contain ip, jail, banned_at, expires_at, ban_count, country."""
|
|
with patch(
|
|
"app.routers.jails.jail_service.get_jail_banned_ips",
|
|
AsyncMock(return_value=self._mock_response()),
|
|
):
|
|
resp = await jails_client.get("/api/jails/sshd/banned")
|
|
|
|
item = resp.json()["items"][0]
|
|
assert "ip" in item
|
|
assert "jail" in item
|
|
assert "banned_at" in item
|
|
assert "expires_at" in item
|
|
assert "ban_count" in item
|
|
assert "country" in item
|
|
|
|
async def test_401_when_unauthenticated(self, jails_client: AsyncClient) -> None:
|
|
"""GET /api/jails/sshd/banned returns 401 without a session cookie."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=jails_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).get("/api/jails/sshd/banned")
|
|
assert resp.status_code == 401
|
|
|