"""Tests for the config router endpoints.""" from __future__ import annotations from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import aiosqlite import pytest from httpx import ASGITransport, AsyncClient import app from app.config import Settings from app.db import init_db from app.main import create_app from app.models.config import ( Fail2BanLogResponse, FilterConfig, GlobalConfigResponse, JailConfig, JailConfigListResponse, JailConfigResponse, RegexTestResponse, ServiceStatusResponse, ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- _SETUP_PAYLOAD = { "master_password": "testpassword1", "database_path": "bangui.db", "fail2ban_socket": "/var/run/fail2ban/fail2ban.sock", "timezone": "UTC", "session_duration_minutes": 60, } @pytest.fixture async def config_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc] """Provide an authenticated ``AsyncClient`` for config endpoint tests.""" settings = Settings( database_path=str(tmp_path / "config_test.db"), fail2ban_socket="/tmp/fake.sock", session_secret="test-config-secret", session_duration_minutes=60, timezone="UTC", log_level="debug", ) app = create_app(settings=settings) db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path) db.row_factory = aiosqlite.Row await init_db(db) app.state.db = db app.state.http_session = MagicMock() transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as ac: await ac.post("/api/setup", json=_SETUP_PAYLOAD) login = await ac.post( "/api/auth/login", json={"password": _SETUP_PAYLOAD["master_password"]}, ) assert login.status_code == 200 yield ac await db.close() def _make_jail_config(name: str = "sshd") -> JailConfig: return JailConfig( name=name, ban_time=600, max_retry=5, find_time=600, fail_regex=["regex1"], ignore_regex=[], log_paths=["/var/log/auth.log"], date_pattern=None, log_encoding="UTF-8", backend="polling", use_dns="warn", prefregex="", actions=["iptables"], ) # --------------------------------------------------------------------------- # GET /api/config/jails # --------------------------------------------------------------------------- class TestGetJailConfigs: """Tests for ``GET /api/config/jails``.""" async def test_200_returns_jail_list(self, config_client: AsyncClient) -> None: """GET /api/config/jails returns 200 with JailConfigListResponse.""" mock_response = JailConfigListResponse( jails=[_make_jail_config("sshd")], total=1 ) with patch( "app.routers.config.config_service.list_jail_configs", AsyncMock(return_value=mock_response), ): resp = await config_client.get("/api/config/jails") assert resp.status_code == 200 data = resp.json() assert data["total"] == 1 assert data["jails"][0]["name"] == "sshd" async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """GET /api/config/jails returns 401 without a valid session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).get("/api/config/jails") assert resp.status_code == 401 async def test_502_on_connection_error(self, config_client: AsyncClient) -> None: """GET /api/config/jails returns 502 when fail2ban is unreachable.""" from app.utils.fail2ban_client import Fail2BanConnectionError with patch( "app.routers.config.config_service.list_jail_configs", AsyncMock(side_effect=Fail2BanConnectionError("down", "/tmp/fake.sock")), ): resp = await config_client.get("/api/config/jails") assert resp.status_code == 502 # --------------------------------------------------------------------------- # GET /api/config/jails/{name} # --------------------------------------------------------------------------- class TestGetJailConfig: """Tests for ``GET /api/config/jails/{name}``.""" async def test_200_returns_jail_config(self, config_client: AsyncClient) -> None: """GET /api/config/jails/sshd returns 200 with JailConfigResponse.""" mock_response = JailConfigResponse(jail=_make_jail_config("sshd")) with patch( "app.routers.config.config_service.get_jail_config", AsyncMock(return_value=mock_response), ): resp = await config_client.get("/api/config/jails/sshd") assert resp.status_code == 200 assert resp.json()["jail"]["name"] == "sshd" assert resp.json()["jail"]["ban_time"] == 600 async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None: """GET /api/config/jails/missing returns 404.""" from app.services.config_service import JailNotFoundError with patch( "app.routers.config.config_service.get_jail_config", AsyncMock(side_effect=JailNotFoundError("missing")), ): resp = await config_client.get("/api/config/jails/missing") assert resp.status_code == 404 async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """GET /api/config/jails/sshd returns 401 without session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).get("/api/config/jails/sshd") assert resp.status_code == 401 # --------------------------------------------------------------------------- # PUT /api/config/jails/{name} # --------------------------------------------------------------------------- class TestUpdateJailConfig: """Tests for ``PUT /api/config/jails/{name}``.""" async def test_204_on_success(self, config_client: AsyncClient) -> None: """PUT /api/config/jails/sshd returns 204 on success.""" with patch( "app.routers.config.config_service.update_jail_config", AsyncMock(return_value=None), ): resp = await config_client.put( "/api/config/jails/sshd", json={"ban_time": 3600}, ) assert resp.status_code == 204 async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None: """PUT /api/config/jails/missing returns 404.""" from app.services.config_service import JailNotFoundError with patch( "app.routers.config.config_service.update_jail_config", AsyncMock(side_effect=JailNotFoundError("missing")), ): resp = await config_client.put( "/api/config/jails/missing", json={"ban_time": 3600}, ) assert resp.status_code == 404 async def test_422_on_invalid_regex(self, config_client: AsyncClient) -> None: """PUT /api/config/jails/sshd returns 422 for invalid regex pattern.""" from app.services.config_service import ConfigValidationError with patch( "app.routers.config.config_service.update_jail_config", AsyncMock(side_effect=ConfigValidationError("bad regex")), ): resp = await config_client.put( "/api/config/jails/sshd", json={"fail_regex": ["[bad"]}, ) assert resp.status_code == 422 async def test_400_on_config_operation_error(self, config_client: AsyncClient) -> None: """PUT /api/config/jails/sshd returns 400 when set command fails.""" from app.services.config_service import ConfigOperationError with patch( "app.routers.config.config_service.update_jail_config", AsyncMock(side_effect=ConfigOperationError("set failed")), ): resp = await config_client.put( "/api/config/jails/sshd", json={"ban_time": 3600}, ) assert resp.status_code == 400 async def test_204_with_dns_mode(self, config_client: AsyncClient) -> None: """PUT /api/config/jails/sshd accepts dns_mode field.""" with patch( "app.routers.config.config_service.update_jail_config", AsyncMock(return_value=None), ): resp = await config_client.put( "/api/config/jails/sshd", json={"dns_mode": "no"}, ) assert resp.status_code == 204 async def test_204_with_prefregex(self, config_client: AsyncClient) -> None: """PUT /api/config/jails/sshd accepts prefregex field.""" with patch( "app.routers.config.config_service.update_jail_config", AsyncMock(return_value=None), ): resp = await config_client.put( "/api/config/jails/sshd", json={"prefregex": r"^%(__prefix_line)s"}, ) assert resp.status_code == 204 async def test_204_with_date_pattern(self, config_client: AsyncClient) -> None: """PUT /api/config/jails/sshd accepts date_pattern field.""" with patch( "app.routers.config.config_service.update_jail_config", AsyncMock(return_value=None), ): resp = await config_client.put( "/api/config/jails/sshd", json={"date_pattern": "%Y-%m-%d %H:%M:%S"}, ) assert resp.status_code == 204 # --------------------------------------------------------------------------- # GET /api/config/global # --------------------------------------------------------------------------- class TestGetGlobalConfig: """Tests for ``GET /api/config/global``.""" async def test_200_returns_global_config(self, config_client: AsyncClient) -> None: """GET /api/config/global returns 200 with GlobalConfigResponse.""" mock_response = GlobalConfigResponse( log_level="WARNING", log_target="/var/log/fail2ban.log", db_purge_age=86400, db_max_matches=10, ) with patch( "app.routers.config.config_service.get_global_config", AsyncMock(return_value=mock_response), ): resp = await config_client.get("/api/config/global") assert resp.status_code == 200 data = resp.json() assert data["log_level"] == "WARNING" assert data["db_purge_age"] == 86400 async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """GET /api/config/global returns 401 without session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).get("/api/config/global") assert resp.status_code == 401 # --------------------------------------------------------------------------- # PUT /api/config/global # --------------------------------------------------------------------------- class TestUpdateGlobalConfig: """Tests for ``PUT /api/config/global``.""" async def test_204_on_success(self, config_client: AsyncClient) -> None: """PUT /api/config/global returns 204 on success.""" with patch( "app.routers.config.config_service.update_global_config", AsyncMock(return_value=None), ): resp = await config_client.put( "/api/config/global", json={"log_level": "DEBUG"}, ) assert resp.status_code == 204 async def test_400_on_operation_error(self, config_client: AsyncClient) -> None: """PUT /api/config/global returns 400 when set command fails.""" from app.services.config_service import ConfigOperationError with patch( "app.routers.config.config_service.update_global_config", AsyncMock(side_effect=ConfigOperationError("set failed")), ): resp = await config_client.put( "/api/config/global", json={"log_level": "INFO"}, ) assert resp.status_code == 400 # --------------------------------------------------------------------------- # POST /api/config/reload # --------------------------------------------------------------------------- class TestReloadFail2ban: """Tests for ``POST /api/config/reload``.""" async def test_204_on_success(self, config_client: AsyncClient) -> None: """POST /api/config/reload returns 204 on success.""" with patch( "app.routers.config.jail_service.reload_all", AsyncMock(return_value=None), ): resp = await config_client.post("/api/config/reload") assert resp.status_code == 204 async def test_502_when_fail2ban_unreachable(self, config_client: AsyncClient) -> None: """POST /api/config/reload returns 502 when fail2ban socket is unreachable.""" from app.utils.fail2ban_client import Fail2BanConnectionError with patch( "app.routers.config.jail_service.reload_all", AsyncMock(side_effect=Fail2BanConnectionError("no socket", "/fake.sock")), ): resp = await config_client.post("/api/config/reload") assert resp.status_code == 502 async def test_409_when_reload_operation_fails(self, config_client: AsyncClient) -> None: """POST /api/config/reload returns 409 when fail2ban reports a reload error.""" from app.services.jail_service import JailOperationError with patch( "app.routers.config.jail_service.reload_all", AsyncMock(side_effect=JailOperationError("reload rejected")), ): resp = await config_client.post("/api/config/reload") assert resp.status_code == 409 # --------------------------------------------------------------------------- # POST /api/config/restart # --------------------------------------------------------------------------- class TestRestartFail2ban: """Tests for ``POST /api/config/restart``.""" async def test_204_on_success(self, config_client: AsyncClient) -> None: """POST /api/config/restart returns 204 when fail2ban restarts cleanly.""" with ( patch( "app.routers.config.jail_service.restart", AsyncMock(return_value=None), ), patch( "app.routers.config.config_file_service.start_daemon", AsyncMock(return_value=True), ), patch( "app.routers.config.config_file_service.wait_for_fail2ban", AsyncMock(return_value=True), ), ): resp = await config_client.post("/api/config/restart") assert resp.status_code == 204 async def test_503_when_fail2ban_does_not_come_back(self, config_client: AsyncClient) -> None: """POST /api/config/restart returns 503 when fail2ban does not come back online.""" with ( patch( "app.routers.config.jail_service.restart", AsyncMock(return_value=None), ), patch( "app.routers.config.config_file_service.start_daemon", AsyncMock(return_value=True), ), patch( "app.routers.config.config_file_service.wait_for_fail2ban", AsyncMock(return_value=False), ), ): resp = await config_client.post("/api/config/restart") assert resp.status_code == 503 async def test_409_when_stop_command_fails(self, config_client: AsyncClient) -> None: """POST /api/config/restart returns 409 when fail2ban rejects the stop command.""" from app.services.jail_service import JailOperationError with patch( "app.routers.config.jail_service.restart", AsyncMock(side_effect=JailOperationError("stop failed")), ): resp = await config_client.post("/api/config/restart") assert resp.status_code == 409 async def test_502_when_fail2ban_unreachable(self, config_client: AsyncClient) -> None: """POST /api/config/restart returns 502 when fail2ban socket is unreachable.""" from app.utils.fail2ban_client import Fail2BanConnectionError with patch( "app.routers.config.jail_service.restart", AsyncMock(side_effect=Fail2BanConnectionError("no socket", "/fake.sock")), ): resp = await config_client.post("/api/config/restart") assert resp.status_code == 502 async def test_start_daemon_called_after_stop(self, config_client: AsyncClient) -> None: """start_daemon is called after a successful stop.""" mock_start = AsyncMock(return_value=True) with ( patch( "app.routers.config.jail_service.restart", AsyncMock(return_value=None), ), patch( "app.routers.config.config_file_service.start_daemon", mock_start, ), patch( "app.routers.config.config_file_service.wait_for_fail2ban", AsyncMock(return_value=True), ), ): await config_client.post("/api/config/restart") mock_start.assert_awaited_once() # --------------------------------------------------------------------------- # POST /api/config/regex-test # --------------------------------------------------------------------------- class TestRegexTest: """Tests for ``POST /api/config/regex-test``.""" async def test_200_matched(self, config_client: AsyncClient) -> None: """POST /api/config/regex-test returns matched=true for a valid match.""" mock_response = RegexTestResponse(matched=True, groups=["1.2.3.4"], error=None) with patch( "app.routers.config.config_service.test_regex", return_value=mock_response, ): resp = await config_client.post( "/api/config/regex-test", json={ "log_line": "fail from 1.2.3.4", "fail_regex": r"(\d+\.\d+\.\d+\.\d+)", }, ) assert resp.status_code == 200 assert resp.json()["matched"] is True async def test_200_not_matched(self, config_client: AsyncClient) -> None: """POST /api/config/regex-test returns matched=false for no match.""" mock_response = RegexTestResponse(matched=False, groups=[], error=None) with patch( "app.routers.config.config_service.test_regex", return_value=mock_response, ): resp = await config_client.post( "/api/config/regex-test", json={"log_line": "ok line", "fail_regex": r"FAIL"}, ) assert resp.status_code == 200 assert resp.json()["matched"] is False async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """POST /api/config/regex-test returns 401 without session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).post( "/api/config/regex-test", json={"log_line": "test", "fail_regex": "test"}, ) assert resp.status_code == 401 # --------------------------------------------------------------------------- # POST /api/config/jails/{name}/logpath # --------------------------------------------------------------------------- class TestAddLogPath: """Tests for ``POST /api/config/jails/{name}/logpath``.""" async def test_204_on_success(self, config_client: AsyncClient) -> None: """POST /api/config/jails/sshd/logpath returns 204 on success.""" with patch( "app.routers.config.config_service.add_log_path", AsyncMock(return_value=None), ): resp = await config_client.post( "/api/config/jails/sshd/logpath", json={"log_path": "/var/log/specific.log", "tail": True}, ) assert resp.status_code == 204 async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None: """POST /api/config/jails/missing/logpath returns 404.""" from app.services.config_service import JailNotFoundError with patch( "app.routers.config.config_service.add_log_path", AsyncMock(side_effect=JailNotFoundError("missing")), ): resp = await config_client.post( "/api/config/jails/missing/logpath", json={"log_path": "/var/log/test.log"}, ) assert resp.status_code == 404 # --------------------------------------------------------------------------- # POST /api/config/preview-log # --------------------------------------------------------------------------- class TestPreviewLog: """Tests for ``POST /api/config/preview-log``.""" async def test_200_returns_preview(self, config_client: AsyncClient) -> None: """POST /api/config/preview-log returns 200 with LogPreviewResponse.""" from app.models.config import LogPreviewLine, LogPreviewResponse mock_response = LogPreviewResponse( lines=[LogPreviewLine(line="fail line", matched=True, groups=[])], total_lines=1, matched_count=1, ) with patch( "app.routers.config.config_service.preview_log", AsyncMock(return_value=mock_response), ): resp = await config_client.post( "/api/config/preview-log", json={"log_path": "/var/log/test.log", "fail_regex": "fail"}, ) assert resp.status_code == 200 data = resp.json() assert data["total_lines"] == 1 assert data["matched_count"] == 1 # --------------------------------------------------------------------------- # GET /api/config/map-color-thresholds # --------------------------------------------------------------------------- class TestGetMapColorThresholds: """Tests for ``GET /api/config/map-color-thresholds``.""" async def test_200_returns_thresholds(self, config_client: AsyncClient) -> None: """GET /api/config/map-color-thresholds returns 200 with current values.""" resp = await config_client.get("/api/config/map-color-thresholds") assert resp.status_code == 200 data = resp.json() assert "threshold_high" in data assert "threshold_medium" in data assert "threshold_low" in data # Should return defaults after setup assert data["threshold_high"] == 100 assert data["threshold_medium"] == 50 assert data["threshold_low"] == 20 # --------------------------------------------------------------------------- # PUT /api/config/map-color-thresholds # --------------------------------------------------------------------------- class TestUpdateMapColorThresholds: """Tests for ``PUT /api/config/map-color-thresholds``.""" async def test_200_updates_thresholds(self, config_client: AsyncClient) -> None: """PUT /api/config/map-color-thresholds returns 200 and updates settings.""" update_payload = { "threshold_high": 200, "threshold_medium": 80, "threshold_low": 30, } resp = await config_client.put( "/api/config/map-color-thresholds", json=update_payload ) assert resp.status_code == 200 data = resp.json() assert data["threshold_high"] == 200 assert data["threshold_medium"] == 80 assert data["threshold_low"] == 30 # Verify the values persist get_resp = await config_client.get("/api/config/map-color-thresholds") assert get_resp.status_code == 200 get_data = get_resp.json() assert get_data["threshold_high"] == 200 assert get_data["threshold_medium"] == 80 assert get_data["threshold_low"] == 30 async def test_400_for_invalid_order(self, config_client: AsyncClient) -> None: """PUT /api/config/map-color-thresholds returns 400 if thresholds are misordered.""" invalid_payload = { "threshold_high": 50, "threshold_medium": 50, "threshold_low": 20, } resp = await config_client.put( "/api/config/map-color-thresholds", json=invalid_payload ) assert resp.status_code == 400 assert "high > medium > low" in resp.json()["detail"] async def test_400_for_non_positive_values( self, config_client: AsyncClient ) -> None: """PUT /api/config/map-color-thresholds returns 422 for non-positive values (Pydantic validation).""" invalid_payload = { "threshold_high": 100, "threshold_medium": 50, "threshold_low": 0, } resp = await config_client.put( "/api/config/map-color-thresholds", json=invalid_payload ) # Pydantic validates ge=1 constraint before our service code runs assert resp.status_code == 422 # --------------------------------------------------------------------------- # GET /api/config/jails/inactive # --------------------------------------------------------------------------- class TestGetInactiveJails: """Tests for ``GET /api/config/jails/inactive``.""" async def test_200_returns_inactive_list(self, config_client: AsyncClient) -> None: """GET /api/config/jails/inactive returns 200 with InactiveJailListResponse.""" from app.models.config import InactiveJail, InactiveJailListResponse mock_jail = InactiveJail( name="apache-auth", filter="apache-auth", actions=[], port="http,https", logpath=["/var/log/apache2/error.log"], bantime="10m", findtime="5m", maxretry=5, source_file="/etc/fail2ban/jail.conf", enabled=False, ) mock_response = InactiveJailListResponse(jails=[mock_jail], total=1) with patch( "app.routers.config.config_file_service.list_inactive_jails", AsyncMock(return_value=mock_response), ): resp = await config_client.get("/api/config/jails/inactive") assert resp.status_code == 200 data = resp.json() assert data["total"] == 1 assert data["jails"][0]["name"] == "apache-auth" async def test_200_empty_list(self, config_client: AsyncClient) -> None: """GET /api/config/jails/inactive returns 200 with empty list.""" from app.models.config import InactiveJailListResponse with patch( "app.routers.config.config_file_service.list_inactive_jails", AsyncMock(return_value=InactiveJailListResponse(jails=[], total=0)), ): resp = await config_client.get("/api/config/jails/inactive") assert resp.status_code == 200 assert resp.json()["total"] == 0 assert resp.json()["jails"] == [] async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """GET /api/config/jails/inactive returns 401 without a valid session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).get("/api/config/jails/inactive") assert resp.status_code == 401 # --------------------------------------------------------------------------- # POST /api/config/jails/{name}/activate # --------------------------------------------------------------------------- class TestActivateJail: """Tests for ``POST /api/config/jails/{name}/activate``.""" async def test_200_activates_jail(self, config_client: AsyncClient) -> None: """POST /api/config/jails/apache-auth/activate returns 200.""" from app.models.config import JailActivationResponse mock_response = JailActivationResponse( name="apache-auth", active=True, message="Jail 'apache-auth' activated successfully.", ) with patch( "app.routers.config.config_file_service.activate_jail", AsyncMock(return_value=mock_response), ): resp = await config_client.post( "/api/config/jails/apache-auth/activate", json={} ) assert resp.status_code == 200 data = resp.json() assert data["active"] is True assert data["name"] == "apache-auth" async def test_200_with_overrides(self, config_client: AsyncClient) -> None: """POST .../activate accepts override fields.""" from app.models.config import JailActivationResponse mock_response = JailActivationResponse( name="apache-auth", active=True, message="Activated." ) with patch( "app.routers.config.config_file_service.activate_jail", AsyncMock(return_value=mock_response), ) as mock_activate: resp = await config_client.post( "/api/config/jails/apache-auth/activate", json={"bantime": "1h", "maxretry": 3}, ) assert resp.status_code == 200 # Verify the override values were passed to the service called_req = mock_activate.call_args.args[3] assert called_req.bantime == "1h" assert called_req.maxretry == 3 async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None: """POST /api/config/jails/missing/activate returns 404.""" from app.services.config_file_service import JailNotFoundInConfigError with patch( "app.routers.config.config_file_service.activate_jail", AsyncMock(side_effect=JailNotFoundInConfigError("missing")), ): resp = await config_client.post( "/api/config/jails/missing/activate", json={} ) assert resp.status_code == 404 async def test_409_when_already_active(self, config_client: AsyncClient) -> None: """POST /api/config/jails/sshd/activate returns 409 if already active.""" from app.services.config_file_service import JailAlreadyActiveError with patch( "app.routers.config.config_file_service.activate_jail", AsyncMock(side_effect=JailAlreadyActiveError("sshd")), ): resp = await config_client.post( "/api/config/jails/sshd/activate", json={} ) assert resp.status_code == 409 async def test_400_for_invalid_jail_name(self, config_client: AsyncClient) -> None: """POST /api/config/jails/ with bad name returns 400.""" from app.services.config_file_service import JailNameError with patch( "app.routers.config.config_file_service.activate_jail", AsyncMock(side_effect=JailNameError("bad name")), ): resp = await config_client.post( "/api/config/jails/bad-name/activate", json={} ) assert resp.status_code == 400 async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """POST /api/config/jails/sshd/activate returns 401 without session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).post("/api/config/jails/sshd/activate", json={}) assert resp.status_code == 401 async def test_200_with_active_false_on_missing_logpath(self, config_client: AsyncClient) -> None: """POST .../activate returns 200 with active=False when the service blocks due to missing logpath.""" from app.models.config import JailActivationResponse blocked_response = JailActivationResponse( name="airsonic-auth", active=False, fail2ban_running=True, validation_warnings=["logpath: log file '/var/log/airsonic/airsonic.log' not found"], message="Jail 'airsonic-auth' cannot be activated: log file '/var/log/airsonic/airsonic.log' not found", ) with patch( "app.routers.config.config_file_service.activate_jail", AsyncMock(return_value=blocked_response), ): resp = await config_client.post( "/api/config/jails/airsonic-auth/activate", json={} ) assert resp.status_code == 200 data = resp.json() assert data["active"] is False assert data["fail2ban_running"] is True assert "cannot be activated" in data["message"] assert len(data["validation_warnings"]) == 1 # --------------------------------------------------------------------------- # POST /api/config/jails/{name}/deactivate # --------------------------------------------------------------------------- class TestDeactivateJail: """Tests for ``POST /api/config/jails/{name}/deactivate``.""" async def test_200_deactivates_jail(self, config_client: AsyncClient) -> None: """POST /api/config/jails/sshd/deactivate returns 200.""" from app.models.config import JailActivationResponse mock_response = JailActivationResponse( name="sshd", active=False, message="Jail 'sshd' deactivated successfully.", ) with patch( "app.routers.config.config_file_service.deactivate_jail", AsyncMock(return_value=mock_response), ): resp = await config_client.post("/api/config/jails/sshd/deactivate") assert resp.status_code == 200 data = resp.json() assert data["active"] is False assert data["name"] == "sshd" async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None: """POST /api/config/jails/missing/deactivate returns 404.""" from app.services.config_file_service import JailNotFoundInConfigError with patch( "app.routers.config.config_file_service.deactivate_jail", AsyncMock(side_effect=JailNotFoundInConfigError("missing")), ): resp = await config_client.post( "/api/config/jails/missing/deactivate" ) assert resp.status_code == 404 async def test_409_when_already_inactive(self, config_client: AsyncClient) -> None: """POST /api/config/jails/apache-auth/deactivate returns 409 if already inactive.""" from app.services.config_file_service import JailAlreadyInactiveError with patch( "app.routers.config.config_file_service.deactivate_jail", AsyncMock(side_effect=JailAlreadyInactiveError("apache-auth")), ): resp = await config_client.post( "/api/config/jails/apache-auth/deactivate" ) assert resp.status_code == 409 async def test_400_for_invalid_jail_name(self, config_client: AsyncClient) -> None: """POST /api/config/jails/.../deactivate with bad name returns 400.""" from app.services.config_file_service import JailNameError with patch( "app.routers.config.config_file_service.deactivate_jail", AsyncMock(side_effect=JailNameError("bad")), ): resp = await config_client.post( "/api/config/jails/sshd/deactivate" ) assert resp.status_code == 400 async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """POST /api/config/jails/sshd/deactivate returns 401 without session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).post("/api/config/jails/sshd/deactivate") assert resp.status_code == 401 async def test_deactivate_triggers_health_probe(self, config_client: AsyncClient) -> None: """POST .../deactivate triggers an immediate health probe after success.""" from app.models.config import JailActivationResponse mock_response = JailActivationResponse( name="sshd", active=False, message="Jail 'sshd' deactivated successfully.", ) with ( patch( "app.routers.config.config_file_service.deactivate_jail", AsyncMock(return_value=mock_response), ), patch( "app.routers.config._run_probe", AsyncMock(), ) as mock_probe, ): resp = await config_client.post("/api/config/jails/sshd/deactivate") assert resp.status_code == 200 mock_probe.assert_awaited_once() # --------------------------------------------------------------------------- # GET /api/config/filters # --------------------------------------------------------------------------- def _make_filter_config(name: str, active: bool = False) -> FilterConfig: return FilterConfig( name=name, filename=f"{name}.conf", before=None, after=None, variables={}, prefregex=None, failregex=[], ignoreregex=[], maxlines=None, datepattern=None, journalmatch=None, active=active, used_by_jails=[name] if active else [], source_file=f"/etc/fail2ban/filter.d/{name}.conf", has_local_override=False, ) class TestListFilters: """Tests for ``GET /api/config/filters``.""" async def test_200_returns_filter_list(self, config_client: AsyncClient) -> None: """GET /api/config/filters returns 200 with FilterListResponse.""" from app.models.config import FilterListResponse mock_response = FilterListResponse( filters=[_make_filter_config("sshd", active=True)], total=1, ) with patch( "app.routers.config.config_file_service.list_filters", AsyncMock(return_value=mock_response), ): resp = await config_client.get("/api/config/filters") assert resp.status_code == 200 data = resp.json() assert data["total"] == 1 assert data["filters"][0]["name"] == "sshd" assert data["filters"][0]["active"] is True async def test_200_empty_filter_list(self, config_client: AsyncClient) -> None: """GET /api/config/filters returns 200 with empty list when no filters found.""" from app.models.config import FilterListResponse with patch( "app.routers.config.config_file_service.list_filters", AsyncMock(return_value=FilterListResponse(filters=[], total=0)), ): resp = await config_client.get("/api/config/filters") assert resp.status_code == 200 assert resp.json()["total"] == 0 assert resp.json()["filters"] == [] async def test_active_filters_sorted_before_inactive( self, config_client: AsyncClient ) -> None: """GET /api/config/filters returns active filters before inactive ones.""" from app.models.config import FilterListResponse mock_response = FilterListResponse( filters=[ _make_filter_config("nginx", active=False), _make_filter_config("sshd", active=True), ], total=2, ) with patch( "app.routers.config.config_file_service.list_filters", AsyncMock(return_value=mock_response), ): resp = await config_client.get("/api/config/filters") data = resp.json() assert data["filters"][0]["name"] == "sshd" # active first assert data["filters"][1]["name"] == "nginx" # inactive second async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """GET /api/config/filters returns 401 without a valid session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).get("/api/config/filters") assert resp.status_code == 401 # --------------------------------------------------------------------------- # GET /api/config/filters/{name} # --------------------------------------------------------------------------- class TestGetFilter: """Tests for ``GET /api/config/filters/{name}``.""" async def test_200_returns_filter(self, config_client: AsyncClient) -> None: """GET /api/config/filters/sshd returns 200 with FilterConfig.""" with patch( "app.routers.config.config_file_service.get_filter", AsyncMock(return_value=_make_filter_config("sshd")), ): resp = await config_client.get("/api/config/filters/sshd") assert resp.status_code == 200 data = resp.json() assert data["name"] == "sshd" assert "failregex" in data assert "active" in data async def test_404_for_unknown_filter(self, config_client: AsyncClient) -> None: """GET /api/config/filters/missing returns 404.""" from app.services.config_file_service import FilterNotFoundError with patch( "app.routers.config.config_file_service.get_filter", AsyncMock(side_effect=FilterNotFoundError("missing")), ): resp = await config_client.get("/api/config/filters/missing") assert resp.status_code == 404 async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """GET /api/config/filters/sshd returns 401 without session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).get("/api/config/filters/sshd") assert resp.status_code == 401 # --------------------------------------------------------------------------- # PUT /api/config/filters/{name} (Task 2.2) # --------------------------------------------------------------------------- class TestUpdateFilter: """Tests for ``PUT /api/config/filters/{name}``.""" async def test_200_returns_updated_filter(self, config_client: AsyncClient) -> None: """PUT /api/config/filters/sshd returns 200 with updated FilterConfig.""" with patch( "app.routers.config.config_file_service.update_filter", AsyncMock(return_value=_make_filter_config("sshd")), ): resp = await config_client.put( "/api/config/filters/sshd", json={"failregex": [r"^fail from "]}, ) assert resp.status_code == 200 assert resp.json()["name"] == "sshd" async def test_404_for_unknown_filter(self, config_client: AsyncClient) -> None: """PUT /api/config/filters/missing returns 404.""" from app.services.config_file_service import FilterNotFoundError with patch( "app.routers.config.config_file_service.update_filter", AsyncMock(side_effect=FilterNotFoundError("missing")), ): resp = await config_client.put( "/api/config/filters/missing", json={}, ) assert resp.status_code == 404 async def test_422_for_invalid_regex(self, config_client: AsyncClient) -> None: """PUT /api/config/filters/sshd returns 422 for bad regex.""" from app.services.config_file_service import FilterInvalidRegexError with patch( "app.routers.config.config_file_service.update_filter", AsyncMock(side_effect=FilterInvalidRegexError("[bad", "unterminated")), ): resp = await config_client.put( "/api/config/filters/sshd", json={"failregex": ["[bad"]}, ) assert resp.status_code == 422 async def test_400_for_invalid_name(self, config_client: AsyncClient) -> None: """PUT /api/config/filters/... with bad name returns 400.""" from app.services.config_file_service import FilterNameError with patch( "app.routers.config.config_file_service.update_filter", AsyncMock(side_effect=FilterNameError("bad")), ): resp = await config_client.put( "/api/config/filters/bad", json={}, ) assert resp.status_code == 400 async def test_reload_query_param_passed(self, config_client: AsyncClient) -> None: """PUT /api/config/filters/sshd?reload=true passes do_reload=True.""" with patch( "app.routers.config.config_file_service.update_filter", AsyncMock(return_value=_make_filter_config("sshd")), ) as mock_update: resp = await config_client.put( "/api/config/filters/sshd?reload=true", json={}, ) assert resp.status_code == 200 assert mock_update.call_args.kwargs.get("do_reload") is True async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """PUT /api/config/filters/sshd returns 401 without session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).put("/api/config/filters/sshd", json={}) assert resp.status_code == 401 # --------------------------------------------------------------------------- # POST /api/config/filters (Task 2.2) # --------------------------------------------------------------------------- class TestCreateFilter: """Tests for ``POST /api/config/filters``.""" async def test_201_creates_filter(self, config_client: AsyncClient) -> None: """POST /api/config/filters returns 201 with FilterConfig.""" with patch( "app.routers.config.config_file_service.create_filter", AsyncMock(return_value=_make_filter_config("my-custom")), ): resp = await config_client.post( "/api/config/filters", json={"name": "my-custom", "failregex": [r"^fail from "]}, ) assert resp.status_code == 201 assert resp.json()["name"] == "my-custom" async def test_409_when_already_exists(self, config_client: AsyncClient) -> None: """POST /api/config/filters returns 409 if filter exists.""" from app.services.config_file_service import FilterAlreadyExistsError with patch( "app.routers.config.config_file_service.create_filter", AsyncMock(side_effect=FilterAlreadyExistsError("sshd")), ): resp = await config_client.post( "/api/config/filters", json={"name": "sshd"}, ) assert resp.status_code == 409 async def test_422_for_invalid_regex(self, config_client: AsyncClient) -> None: """POST /api/config/filters returns 422 for bad regex.""" from app.services.config_file_service import FilterInvalidRegexError with patch( "app.routers.config.config_file_service.create_filter", AsyncMock(side_effect=FilterInvalidRegexError("[bad", "unterminated")), ): resp = await config_client.post( "/api/config/filters", json={"name": "test", "failregex": ["[bad"]}, ) assert resp.status_code == 422 async def test_400_for_invalid_name(self, config_client: AsyncClient) -> None: """POST /api/config/filters returns 400 for invalid filter name.""" from app.services.config_file_service import FilterNameError with patch( "app.routers.config.config_file_service.create_filter", AsyncMock(side_effect=FilterNameError("bad")), ): resp = await config_client.post( "/api/config/filters", json={"name": "bad"}, ) assert resp.status_code == 400 async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """POST /api/config/filters returns 401 without session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).post("/api/config/filters", json={"name": "test"}) assert resp.status_code == 401 # --------------------------------------------------------------------------- # DELETE /api/config/filters/{name} (Task 2.2) # --------------------------------------------------------------------------- class TestDeleteFilter: """Tests for ``DELETE /api/config/filters/{name}``.""" async def test_204_deletes_filter(self, config_client: AsyncClient) -> None: """DELETE /api/config/filters/my-custom returns 204.""" with patch( "app.routers.config.config_file_service.delete_filter", AsyncMock(return_value=None), ): resp = await config_client.delete("/api/config/filters/my-custom") assert resp.status_code == 204 async def test_404_for_unknown_filter(self, config_client: AsyncClient) -> None: """DELETE /api/config/filters/missing returns 404.""" from app.services.config_file_service import FilterNotFoundError with patch( "app.routers.config.config_file_service.delete_filter", AsyncMock(side_effect=FilterNotFoundError("missing")), ): resp = await config_client.delete("/api/config/filters/missing") assert resp.status_code == 404 async def test_409_for_readonly_filter(self, config_client: AsyncClient) -> None: """DELETE /api/config/filters/sshd returns 409 for shipped conf-only filter.""" from app.services.config_file_service import FilterReadonlyError with patch( "app.routers.config.config_file_service.delete_filter", AsyncMock(side_effect=FilterReadonlyError("sshd")), ): resp = await config_client.delete("/api/config/filters/sshd") assert resp.status_code == 409 async def test_400_for_invalid_name(self, config_client: AsyncClient) -> None: """DELETE /api/config/filters/... with bad name returns 400.""" from app.services.config_file_service import FilterNameError with patch( "app.routers.config.config_file_service.delete_filter", AsyncMock(side_effect=FilterNameError("bad")), ): resp = await config_client.delete("/api/config/filters/bad") assert resp.status_code == 400 async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """DELETE /api/config/filters/sshd returns 401 without session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).delete("/api/config/filters/sshd") assert resp.status_code == 401 # --------------------------------------------------------------------------- # POST /api/config/jails/{name}/filter (Task 2.2) # --------------------------------------------------------------------------- class TestAssignFilterToJail: """Tests for ``POST /api/config/jails/{name}/filter``.""" async def test_204_assigns_filter(self, config_client: AsyncClient) -> None: """POST /api/config/jails/sshd/filter returns 204 on success.""" with patch( "app.routers.config.config_file_service.assign_filter_to_jail", AsyncMock(return_value=None), ): resp = await config_client.post( "/api/config/jails/sshd/filter", json={"filter_name": "myfilter"}, ) assert resp.status_code == 204 async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None: """POST /api/config/jails/missing/filter returns 404.""" from app.services.config_file_service import JailNotFoundInConfigError with patch( "app.routers.config.config_file_service.assign_filter_to_jail", AsyncMock(side_effect=JailNotFoundInConfigError("missing")), ): resp = await config_client.post( "/api/config/jails/missing/filter", json={"filter_name": "sshd"}, ) assert resp.status_code == 404 async def test_404_for_unknown_filter(self, config_client: AsyncClient) -> None: """POST /api/config/jails/sshd/filter returns 404 when filter not found.""" from app.services.config_file_service import FilterNotFoundError with patch( "app.routers.config.config_file_service.assign_filter_to_jail", AsyncMock(side_effect=FilterNotFoundError("missing-filter")), ): resp = await config_client.post( "/api/config/jails/sshd/filter", json={"filter_name": "missing-filter"}, ) assert resp.status_code == 404 async def test_400_for_invalid_jail_name(self, config_client: AsyncClient) -> None: """POST /api/config/jails/.../filter with bad jail name returns 400.""" from app.services.config_file_service import JailNameError with patch( "app.routers.config.config_file_service.assign_filter_to_jail", AsyncMock(side_effect=JailNameError("bad")), ): resp = await config_client.post( "/api/config/jails/sshd/filter", json={"filter_name": "valid"}, ) assert resp.status_code == 400 async def test_400_for_invalid_filter_name(self, config_client: AsyncClient) -> None: """POST /api/config/jails/sshd/filter with bad filter name returns 400.""" from app.services.config_file_service import FilterNameError with patch( "app.routers.config.config_file_service.assign_filter_to_jail", AsyncMock(side_effect=FilterNameError("bad")), ): resp = await config_client.post( "/api/config/jails/sshd/filter", json={"filter_name": "../evil"}, ) assert resp.status_code == 400 async def test_reload_query_param_passed(self, config_client: AsyncClient) -> None: """POST /api/config/jails/sshd/filter?reload=true passes do_reload=True.""" with patch( "app.routers.config.config_file_service.assign_filter_to_jail", AsyncMock(return_value=None), ) as mock_assign: resp = await config_client.post( "/api/config/jails/sshd/filter?reload=true", json={"filter_name": "sshd"}, ) assert resp.status_code == 204 assert mock_assign.call_args.kwargs.get("do_reload") is True async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """POST /api/config/jails/sshd/filter returns 401 without session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).post("/api/config/jails/sshd/filter", json={"filter_name": "sshd"}) assert resp.status_code == 401 # =========================================================================== # Action router tests (Task 3.1 + 3.2) # =========================================================================== @pytest.mark.asyncio class TestListActionsRouter: async def test_200_returns_action_list(self, config_client: AsyncClient) -> None: from app.models.config import ActionConfig, ActionListResponse mock_action = ActionConfig( name="iptables", filename="iptables.conf", actionban="/sbin/iptables -I f2b- 1 -s -j DROP", ) mock_response = ActionListResponse(actions=[mock_action], total=1) with patch( "app.routers.config.config_file_service.list_actions", AsyncMock(return_value=mock_response), ): resp = await config_client.get("/api/config/actions") assert resp.status_code == 200 data = resp.json() assert data["total"] == 1 assert data["actions"][0]["name"] == "iptables" async def test_active_sorted_first(self, config_client: AsyncClient) -> None: from app.models.config import ActionConfig, ActionListResponse inactive = ActionConfig(name="aaa", filename="aaa.conf", active=False) active = ActionConfig(name="zzz", filename="zzz.conf", active=True) mock_response = ActionListResponse(actions=[inactive, active], total=2) with patch( "app.routers.config.config_file_service.list_actions", AsyncMock(return_value=mock_response), ): resp = await config_client.get("/api/config/actions") data = resp.json() assert data["actions"][0]["name"] == "zzz" # active comes first async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).get("/api/config/actions") assert resp.status_code == 401 @pytest.mark.asyncio class TestGetActionRouter: async def test_200_returns_action(self, config_client: AsyncClient) -> None: from app.models.config import ActionConfig mock_action = ActionConfig( name="iptables", filename="iptables.conf", actionban="/sbin/iptables -I f2b- 1 -s -j DROP", ) with patch( "app.routers.config.config_file_service.get_action", AsyncMock(return_value=mock_action), ): resp = await config_client.get("/api/config/actions/iptables") assert resp.status_code == 200 assert resp.json()["name"] == "iptables" async def test_404_when_not_found(self, config_client: AsyncClient) -> None: from app.services.config_file_service import ActionNotFoundError with patch( "app.routers.config.config_file_service.get_action", AsyncMock(side_effect=ActionNotFoundError("missing")), ): resp = await config_client.get("/api/config/actions/missing") assert resp.status_code == 404 async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).get("/api/config/actions/iptables") assert resp.status_code == 401 @pytest.mark.asyncio class TestUpdateActionRouter: async def test_200_returns_updated_action(self, config_client: AsyncClient) -> None: from app.models.config import ActionConfig updated = ActionConfig( name="iptables", filename="iptables.local", actionban="echo ban", ) with patch( "app.routers.config.config_file_service.update_action", AsyncMock(return_value=updated), ): resp = await config_client.put( "/api/config/actions/iptables", json={"actionban": "echo ban"}, ) assert resp.status_code == 200 assert resp.json()["actionban"] == "echo ban" async def test_404_when_not_found(self, config_client: AsyncClient) -> None: from app.services.config_file_service import ActionNotFoundError with patch( "app.routers.config.config_file_service.update_action", AsyncMock(side_effect=ActionNotFoundError("missing")), ): resp = await config_client.put( "/api/config/actions/missing", json={} ) assert resp.status_code == 404 async def test_400_for_bad_name(self, config_client: AsyncClient) -> None: from app.services.config_file_service import ActionNameError with patch( "app.routers.config.config_file_service.update_action", AsyncMock(side_effect=ActionNameError()), ): resp = await config_client.put( "/api/config/actions/badname", json={} ) assert resp.status_code == 400 async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).put("/api/config/actions/iptables", json={}) assert resp.status_code == 401 @pytest.mark.asyncio class TestCreateActionRouter: async def test_201_returns_created_action(self, config_client: AsyncClient) -> None: from app.models.config import ActionConfig created = ActionConfig( name="custom", filename="custom.local", actionban="echo ban", ) with patch( "app.routers.config.config_file_service.create_action", AsyncMock(return_value=created), ): resp = await config_client.post( "/api/config/actions", json={"name": "custom", "actionban": "echo ban"}, ) assert resp.status_code == 201 assert resp.json()["name"] == "custom" async def test_409_when_already_exists(self, config_client: AsyncClient) -> None: from app.services.config_file_service import ActionAlreadyExistsError with patch( "app.routers.config.config_file_service.create_action", AsyncMock(side_effect=ActionAlreadyExistsError("iptables")), ): resp = await config_client.post( "/api/config/actions", json={"name": "iptables"}, ) assert resp.status_code == 409 async def test_400_for_bad_name(self, config_client: AsyncClient) -> None: from app.services.config_file_service import ActionNameError with patch( "app.routers.config.config_file_service.create_action", AsyncMock(side_effect=ActionNameError()), ): resp = await config_client.post( "/api/config/actions", json={"name": "badname"}, ) assert resp.status_code == 400 async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).post("/api/config/actions", json={"name": "x"}) assert resp.status_code == 401 @pytest.mark.asyncio class TestDeleteActionRouter: async def test_204_on_delete(self, config_client: AsyncClient) -> None: with patch( "app.routers.config.config_file_service.delete_action", AsyncMock(return_value=None), ): resp = await config_client.delete("/api/config/actions/custom") assert resp.status_code == 204 async def test_404_when_not_found(self, config_client: AsyncClient) -> None: from app.services.config_file_service import ActionNotFoundError with patch( "app.routers.config.config_file_service.delete_action", AsyncMock(side_effect=ActionNotFoundError("missing")), ): resp = await config_client.delete("/api/config/actions/missing") assert resp.status_code == 404 async def test_409_when_readonly(self, config_client: AsyncClient) -> None: from app.services.config_file_service import ActionReadonlyError with patch( "app.routers.config.config_file_service.delete_action", AsyncMock(side_effect=ActionReadonlyError("iptables")), ): resp = await config_client.delete("/api/config/actions/iptables") assert resp.status_code == 409 async def test_400_for_bad_name(self, config_client: AsyncClient) -> None: from app.services.config_file_service import ActionNameError with patch( "app.routers.config.config_file_service.delete_action", AsyncMock(side_effect=ActionNameError()), ): resp = await config_client.delete("/api/config/actions/badname") assert resp.status_code == 400 async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).delete("/api/config/actions/iptables") assert resp.status_code == 401 @pytest.mark.asyncio class TestAssignActionToJailRouter: async def test_204_on_success(self, config_client: AsyncClient) -> None: with patch( "app.routers.config.config_file_service.assign_action_to_jail", AsyncMock(return_value=None), ): resp = await config_client.post( "/api/config/jails/sshd/action", json={"action_name": "iptables"}, ) assert resp.status_code == 204 async def test_404_when_jail_not_found(self, config_client: AsyncClient) -> None: from app.services.config_file_service import JailNotFoundInConfigError with patch( "app.routers.config.config_file_service.assign_action_to_jail", AsyncMock(side_effect=JailNotFoundInConfigError("missing")), ): resp = await config_client.post( "/api/config/jails/missing/action", json={"action_name": "iptables"}, ) assert resp.status_code == 404 async def test_404_when_action_not_found(self, config_client: AsyncClient) -> None: from app.services.config_file_service import ActionNotFoundError with patch( "app.routers.config.config_file_service.assign_action_to_jail", AsyncMock(side_effect=ActionNotFoundError("missing")), ): resp = await config_client.post( "/api/config/jails/sshd/action", json={"action_name": "missing"}, ) assert resp.status_code == 404 async def test_400_for_bad_jail_name(self, config_client: AsyncClient) -> None: from app.services.config_file_service import JailNameError with patch( "app.routers.config.config_file_service.assign_action_to_jail", AsyncMock(side_effect=JailNameError()), ): resp = await config_client.post( "/api/config/jails/badjailname/action", json={"action_name": "iptables"}, ) assert resp.status_code == 400 async def test_400_for_bad_action_name(self, config_client: AsyncClient) -> None: from app.services.config_file_service import ActionNameError with patch( "app.routers.config.config_file_service.assign_action_to_jail", AsyncMock(side_effect=ActionNameError()), ): resp = await config_client.post( "/api/config/jails/sshd/action", json={"action_name": "badaction"}, ) assert resp.status_code == 400 async def test_reload_param_passed(self, config_client: AsyncClient) -> None: with patch( "app.routers.config.config_file_service.assign_action_to_jail", AsyncMock(return_value=None), ) as mock_assign: resp = await config_client.post( "/api/config/jails/sshd/action?reload=true", json={"action_name": "iptables"}, ) assert resp.status_code == 204 assert mock_assign.call_args.kwargs.get("do_reload") is True async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).post("/api/config/jails/sshd/action", json={"action_name": "iptables"}) assert resp.status_code == 401 @pytest.mark.asyncio class TestRemoveActionFromJailRouter: async def test_204_on_success(self, config_client: AsyncClient) -> None: with patch( "app.routers.config.config_file_service.remove_action_from_jail", AsyncMock(return_value=None), ): resp = await config_client.delete( "/api/config/jails/sshd/action/iptables" ) assert resp.status_code == 204 async def test_404_when_jail_not_found(self, config_client: AsyncClient) -> None: from app.services.config_file_service import JailNotFoundInConfigError with patch( "app.routers.config.config_file_service.remove_action_from_jail", AsyncMock(side_effect=JailNotFoundInConfigError("missing")), ): resp = await config_client.delete( "/api/config/jails/missing/action/iptables" ) assert resp.status_code == 404 async def test_400_for_bad_jail_name(self, config_client: AsyncClient) -> None: from app.services.config_file_service import JailNameError with patch( "app.routers.config.config_file_service.remove_action_from_jail", AsyncMock(side_effect=JailNameError()), ): resp = await config_client.delete( "/api/config/jails/badjailname/action/iptables" ) assert resp.status_code == 400 async def test_400_for_bad_action_name(self, config_client: AsyncClient) -> None: from app.services.config_file_service import ActionNameError with patch( "app.routers.config.config_file_service.remove_action_from_jail", AsyncMock(side_effect=ActionNameError()), ): resp = await config_client.delete( "/api/config/jails/sshd/action/badactionname" ) assert resp.status_code == 400 async def test_reload_param_passed(self, config_client: AsyncClient) -> None: with patch( "app.routers.config.config_file_service.remove_action_from_jail", AsyncMock(return_value=None), ) as mock_rm: resp = await config_client.delete( "/api/config/jails/sshd/action/iptables?reload=true" ) assert resp.status_code == 204 assert mock_rm.call_args.kwargs.get("do_reload") is True async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).delete("/api/config/jails/sshd/action/iptables") assert resp.status_code == 401 # --------------------------------------------------------------------------- # GET /api/config/fail2ban-log # --------------------------------------------------------------------------- class TestGetFail2BanLog: """Tests for ``GET /api/config/fail2ban-log``.""" def _mock_log_response(self) -> Fail2BanLogResponse: return Fail2BanLogResponse( log_path="/var/log/fail2ban.log", lines=["2025-01-01 INFO sshd Found 1.2.3.4", "2025-01-01 ERROR oops"], total_lines=100, log_level="INFO", log_target="/var/log/fail2ban.log", ) async def test_200_returns_log_response(self, config_client: AsyncClient) -> None: """GET /api/config/fail2ban-log returns 200 with Fail2BanLogResponse.""" with patch( "app.routers.config.config_service.read_fail2ban_log", AsyncMock(return_value=self._mock_log_response()), ): resp = await config_client.get("/api/config/fail2ban-log") assert resp.status_code == 200 data = resp.json() assert data["log_path"] == "/var/log/fail2ban.log" assert isinstance(data["lines"], list) assert data["total_lines"] == 100 assert data["log_level"] == "INFO" async def test_200_passes_lines_query_param(self, config_client: AsyncClient) -> None: """GET /api/config/fail2ban-log passes the lines query param to the service.""" with patch( "app.routers.config.config_service.read_fail2ban_log", AsyncMock(return_value=self._mock_log_response()), ) as mock_fn: resp = await config_client.get("/api/config/fail2ban-log?lines=500") assert resp.status_code == 200 _socket, lines_arg, _filter = mock_fn.call_args.args assert lines_arg == 500 async def test_200_passes_filter_query_param(self, config_client: AsyncClient) -> None: """GET /api/config/fail2ban-log passes the filter query param to the service.""" with patch( "app.routers.config.config_service.read_fail2ban_log", AsyncMock(return_value=self._mock_log_response()), ) as mock_fn: resp = await config_client.get("/api/config/fail2ban-log?filter=ERROR") assert resp.status_code == 200 _socket, _lines, filter_arg = mock_fn.call_args.args assert filter_arg == "ERROR" async def test_400_when_non_file_target(self, config_client: AsyncClient) -> None: """GET /api/config/fail2ban-log returns 400 when log target is not a file.""" from app.services.config_service import ConfigOperationError with patch( "app.routers.config.config_service.read_fail2ban_log", AsyncMock(side_effect=ConfigOperationError("fail2ban is logging to 'STDOUT'")), ): resp = await config_client.get("/api/config/fail2ban-log") assert resp.status_code == 400 async def test_400_when_path_traversal_detected(self, config_client: AsyncClient) -> None: """GET /api/config/fail2ban-log returns 400 when the path is outside safe dirs.""" from app.services.config_service import ConfigOperationError with patch( "app.routers.config.config_service.read_fail2ban_log", AsyncMock(side_effect=ConfigOperationError("outside the allowed directory")), ): resp = await config_client.get("/api/config/fail2ban-log") assert resp.status_code == 400 async def test_502_when_fail2ban_unreachable(self, config_client: AsyncClient) -> None: """GET /api/config/fail2ban-log returns 502 when fail2ban is down.""" from app.utils.fail2ban_client import Fail2BanConnectionError with patch( "app.routers.config.config_service.read_fail2ban_log", AsyncMock(side_effect=Fail2BanConnectionError("socket error", "/tmp/f.sock")), ): resp = await config_client.get("/api/config/fail2ban-log") assert resp.status_code == 502 async def test_422_for_lines_exceeding_max(self, config_client: AsyncClient) -> None: """GET /api/config/fail2ban-log returns 422 for lines > 2000.""" resp = await config_client.get("/api/config/fail2ban-log?lines=9999") assert resp.status_code == 422 async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """GET /api/config/fail2ban-log requires authentication.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).get("/api/config/fail2ban-log") assert resp.status_code == 401 # --------------------------------------------------------------------------- # GET /api/config/service-status # --------------------------------------------------------------------------- class TestGetServiceStatus: """Tests for ``GET /api/config/service-status``.""" def _mock_status(self, online: bool = True) -> ServiceStatusResponse: return ServiceStatusResponse( online=online, version="1.0.0" if online else None, bangui_version=app.__version__, jail_count=2 if online else 0, total_bans=10 if online else 0, total_failures=3 if online else 0, log_level="INFO" if online else "UNKNOWN", log_target="/var/log/fail2ban.log" if online else "UNKNOWN", ) async def test_200_when_online(self, config_client: AsyncClient) -> None: """GET /api/config/service-status returns 200 with full status when online.""" with patch( "app.routers.config.config_service.get_service_status", AsyncMock(return_value=self._mock_status(online=True)), ): resp = await config_client.get("/api/config/service-status") assert resp.status_code == 200 data = resp.json() assert data["online"] is True assert data["bangui_version"] == app.__version__ assert data["jail_count"] == 2 assert data["log_level"] == "INFO" async def test_200_when_offline(self, config_client: AsyncClient) -> None: """GET /api/config/service-status returns 200 with offline=False when daemon is down.""" with patch( "app.routers.config.config_service.get_service_status", AsyncMock(return_value=self._mock_status(online=False)), ): resp = await config_client.get("/api/config/service-status") assert resp.status_code == 200 data = resp.json() assert data["bangui_version"] == app.__version__ assert data["online"] is False assert data["log_level"] == "UNKNOWN" async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """GET /api/config/service-status requires authentication.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).get("/api/config/service-status") assert resp.status_code == 401 # --------------------------------------------------------------------------- # Task 3 endpoints # --------------------------------------------------------------------------- @pytest.mark.asyncio class TestValidateJailEndpoint: """Tests for ``POST /api/config/jails/{name}/validate``.""" async def test_200_valid_config(self, config_client: AsyncClient) -> None: """Returns 200 with valid=True when the jail config has no issues.""" from app.models.config import JailValidationResult mock_result = JailValidationResult( jail_name="sshd", valid=True, issues=[] ) with patch( "app.routers.config.config_file_service.validate_jail_config", AsyncMock(return_value=mock_result), ): resp = await config_client.post("/api/config/jails/sshd/validate") assert resp.status_code == 200 data = resp.json() assert data["valid"] is True assert data["jail_name"] == "sshd" assert data["issues"] == [] async def test_200_invalid_config(self, config_client: AsyncClient) -> None: """Returns 200 with valid=False and issues when there are errors.""" from app.models.config import JailValidationIssue, JailValidationResult issue = JailValidationIssue(field="filter", message="Filter file not found: filter.d/bad.conf (or .local)") mock_result = JailValidationResult( jail_name="sshd", valid=False, issues=[issue] ) with patch( "app.routers.config.config_file_service.validate_jail_config", AsyncMock(return_value=mock_result), ): resp = await config_client.post("/api/config/jails/sshd/validate") assert resp.status_code == 200 data = resp.json() assert data["valid"] is False assert len(data["issues"]) == 1 assert data["issues"][0]["field"] == "filter" async def test_400_for_invalid_jail_name(self, config_client: AsyncClient) -> None: """POST /api/config/jails/bad-name/validate returns 400 on JailNameError.""" from app.services.config_file_service import JailNameError with patch( "app.routers.config.config_file_service.validate_jail_config", AsyncMock(side_effect=JailNameError("bad name")), ): resp = await config_client.post("/api/config/jails/bad-name/validate") assert resp.status_code == 400 async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """POST /api/config/jails/sshd/validate returns 401 without session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).post("/api/config/jails/sshd/validate") assert resp.status_code == 401 @pytest.mark.asyncio class TestPendingRecovery: """Tests for ``GET /api/config/pending-recovery``.""" async def test_returns_null_when_no_pending_recovery( self, config_client: AsyncClient ) -> None: """Returns null body (204-like 200) when pending_recovery is not set.""" app = config_client._transport.app # type: ignore[attr-defined] app.state.pending_recovery = None resp = await config_client.get("/api/config/pending-recovery") assert resp.status_code == 200 assert resp.json() is None async def test_returns_record_when_set(self, config_client: AsyncClient) -> None: """Returns the PendingRecovery model when one is stored on app.state.""" import datetime from app.models.config import PendingRecovery now = datetime.datetime.now(tz=datetime.UTC) record = PendingRecovery( jail_name="sshd", activated_at=now - datetime.timedelta(seconds=20), detected_at=now, ) app = config_client._transport.app # type: ignore[attr-defined] app.state.pending_recovery = record resp = await config_client.get("/api/config/pending-recovery") assert resp.status_code == 200 data = resp.json() assert data["jail_name"] == "sshd" assert data["recovered"] is False async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """GET /api/config/pending-recovery returns 401 without session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).get("/api/config/pending-recovery") assert resp.status_code == 401 @pytest.mark.asyncio class TestRollbackEndpoint: """Tests for ``POST /api/config/jails/{name}/rollback``.""" async def test_200_success_clears_pending_recovery( self, config_client: AsyncClient ) -> None: """A successful rollback returns 200 and clears app.state.pending_recovery.""" import datetime from app.models.config import PendingRecovery, RollbackResponse # Set up a pending recovery record on the app. app = config_client._transport.app # type: ignore[attr-defined] now = datetime.datetime.now(tz=datetime.UTC) app.state.pending_recovery = PendingRecovery( jail_name="sshd", activated_at=now - datetime.timedelta(seconds=10), detected_at=now, ) mock_result = RollbackResponse( jail_name="sshd", disabled=True, fail2ban_running=True, active_jails=0, message="Jail 'sshd' disabled and fail2ban restarted.", ) with patch( "app.routers.config.config_file_service.rollback_jail", AsyncMock(return_value=mock_result), ): resp = await config_client.post("/api/config/jails/sshd/rollback") assert resp.status_code == 200 data = resp.json() assert data["disabled"] is True assert data["fail2ban_running"] is True # Successful rollback must clear the pending record. assert app.state.pending_recovery is None async def test_200_fail_preserves_pending_recovery( self, config_client: AsyncClient ) -> None: """When fail2ban is still down after rollback, pending_recovery is retained.""" import datetime from app.models.config import PendingRecovery, RollbackResponse app = config_client._transport.app # type: ignore[attr-defined] now = datetime.datetime.now(tz=datetime.UTC) record = PendingRecovery( jail_name="sshd", activated_at=now - datetime.timedelta(seconds=10), detected_at=now, ) app.state.pending_recovery = record mock_result = RollbackResponse( jail_name="sshd", disabled=True, fail2ban_running=False, active_jails=0, message="fail2ban did not come back online.", ) with patch( "app.routers.config.config_file_service.rollback_jail", AsyncMock(return_value=mock_result), ): resp = await config_client.post("/api/config/jails/sshd/rollback") assert resp.status_code == 200 data = resp.json() assert data["fail2ban_running"] is False # Pending record should NOT be cleared when rollback didn't fully succeed. assert app.state.pending_recovery is not None async def test_400_for_invalid_jail_name(self, config_client: AsyncClient) -> None: """POST /api/config/jails/bad/rollback returns 400 on JailNameError.""" from app.services.config_file_service import JailNameError with patch( "app.routers.config.config_file_service.rollback_jail", AsyncMock(side_effect=JailNameError("bad")), ): resp = await config_client.post("/api/config/jails/bad/rollback") assert resp.status_code == 400 async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None: """POST /api/config/jails/sshd/rollback returns 401 without session.""" resp = await AsyncClient( transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined] base_url="http://test", ).post("/api/config/jails/sshd/rollback") assert resp.status_code == 401