Task 2: adds a new Log tab to the Configuration page.
Backend:
- New Pydantic models: Fail2BanLogResponse, ServiceStatusResponse
(backend/app/models/config.py)
- New service methods in config_service.py:
read_fail2ban_log() — queries socket for log target/level, validates the
resolved path against a safe-prefix allowlist (/var/log) to prevent
path traversal, then reads the tail of the file via the existing
_read_tail_lines() helper; optional substring filter applied server-side.
get_service_status() — delegates to health_service.probe() and appends
log level/target from the socket.
- New endpoints in routers/config.py:
GET /api/config/fail2ban-log?lines=200&filter=...
GET /api/config/service-status
Both require authentication; log endpoint returns 400 for non-file log
targets or path-traversal attempts, 502 when fail2ban is unreachable.
Frontend:
- New LogTab.tsx component:
Service Health panel (Running/Offline badge, version, jail count, bans,
failures, log level/target, offline warning banner).
Log viewer with color-coded lines (error=red, warning=yellow,
debug=grey), toolbar (filter input + debounce, lines selector, manual
refresh, auto-refresh with interval selector), truncation notice, and
auto-scroll to bottom on data updates.
fetchData uses Promise.allSettled so a log-read failure never hides the
service-health panel.
- Types: Fail2BanLogResponse, ServiceStatusResponse (types/config.ts)
- API functions: fetchFail2BanLog, fetchServiceStatus (api/config.ts)
- Endpoint constants (api/endpoints.ts)
- ConfigPage.tsx: Log tab added after existing tabs
Tests:
- Backend service tests: TestReadFail2BanLog (6), TestGetServiceStatus (2)
- Backend router tests: TestGetFail2BanLog (8), TestGetServiceStatus (3)
- Frontend: LogTab.test.tsx (8 tests)
Docs:
- Features.md: Log section added under Configuration View
- Architekture.md: config.py router and config_service.py descriptions updated
- Tasks.md: Task 2 marked done
1877 lines
71 KiB
Python
1877 lines
71 KiB
Python
"""Tests for the config router endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import aiosqlite
|
|
import pytest
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from app.config import Settings
|
|
from app.db import init_db
|
|
from app.main import create_app
|
|
from app.models.config import (
|
|
Fail2BanLogResponse,
|
|
FilterConfig,
|
|
GlobalConfigResponse,
|
|
JailConfig,
|
|
JailConfigListResponse,
|
|
JailConfigResponse,
|
|
RegexTestResponse,
|
|
ServiceStatusResponse,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_SETUP_PAYLOAD = {
|
|
"master_password": "testpassword1",
|
|
"database_path": "bangui.db",
|
|
"fail2ban_socket": "/var/run/fail2ban/fail2ban.sock",
|
|
"timezone": "UTC",
|
|
"session_duration_minutes": 60,
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
async def config_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc]
|
|
"""Provide an authenticated ``AsyncClient`` for config endpoint tests."""
|
|
settings = Settings(
|
|
database_path=str(tmp_path / "config_test.db"),
|
|
fail2ban_socket="/tmp/fake.sock",
|
|
session_secret="test-config-secret",
|
|
session_duration_minutes=60,
|
|
timezone="UTC",
|
|
log_level="debug",
|
|
)
|
|
app = create_app(settings=settings)
|
|
|
|
db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path)
|
|
db.row_factory = aiosqlite.Row
|
|
await init_db(db)
|
|
app.state.db = db
|
|
app.state.http_session = MagicMock()
|
|
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
|
await ac.post("/api/setup", json=_SETUP_PAYLOAD)
|
|
login = await ac.post(
|
|
"/api/auth/login",
|
|
json={"password": _SETUP_PAYLOAD["master_password"]},
|
|
)
|
|
assert login.status_code == 200
|
|
yield ac
|
|
|
|
await db.close()
|
|
|
|
|
|
def _make_jail_config(name: str = "sshd") -> JailConfig:
|
|
return JailConfig(
|
|
name=name,
|
|
ban_time=600,
|
|
max_retry=5,
|
|
find_time=600,
|
|
fail_regex=["regex1"],
|
|
ignore_regex=[],
|
|
log_paths=["/var/log/auth.log"],
|
|
date_pattern=None,
|
|
log_encoding="UTF-8",
|
|
backend="polling",
|
|
use_dns="warn",
|
|
prefregex="",
|
|
actions=["iptables"],
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/jails
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetJailConfigs:
|
|
"""Tests for ``GET /api/config/jails``."""
|
|
|
|
async def test_200_returns_jail_list(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/jails returns 200 with JailConfigListResponse."""
|
|
mock_response = JailConfigListResponse(
|
|
jails=[_make_jail_config("sshd")], total=1
|
|
)
|
|
with patch(
|
|
"app.routers.config.config_service.list_jail_configs",
|
|
AsyncMock(return_value=mock_response),
|
|
):
|
|
resp = await config_client.get("/api/config/jails")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 1
|
|
assert data["jails"][0]["name"] == "sshd"
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/jails returns 401 without a valid session."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).get("/api/config/jails")
|
|
assert resp.status_code == 401
|
|
|
|
async def test_502_on_connection_error(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/jails returns 502 when fail2ban is unreachable."""
|
|
from app.utils.fail2ban_client import Fail2BanConnectionError
|
|
|
|
with patch(
|
|
"app.routers.config.config_service.list_jail_configs",
|
|
AsyncMock(side_effect=Fail2BanConnectionError("down", "/tmp/fake.sock")),
|
|
):
|
|
resp = await config_client.get("/api/config/jails")
|
|
|
|
assert resp.status_code == 502
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/jails/{name}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetJailConfig:
|
|
"""Tests for ``GET /api/config/jails/{name}``."""
|
|
|
|
async def test_200_returns_jail_config(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/jails/sshd returns 200 with JailConfigResponse."""
|
|
mock_response = JailConfigResponse(jail=_make_jail_config("sshd"))
|
|
with patch(
|
|
"app.routers.config.config_service.get_jail_config",
|
|
AsyncMock(return_value=mock_response),
|
|
):
|
|
resp = await config_client.get("/api/config/jails/sshd")
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["jail"]["name"] == "sshd"
|
|
assert resp.json()["jail"]["ban_time"] == 600
|
|
|
|
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/jails/missing returns 404."""
|
|
from app.services.config_service import JailNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.config.config_service.get_jail_config",
|
|
AsyncMock(side_effect=JailNotFoundError("missing")),
|
|
):
|
|
resp = await config_client.get("/api/config/jails/missing")
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/jails/sshd returns 401 without session."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).get("/api/config/jails/sshd")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PUT /api/config/jails/{name}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestUpdateJailConfig:
|
|
"""Tests for ``PUT /api/config/jails/{name}``."""
|
|
|
|
async def test_204_on_success(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/jails/sshd returns 204 on success."""
|
|
with patch(
|
|
"app.routers.config.config_service.update_jail_config",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/jails/sshd",
|
|
json={"ban_time": 3600},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/jails/missing returns 404."""
|
|
from app.services.config_service import JailNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.config.config_service.update_jail_config",
|
|
AsyncMock(side_effect=JailNotFoundError("missing")),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/jails/missing",
|
|
json={"ban_time": 3600},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_422_on_invalid_regex(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/jails/sshd returns 422 for invalid regex pattern."""
|
|
from app.services.config_service import ConfigValidationError
|
|
|
|
with patch(
|
|
"app.routers.config.config_service.update_jail_config",
|
|
AsyncMock(side_effect=ConfigValidationError("bad regex")),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/jails/sshd",
|
|
json={"fail_regex": ["[bad"]},
|
|
)
|
|
|
|
assert resp.status_code == 422
|
|
|
|
async def test_400_on_config_operation_error(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/jails/sshd returns 400 when set command fails."""
|
|
from app.services.config_service import ConfigOperationError
|
|
|
|
with patch(
|
|
"app.routers.config.config_service.update_jail_config",
|
|
AsyncMock(side_effect=ConfigOperationError("set failed")),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/jails/sshd",
|
|
json={"ban_time": 3600},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_204_with_dns_mode(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/jails/sshd accepts dns_mode field."""
|
|
with patch(
|
|
"app.routers.config.config_service.update_jail_config",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/jails/sshd",
|
|
json={"dns_mode": "no"},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_204_with_prefregex(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/jails/sshd accepts prefregex field."""
|
|
with patch(
|
|
"app.routers.config.config_service.update_jail_config",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/jails/sshd",
|
|
json={"prefregex": r"^%(__prefix_line)s"},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_204_with_date_pattern(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/jails/sshd accepts date_pattern field."""
|
|
with patch(
|
|
"app.routers.config.config_service.update_jail_config",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/jails/sshd",
|
|
json={"date_pattern": "%Y-%m-%d %H:%M:%S"},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/global
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetGlobalConfig:
|
|
"""Tests for ``GET /api/config/global``."""
|
|
|
|
async def test_200_returns_global_config(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/global returns 200 with GlobalConfigResponse."""
|
|
mock_response = GlobalConfigResponse(
|
|
log_level="WARNING",
|
|
log_target="/var/log/fail2ban.log",
|
|
db_purge_age=86400,
|
|
db_max_matches=10,
|
|
)
|
|
with patch(
|
|
"app.routers.config.config_service.get_global_config",
|
|
AsyncMock(return_value=mock_response),
|
|
):
|
|
resp = await config_client.get("/api/config/global")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["log_level"] == "WARNING"
|
|
assert data["db_purge_age"] == 86400
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/global returns 401 without session."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).get("/api/config/global")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PUT /api/config/global
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestUpdateGlobalConfig:
|
|
"""Tests for ``PUT /api/config/global``."""
|
|
|
|
async def test_204_on_success(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/global returns 204 on success."""
|
|
with patch(
|
|
"app.routers.config.config_service.update_global_config",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/global",
|
|
json={"log_level": "DEBUG"},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_400_on_operation_error(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/global returns 400 when set command fails."""
|
|
from app.services.config_service import ConfigOperationError
|
|
|
|
with patch(
|
|
"app.routers.config.config_service.update_global_config",
|
|
AsyncMock(side_effect=ConfigOperationError("set failed")),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/global",
|
|
json={"log_level": "INFO"},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/config/reload
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestReloadFail2ban:
|
|
"""Tests for ``POST /api/config/reload``."""
|
|
|
|
async def test_204_on_success(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/reload returns 204 on success."""
|
|
with patch(
|
|
"app.routers.config.jail_service.reload_all",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await config_client.post("/api/config/reload")
|
|
|
|
assert resp.status_code == 204
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/config/regex-test
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRegexTest:
|
|
"""Tests for ``POST /api/config/regex-test``."""
|
|
|
|
async def test_200_matched(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/regex-test returns matched=true for a valid match."""
|
|
mock_response = RegexTestResponse(matched=True, groups=["1.2.3.4"], error=None)
|
|
with patch(
|
|
"app.routers.config.config_service.test_regex",
|
|
return_value=mock_response,
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/regex-test",
|
|
json={
|
|
"log_line": "fail from 1.2.3.4",
|
|
"fail_regex": r"(\d+\.\d+\.\d+\.\d+)",
|
|
},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["matched"] is True
|
|
|
|
async def test_200_not_matched(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/regex-test returns matched=false for no match."""
|
|
mock_response = RegexTestResponse(matched=False, groups=[], error=None)
|
|
with patch(
|
|
"app.routers.config.config_service.test_regex",
|
|
return_value=mock_response,
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/regex-test",
|
|
json={"log_line": "ok line", "fail_regex": r"FAIL"},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["matched"] is False
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/regex-test returns 401 without session."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).post(
|
|
"/api/config/regex-test",
|
|
json={"log_line": "test", "fail_regex": "test"},
|
|
)
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/config/jails/{name}/logpath
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAddLogPath:
|
|
"""Tests for ``POST /api/config/jails/{name}/logpath``."""
|
|
|
|
async def test_204_on_success(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/sshd/logpath returns 204 on success."""
|
|
with patch(
|
|
"app.routers.config.config_service.add_log_path",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/sshd/logpath",
|
|
json={"log_path": "/var/log/specific.log", "tail": True},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/missing/logpath returns 404."""
|
|
from app.services.config_service import JailNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.config.config_service.add_log_path",
|
|
AsyncMock(side_effect=JailNotFoundError("missing")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/missing/logpath",
|
|
json={"log_path": "/var/log/test.log"},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/config/preview-log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPreviewLog:
|
|
"""Tests for ``POST /api/config/preview-log``."""
|
|
|
|
async def test_200_returns_preview(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/preview-log returns 200 with LogPreviewResponse."""
|
|
from app.models.config import LogPreviewLine, LogPreviewResponse
|
|
|
|
mock_response = LogPreviewResponse(
|
|
lines=[LogPreviewLine(line="fail line", matched=True, groups=[])],
|
|
total_lines=1,
|
|
matched_count=1,
|
|
)
|
|
with patch(
|
|
"app.routers.config.config_service.preview_log",
|
|
AsyncMock(return_value=mock_response),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/preview-log",
|
|
json={"log_path": "/var/log/test.log", "fail_regex": "fail"},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total_lines"] == 1
|
|
assert data["matched_count"] == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/map-color-thresholds
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetMapColorThresholds:
|
|
"""Tests for ``GET /api/config/map-color-thresholds``."""
|
|
|
|
async def test_200_returns_thresholds(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/map-color-thresholds returns 200 with current values."""
|
|
resp = await config_client.get("/api/config/map-color-thresholds")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "threshold_high" in data
|
|
assert "threshold_medium" in data
|
|
assert "threshold_low" in data
|
|
# Should return defaults after setup
|
|
assert data["threshold_high"] == 100
|
|
assert data["threshold_medium"] == 50
|
|
assert data["threshold_low"] == 20
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PUT /api/config/map-color-thresholds
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestUpdateMapColorThresholds:
|
|
"""Tests for ``PUT /api/config/map-color-thresholds``."""
|
|
|
|
async def test_200_updates_thresholds(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/map-color-thresholds returns 200 and updates settings."""
|
|
update_payload = {
|
|
"threshold_high": 200,
|
|
"threshold_medium": 80,
|
|
"threshold_low": 30,
|
|
}
|
|
resp = await config_client.put(
|
|
"/api/config/map-color-thresholds", json=update_payload
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["threshold_high"] == 200
|
|
assert data["threshold_medium"] == 80
|
|
assert data["threshold_low"] == 30
|
|
|
|
# Verify the values persist
|
|
get_resp = await config_client.get("/api/config/map-color-thresholds")
|
|
assert get_resp.status_code == 200
|
|
get_data = get_resp.json()
|
|
assert get_data["threshold_high"] == 200
|
|
assert get_data["threshold_medium"] == 80
|
|
assert get_data["threshold_low"] == 30
|
|
|
|
async def test_400_for_invalid_order(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/map-color-thresholds returns 400 if thresholds are misordered."""
|
|
invalid_payload = {
|
|
"threshold_high": 50,
|
|
"threshold_medium": 50,
|
|
"threshold_low": 20,
|
|
}
|
|
resp = await config_client.put(
|
|
"/api/config/map-color-thresholds", json=invalid_payload
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
assert "high > medium > low" in resp.json()["detail"]
|
|
|
|
async def test_400_for_non_positive_values(
|
|
self, config_client: AsyncClient
|
|
) -> None:
|
|
"""PUT /api/config/map-color-thresholds returns 422 for non-positive values (Pydantic validation)."""
|
|
invalid_payload = {
|
|
"threshold_high": 100,
|
|
"threshold_medium": 50,
|
|
"threshold_low": 0,
|
|
}
|
|
resp = await config_client.put(
|
|
"/api/config/map-color-thresholds", json=invalid_payload
|
|
)
|
|
|
|
# Pydantic validates ge=1 constraint before our service code runs
|
|
assert resp.status_code == 422
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/jails/inactive
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetInactiveJails:
|
|
"""Tests for ``GET /api/config/jails/inactive``."""
|
|
|
|
async def test_200_returns_inactive_list(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/jails/inactive returns 200 with InactiveJailListResponse."""
|
|
from app.models.config import InactiveJail, InactiveJailListResponse
|
|
|
|
mock_jail = InactiveJail(
|
|
name="apache-auth",
|
|
filter="apache-auth",
|
|
actions=[],
|
|
port="http,https",
|
|
logpath=["/var/log/apache2/error.log"],
|
|
bantime="10m",
|
|
findtime="5m",
|
|
maxretry=5,
|
|
source_file="/etc/fail2ban/jail.conf",
|
|
enabled=False,
|
|
)
|
|
mock_response = InactiveJailListResponse(jails=[mock_jail], total=1)
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.list_inactive_jails",
|
|
AsyncMock(return_value=mock_response),
|
|
):
|
|
resp = await config_client.get("/api/config/jails/inactive")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 1
|
|
assert data["jails"][0]["name"] == "apache-auth"
|
|
|
|
async def test_200_empty_list(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/jails/inactive returns 200 with empty list."""
|
|
from app.models.config import InactiveJailListResponse
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.list_inactive_jails",
|
|
AsyncMock(return_value=InactiveJailListResponse(jails=[], total=0)),
|
|
):
|
|
resp = await config_client.get("/api/config/jails/inactive")
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["total"] == 0
|
|
assert resp.json()["jails"] == []
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/jails/inactive returns 401 without a valid session."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).get("/api/config/jails/inactive")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/config/jails/{name}/activate
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestActivateJail:
|
|
"""Tests for ``POST /api/config/jails/{name}/activate``."""
|
|
|
|
async def test_200_activates_jail(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/apache-auth/activate returns 200."""
|
|
from app.models.config import JailActivationResponse
|
|
|
|
mock_response = JailActivationResponse(
|
|
name="apache-auth",
|
|
active=True,
|
|
message="Jail 'apache-auth' activated successfully.",
|
|
)
|
|
with patch(
|
|
"app.routers.config.config_file_service.activate_jail",
|
|
AsyncMock(return_value=mock_response),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/apache-auth/activate", json={}
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["active"] is True
|
|
assert data["name"] == "apache-auth"
|
|
|
|
async def test_200_with_overrides(self, config_client: AsyncClient) -> None:
|
|
"""POST .../activate accepts override fields."""
|
|
from app.models.config import JailActivationResponse
|
|
|
|
mock_response = JailActivationResponse(
|
|
name="apache-auth", active=True, message="Activated."
|
|
)
|
|
with patch(
|
|
"app.routers.config.config_file_service.activate_jail",
|
|
AsyncMock(return_value=mock_response),
|
|
) as mock_activate:
|
|
resp = await config_client.post(
|
|
"/api/config/jails/apache-auth/activate",
|
|
json={"bantime": "1h", "maxretry": 3},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
# Verify the override values were passed to the service
|
|
called_req = mock_activate.call_args.args[3]
|
|
assert called_req.bantime == "1h"
|
|
assert called_req.maxretry == 3
|
|
|
|
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/missing/activate returns 404."""
|
|
from app.services.config_file_service import JailNotFoundInConfigError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.activate_jail",
|
|
AsyncMock(side_effect=JailNotFoundInConfigError("missing")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/missing/activate", json={}
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_409_when_already_active(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/sshd/activate returns 409 if already active."""
|
|
from app.services.config_file_service import JailAlreadyActiveError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.activate_jail",
|
|
AsyncMock(side_effect=JailAlreadyActiveError("sshd")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/sshd/activate", json={}
|
|
)
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_400_for_invalid_jail_name(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/ with bad name returns 400."""
|
|
from app.services.config_file_service import JailNameError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.activate_jail",
|
|
AsyncMock(side_effect=JailNameError("bad name")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/bad-name/activate", json={}
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/sshd/activate returns 401 without session."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).post("/api/config/jails/sshd/activate", json={})
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/config/jails/{name}/deactivate
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDeactivateJail:
|
|
"""Tests for ``POST /api/config/jails/{name}/deactivate``."""
|
|
|
|
async def test_200_deactivates_jail(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/sshd/deactivate returns 200."""
|
|
from app.models.config import JailActivationResponse
|
|
|
|
mock_response = JailActivationResponse(
|
|
name="sshd",
|
|
active=False,
|
|
message="Jail 'sshd' deactivated successfully.",
|
|
)
|
|
with patch(
|
|
"app.routers.config.config_file_service.deactivate_jail",
|
|
AsyncMock(return_value=mock_response),
|
|
):
|
|
resp = await config_client.post("/api/config/jails/sshd/deactivate")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["active"] is False
|
|
assert data["name"] == "sshd"
|
|
|
|
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/missing/deactivate returns 404."""
|
|
from app.services.config_file_service import JailNotFoundInConfigError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.deactivate_jail",
|
|
AsyncMock(side_effect=JailNotFoundInConfigError("missing")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/missing/deactivate"
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_409_when_already_inactive(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/apache-auth/deactivate returns 409 if already inactive."""
|
|
from app.services.config_file_service import JailAlreadyInactiveError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.deactivate_jail",
|
|
AsyncMock(side_effect=JailAlreadyInactiveError("apache-auth")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/apache-auth/deactivate"
|
|
)
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_400_for_invalid_jail_name(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/.../deactivate with bad name returns 400."""
|
|
from app.services.config_file_service import JailNameError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.deactivate_jail",
|
|
AsyncMock(side_effect=JailNameError("bad")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/sshd/deactivate"
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/sshd/deactivate returns 401 without session."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).post("/api/config/jails/sshd/deactivate")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/filters
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_filter_config(name: str, active: bool = False) -> FilterConfig:
|
|
return FilterConfig(
|
|
name=name,
|
|
filename=f"{name}.conf",
|
|
before=None,
|
|
after=None,
|
|
variables={},
|
|
prefregex=None,
|
|
failregex=[],
|
|
ignoreregex=[],
|
|
maxlines=None,
|
|
datepattern=None,
|
|
journalmatch=None,
|
|
active=active,
|
|
used_by_jails=[name] if active else [],
|
|
source_file=f"/etc/fail2ban/filter.d/{name}.conf",
|
|
has_local_override=False,
|
|
)
|
|
|
|
|
|
class TestListFilters:
|
|
"""Tests for ``GET /api/config/filters``."""
|
|
|
|
async def test_200_returns_filter_list(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/filters returns 200 with FilterListResponse."""
|
|
from app.models.config import FilterListResponse
|
|
|
|
mock_response = FilterListResponse(
|
|
filters=[_make_filter_config("sshd", active=True)],
|
|
total=1,
|
|
)
|
|
with patch(
|
|
"app.routers.config.config_file_service.list_filters",
|
|
AsyncMock(return_value=mock_response),
|
|
):
|
|
resp = await config_client.get("/api/config/filters")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 1
|
|
assert data["filters"][0]["name"] == "sshd"
|
|
assert data["filters"][0]["active"] is True
|
|
|
|
async def test_200_empty_filter_list(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/filters returns 200 with empty list when no filters found."""
|
|
from app.models.config import FilterListResponse
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.list_filters",
|
|
AsyncMock(return_value=FilterListResponse(filters=[], total=0)),
|
|
):
|
|
resp = await config_client.get("/api/config/filters")
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["total"] == 0
|
|
assert resp.json()["filters"] == []
|
|
|
|
async def test_active_filters_sorted_before_inactive(
|
|
self, config_client: AsyncClient
|
|
) -> None:
|
|
"""GET /api/config/filters returns active filters before inactive ones."""
|
|
from app.models.config import FilterListResponse
|
|
|
|
mock_response = FilterListResponse(
|
|
filters=[
|
|
_make_filter_config("nginx", active=False),
|
|
_make_filter_config("sshd", active=True),
|
|
],
|
|
total=2,
|
|
)
|
|
with patch(
|
|
"app.routers.config.config_file_service.list_filters",
|
|
AsyncMock(return_value=mock_response),
|
|
):
|
|
resp = await config_client.get("/api/config/filters")
|
|
|
|
data = resp.json()
|
|
assert data["filters"][0]["name"] == "sshd" # active first
|
|
assert data["filters"][1]["name"] == "nginx" # inactive second
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/filters returns 401 without a valid session."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).get("/api/config/filters")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/filters/{name}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetFilter:
|
|
"""Tests for ``GET /api/config/filters/{name}``."""
|
|
|
|
async def test_200_returns_filter(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/filters/sshd returns 200 with FilterConfig."""
|
|
with patch(
|
|
"app.routers.config.config_file_service.get_filter",
|
|
AsyncMock(return_value=_make_filter_config("sshd")),
|
|
):
|
|
resp = await config_client.get("/api/config/filters/sshd")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["name"] == "sshd"
|
|
assert "failregex" in data
|
|
assert "active" in data
|
|
|
|
async def test_404_for_unknown_filter(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/filters/missing returns 404."""
|
|
from app.services.config_file_service import FilterNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.get_filter",
|
|
AsyncMock(side_effect=FilterNotFoundError("missing")),
|
|
):
|
|
resp = await config_client.get("/api/config/filters/missing")
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/filters/sshd returns 401 without session."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).get("/api/config/filters/sshd")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PUT /api/config/filters/{name} (Task 2.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestUpdateFilter:
|
|
"""Tests for ``PUT /api/config/filters/{name}``."""
|
|
|
|
async def test_200_returns_updated_filter(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/filters/sshd returns 200 with updated FilterConfig."""
|
|
with patch(
|
|
"app.routers.config.config_file_service.update_filter",
|
|
AsyncMock(return_value=_make_filter_config("sshd")),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/filters/sshd",
|
|
json={"failregex": [r"^fail from <HOST>"]},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["name"] == "sshd"
|
|
|
|
async def test_404_for_unknown_filter(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/filters/missing returns 404."""
|
|
from app.services.config_file_service import FilterNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.update_filter",
|
|
AsyncMock(side_effect=FilterNotFoundError("missing")),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/filters/missing",
|
|
json={},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_422_for_invalid_regex(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/filters/sshd returns 422 for bad regex."""
|
|
from app.services.config_file_service import FilterInvalidRegexError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.update_filter",
|
|
AsyncMock(side_effect=FilterInvalidRegexError("[bad", "unterminated")),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/filters/sshd",
|
|
json={"failregex": ["[bad"]},
|
|
)
|
|
|
|
assert resp.status_code == 422
|
|
|
|
async def test_400_for_invalid_name(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/filters/... with bad name returns 400."""
|
|
from app.services.config_file_service import FilterNameError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.update_filter",
|
|
AsyncMock(side_effect=FilterNameError("bad")),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/filters/bad",
|
|
json={},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_reload_query_param_passed(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/filters/sshd?reload=true passes do_reload=True."""
|
|
with patch(
|
|
"app.routers.config.config_file_service.update_filter",
|
|
AsyncMock(return_value=_make_filter_config("sshd")),
|
|
) as mock_update:
|
|
resp = await config_client.put(
|
|
"/api/config/filters/sshd?reload=true",
|
|
json={},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
assert mock_update.call_args.kwargs.get("do_reload") is True
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""PUT /api/config/filters/sshd returns 401 without session."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).put("/api/config/filters/sshd", json={})
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/config/filters (Task 2.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCreateFilter:
|
|
"""Tests for ``POST /api/config/filters``."""
|
|
|
|
async def test_201_creates_filter(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/filters returns 201 with FilterConfig."""
|
|
with patch(
|
|
"app.routers.config.config_file_service.create_filter",
|
|
AsyncMock(return_value=_make_filter_config("my-custom")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/filters",
|
|
json={"name": "my-custom", "failregex": [r"^fail from <HOST>"]},
|
|
)
|
|
|
|
assert resp.status_code == 201
|
|
assert resp.json()["name"] == "my-custom"
|
|
|
|
async def test_409_when_already_exists(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/filters returns 409 if filter exists."""
|
|
from app.services.config_file_service import FilterAlreadyExistsError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.create_filter",
|
|
AsyncMock(side_effect=FilterAlreadyExistsError("sshd")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/filters",
|
|
json={"name": "sshd"},
|
|
)
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_422_for_invalid_regex(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/filters returns 422 for bad regex."""
|
|
from app.services.config_file_service import FilterInvalidRegexError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.create_filter",
|
|
AsyncMock(side_effect=FilterInvalidRegexError("[bad", "unterminated")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/filters",
|
|
json={"name": "test", "failregex": ["[bad"]},
|
|
)
|
|
|
|
assert resp.status_code == 422
|
|
|
|
async def test_400_for_invalid_name(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/filters returns 400 for invalid filter name."""
|
|
from app.services.config_file_service import FilterNameError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.create_filter",
|
|
AsyncMock(side_effect=FilterNameError("bad")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/filters",
|
|
json={"name": "bad"},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/filters returns 401 without session."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).post("/api/config/filters", json={"name": "test"})
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DELETE /api/config/filters/{name} (Task 2.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDeleteFilter:
|
|
"""Tests for ``DELETE /api/config/filters/{name}``."""
|
|
|
|
async def test_204_deletes_filter(self, config_client: AsyncClient) -> None:
|
|
"""DELETE /api/config/filters/my-custom returns 204."""
|
|
with patch(
|
|
"app.routers.config.config_file_service.delete_filter",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await config_client.delete("/api/config/filters/my-custom")
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_404_for_unknown_filter(self, config_client: AsyncClient) -> None:
|
|
"""DELETE /api/config/filters/missing returns 404."""
|
|
from app.services.config_file_service import FilterNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.delete_filter",
|
|
AsyncMock(side_effect=FilterNotFoundError("missing")),
|
|
):
|
|
resp = await config_client.delete("/api/config/filters/missing")
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_409_for_readonly_filter(self, config_client: AsyncClient) -> None:
|
|
"""DELETE /api/config/filters/sshd returns 409 for shipped conf-only filter."""
|
|
from app.services.config_file_service import FilterReadonlyError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.delete_filter",
|
|
AsyncMock(side_effect=FilterReadonlyError("sshd")),
|
|
):
|
|
resp = await config_client.delete("/api/config/filters/sshd")
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_400_for_invalid_name(self, config_client: AsyncClient) -> None:
|
|
"""DELETE /api/config/filters/... with bad name returns 400."""
|
|
from app.services.config_file_service import FilterNameError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.delete_filter",
|
|
AsyncMock(side_effect=FilterNameError("bad")),
|
|
):
|
|
resp = await config_client.delete("/api/config/filters/bad")
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""DELETE /api/config/filters/sshd returns 401 without session."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).delete("/api/config/filters/sshd")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/config/jails/{name}/filter (Task 2.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAssignFilterToJail:
|
|
"""Tests for ``POST /api/config/jails/{name}/filter``."""
|
|
|
|
async def test_204_assigns_filter(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/sshd/filter returns 204 on success."""
|
|
with patch(
|
|
"app.routers.config.config_file_service.assign_filter_to_jail",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/sshd/filter",
|
|
json={"filter_name": "myfilter"},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/missing/filter returns 404."""
|
|
from app.services.config_file_service import JailNotFoundInConfigError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.assign_filter_to_jail",
|
|
AsyncMock(side_effect=JailNotFoundInConfigError("missing")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/missing/filter",
|
|
json={"filter_name": "sshd"},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_404_for_unknown_filter(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/sshd/filter returns 404 when filter not found."""
|
|
from app.services.config_file_service import FilterNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.assign_filter_to_jail",
|
|
AsyncMock(side_effect=FilterNotFoundError("missing-filter")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/sshd/filter",
|
|
json={"filter_name": "missing-filter"},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_400_for_invalid_jail_name(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/.../filter with bad jail name returns 400."""
|
|
from app.services.config_file_service import JailNameError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.assign_filter_to_jail",
|
|
AsyncMock(side_effect=JailNameError("bad")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/sshd/filter",
|
|
json={"filter_name": "valid"},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_400_for_invalid_filter_name(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/sshd/filter with bad filter name returns 400."""
|
|
from app.services.config_file_service import FilterNameError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.assign_filter_to_jail",
|
|
AsyncMock(side_effect=FilterNameError("bad")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/sshd/filter",
|
|
json={"filter_name": "../evil"},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_reload_query_param_passed(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/sshd/filter?reload=true passes do_reload=True."""
|
|
with patch(
|
|
"app.routers.config.config_file_service.assign_filter_to_jail",
|
|
AsyncMock(return_value=None),
|
|
) as mock_assign:
|
|
resp = await config_client.post(
|
|
"/api/config/jails/sshd/filter?reload=true",
|
|
json={"filter_name": "sshd"},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
assert mock_assign.call_args.kwargs.get("do_reload") is True
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""POST /api/config/jails/sshd/filter returns 401 without session."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).post("/api/config/jails/sshd/filter", json={"filter_name": "sshd"})
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ===========================================================================
|
|
# Action router tests (Task 3.1 + 3.2)
|
|
# ===========================================================================
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestListActionsRouter:
|
|
async def test_200_returns_action_list(self, config_client: AsyncClient) -> None:
|
|
from app.models.config import ActionConfig, ActionListResponse
|
|
|
|
mock_action = ActionConfig(
|
|
name="iptables",
|
|
filename="iptables.conf",
|
|
actionban="/sbin/iptables -I f2b-<name> 1 -s <ip> -j DROP",
|
|
)
|
|
mock_response = ActionListResponse(actions=[mock_action], total=1)
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.list_actions",
|
|
AsyncMock(return_value=mock_response),
|
|
):
|
|
resp = await config_client.get("/api/config/actions")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 1
|
|
assert data["actions"][0]["name"] == "iptables"
|
|
|
|
async def test_active_sorted_first(self, config_client: AsyncClient) -> None:
|
|
from app.models.config import ActionConfig, ActionListResponse
|
|
|
|
inactive = ActionConfig(name="aaa", filename="aaa.conf", active=False)
|
|
active = ActionConfig(name="zzz", filename="zzz.conf", active=True)
|
|
mock_response = ActionListResponse(actions=[inactive, active], total=2)
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.list_actions",
|
|
AsyncMock(return_value=mock_response),
|
|
):
|
|
resp = await config_client.get("/api/config/actions")
|
|
|
|
data = resp.json()
|
|
assert data["actions"][0]["name"] == "zzz" # active comes first
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).get("/api/config/actions")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestGetActionRouter:
|
|
async def test_200_returns_action(self, config_client: AsyncClient) -> None:
|
|
from app.models.config import ActionConfig
|
|
|
|
mock_action = ActionConfig(
|
|
name="iptables",
|
|
filename="iptables.conf",
|
|
actionban="/sbin/iptables -I f2b-<name> 1 -s <ip> -j DROP",
|
|
)
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.get_action",
|
|
AsyncMock(return_value=mock_action),
|
|
):
|
|
resp = await config_client.get("/api/config/actions/iptables")
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["name"] == "iptables"
|
|
|
|
async def test_404_when_not_found(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import ActionNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.get_action",
|
|
AsyncMock(side_effect=ActionNotFoundError("missing")),
|
|
):
|
|
resp = await config_client.get("/api/config/actions/missing")
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).get("/api/config/actions/iptables")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestUpdateActionRouter:
|
|
async def test_200_returns_updated_action(self, config_client: AsyncClient) -> None:
|
|
from app.models.config import ActionConfig
|
|
|
|
updated = ActionConfig(
|
|
name="iptables",
|
|
filename="iptables.local",
|
|
actionban="echo ban",
|
|
)
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.update_action",
|
|
AsyncMock(return_value=updated),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/actions/iptables",
|
|
json={"actionban": "echo ban"},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["actionban"] == "echo ban"
|
|
|
|
async def test_404_when_not_found(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import ActionNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.update_action",
|
|
AsyncMock(side_effect=ActionNotFoundError("missing")),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/actions/missing", json={}
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_400_for_bad_name(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import ActionNameError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.update_action",
|
|
AsyncMock(side_effect=ActionNameError()),
|
|
):
|
|
resp = await config_client.put(
|
|
"/api/config/actions/badname", json={}
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).put("/api/config/actions/iptables", json={})
|
|
assert resp.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestCreateActionRouter:
|
|
async def test_201_returns_created_action(self, config_client: AsyncClient) -> None:
|
|
from app.models.config import ActionConfig
|
|
|
|
created = ActionConfig(
|
|
name="custom",
|
|
filename="custom.local",
|
|
actionban="echo ban",
|
|
)
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.create_action",
|
|
AsyncMock(return_value=created),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/actions",
|
|
json={"name": "custom", "actionban": "echo ban"},
|
|
)
|
|
|
|
assert resp.status_code == 201
|
|
assert resp.json()["name"] == "custom"
|
|
|
|
async def test_409_when_already_exists(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import ActionAlreadyExistsError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.create_action",
|
|
AsyncMock(side_effect=ActionAlreadyExistsError("iptables")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/actions",
|
|
json={"name": "iptables"},
|
|
)
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_400_for_bad_name(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import ActionNameError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.create_action",
|
|
AsyncMock(side_effect=ActionNameError()),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/actions",
|
|
json={"name": "badname"},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).post("/api/config/actions", json={"name": "x"})
|
|
assert resp.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestDeleteActionRouter:
|
|
async def test_204_on_delete(self, config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.config.config_file_service.delete_action",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await config_client.delete("/api/config/actions/custom")
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_404_when_not_found(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import ActionNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.delete_action",
|
|
AsyncMock(side_effect=ActionNotFoundError("missing")),
|
|
):
|
|
resp = await config_client.delete("/api/config/actions/missing")
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_409_when_readonly(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import ActionReadonlyError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.delete_action",
|
|
AsyncMock(side_effect=ActionReadonlyError("iptables")),
|
|
):
|
|
resp = await config_client.delete("/api/config/actions/iptables")
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_400_for_bad_name(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import ActionNameError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.delete_action",
|
|
AsyncMock(side_effect=ActionNameError()),
|
|
):
|
|
resp = await config_client.delete("/api/config/actions/badname")
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).delete("/api/config/actions/iptables")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestAssignActionToJailRouter:
|
|
async def test_204_on_success(self, config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.config.config_file_service.assign_action_to_jail",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/sshd/action",
|
|
json={"action_name": "iptables"},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_404_when_jail_not_found(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import JailNotFoundInConfigError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.assign_action_to_jail",
|
|
AsyncMock(side_effect=JailNotFoundInConfigError("missing")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/missing/action",
|
|
json={"action_name": "iptables"},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_404_when_action_not_found(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import ActionNotFoundError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.assign_action_to_jail",
|
|
AsyncMock(side_effect=ActionNotFoundError("missing")),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/sshd/action",
|
|
json={"action_name": "missing"},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_400_for_bad_jail_name(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import JailNameError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.assign_action_to_jail",
|
|
AsyncMock(side_effect=JailNameError()),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/badjailname/action",
|
|
json={"action_name": "iptables"},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_400_for_bad_action_name(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import ActionNameError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.assign_action_to_jail",
|
|
AsyncMock(side_effect=ActionNameError()),
|
|
):
|
|
resp = await config_client.post(
|
|
"/api/config/jails/sshd/action",
|
|
json={"action_name": "badaction"},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_reload_param_passed(self, config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.config.config_file_service.assign_action_to_jail",
|
|
AsyncMock(return_value=None),
|
|
) as mock_assign:
|
|
resp = await config_client.post(
|
|
"/api/config/jails/sshd/action?reload=true",
|
|
json={"action_name": "iptables"},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
assert mock_assign.call_args.kwargs.get("do_reload") is True
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).post("/api/config/jails/sshd/action", json={"action_name": "iptables"})
|
|
assert resp.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestRemoveActionFromJailRouter:
|
|
async def test_204_on_success(self, config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.config.config_file_service.remove_action_from_jail",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await config_client.delete(
|
|
"/api/config/jails/sshd/action/iptables"
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_404_when_jail_not_found(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import JailNotFoundInConfigError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.remove_action_from_jail",
|
|
AsyncMock(side_effect=JailNotFoundInConfigError("missing")),
|
|
):
|
|
resp = await config_client.delete(
|
|
"/api/config/jails/missing/action/iptables"
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_400_for_bad_jail_name(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import JailNameError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.remove_action_from_jail",
|
|
AsyncMock(side_effect=JailNameError()),
|
|
):
|
|
resp = await config_client.delete(
|
|
"/api/config/jails/badjailname/action/iptables"
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_400_for_bad_action_name(self, config_client: AsyncClient) -> None:
|
|
from app.services.config_file_service import ActionNameError
|
|
|
|
with patch(
|
|
"app.routers.config.config_file_service.remove_action_from_jail",
|
|
AsyncMock(side_effect=ActionNameError()),
|
|
):
|
|
resp = await config_client.delete(
|
|
"/api/config/jails/sshd/action/badactionname"
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_reload_param_passed(self, config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.config.config_file_service.remove_action_from_jail",
|
|
AsyncMock(return_value=None),
|
|
) as mock_rm:
|
|
resp = await config_client.delete(
|
|
"/api/config/jails/sshd/action/iptables?reload=true"
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
assert mock_rm.call_args.kwargs.get("do_reload") is True
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).delete("/api/config/jails/sshd/action/iptables")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/fail2ban-log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetFail2BanLog:
|
|
"""Tests for ``GET /api/config/fail2ban-log``."""
|
|
|
|
def _mock_log_response(self) -> Fail2BanLogResponse:
|
|
return Fail2BanLogResponse(
|
|
log_path="/var/log/fail2ban.log",
|
|
lines=["2025-01-01 INFO sshd Found 1.2.3.4", "2025-01-01 ERROR oops"],
|
|
total_lines=100,
|
|
log_level="INFO",
|
|
log_target="/var/log/fail2ban.log",
|
|
)
|
|
|
|
async def test_200_returns_log_response(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/fail2ban-log returns 200 with Fail2BanLogResponse."""
|
|
with patch(
|
|
"app.routers.config.config_service.read_fail2ban_log",
|
|
AsyncMock(return_value=self._mock_log_response()),
|
|
):
|
|
resp = await config_client.get("/api/config/fail2ban-log")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["log_path"] == "/var/log/fail2ban.log"
|
|
assert isinstance(data["lines"], list)
|
|
assert data["total_lines"] == 100
|
|
assert data["log_level"] == "INFO"
|
|
|
|
async def test_200_passes_lines_query_param(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/fail2ban-log passes the lines query param to the service."""
|
|
with patch(
|
|
"app.routers.config.config_service.read_fail2ban_log",
|
|
AsyncMock(return_value=self._mock_log_response()),
|
|
) as mock_fn:
|
|
resp = await config_client.get("/api/config/fail2ban-log?lines=500")
|
|
|
|
assert resp.status_code == 200
|
|
_socket, lines_arg, _filter = mock_fn.call_args.args
|
|
assert lines_arg == 500
|
|
|
|
async def test_200_passes_filter_query_param(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/fail2ban-log passes the filter query param to the service."""
|
|
with patch(
|
|
"app.routers.config.config_service.read_fail2ban_log",
|
|
AsyncMock(return_value=self._mock_log_response()),
|
|
) as mock_fn:
|
|
resp = await config_client.get("/api/config/fail2ban-log?filter=ERROR")
|
|
|
|
assert resp.status_code == 200
|
|
_socket, _lines, filter_arg = mock_fn.call_args.args
|
|
assert filter_arg == "ERROR"
|
|
|
|
async def test_400_when_non_file_target(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/fail2ban-log returns 400 when log target is not a file."""
|
|
from app.services.config_service import ConfigOperationError
|
|
|
|
with patch(
|
|
"app.routers.config.config_service.read_fail2ban_log",
|
|
AsyncMock(side_effect=ConfigOperationError("fail2ban is logging to 'STDOUT'")),
|
|
):
|
|
resp = await config_client.get("/api/config/fail2ban-log")
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_400_when_path_traversal_detected(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/fail2ban-log returns 400 when the path is outside safe dirs."""
|
|
from app.services.config_service import ConfigOperationError
|
|
|
|
with patch(
|
|
"app.routers.config.config_service.read_fail2ban_log",
|
|
AsyncMock(side_effect=ConfigOperationError("outside the allowed directory")),
|
|
):
|
|
resp = await config_client.get("/api/config/fail2ban-log")
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_502_when_fail2ban_unreachable(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/fail2ban-log returns 502 when fail2ban is down."""
|
|
from app.utils.fail2ban_client import Fail2BanConnectionError
|
|
|
|
with patch(
|
|
"app.routers.config.config_service.read_fail2ban_log",
|
|
AsyncMock(side_effect=Fail2BanConnectionError("socket error", "/tmp/f.sock")),
|
|
):
|
|
resp = await config_client.get("/api/config/fail2ban-log")
|
|
|
|
assert resp.status_code == 502
|
|
|
|
async def test_422_for_lines_exceeding_max(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/fail2ban-log returns 422 for lines > 2000."""
|
|
resp = await config_client.get("/api/config/fail2ban-log?lines=9999")
|
|
assert resp.status_code == 422
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/fail2ban-log requires authentication."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).get("/api/config/fail2ban-log")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/service-status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetServiceStatus:
|
|
"""Tests for ``GET /api/config/service-status``."""
|
|
|
|
def _mock_status(self, online: bool = True) -> ServiceStatusResponse:
|
|
return ServiceStatusResponse(
|
|
online=online,
|
|
version="1.0.0" if online else None,
|
|
jail_count=2 if online else 0,
|
|
total_bans=10 if online else 0,
|
|
total_failures=3 if online else 0,
|
|
log_level="INFO" if online else "UNKNOWN",
|
|
log_target="/var/log/fail2ban.log" if online else "UNKNOWN",
|
|
)
|
|
|
|
async def test_200_when_online(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/service-status returns 200 with full status when online."""
|
|
with patch(
|
|
"app.routers.config.config_service.get_service_status",
|
|
AsyncMock(return_value=self._mock_status(online=True)),
|
|
):
|
|
resp = await config_client.get("/api/config/service-status")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["online"] is True
|
|
assert data["jail_count"] == 2
|
|
assert data["log_level"] == "INFO"
|
|
|
|
async def test_200_when_offline(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/service-status returns 200 with offline=False when daemon is down."""
|
|
with patch(
|
|
"app.routers.config.config_service.get_service_status",
|
|
AsyncMock(return_value=self._mock_status(online=False)),
|
|
):
|
|
resp = await config_client.get("/api/config/service-status")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["online"] is False
|
|
assert data["log_level"] == "UNKNOWN"
|
|
|
|
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
|
|
"""GET /api/config/service-status requires authentication."""
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).get("/api/config/service-status")
|
|
assert resp.status_code == 401
|
|
|