Files
BanGUI/backend/tests/test_routers/test_config.py

2264 lines
86 KiB
Python

"""Tests for the config router endpoints."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import aiosqlite
import pytest
from httpx import ASGITransport, AsyncClient
import app
from app.config import Settings
from app.db import init_db
from app.main import create_app
from app.models.config import (
Fail2BanLogResponse,
FilterConfig,
GlobalConfigResponse,
JailConfig,
JailConfigListResponse,
JailConfigResponse,
RegexTestResponse,
ServiceStatusResponse,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
_SETUP_PAYLOAD = {
"master_password": "testpassword1",
"database_path": "bangui.db",
"fail2ban_socket": "/var/run/fail2ban/fail2ban.sock",
"timezone": "UTC",
"session_duration_minutes": 60,
}
@pytest.fixture
async def config_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc]
"""Provide an authenticated ``AsyncClient`` for config endpoint tests."""
settings = Settings(
database_path=str(tmp_path / "config_test.db"),
fail2ban_socket="/tmp/fake.sock",
session_secret="test-config-secret",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
app = create_app(settings=settings)
db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path)
db.row_factory = aiosqlite.Row
await init_db(db)
app.state.db = db
app.state.http_session = MagicMock()
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
await ac.post("/api/setup", json=_SETUP_PAYLOAD)
login = await ac.post(
"/api/auth/login",
json={"password": _SETUP_PAYLOAD["master_password"]},
)
assert login.status_code == 200
yield ac
await db.close()
def _make_jail_config(name: str = "sshd") -> JailConfig:
return JailConfig(
name=name,
ban_time=600,
max_retry=5,
find_time=600,
fail_regex=["regex1"],
ignore_regex=[],
log_paths=["/var/log/auth.log"],
date_pattern=None,
log_encoding="UTF-8",
backend="polling",
use_dns="warn",
prefregex="",
actions=["iptables"],
)
# ---------------------------------------------------------------------------
# GET /api/config/jails
# ---------------------------------------------------------------------------
class TestGetJailConfigs:
"""Tests for ``GET /api/config/jails``."""
async def test_200_returns_jail_list(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails returns 200 with JailConfigListResponse."""
mock_response = JailConfigListResponse(
jails=[_make_jail_config("sshd")], total=1
)
with patch(
"app.routers.config.config_service.list_jail_configs",
AsyncMock(return_value=mock_response),
):
resp = await config_client.get("/api/config/jails")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert data["jails"][0]["name"] == "sshd"
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails returns 401 without a valid session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/jails")
assert resp.status_code == 401
async def test_502_on_connection_error(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails returns 502 when fail2ban is unreachable."""
from app.utils.fail2ban_client import Fail2BanConnectionError
with patch(
"app.routers.config.config_service.list_jail_configs",
AsyncMock(side_effect=Fail2BanConnectionError("down", "/tmp/fake.sock")),
):
resp = await config_client.get("/api/config/jails")
assert resp.status_code == 502
# ---------------------------------------------------------------------------
# GET /api/config/jails/{name}
# ---------------------------------------------------------------------------
class TestGetJailConfig:
"""Tests for ``GET /api/config/jails/{name}``."""
async def test_200_returns_jail_config(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails/sshd returns 200 with JailConfigResponse."""
mock_response = JailConfigResponse(jail=_make_jail_config("sshd"))
with patch(
"app.routers.config.config_service.get_jail_config",
AsyncMock(return_value=mock_response),
):
resp = await config_client.get("/api/config/jails/sshd")
assert resp.status_code == 200
assert resp.json()["jail"]["name"] == "sshd"
assert resp.json()["jail"]["ban_time"] == 600
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails/missing returns 404."""
from app.services.config_service import JailNotFoundError
with patch(
"app.routers.config.config_service.get_jail_config",
AsyncMock(side_effect=JailNotFoundError("missing")),
):
resp = await config_client.get("/api/config/jails/missing")
assert resp.status_code == 404
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails/sshd returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/jails/sshd")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# PUT /api/config/jails/{name}
# ---------------------------------------------------------------------------
class TestUpdateJailConfig:
"""Tests for ``PUT /api/config/jails/{name}``."""
async def test_204_on_success(self, config_client: AsyncClient) -> None:
"""PUT /api/config/jails/sshd returns 204 on success."""
with patch(
"app.routers.config.config_service.update_jail_config",
AsyncMock(return_value=None),
):
resp = await config_client.put(
"/api/config/jails/sshd",
json={"ban_time": 3600},
)
assert resp.status_code == 204
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
"""PUT /api/config/jails/missing returns 404."""
from app.services.config_service import JailNotFoundError
with patch(
"app.routers.config.config_service.update_jail_config",
AsyncMock(side_effect=JailNotFoundError("missing")),
):
resp = await config_client.put(
"/api/config/jails/missing",
json={"ban_time": 3600},
)
assert resp.status_code == 404
async def test_422_on_invalid_regex(self, config_client: AsyncClient) -> None:
"""PUT /api/config/jails/sshd returns 422 for invalid regex pattern."""
from app.services.config_service import ConfigValidationError
with patch(
"app.routers.config.config_service.update_jail_config",
AsyncMock(side_effect=ConfigValidationError("bad regex")),
):
resp = await config_client.put(
"/api/config/jails/sshd",
json={"fail_regex": ["[bad"]},
)
assert resp.status_code == 422
async def test_400_on_config_operation_error(self, config_client: AsyncClient) -> None:
"""PUT /api/config/jails/sshd returns 400 when set command fails."""
from app.services.config_service import ConfigOperationError
with patch(
"app.routers.config.config_service.update_jail_config",
AsyncMock(side_effect=ConfigOperationError("set failed")),
):
resp = await config_client.put(
"/api/config/jails/sshd",
json={"ban_time": 3600},
)
assert resp.status_code == 400
async def test_204_with_dns_mode(self, config_client: AsyncClient) -> None:
"""PUT /api/config/jails/sshd accepts dns_mode field."""
with patch(
"app.routers.config.config_service.update_jail_config",
AsyncMock(return_value=None),
):
resp = await config_client.put(
"/api/config/jails/sshd",
json={"dns_mode": "no"},
)
assert resp.status_code == 204
async def test_204_with_prefregex(self, config_client: AsyncClient) -> None:
"""PUT /api/config/jails/sshd accepts prefregex field."""
with patch(
"app.routers.config.config_service.update_jail_config",
AsyncMock(return_value=None),
):
resp = await config_client.put(
"/api/config/jails/sshd",
json={"prefregex": r"^%(__prefix_line)s"},
)
assert resp.status_code == 204
async def test_204_with_date_pattern(self, config_client: AsyncClient) -> None:
"""PUT /api/config/jails/sshd accepts date_pattern field."""
with patch(
"app.routers.config.config_service.update_jail_config",
AsyncMock(return_value=None),
):
resp = await config_client.put(
"/api/config/jails/sshd",
json={"date_pattern": "%Y-%m-%d %H:%M:%S"},
)
assert resp.status_code == 204
# ---------------------------------------------------------------------------
# GET /api/config/global
# ---------------------------------------------------------------------------
class TestGetGlobalConfig:
"""Tests for ``GET /api/config/global``."""
async def test_200_returns_global_config(self, config_client: AsyncClient) -> None:
"""GET /api/config/global returns 200 with GlobalConfigResponse."""
mock_response = GlobalConfigResponse(
log_level="WARNING",
log_target="/var/log/fail2ban.log",
db_purge_age=86400,
db_max_matches=10,
)
with patch(
"app.routers.config.config_service.get_global_config",
AsyncMock(return_value=mock_response),
):
resp = await config_client.get("/api/config/global")
assert resp.status_code == 200
data = resp.json()
assert data["log_level"] == "WARNING"
assert data["db_purge_age"] == 86400
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""GET /api/config/global returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/global")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# PUT /api/config/global
# ---------------------------------------------------------------------------
class TestUpdateGlobalConfig:
"""Tests for ``PUT /api/config/global``."""
async def test_204_on_success(self, config_client: AsyncClient) -> None:
"""PUT /api/config/global returns 204 on success."""
with patch(
"app.routers.config.config_service.update_global_config",
AsyncMock(return_value=None),
):
resp = await config_client.put(
"/api/config/global",
json={"log_level": "DEBUG"},
)
assert resp.status_code == 204
async def test_400_on_operation_error(self, config_client: AsyncClient) -> None:
"""PUT /api/config/global returns 400 when set command fails."""
from app.services.config_service import ConfigOperationError
with patch(
"app.routers.config.config_service.update_global_config",
AsyncMock(side_effect=ConfigOperationError("set failed")),
):
resp = await config_client.put(
"/api/config/global",
json={"log_level": "INFO"},
)
assert resp.status_code == 400
# ---------------------------------------------------------------------------
# POST /api/config/reload
# ---------------------------------------------------------------------------
class TestReloadFail2ban:
"""Tests for ``POST /api/config/reload``."""
async def test_204_on_success(self, config_client: AsyncClient) -> None:
"""POST /api/config/reload returns 204 on success."""
with patch(
"app.routers.config.jail_service.reload_all",
AsyncMock(return_value=None),
):
resp = await config_client.post("/api/config/reload")
assert resp.status_code == 204
async def test_502_when_fail2ban_unreachable(self, config_client: AsyncClient) -> None:
"""POST /api/config/reload returns 502 when fail2ban socket is unreachable."""
from app.utils.fail2ban_client import Fail2BanConnectionError
with patch(
"app.routers.config.jail_service.reload_all",
AsyncMock(side_effect=Fail2BanConnectionError("no socket", "/fake.sock")),
):
resp = await config_client.post("/api/config/reload")
assert resp.status_code == 502
async def test_409_when_reload_operation_fails(self, config_client: AsyncClient) -> None:
"""POST /api/config/reload returns 409 when fail2ban reports a reload error."""
from app.services.jail_service import JailOperationError
with patch(
"app.routers.config.jail_service.reload_all",
AsyncMock(side_effect=JailOperationError("reload rejected")),
):
resp = await config_client.post("/api/config/reload")
assert resp.status_code == 409
# ---------------------------------------------------------------------------
# POST /api/config/restart
# ---------------------------------------------------------------------------
class TestRestartFail2ban:
"""Tests for ``POST /api/config/restart``."""
async def test_204_on_success(self, config_client: AsyncClient) -> None:
"""POST /api/config/restart returns 204 when fail2ban restarts cleanly."""
with (
patch(
"app.routers.config.jail_service.restart",
AsyncMock(return_value=None),
),
patch(
"app.routers.config.config_file_service.start_daemon",
AsyncMock(return_value=True),
),
patch(
"app.routers.config.config_file_service.wait_for_fail2ban",
AsyncMock(return_value=True),
),
):
resp = await config_client.post("/api/config/restart")
assert resp.status_code == 204
async def test_503_when_fail2ban_does_not_come_back(self, config_client: AsyncClient) -> None:
"""POST /api/config/restart returns 503 when fail2ban does not come back online."""
with (
patch(
"app.routers.config.jail_service.restart",
AsyncMock(return_value=None),
),
patch(
"app.routers.config.config_file_service.start_daemon",
AsyncMock(return_value=True),
),
patch(
"app.routers.config.config_file_service.wait_for_fail2ban",
AsyncMock(return_value=False),
),
):
resp = await config_client.post("/api/config/restart")
assert resp.status_code == 503
async def test_409_when_stop_command_fails(self, config_client: AsyncClient) -> None:
"""POST /api/config/restart returns 409 when fail2ban rejects the stop command."""
from app.services.jail_service import JailOperationError
with patch(
"app.routers.config.jail_service.restart",
AsyncMock(side_effect=JailOperationError("stop failed")),
):
resp = await config_client.post("/api/config/restart")
assert resp.status_code == 409
async def test_502_when_fail2ban_unreachable(self, config_client: AsyncClient) -> None:
"""POST /api/config/restart returns 502 when fail2ban socket is unreachable."""
from app.utils.fail2ban_client import Fail2BanConnectionError
with patch(
"app.routers.config.jail_service.restart",
AsyncMock(side_effect=Fail2BanConnectionError("no socket", "/fake.sock")),
):
resp = await config_client.post("/api/config/restart")
assert resp.status_code == 502
async def test_start_daemon_called_after_stop(self, config_client: AsyncClient) -> None:
"""start_daemon is called after a successful stop."""
mock_start = AsyncMock(return_value=True)
with (
patch(
"app.routers.config.jail_service.restart",
AsyncMock(return_value=None),
),
patch(
"app.routers.config.config_file_service.start_daemon",
mock_start,
),
patch(
"app.routers.config.config_file_service.wait_for_fail2ban",
AsyncMock(return_value=True),
),
):
await config_client.post("/api/config/restart")
mock_start.assert_awaited_once()
# ---------------------------------------------------------------------------
# POST /api/config/regex-test
# ---------------------------------------------------------------------------
class TestRegexTest:
"""Tests for ``POST /api/config/regex-test``."""
async def test_200_matched(self, config_client: AsyncClient) -> None:
"""POST /api/config/regex-test returns matched=true for a valid match."""
mock_response = RegexTestResponse(matched=True, groups=["1.2.3.4"], error=None)
with patch(
"app.routers.config.config_service.test_regex",
return_value=mock_response,
):
resp = await config_client.post(
"/api/config/regex-test",
json={
"log_line": "fail from 1.2.3.4",
"fail_regex": r"(\d+\.\d+\.\d+\.\d+)",
},
)
assert resp.status_code == 200
assert resp.json()["matched"] is True
async def test_200_not_matched(self, config_client: AsyncClient) -> None:
"""POST /api/config/regex-test returns matched=false for no match."""
mock_response = RegexTestResponse(matched=False, groups=[], error=None)
with patch(
"app.routers.config.config_service.test_regex",
return_value=mock_response,
):
resp = await config_client.post(
"/api/config/regex-test",
json={"log_line": "ok line", "fail_regex": r"FAIL"},
)
assert resp.status_code == 200
assert resp.json()["matched"] is False
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""POST /api/config/regex-test returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).post(
"/api/config/regex-test",
json={"log_line": "test", "fail_regex": "test"},
)
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# POST /api/config/jails/{name}/logpath
# ---------------------------------------------------------------------------
class TestAddLogPath:
"""Tests for ``POST /api/config/jails/{name}/logpath``."""
async def test_204_on_success(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/sshd/logpath returns 204 on success."""
with patch(
"app.routers.config.config_service.add_log_path",
AsyncMock(return_value=None),
):
resp = await config_client.post(
"/api/config/jails/sshd/logpath",
json={"log_path": "/var/log/specific.log", "tail": True},
)
assert resp.status_code == 204
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/missing/logpath returns 404."""
from app.services.config_service import JailNotFoundError
with patch(
"app.routers.config.config_service.add_log_path",
AsyncMock(side_effect=JailNotFoundError("missing")),
):
resp = await config_client.post(
"/api/config/jails/missing/logpath",
json={"log_path": "/var/log/test.log"},
)
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# POST /api/config/preview-log
# ---------------------------------------------------------------------------
class TestPreviewLog:
"""Tests for ``POST /api/config/preview-log``."""
async def test_200_returns_preview(self, config_client: AsyncClient) -> None:
"""POST /api/config/preview-log returns 200 with LogPreviewResponse."""
from app.models.config import LogPreviewLine, LogPreviewResponse
mock_response = LogPreviewResponse(
lines=[LogPreviewLine(line="fail line", matched=True, groups=[])],
total_lines=1,
matched_count=1,
)
with patch(
"app.routers.config.config_service.preview_log",
AsyncMock(return_value=mock_response),
):
resp = await config_client.post(
"/api/config/preview-log",
json={"log_path": "/var/log/test.log", "fail_regex": "fail"},
)
assert resp.status_code == 200
data = resp.json()
assert data["total_lines"] == 1
assert data["matched_count"] == 1
# ---------------------------------------------------------------------------
# GET /api/config/map-color-thresholds
# ---------------------------------------------------------------------------
class TestGetMapColorThresholds:
"""Tests for ``GET /api/config/map-color-thresholds``."""
async def test_200_returns_thresholds(self, config_client: AsyncClient) -> None:
"""GET /api/config/map-color-thresholds returns 200 with current values."""
resp = await config_client.get("/api/config/map-color-thresholds")
assert resp.status_code == 200
data = resp.json()
assert "threshold_high" in data
assert "threshold_medium" in data
assert "threshold_low" in data
# Should return defaults after setup
assert data["threshold_high"] == 100
assert data["threshold_medium"] == 50
assert data["threshold_low"] == 20
# ---------------------------------------------------------------------------
# PUT /api/config/map-color-thresholds
# ---------------------------------------------------------------------------
class TestUpdateMapColorThresholds:
"""Tests for ``PUT /api/config/map-color-thresholds``."""
async def test_200_updates_thresholds(self, config_client: AsyncClient) -> None:
"""PUT /api/config/map-color-thresholds returns 200 and updates settings."""
update_payload = {
"threshold_high": 200,
"threshold_medium": 80,
"threshold_low": 30,
}
resp = await config_client.put(
"/api/config/map-color-thresholds", json=update_payload
)
assert resp.status_code == 200
data = resp.json()
assert data["threshold_high"] == 200
assert data["threshold_medium"] == 80
assert data["threshold_low"] == 30
# Verify the values persist
get_resp = await config_client.get("/api/config/map-color-thresholds")
assert get_resp.status_code == 200
get_data = get_resp.json()
assert get_data["threshold_high"] == 200
assert get_data["threshold_medium"] == 80
assert get_data["threshold_low"] == 30
async def test_400_for_invalid_order(self, config_client: AsyncClient) -> None:
"""PUT /api/config/map-color-thresholds returns 400 if thresholds are misordered."""
invalid_payload = {
"threshold_high": 50,
"threshold_medium": 50,
"threshold_low": 20,
}
resp = await config_client.put(
"/api/config/map-color-thresholds", json=invalid_payload
)
assert resp.status_code == 400
assert "high > medium > low" in resp.json()["detail"]
async def test_400_for_non_positive_values(
self, config_client: AsyncClient
) -> None:
"""PUT /api/config/map-color-thresholds returns 422 for non-positive values (Pydantic validation)."""
invalid_payload = {
"threshold_high": 100,
"threshold_medium": 50,
"threshold_low": 0,
}
resp = await config_client.put(
"/api/config/map-color-thresholds", json=invalid_payload
)
# Pydantic validates ge=1 constraint before our service code runs
assert resp.status_code == 422
# ---------------------------------------------------------------------------
# GET /api/config/jails/inactive
# ---------------------------------------------------------------------------
class TestGetInactiveJails:
"""Tests for ``GET /api/config/jails/inactive``."""
async def test_200_returns_inactive_list(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails/inactive returns 200 with InactiveJailListResponse."""
from app.models.config import InactiveJail, InactiveJailListResponse
mock_jail = InactiveJail(
name="apache-auth",
filter="apache-auth",
actions=[],
port="http,https",
logpath=["/var/log/apache2/error.log"],
bantime="10m",
findtime="5m",
maxretry=5,
source_file="/etc/fail2ban/jail.conf",
enabled=False,
)
mock_response = InactiveJailListResponse(jails=[mock_jail], total=1)
with patch(
"app.routers.config.config_file_service.list_inactive_jails",
AsyncMock(return_value=mock_response),
):
resp = await config_client.get("/api/config/jails/inactive")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert data["jails"][0]["name"] == "apache-auth"
async def test_200_empty_list(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails/inactive returns 200 with empty list."""
from app.models.config import InactiveJailListResponse
with patch(
"app.routers.config.config_file_service.list_inactive_jails",
AsyncMock(return_value=InactiveJailListResponse(jails=[], total=0)),
):
resp = await config_client.get("/api/config/jails/inactive")
assert resp.status_code == 200
assert resp.json()["total"] == 0
assert resp.json()["jails"] == []
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails/inactive returns 401 without a valid session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/jails/inactive")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# POST /api/config/jails/{name}/activate
# ---------------------------------------------------------------------------
class TestActivateJail:
"""Tests for ``POST /api/config/jails/{name}/activate``."""
async def test_200_activates_jail(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/apache-auth/activate returns 200."""
from app.models.config import JailActivationResponse
mock_response = JailActivationResponse(
name="apache-auth",
active=True,
message="Jail 'apache-auth' activated successfully.",
)
with patch(
"app.routers.config.config_file_service.activate_jail",
AsyncMock(return_value=mock_response),
):
resp = await config_client.post(
"/api/config/jails/apache-auth/activate", json={}
)
assert resp.status_code == 200
data = resp.json()
assert data["active"] is True
assert data["name"] == "apache-auth"
async def test_200_with_overrides(self, config_client: AsyncClient) -> None:
"""POST .../activate accepts override fields."""
from app.models.config import JailActivationResponse
mock_response = JailActivationResponse(
name="apache-auth", active=True, message="Activated."
)
with patch(
"app.routers.config.config_file_service.activate_jail",
AsyncMock(return_value=mock_response),
) as mock_activate:
resp = await config_client.post(
"/api/config/jails/apache-auth/activate",
json={"bantime": "1h", "maxretry": 3},
)
assert resp.status_code == 200
# Verify the override values were passed to the service
called_req = mock_activate.call_args.args[3]
assert called_req.bantime == "1h"
assert called_req.maxretry == 3
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/missing/activate returns 404."""
from app.services.config_file_service import JailNotFoundInConfigError
with patch(
"app.routers.config.config_file_service.activate_jail",
AsyncMock(side_effect=JailNotFoundInConfigError("missing")),
):
resp = await config_client.post(
"/api/config/jails/missing/activate", json={}
)
assert resp.status_code == 404
async def test_409_when_already_active(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/sshd/activate returns 409 if already active."""
from app.services.config_file_service import JailAlreadyActiveError
with patch(
"app.routers.config.config_file_service.activate_jail",
AsyncMock(side_effect=JailAlreadyActiveError("sshd")),
):
resp = await config_client.post(
"/api/config/jails/sshd/activate", json={}
)
assert resp.status_code == 409
async def test_400_for_invalid_jail_name(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/ with bad name returns 400."""
from app.services.config_file_service import JailNameError
with patch(
"app.routers.config.config_file_service.activate_jail",
AsyncMock(side_effect=JailNameError("bad name")),
):
resp = await config_client.post(
"/api/config/jails/bad-name/activate", json={}
)
assert resp.status_code == 400
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/sshd/activate returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).post("/api/config/jails/sshd/activate", json={})
assert resp.status_code == 401
async def test_200_with_active_false_on_missing_logpath(self, config_client: AsyncClient) -> None:
"""POST .../activate returns 200 with active=False when the service blocks due to missing logpath."""
from app.models.config import JailActivationResponse
blocked_response = JailActivationResponse(
name="airsonic-auth",
active=False,
fail2ban_running=True,
validation_warnings=["logpath: log file '/var/log/airsonic/airsonic.log' not found"],
message="Jail 'airsonic-auth' cannot be activated: log file '/var/log/airsonic/airsonic.log' not found",
)
with patch(
"app.routers.config.config_file_service.activate_jail",
AsyncMock(return_value=blocked_response),
):
resp = await config_client.post(
"/api/config/jails/airsonic-auth/activate", json={}
)
assert resp.status_code == 200
data = resp.json()
assert data["active"] is False
assert data["fail2ban_running"] is True
assert "cannot be activated" in data["message"]
assert len(data["validation_warnings"]) == 1
# ---------------------------------------------------------------------------
# POST /api/config/jails/{name}/deactivate
# ---------------------------------------------------------------------------
class TestDeactivateJail:
"""Tests for ``POST /api/config/jails/{name}/deactivate``."""
async def test_200_deactivates_jail(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/sshd/deactivate returns 200."""
from app.models.config import JailActivationResponse
mock_response = JailActivationResponse(
name="sshd",
active=False,
message="Jail 'sshd' deactivated successfully.",
)
with patch(
"app.routers.config.config_file_service.deactivate_jail",
AsyncMock(return_value=mock_response),
):
resp = await config_client.post("/api/config/jails/sshd/deactivate")
assert resp.status_code == 200
data = resp.json()
assert data["active"] is False
assert data["name"] == "sshd"
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/missing/deactivate returns 404."""
from app.services.config_file_service import JailNotFoundInConfigError
with patch(
"app.routers.config.config_file_service.deactivate_jail",
AsyncMock(side_effect=JailNotFoundInConfigError("missing")),
):
resp = await config_client.post(
"/api/config/jails/missing/deactivate"
)
assert resp.status_code == 404
async def test_409_when_already_inactive(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/apache-auth/deactivate returns 409 if already inactive."""
from app.services.config_file_service import JailAlreadyInactiveError
with patch(
"app.routers.config.config_file_service.deactivate_jail",
AsyncMock(side_effect=JailAlreadyInactiveError("apache-auth")),
):
resp = await config_client.post(
"/api/config/jails/apache-auth/deactivate"
)
assert resp.status_code == 409
async def test_400_for_invalid_jail_name(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/.../deactivate with bad name returns 400."""
from app.services.config_file_service import JailNameError
with patch(
"app.routers.config.config_file_service.deactivate_jail",
AsyncMock(side_effect=JailNameError("bad")),
):
resp = await config_client.post(
"/api/config/jails/sshd/deactivate"
)
assert resp.status_code == 400
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/sshd/deactivate returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).post("/api/config/jails/sshd/deactivate")
assert resp.status_code == 401
async def test_deactivate_triggers_health_probe(self, config_client: AsyncClient) -> None:
"""POST .../deactivate triggers an immediate health probe after success."""
from app.models.config import JailActivationResponse
mock_response = JailActivationResponse(
name="sshd",
active=False,
message="Jail 'sshd' deactivated successfully.",
)
with (
patch(
"app.routers.config.config_file_service.deactivate_jail",
AsyncMock(return_value=mock_response),
),
patch(
"app.routers.config._run_probe",
AsyncMock(),
) as mock_probe,
):
resp = await config_client.post("/api/config/jails/sshd/deactivate")
assert resp.status_code == 200
mock_probe.assert_awaited_once()
# ---------------------------------------------------------------------------
# GET /api/config/filters
# ---------------------------------------------------------------------------
def _make_filter_config(name: str, active: bool = False) -> FilterConfig:
return FilterConfig(
name=name,
filename=f"{name}.conf",
before=None,
after=None,
variables={},
prefregex=None,
failregex=[],
ignoreregex=[],
maxlines=None,
datepattern=None,
journalmatch=None,
active=active,
used_by_jails=[name] if active else [],
source_file=f"/etc/fail2ban/filter.d/{name}.conf",
has_local_override=False,
)
class TestListFilters:
"""Tests for ``GET /api/config/filters``."""
async def test_200_returns_filter_list(self, config_client: AsyncClient) -> None:
"""GET /api/config/filters returns 200 with FilterListResponse."""
from app.models.config import FilterListResponse
mock_response = FilterListResponse(
filters=[_make_filter_config("sshd", active=True)],
total=1,
)
with patch(
"app.routers.config.config_file_service.list_filters",
AsyncMock(return_value=mock_response),
):
resp = await config_client.get("/api/config/filters")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert data["filters"][0]["name"] == "sshd"
assert data["filters"][0]["active"] is True
async def test_200_empty_filter_list(self, config_client: AsyncClient) -> None:
"""GET /api/config/filters returns 200 with empty list when no filters found."""
from app.models.config import FilterListResponse
with patch(
"app.routers.config.config_file_service.list_filters",
AsyncMock(return_value=FilterListResponse(filters=[], total=0)),
):
resp = await config_client.get("/api/config/filters")
assert resp.status_code == 200
assert resp.json()["total"] == 0
assert resp.json()["filters"] == []
async def test_active_filters_sorted_before_inactive(
self, config_client: AsyncClient
) -> None:
"""GET /api/config/filters returns active filters before inactive ones."""
from app.models.config import FilterListResponse
mock_response = FilterListResponse(
filters=[
_make_filter_config("nginx", active=False),
_make_filter_config("sshd", active=True),
],
total=2,
)
with patch(
"app.routers.config.config_file_service.list_filters",
AsyncMock(return_value=mock_response),
):
resp = await config_client.get("/api/config/filters")
data = resp.json()
assert data["filters"][0]["name"] == "sshd" # active first
assert data["filters"][1]["name"] == "nginx" # inactive second
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""GET /api/config/filters returns 401 without a valid session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/filters")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# GET /api/config/filters/{name}
# ---------------------------------------------------------------------------
class TestGetFilter:
"""Tests for ``GET /api/config/filters/{name}``."""
async def test_200_returns_filter(self, config_client: AsyncClient) -> None:
"""GET /api/config/filters/sshd returns 200 with FilterConfig."""
with patch(
"app.routers.config.config_file_service.get_filter",
AsyncMock(return_value=_make_filter_config("sshd")),
):
resp = await config_client.get("/api/config/filters/sshd")
assert resp.status_code == 200
data = resp.json()
assert data["name"] == "sshd"
assert "failregex" in data
assert "active" in data
async def test_404_for_unknown_filter(self, config_client: AsyncClient) -> None:
"""GET /api/config/filters/missing returns 404."""
from app.services.config_file_service import FilterNotFoundError
with patch(
"app.routers.config.config_file_service.get_filter",
AsyncMock(side_effect=FilterNotFoundError("missing")),
):
resp = await config_client.get("/api/config/filters/missing")
assert resp.status_code == 404
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""GET /api/config/filters/sshd returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/filters/sshd")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# PUT /api/config/filters/{name} (Task 2.2)
# ---------------------------------------------------------------------------
class TestUpdateFilter:
"""Tests for ``PUT /api/config/filters/{name}``."""
async def test_200_returns_updated_filter(self, config_client: AsyncClient) -> None:
"""PUT /api/config/filters/sshd returns 200 with updated FilterConfig."""
with patch(
"app.routers.config.config_file_service.update_filter",
AsyncMock(return_value=_make_filter_config("sshd")),
):
resp = await config_client.put(
"/api/config/filters/sshd",
json={"failregex": [r"^fail from <HOST>"]},
)
assert resp.status_code == 200
assert resp.json()["name"] == "sshd"
async def test_404_for_unknown_filter(self, config_client: AsyncClient) -> None:
"""PUT /api/config/filters/missing returns 404."""
from app.services.config_file_service import FilterNotFoundError
with patch(
"app.routers.config.config_file_service.update_filter",
AsyncMock(side_effect=FilterNotFoundError("missing")),
):
resp = await config_client.put(
"/api/config/filters/missing",
json={},
)
assert resp.status_code == 404
async def test_422_for_invalid_regex(self, config_client: AsyncClient) -> None:
"""PUT /api/config/filters/sshd returns 422 for bad regex."""
from app.services.config_file_service import FilterInvalidRegexError
with patch(
"app.routers.config.config_file_service.update_filter",
AsyncMock(side_effect=FilterInvalidRegexError("[bad", "unterminated")),
):
resp = await config_client.put(
"/api/config/filters/sshd",
json={"failregex": ["[bad"]},
)
assert resp.status_code == 422
async def test_400_for_invalid_name(self, config_client: AsyncClient) -> None:
"""PUT /api/config/filters/... with bad name returns 400."""
from app.services.config_file_service import FilterNameError
with patch(
"app.routers.config.config_file_service.update_filter",
AsyncMock(side_effect=FilterNameError("bad")),
):
resp = await config_client.put(
"/api/config/filters/bad",
json={},
)
assert resp.status_code == 400
async def test_reload_query_param_passed(self, config_client: AsyncClient) -> None:
"""PUT /api/config/filters/sshd?reload=true passes do_reload=True."""
with patch(
"app.routers.config.config_file_service.update_filter",
AsyncMock(return_value=_make_filter_config("sshd")),
) as mock_update:
resp = await config_client.put(
"/api/config/filters/sshd?reload=true",
json={},
)
assert resp.status_code == 200
assert mock_update.call_args.kwargs.get("do_reload") is True
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""PUT /api/config/filters/sshd returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).put("/api/config/filters/sshd", json={})
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# POST /api/config/filters (Task 2.2)
# ---------------------------------------------------------------------------
class TestCreateFilter:
"""Tests for ``POST /api/config/filters``."""
async def test_201_creates_filter(self, config_client: AsyncClient) -> None:
"""POST /api/config/filters returns 201 with FilterConfig."""
with patch(
"app.routers.config.config_file_service.create_filter",
AsyncMock(return_value=_make_filter_config("my-custom")),
):
resp = await config_client.post(
"/api/config/filters",
json={"name": "my-custom", "failregex": [r"^fail from <HOST>"]},
)
assert resp.status_code == 201
assert resp.json()["name"] == "my-custom"
async def test_409_when_already_exists(self, config_client: AsyncClient) -> None:
"""POST /api/config/filters returns 409 if filter exists."""
from app.services.config_file_service import FilterAlreadyExistsError
with patch(
"app.routers.config.config_file_service.create_filter",
AsyncMock(side_effect=FilterAlreadyExistsError("sshd")),
):
resp = await config_client.post(
"/api/config/filters",
json={"name": "sshd"},
)
assert resp.status_code == 409
async def test_422_for_invalid_regex(self, config_client: AsyncClient) -> None:
"""POST /api/config/filters returns 422 for bad regex."""
from app.services.config_file_service import FilterInvalidRegexError
with patch(
"app.routers.config.config_file_service.create_filter",
AsyncMock(side_effect=FilterInvalidRegexError("[bad", "unterminated")),
):
resp = await config_client.post(
"/api/config/filters",
json={"name": "test", "failregex": ["[bad"]},
)
assert resp.status_code == 422
async def test_400_for_invalid_name(self, config_client: AsyncClient) -> None:
"""POST /api/config/filters returns 400 for invalid filter name."""
from app.services.config_file_service import FilterNameError
with patch(
"app.routers.config.config_file_service.create_filter",
AsyncMock(side_effect=FilterNameError("bad")),
):
resp = await config_client.post(
"/api/config/filters",
json={"name": "bad"},
)
assert resp.status_code == 400
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""POST /api/config/filters returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).post("/api/config/filters", json={"name": "test"})
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# DELETE /api/config/filters/{name} (Task 2.2)
# ---------------------------------------------------------------------------
class TestDeleteFilter:
"""Tests for ``DELETE /api/config/filters/{name}``."""
async def test_204_deletes_filter(self, config_client: AsyncClient) -> None:
"""DELETE /api/config/filters/my-custom returns 204."""
with patch(
"app.routers.config.config_file_service.delete_filter",
AsyncMock(return_value=None),
):
resp = await config_client.delete("/api/config/filters/my-custom")
assert resp.status_code == 204
async def test_404_for_unknown_filter(self, config_client: AsyncClient) -> None:
"""DELETE /api/config/filters/missing returns 404."""
from app.services.config_file_service import FilterNotFoundError
with patch(
"app.routers.config.config_file_service.delete_filter",
AsyncMock(side_effect=FilterNotFoundError("missing")),
):
resp = await config_client.delete("/api/config/filters/missing")
assert resp.status_code == 404
async def test_409_for_readonly_filter(self, config_client: AsyncClient) -> None:
"""DELETE /api/config/filters/sshd returns 409 for shipped conf-only filter."""
from app.services.config_file_service import FilterReadonlyError
with patch(
"app.routers.config.config_file_service.delete_filter",
AsyncMock(side_effect=FilterReadonlyError("sshd")),
):
resp = await config_client.delete("/api/config/filters/sshd")
assert resp.status_code == 409
async def test_400_for_invalid_name(self, config_client: AsyncClient) -> None:
"""DELETE /api/config/filters/... with bad name returns 400."""
from app.services.config_file_service import FilterNameError
with patch(
"app.routers.config.config_file_service.delete_filter",
AsyncMock(side_effect=FilterNameError("bad")),
):
resp = await config_client.delete("/api/config/filters/bad")
assert resp.status_code == 400
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""DELETE /api/config/filters/sshd returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).delete("/api/config/filters/sshd")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# POST /api/config/jails/{name}/filter (Task 2.2)
# ---------------------------------------------------------------------------
class TestAssignFilterToJail:
"""Tests for ``POST /api/config/jails/{name}/filter``."""
async def test_204_assigns_filter(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/sshd/filter returns 204 on success."""
with patch(
"app.routers.config.config_file_service.assign_filter_to_jail",
AsyncMock(return_value=None),
):
resp = await config_client.post(
"/api/config/jails/sshd/filter",
json={"filter_name": "myfilter"},
)
assert resp.status_code == 204
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/missing/filter returns 404."""
from app.services.config_file_service import JailNotFoundInConfigError
with patch(
"app.routers.config.config_file_service.assign_filter_to_jail",
AsyncMock(side_effect=JailNotFoundInConfigError("missing")),
):
resp = await config_client.post(
"/api/config/jails/missing/filter",
json={"filter_name": "sshd"},
)
assert resp.status_code == 404
async def test_404_for_unknown_filter(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/sshd/filter returns 404 when filter not found."""
from app.services.config_file_service import FilterNotFoundError
with patch(
"app.routers.config.config_file_service.assign_filter_to_jail",
AsyncMock(side_effect=FilterNotFoundError("missing-filter")),
):
resp = await config_client.post(
"/api/config/jails/sshd/filter",
json={"filter_name": "missing-filter"},
)
assert resp.status_code == 404
async def test_400_for_invalid_jail_name(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/.../filter with bad jail name returns 400."""
from app.services.config_file_service import JailNameError
with patch(
"app.routers.config.config_file_service.assign_filter_to_jail",
AsyncMock(side_effect=JailNameError("bad")),
):
resp = await config_client.post(
"/api/config/jails/sshd/filter",
json={"filter_name": "valid"},
)
assert resp.status_code == 400
async def test_400_for_invalid_filter_name(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/sshd/filter with bad filter name returns 400."""
from app.services.config_file_service import FilterNameError
with patch(
"app.routers.config.config_file_service.assign_filter_to_jail",
AsyncMock(side_effect=FilterNameError("bad")),
):
resp = await config_client.post(
"/api/config/jails/sshd/filter",
json={"filter_name": "../evil"},
)
assert resp.status_code == 400
async def test_reload_query_param_passed(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/sshd/filter?reload=true passes do_reload=True."""
with patch(
"app.routers.config.config_file_service.assign_filter_to_jail",
AsyncMock(return_value=None),
) as mock_assign:
resp = await config_client.post(
"/api/config/jails/sshd/filter?reload=true",
json={"filter_name": "sshd"},
)
assert resp.status_code == 204
assert mock_assign.call_args.kwargs.get("do_reload") is True
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/sshd/filter returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).post("/api/config/jails/sshd/filter", json={"filter_name": "sshd"})
assert resp.status_code == 401
# ===========================================================================
# Action router tests (Task 3.1 + 3.2)
# ===========================================================================
@pytest.mark.asyncio
class TestListActionsRouter:
async def test_200_returns_action_list(self, config_client: AsyncClient) -> None:
from app.models.config import ActionConfig, ActionListResponse
mock_action = ActionConfig(
name="iptables",
filename="iptables.conf",
actionban="/sbin/iptables -I f2b-<name> 1 -s <ip> -j DROP",
)
mock_response = ActionListResponse(actions=[mock_action], total=1)
with patch(
"app.routers.config.config_file_service.list_actions",
AsyncMock(return_value=mock_response),
):
resp = await config_client.get("/api/config/actions")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert data["actions"][0]["name"] == "iptables"
async def test_active_sorted_first(self, config_client: AsyncClient) -> None:
from app.models.config import ActionConfig, ActionListResponse
inactive = ActionConfig(name="aaa", filename="aaa.conf", active=False)
active = ActionConfig(name="zzz", filename="zzz.conf", active=True)
mock_response = ActionListResponse(actions=[inactive, active], total=2)
with patch(
"app.routers.config.config_file_service.list_actions",
AsyncMock(return_value=mock_response),
):
resp = await config_client.get("/api/config/actions")
data = resp.json()
assert data["actions"][0]["name"] == "zzz" # active comes first
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/actions")
assert resp.status_code == 401
@pytest.mark.asyncio
class TestGetActionRouter:
async def test_200_returns_action(self, config_client: AsyncClient) -> None:
from app.models.config import ActionConfig
mock_action = ActionConfig(
name="iptables",
filename="iptables.conf",
actionban="/sbin/iptables -I f2b-<name> 1 -s <ip> -j DROP",
)
with patch(
"app.routers.config.config_file_service.get_action",
AsyncMock(return_value=mock_action),
):
resp = await config_client.get("/api/config/actions/iptables")
assert resp.status_code == 200
assert resp.json()["name"] == "iptables"
async def test_404_when_not_found(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNotFoundError
with patch(
"app.routers.config.config_file_service.get_action",
AsyncMock(side_effect=ActionNotFoundError("missing")),
):
resp = await config_client.get("/api/config/actions/missing")
assert resp.status_code == 404
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/actions/iptables")
assert resp.status_code == 401
@pytest.mark.asyncio
class TestUpdateActionRouter:
async def test_200_returns_updated_action(self, config_client: AsyncClient) -> None:
from app.models.config import ActionConfig
updated = ActionConfig(
name="iptables",
filename="iptables.local",
actionban="echo ban",
)
with patch(
"app.routers.config.config_file_service.update_action",
AsyncMock(return_value=updated),
):
resp = await config_client.put(
"/api/config/actions/iptables",
json={"actionban": "echo ban"},
)
assert resp.status_code == 200
assert resp.json()["actionban"] == "echo ban"
async def test_404_when_not_found(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNotFoundError
with patch(
"app.routers.config.config_file_service.update_action",
AsyncMock(side_effect=ActionNotFoundError("missing")),
):
resp = await config_client.put(
"/api/config/actions/missing", json={}
)
assert resp.status_code == 404
async def test_400_for_bad_name(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNameError
with patch(
"app.routers.config.config_file_service.update_action",
AsyncMock(side_effect=ActionNameError()),
):
resp = await config_client.put(
"/api/config/actions/badname", json={}
)
assert resp.status_code == 400
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).put("/api/config/actions/iptables", json={})
assert resp.status_code == 401
@pytest.mark.asyncio
class TestCreateActionRouter:
async def test_201_returns_created_action(self, config_client: AsyncClient) -> None:
from app.models.config import ActionConfig
created = ActionConfig(
name="custom",
filename="custom.local",
actionban="echo ban",
)
with patch(
"app.routers.config.config_file_service.create_action",
AsyncMock(return_value=created),
):
resp = await config_client.post(
"/api/config/actions",
json={"name": "custom", "actionban": "echo ban"},
)
assert resp.status_code == 201
assert resp.json()["name"] == "custom"
async def test_409_when_already_exists(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionAlreadyExistsError
with patch(
"app.routers.config.config_file_service.create_action",
AsyncMock(side_effect=ActionAlreadyExistsError("iptables")),
):
resp = await config_client.post(
"/api/config/actions",
json={"name": "iptables"},
)
assert resp.status_code == 409
async def test_400_for_bad_name(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNameError
with patch(
"app.routers.config.config_file_service.create_action",
AsyncMock(side_effect=ActionNameError()),
):
resp = await config_client.post(
"/api/config/actions",
json={"name": "badname"},
)
assert resp.status_code == 400
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).post("/api/config/actions", json={"name": "x"})
assert resp.status_code == 401
@pytest.mark.asyncio
class TestDeleteActionRouter:
async def test_204_on_delete(self, config_client: AsyncClient) -> None:
with patch(
"app.routers.config.config_file_service.delete_action",
AsyncMock(return_value=None),
):
resp = await config_client.delete("/api/config/actions/custom")
assert resp.status_code == 204
async def test_404_when_not_found(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNotFoundError
with patch(
"app.routers.config.config_file_service.delete_action",
AsyncMock(side_effect=ActionNotFoundError("missing")),
):
resp = await config_client.delete("/api/config/actions/missing")
assert resp.status_code == 404
async def test_409_when_readonly(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionReadonlyError
with patch(
"app.routers.config.config_file_service.delete_action",
AsyncMock(side_effect=ActionReadonlyError("iptables")),
):
resp = await config_client.delete("/api/config/actions/iptables")
assert resp.status_code == 409
async def test_400_for_bad_name(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNameError
with patch(
"app.routers.config.config_file_service.delete_action",
AsyncMock(side_effect=ActionNameError()),
):
resp = await config_client.delete("/api/config/actions/badname")
assert resp.status_code == 400
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).delete("/api/config/actions/iptables")
assert resp.status_code == 401
@pytest.mark.asyncio
class TestAssignActionToJailRouter:
async def test_204_on_success(self, config_client: AsyncClient) -> None:
with patch(
"app.routers.config.config_file_service.assign_action_to_jail",
AsyncMock(return_value=None),
):
resp = await config_client.post(
"/api/config/jails/sshd/action",
json={"action_name": "iptables"},
)
assert resp.status_code == 204
async def test_404_when_jail_not_found(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import JailNotFoundInConfigError
with patch(
"app.routers.config.config_file_service.assign_action_to_jail",
AsyncMock(side_effect=JailNotFoundInConfigError("missing")),
):
resp = await config_client.post(
"/api/config/jails/missing/action",
json={"action_name": "iptables"},
)
assert resp.status_code == 404
async def test_404_when_action_not_found(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNotFoundError
with patch(
"app.routers.config.config_file_service.assign_action_to_jail",
AsyncMock(side_effect=ActionNotFoundError("missing")),
):
resp = await config_client.post(
"/api/config/jails/sshd/action",
json={"action_name": "missing"},
)
assert resp.status_code == 404
async def test_400_for_bad_jail_name(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import JailNameError
with patch(
"app.routers.config.config_file_service.assign_action_to_jail",
AsyncMock(side_effect=JailNameError()),
):
resp = await config_client.post(
"/api/config/jails/badjailname/action",
json={"action_name": "iptables"},
)
assert resp.status_code == 400
async def test_400_for_bad_action_name(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNameError
with patch(
"app.routers.config.config_file_service.assign_action_to_jail",
AsyncMock(side_effect=ActionNameError()),
):
resp = await config_client.post(
"/api/config/jails/sshd/action",
json={"action_name": "badaction"},
)
assert resp.status_code == 400
async def test_reload_param_passed(self, config_client: AsyncClient) -> None:
with patch(
"app.routers.config.config_file_service.assign_action_to_jail",
AsyncMock(return_value=None),
) as mock_assign:
resp = await config_client.post(
"/api/config/jails/sshd/action?reload=true",
json={"action_name": "iptables"},
)
assert resp.status_code == 204
assert mock_assign.call_args.kwargs.get("do_reload") is True
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).post("/api/config/jails/sshd/action", json={"action_name": "iptables"})
assert resp.status_code == 401
@pytest.mark.asyncio
class TestRemoveActionFromJailRouter:
async def test_204_on_success(self, config_client: AsyncClient) -> None:
with patch(
"app.routers.config.config_file_service.remove_action_from_jail",
AsyncMock(return_value=None),
):
resp = await config_client.delete(
"/api/config/jails/sshd/action/iptables"
)
assert resp.status_code == 204
async def test_404_when_jail_not_found(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import JailNotFoundInConfigError
with patch(
"app.routers.config.config_file_service.remove_action_from_jail",
AsyncMock(side_effect=JailNotFoundInConfigError("missing")),
):
resp = await config_client.delete(
"/api/config/jails/missing/action/iptables"
)
assert resp.status_code == 404
async def test_400_for_bad_jail_name(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import JailNameError
with patch(
"app.routers.config.config_file_service.remove_action_from_jail",
AsyncMock(side_effect=JailNameError()),
):
resp = await config_client.delete(
"/api/config/jails/badjailname/action/iptables"
)
assert resp.status_code == 400
async def test_400_for_bad_action_name(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNameError
with patch(
"app.routers.config.config_file_service.remove_action_from_jail",
AsyncMock(side_effect=ActionNameError()),
):
resp = await config_client.delete(
"/api/config/jails/sshd/action/badactionname"
)
assert resp.status_code == 400
async def test_reload_param_passed(self, config_client: AsyncClient) -> None:
with patch(
"app.routers.config.config_file_service.remove_action_from_jail",
AsyncMock(return_value=None),
) as mock_rm:
resp = await config_client.delete(
"/api/config/jails/sshd/action/iptables?reload=true"
)
assert resp.status_code == 204
assert mock_rm.call_args.kwargs.get("do_reload") is True
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).delete("/api/config/jails/sshd/action/iptables")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# GET /api/config/fail2ban-log
# ---------------------------------------------------------------------------
class TestGetFail2BanLog:
"""Tests for ``GET /api/config/fail2ban-log``."""
def _mock_log_response(self) -> Fail2BanLogResponse:
return Fail2BanLogResponse(
log_path="/var/log/fail2ban.log",
lines=["2025-01-01 INFO sshd Found 1.2.3.4", "2025-01-01 ERROR oops"],
total_lines=100,
log_level="INFO",
log_target="/var/log/fail2ban.log",
)
async def test_200_returns_log_response(self, config_client: AsyncClient) -> None:
"""GET /api/config/fail2ban-log returns 200 with Fail2BanLogResponse."""
with patch(
"app.routers.config.config_service.read_fail2ban_log",
AsyncMock(return_value=self._mock_log_response()),
):
resp = await config_client.get("/api/config/fail2ban-log")
assert resp.status_code == 200
data = resp.json()
assert data["log_path"] == "/var/log/fail2ban.log"
assert isinstance(data["lines"], list)
assert data["total_lines"] == 100
assert data["log_level"] == "INFO"
async def test_200_passes_lines_query_param(self, config_client: AsyncClient) -> None:
"""GET /api/config/fail2ban-log passes the lines query param to the service."""
with patch(
"app.routers.config.config_service.read_fail2ban_log",
AsyncMock(return_value=self._mock_log_response()),
) as mock_fn:
resp = await config_client.get("/api/config/fail2ban-log?lines=500")
assert resp.status_code == 200
_socket, lines_arg, _filter = mock_fn.call_args.args
assert lines_arg == 500
async def test_200_passes_filter_query_param(self, config_client: AsyncClient) -> None:
"""GET /api/config/fail2ban-log passes the filter query param to the service."""
with patch(
"app.routers.config.config_service.read_fail2ban_log",
AsyncMock(return_value=self._mock_log_response()),
) as mock_fn:
resp = await config_client.get("/api/config/fail2ban-log?filter=ERROR")
assert resp.status_code == 200
_socket, _lines, filter_arg = mock_fn.call_args.args
assert filter_arg == "ERROR"
async def test_400_when_non_file_target(self, config_client: AsyncClient) -> None:
"""GET /api/config/fail2ban-log returns 400 when log target is not a file."""
from app.services.config_service import ConfigOperationError
with patch(
"app.routers.config.config_service.read_fail2ban_log",
AsyncMock(side_effect=ConfigOperationError("fail2ban is logging to 'STDOUT'")),
):
resp = await config_client.get("/api/config/fail2ban-log")
assert resp.status_code == 400
async def test_400_when_path_traversal_detected(self, config_client: AsyncClient) -> None:
"""GET /api/config/fail2ban-log returns 400 when the path is outside safe dirs."""
from app.services.config_service import ConfigOperationError
with patch(
"app.routers.config.config_service.read_fail2ban_log",
AsyncMock(side_effect=ConfigOperationError("outside the allowed directory")),
):
resp = await config_client.get("/api/config/fail2ban-log")
assert resp.status_code == 400
async def test_502_when_fail2ban_unreachable(self, config_client: AsyncClient) -> None:
"""GET /api/config/fail2ban-log returns 502 when fail2ban is down."""
from app.utils.fail2ban_client import Fail2BanConnectionError
with patch(
"app.routers.config.config_service.read_fail2ban_log",
AsyncMock(side_effect=Fail2BanConnectionError("socket error", "/tmp/f.sock")),
):
resp = await config_client.get("/api/config/fail2ban-log")
assert resp.status_code == 502
async def test_422_for_lines_exceeding_max(self, config_client: AsyncClient) -> None:
"""GET /api/config/fail2ban-log returns 422 for lines > 2000."""
resp = await config_client.get("/api/config/fail2ban-log?lines=9999")
assert resp.status_code == 422
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""GET /api/config/fail2ban-log requires authentication."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/fail2ban-log")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# GET /api/config/service-status
# ---------------------------------------------------------------------------
class TestGetServiceStatus:
"""Tests for ``GET /api/config/service-status``."""
def _mock_status(self, online: bool = True) -> ServiceStatusResponse:
return ServiceStatusResponse(
online=online,
version="1.0.0" if online else None,
bangui_version=app.__version__,
jail_count=2 if online else 0,
total_bans=10 if online else 0,
total_failures=3 if online else 0,
log_level="INFO" if online else "UNKNOWN",
log_target="/var/log/fail2ban.log" if online else "UNKNOWN",
)
async def test_200_when_online(self, config_client: AsyncClient) -> None:
"""GET /api/config/service-status returns 200 with full status when online."""
with patch(
"app.routers.config.config_service.get_service_status",
AsyncMock(return_value=self._mock_status(online=True)),
):
resp = await config_client.get("/api/config/service-status")
assert resp.status_code == 200
data = resp.json()
assert data["online"] is True
assert data["bangui_version"] == app.__version__
assert data["jail_count"] == 2
assert data["log_level"] == "INFO"
async def test_200_when_offline(self, config_client: AsyncClient) -> None:
"""GET /api/config/service-status returns 200 with offline=False when daemon is down."""
with patch(
"app.routers.config.config_service.get_service_status",
AsyncMock(return_value=self._mock_status(online=False)),
):
resp = await config_client.get("/api/config/service-status")
assert resp.status_code == 200
data = resp.json()
assert data["bangui_version"] == app.__version__
assert data["online"] is False
assert data["log_level"] == "UNKNOWN"
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""GET /api/config/service-status requires authentication."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/service-status")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# Task 3 endpoints
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestValidateJailEndpoint:
"""Tests for ``POST /api/config/jails/{name}/validate``."""
async def test_200_valid_config(self, config_client: AsyncClient) -> None:
"""Returns 200 with valid=True when the jail config has no issues."""
from app.models.config import JailValidationResult
mock_result = JailValidationResult(
jail_name="sshd", valid=True, issues=[]
)
with patch(
"app.routers.config.config_file_service.validate_jail_config",
AsyncMock(return_value=mock_result),
):
resp = await config_client.post("/api/config/jails/sshd/validate")
assert resp.status_code == 200
data = resp.json()
assert data["valid"] is True
assert data["jail_name"] == "sshd"
assert data["issues"] == []
async def test_200_invalid_config(self, config_client: AsyncClient) -> None:
"""Returns 200 with valid=False and issues when there are errors."""
from app.models.config import JailValidationIssue, JailValidationResult
issue = JailValidationIssue(field="filter", message="Filter file not found: filter.d/bad.conf (or .local)")
mock_result = JailValidationResult(
jail_name="sshd", valid=False, issues=[issue]
)
with patch(
"app.routers.config.config_file_service.validate_jail_config",
AsyncMock(return_value=mock_result),
):
resp = await config_client.post("/api/config/jails/sshd/validate")
assert resp.status_code == 200
data = resp.json()
assert data["valid"] is False
assert len(data["issues"]) == 1
assert data["issues"][0]["field"] == "filter"
async def test_400_for_invalid_jail_name(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/bad-name/validate returns 400 on JailNameError."""
from app.services.config_file_service import JailNameError
with patch(
"app.routers.config.config_file_service.validate_jail_config",
AsyncMock(side_effect=JailNameError("bad name")),
):
resp = await config_client.post("/api/config/jails/bad-name/validate")
assert resp.status_code == 400
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/sshd/validate returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).post("/api/config/jails/sshd/validate")
assert resp.status_code == 401
@pytest.mark.asyncio
class TestPendingRecovery:
"""Tests for ``GET /api/config/pending-recovery``."""
async def test_returns_null_when_no_pending_recovery(
self, config_client: AsyncClient
) -> None:
"""Returns null body (204-like 200) when pending_recovery is not set."""
app = config_client._transport.app # type: ignore[attr-defined]
app.state.pending_recovery = None
resp = await config_client.get("/api/config/pending-recovery")
assert resp.status_code == 200
assert resp.json() is None
async def test_returns_record_when_set(self, config_client: AsyncClient) -> None:
"""Returns the PendingRecovery model when one is stored on app.state."""
import datetime
from app.models.config import PendingRecovery
now = datetime.datetime.now(tz=datetime.UTC)
record = PendingRecovery(
jail_name="sshd",
activated_at=now - datetime.timedelta(seconds=20),
detected_at=now,
)
app = config_client._transport.app # type: ignore[attr-defined]
app.state.pending_recovery = record
resp = await config_client.get("/api/config/pending-recovery")
assert resp.status_code == 200
data = resp.json()
assert data["jail_name"] == "sshd"
assert data["recovered"] is False
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""GET /api/config/pending-recovery returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/pending-recovery")
assert resp.status_code == 401
@pytest.mark.asyncio
class TestRollbackEndpoint:
"""Tests for ``POST /api/config/jails/{name}/rollback``."""
async def test_200_success_clears_pending_recovery(
self, config_client: AsyncClient
) -> None:
"""A successful rollback returns 200 and clears app.state.pending_recovery."""
import datetime
from app.models.config import PendingRecovery, RollbackResponse
# Set up a pending recovery record on the app.
app = config_client._transport.app # type: ignore[attr-defined]
now = datetime.datetime.now(tz=datetime.UTC)
app.state.pending_recovery = PendingRecovery(
jail_name="sshd",
activated_at=now - datetime.timedelta(seconds=10),
detected_at=now,
)
mock_result = RollbackResponse(
jail_name="sshd",
disabled=True,
fail2ban_running=True,
active_jails=0,
message="Jail 'sshd' disabled and fail2ban restarted.",
)
with patch(
"app.routers.config.config_file_service.rollback_jail",
AsyncMock(return_value=mock_result),
):
resp = await config_client.post("/api/config/jails/sshd/rollback")
assert resp.status_code == 200
data = resp.json()
assert data["disabled"] is True
assert data["fail2ban_running"] is True
# Successful rollback must clear the pending record.
assert app.state.pending_recovery is None
async def test_200_fail_preserves_pending_recovery(
self, config_client: AsyncClient
) -> None:
"""When fail2ban is still down after rollback, pending_recovery is retained."""
import datetime
from app.models.config import PendingRecovery, RollbackResponse
app = config_client._transport.app # type: ignore[attr-defined]
now = datetime.datetime.now(tz=datetime.UTC)
record = PendingRecovery(
jail_name="sshd",
activated_at=now - datetime.timedelta(seconds=10),
detected_at=now,
)
app.state.pending_recovery = record
mock_result = RollbackResponse(
jail_name="sshd",
disabled=True,
fail2ban_running=False,
active_jails=0,
message="fail2ban did not come back online.",
)
with patch(
"app.routers.config.config_file_service.rollback_jail",
AsyncMock(return_value=mock_result),
):
resp = await config_client.post("/api/config/jails/sshd/rollback")
assert resp.status_code == 200
data = resp.json()
assert data["fail2ban_running"] is False
# Pending record should NOT be cleared when rollback didn't fully succeed.
assert app.state.pending_recovery is not None
async def test_400_for_invalid_jail_name(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/bad/rollback returns 400 on JailNameError."""
from app.services.config_file_service import JailNameError
with patch(
"app.routers.config.config_file_service.rollback_jail",
AsyncMock(side_effect=JailNameError("bad")),
):
resp = await config_client.post("/api/config/jails/bad/rollback")
assert resp.status_code == 400
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/sshd/rollback returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).post("/api/config/jails/sshd/rollback")
assert resp.status_code == 401