"""Tests for the jails router endpoints.""" from __future__ import annotations from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import aiosqlite import pytest from httpx import ASGITransport, AsyncClient from app.config import Settings from app.db import init_db from app.main import create_app from app.models.jail import Jail, JailDetailResponse, JailListResponse, JailStatus, JailSummary # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- _SETUP_PAYLOAD = { "master_password": "testpassword1", "database_path": "bangui.db", "fail2ban_socket": "/var/run/fail2ban/fail2ban.sock", "timezone": "UTC", "session_duration_minutes": 60, } @pytest.fixture async def jails_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc] """Provide an authenticated ``AsyncClient`` for jail endpoint tests.""" settings = Settings( database_path=str(tmp_path / "jails_test.db"), fail2ban_socket="/tmp/fake.sock", session_secret="test-jails-secret", session_duration_minutes=60, timezone="UTC", log_level="debug", ) app = create_app(settings=settings) db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path) db.row_factory = aiosqlite.Row await init_db(db) app.state.db = db app.state.http_session = MagicMock() transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as ac: await ac.post("/api/setup", json=_SETUP_PAYLOAD) login = await ac.post( "/api/auth/login", json={"password": _SETUP_PAYLOAD["master_password"]}, ) assert login.status_code == 200 yield ac await db.close() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _summary(name: str = "sshd") -> JailSummary: return JailSummary( name=name, enabled=True, running=True, idle=False, backend="polling", find_time=600, ban_time=600, max_retry=5, status=JailStatus( currently_banned=2, total_banned=10, currently_failed=1, total_failed=50, ), ) def _detail(name: str = "sshd") -> JailDetailResponse: return JailDetailResponse( jail=Jail( name=name, enabled=True, running=True, idle=False, backend="polling", log_paths=["/var/log/auth.log"], fail_regex=["^.*Failed.*"], ignore_regex=[], ignore_ips=["127.0.0.1"], date_pattern=None, log_encoding="UTF-8", find_time=600, ban_time=600, max_retry=5, actions=["iptables-multiport"], status=JailStatus( currently_banned=2, total_banned=10, currently_failed=1, total_failed=50, ), ) ) # --------------------------------------------------------------------------- # GET /api/jails # --------------------------------------------------------------------------- class TestGetJails: """Tests for ``GET /api/jails``.""" async def test_200_when_authenticated(self, jails_client: AsyncClient) -> None: """GET /api/jails returns 200 with a JailListResponse.""" mock_response = JailListResponse(jails=[_summary()], total=1) with patch( "app.routers.jails.jail_service.list_jails", AsyncMock(return_value=mock_response), ): resp = await jails_client.get("/api/jails") assert resp.status_code == 200 data = resp.json() assert data["total"] == 1 assert data["jails"][0]["name"] == "sshd" async def test_401_when_unauthenticated(self, jails_client: AsyncClient) -> None: """GET /api/jails returns 401 without a session cookie.""" resp = await AsyncClient( transport=ASGITransport(app=jails_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).get("/api/jails") assert resp.status_code == 401 async def test_response_shape(self, jails_client: AsyncClient) -> None: """GET /api/jails response contains expected fields.""" mock_response = JailListResponse(jails=[_summary()], total=1) with patch( "app.routers.jails.jail_service.list_jails", AsyncMock(return_value=mock_response), ): resp = await jails_client.get("/api/jails") jail = resp.json()["jails"][0] assert "name" in jail assert "enabled" in jail assert "running" in jail assert "idle" in jail assert "backend" in jail assert "status" in jail # --------------------------------------------------------------------------- # GET /api/jails/{name} # --------------------------------------------------------------------------- class TestGetJailDetail: """Tests for ``GET /api/jails/{name}``.""" async def test_200_for_existing_jail(self, jails_client: AsyncClient) -> None: """GET /api/jails/sshd returns 200 with full jail detail.""" with patch( "app.routers.jails.jail_service.get_jail", AsyncMock(return_value=_detail()), ): resp = await jails_client.get("/api/jails/sshd") assert resp.status_code == 200 data = resp.json() assert data["jail"]["name"] == "sshd" assert "log_paths" in data["jail"] assert "fail_regex" in data["jail"] assert "actions" in data["jail"] async def test_404_for_unknown_jail(self, jails_client: AsyncClient) -> None: """GET /api/jails/ghost returns 404.""" from app.services.jail_service import JailNotFoundError with patch( "app.routers.jails.jail_service.get_jail", AsyncMock(side_effect=JailNotFoundError("ghost")), ): resp = await jails_client.get("/api/jails/ghost") assert resp.status_code == 404 # --------------------------------------------------------------------------- # POST /api/jails/{name}/start # --------------------------------------------------------------------------- class TestStartJail: """Tests for ``POST /api/jails/{name}/start``.""" async def test_200_starts_jail(self, jails_client: AsyncClient) -> None: """POST /api/jails/sshd/start returns 200 on success.""" with patch( "app.routers.jails.jail_service.start_jail", AsyncMock(return_value=None), ): resp = await jails_client.post("/api/jails/sshd/start") assert resp.status_code == 200 assert resp.json()["jail"] == "sshd" async def test_404_for_unknown_jail(self, jails_client: AsyncClient) -> None: """POST /api/jails/ghost/start returns 404.""" from app.services.jail_service import JailNotFoundError with patch( "app.routers.jails.jail_service.start_jail", AsyncMock(side_effect=JailNotFoundError("ghost")), ): resp = await jails_client.post("/api/jails/ghost/start") assert resp.status_code == 404 async def test_409_on_operation_error(self, jails_client: AsyncClient) -> None: """POST /api/jails/sshd/start returns 409 on operation failure.""" from app.services.jail_service import JailOperationError with patch( "app.routers.jails.jail_service.start_jail", AsyncMock(side_effect=JailOperationError("already running")), ): resp = await jails_client.post("/api/jails/sshd/start") assert resp.status_code == 409 # --------------------------------------------------------------------------- # POST /api/jails/{name}/stop # --------------------------------------------------------------------------- class TestStopJail: """Tests for ``POST /api/jails/{name}/stop``.""" async def test_200_stops_jail(self, jails_client: AsyncClient) -> None: """POST /api/jails/sshd/stop returns 200 on success.""" with patch( "app.routers.jails.jail_service.stop_jail", AsyncMock(return_value=None), ): resp = await jails_client.post("/api/jails/sshd/stop") assert resp.status_code == 200 async def test_404_for_unknown_jail(self, jails_client: AsyncClient) -> None: """POST /api/jails/ghost/stop returns 404.""" from app.services.jail_service import JailNotFoundError with patch( "app.routers.jails.jail_service.stop_jail", AsyncMock(side_effect=JailNotFoundError("ghost")), ): resp = await jails_client.post("/api/jails/ghost/stop") assert resp.status_code == 404 # --------------------------------------------------------------------------- # POST /api/jails/{name}/idle # --------------------------------------------------------------------------- class TestToggleIdle: """Tests for ``POST /api/jails/{name}/idle``.""" async def test_200_idle_on(self, jails_client: AsyncClient) -> None: """POST /api/jails/sshd/idle?on=true returns 200.""" with patch( "app.routers.jails.jail_service.set_idle", AsyncMock(return_value=None), ): resp = await jails_client.post( "/api/jails/sshd/idle", content="true", headers={"Content-Type": "application/json"}, ) assert resp.status_code == 200 async def test_200_idle_off(self, jails_client: AsyncClient) -> None: """POST /api/jails/sshd/idle with false turns idle off.""" with patch( "app.routers.jails.jail_service.set_idle", AsyncMock(return_value=None), ): resp = await jails_client.post( "/api/jails/sshd/idle", content="false", headers={"Content-Type": "application/json"}, ) assert resp.status_code == 200 # --------------------------------------------------------------------------- # POST /api/jails/{name}/reload # --------------------------------------------------------------------------- class TestReloadJail: """Tests for ``POST /api/jails/{name}/reload``.""" async def test_200_reloads_jail(self, jails_client: AsyncClient) -> None: """POST /api/jails/sshd/reload returns 200 on success.""" with patch( "app.routers.jails.jail_service.reload_jail", AsyncMock(return_value=None), ): resp = await jails_client.post("/api/jails/sshd/reload") assert resp.status_code == 200 assert resp.json()["jail"] == "sshd" # --------------------------------------------------------------------------- # POST /api/jails/reload-all # --------------------------------------------------------------------------- class TestReloadAll: """Tests for ``POST /api/jails/reload-all``.""" async def test_200_reloads_all(self, jails_client: AsyncClient) -> None: """POST /api/jails/reload-all returns 200 on success.""" with patch( "app.routers.jails.jail_service.reload_all", AsyncMock(return_value=None), ): resp = await jails_client.post("/api/jails/reload-all") assert resp.status_code == 200 assert resp.json()["jail"] == "*" # --------------------------------------------------------------------------- # GET /api/jails/{name}/ignoreip # --------------------------------------------------------------------------- class TestIgnoreIpEndpoints: """Tests for ignore-list management endpoints.""" async def test_get_ignore_list(self, jails_client: AsyncClient) -> None: """GET /api/jails/sshd/ignoreip returns 200 with a list.""" with patch( "app.routers.jails.jail_service.get_ignore_list", AsyncMock(return_value=["127.0.0.1"]), ): resp = await jails_client.get("/api/jails/sshd/ignoreip") assert resp.status_code == 200 assert "127.0.0.1" in resp.json() async def test_add_ignore_ip_returns_201(self, jails_client: AsyncClient) -> None: """POST /api/jails/sshd/ignoreip returns 201 on success.""" with patch( "app.routers.jails.jail_service.add_ignore_ip", AsyncMock(return_value=None), ): resp = await jails_client.post( "/api/jails/sshd/ignoreip", json={"ip": "192.168.1.0/24"}, ) assert resp.status_code == 201 async def test_add_invalid_ip_returns_400(self, jails_client: AsyncClient) -> None: """POST /api/jails/sshd/ignoreip returns 400 for invalid IP.""" with patch( "app.routers.jails.jail_service.add_ignore_ip", AsyncMock(side_effect=ValueError("Invalid IP address or network: 'bad'")), ): resp = await jails_client.post( "/api/jails/sshd/ignoreip", json={"ip": "bad"}, ) assert resp.status_code == 400 async def test_delete_ignore_ip(self, jails_client: AsyncClient) -> None: """DELETE /api/jails/sshd/ignoreip returns 200 on success.""" with patch( "app.routers.jails.jail_service.del_ignore_ip", AsyncMock(return_value=None), ): resp = await jails_client.request( "DELETE", "/api/jails/sshd/ignoreip", json={"ip": "127.0.0.1"}, ) assert resp.status_code == 200