Stage 7: configuration view — backend service, routers, tests, and frontend

- config_service.py: read/write jail config via asyncio.gather, global
  settings, in-process regex validation, log preview via _read_tail_lines
- server_service.py: read/write server settings, flush logs
- config router: 9 endpoints for jail/global config, regex-test,
  logpath management, log preview
- server router: GET/PUT settings, POST flush-logs
- models/config.py expanded with JailConfig, GlobalConfigUpdate,
  LogPreview* models
- 285 tests pass (68 new), ruff clean, mypy clean (44 files)
- Frontend: types/config.ts, api/config.ts, hooks/useConfig.ts,
  ConfigPage.tsx full implementation (Jails accordion editor,
  Global config, Server settings, Regex Tester with preview)
- Fixed pre-existing frontend lint: JSX.Element → React.JSX.Element
  (10 files), void/promise patterns in useServerStatus + useJails,
  no-misused-spread in client.ts, eslint.config.ts self-excluded
This commit is contained in:
2026-03-01 14:37:55 +01:00
parent ebec5e0f58
commit 7f81f0614b
33 changed files with 4488 additions and 82 deletions

View File

@@ -0,0 +1,449 @@
"""Tests for the config router endpoints."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import aiosqlite
import pytest
from httpx import ASGITransport, AsyncClient
from app.config import Settings
from app.db import init_db
from app.main import create_app
from app.models.config import (
GlobalConfigResponse,
JailConfig,
JailConfigListResponse,
JailConfigResponse,
RegexTestResponse,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
_SETUP_PAYLOAD = {
"master_password": "testpassword1",
"database_path": "bangui.db",
"fail2ban_socket": "/var/run/fail2ban/fail2ban.sock",
"timezone": "UTC",
"session_duration_minutes": 60,
}
@pytest.fixture
async def config_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc]
"""Provide an authenticated ``AsyncClient`` for config endpoint tests."""
settings = Settings(
database_path=str(tmp_path / "config_test.db"),
fail2ban_socket="/tmp/fake.sock",
session_secret="test-config-secret",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
app = create_app(settings=settings)
db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path)
db.row_factory = aiosqlite.Row
await init_db(db)
app.state.db = db
app.state.http_session = MagicMock()
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
await ac.post("/api/setup", json=_SETUP_PAYLOAD)
login = await ac.post(
"/api/auth/login",
json={"password": _SETUP_PAYLOAD["master_password"]},
)
assert login.status_code == 200
yield ac
await db.close()
def _make_jail_config(name: str = "sshd") -> JailConfig:
return JailConfig(
name=name,
ban_time=600,
max_retry=5,
find_time=600,
fail_regex=["regex1"],
ignore_regex=[],
log_paths=["/var/log/auth.log"],
date_pattern=None,
log_encoding="UTF-8",
backend="polling",
actions=["iptables"],
)
# ---------------------------------------------------------------------------
# GET /api/config/jails
# ---------------------------------------------------------------------------
class TestGetJailConfigs:
"""Tests for ``GET /api/config/jails``."""
async def test_200_returns_jail_list(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails returns 200 with JailConfigListResponse."""
mock_response = JailConfigListResponse(
jails=[_make_jail_config("sshd")], total=1
)
with patch(
"app.routers.config.config_service.list_jail_configs",
AsyncMock(return_value=mock_response),
):
resp = await config_client.get("/api/config/jails")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert data["jails"][0]["name"] == "sshd"
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails returns 401 without a valid session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/jails")
assert resp.status_code == 401
async def test_502_on_connection_error(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails returns 502 when fail2ban is unreachable."""
from app.utils.fail2ban_client import Fail2BanConnectionError
with patch(
"app.routers.config.config_service.list_jail_configs",
AsyncMock(side_effect=Fail2BanConnectionError("down", "/tmp/fake.sock")),
):
resp = await config_client.get("/api/config/jails")
assert resp.status_code == 502
# ---------------------------------------------------------------------------
# GET /api/config/jails/{name}
# ---------------------------------------------------------------------------
class TestGetJailConfig:
"""Tests for ``GET /api/config/jails/{name}``."""
async def test_200_returns_jail_config(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails/sshd returns 200 with JailConfigResponse."""
mock_response = JailConfigResponse(jail=_make_jail_config("sshd"))
with patch(
"app.routers.config.config_service.get_jail_config",
AsyncMock(return_value=mock_response),
):
resp = await config_client.get("/api/config/jails/sshd")
assert resp.status_code == 200
assert resp.json()["jail"]["name"] == "sshd"
assert resp.json()["jail"]["ban_time"] == 600
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails/missing returns 404."""
from app.services.config_service import JailNotFoundError
with patch(
"app.routers.config.config_service.get_jail_config",
AsyncMock(side_effect=JailNotFoundError("missing")),
):
resp = await config_client.get("/api/config/jails/missing")
assert resp.status_code == 404
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""GET /api/config/jails/sshd returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/jails/sshd")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# PUT /api/config/jails/{name}
# ---------------------------------------------------------------------------
class TestUpdateJailConfig:
"""Tests for ``PUT /api/config/jails/{name}``."""
async def test_204_on_success(self, config_client: AsyncClient) -> None:
"""PUT /api/config/jails/sshd returns 204 on success."""
with patch(
"app.routers.config.config_service.update_jail_config",
AsyncMock(return_value=None),
):
resp = await config_client.put(
"/api/config/jails/sshd",
json={"ban_time": 3600},
)
assert resp.status_code == 204
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
"""PUT /api/config/jails/missing returns 404."""
from app.services.config_service import JailNotFoundError
with patch(
"app.routers.config.config_service.update_jail_config",
AsyncMock(side_effect=JailNotFoundError("missing")),
):
resp = await config_client.put(
"/api/config/jails/missing",
json={"ban_time": 3600},
)
assert resp.status_code == 404
async def test_422_on_invalid_regex(self, config_client: AsyncClient) -> None:
"""PUT /api/config/jails/sshd returns 422 for invalid regex pattern."""
from app.services.config_service import ConfigValidationError
with patch(
"app.routers.config.config_service.update_jail_config",
AsyncMock(side_effect=ConfigValidationError("bad regex")),
):
resp = await config_client.put(
"/api/config/jails/sshd",
json={"fail_regex": ["[bad"]},
)
assert resp.status_code == 422
async def test_400_on_config_operation_error(self, config_client: AsyncClient) -> None:
"""PUT /api/config/jails/sshd returns 400 when set command fails."""
from app.services.config_service import ConfigOperationError
with patch(
"app.routers.config.config_service.update_jail_config",
AsyncMock(side_effect=ConfigOperationError("set failed")),
):
resp = await config_client.put(
"/api/config/jails/sshd",
json={"ban_time": 3600},
)
assert resp.status_code == 400
# ---------------------------------------------------------------------------
# GET /api/config/global
# ---------------------------------------------------------------------------
class TestGetGlobalConfig:
"""Tests for ``GET /api/config/global``."""
async def test_200_returns_global_config(self, config_client: AsyncClient) -> None:
"""GET /api/config/global returns 200 with GlobalConfigResponse."""
mock_response = GlobalConfigResponse(
log_level="WARNING",
log_target="/var/log/fail2ban.log",
db_purge_age=86400,
db_max_matches=10,
)
with patch(
"app.routers.config.config_service.get_global_config",
AsyncMock(return_value=mock_response),
):
resp = await config_client.get("/api/config/global")
assert resp.status_code == 200
data = resp.json()
assert data["log_level"] == "WARNING"
assert data["db_purge_age"] == 86400
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""GET /api/config/global returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/global")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# PUT /api/config/global
# ---------------------------------------------------------------------------
class TestUpdateGlobalConfig:
"""Tests for ``PUT /api/config/global``."""
async def test_204_on_success(self, config_client: AsyncClient) -> None:
"""PUT /api/config/global returns 204 on success."""
with patch(
"app.routers.config.config_service.update_global_config",
AsyncMock(return_value=None),
):
resp = await config_client.put(
"/api/config/global",
json={"log_level": "DEBUG"},
)
assert resp.status_code == 204
async def test_400_on_operation_error(self, config_client: AsyncClient) -> None:
"""PUT /api/config/global returns 400 when set command fails."""
from app.services.config_service import ConfigOperationError
with patch(
"app.routers.config.config_service.update_global_config",
AsyncMock(side_effect=ConfigOperationError("set failed")),
):
resp = await config_client.put(
"/api/config/global",
json={"log_level": "INFO"},
)
assert resp.status_code == 400
# ---------------------------------------------------------------------------
# POST /api/config/reload
# ---------------------------------------------------------------------------
class TestReloadFail2ban:
"""Tests for ``POST /api/config/reload``."""
async def test_204_on_success(self, config_client: AsyncClient) -> None:
"""POST /api/config/reload returns 204 on success."""
with patch(
"app.routers.config.jail_service.reload_all",
AsyncMock(return_value=None),
):
resp = await config_client.post("/api/config/reload")
assert resp.status_code == 204
# ---------------------------------------------------------------------------
# POST /api/config/regex-test
# ---------------------------------------------------------------------------
class TestRegexTest:
"""Tests for ``POST /api/config/regex-test``."""
async def test_200_matched(self, config_client: AsyncClient) -> None:
"""POST /api/config/regex-test returns matched=true for a valid match."""
mock_response = RegexTestResponse(matched=True, groups=["1.2.3.4"], error=None)
with patch(
"app.routers.config.config_service.test_regex",
return_value=mock_response,
):
resp = await config_client.post(
"/api/config/regex-test",
json={
"log_line": "fail from 1.2.3.4",
"fail_regex": r"(\d+\.\d+\.\d+\.\d+)",
},
)
assert resp.status_code == 200
assert resp.json()["matched"] is True
async def test_200_not_matched(self, config_client: AsyncClient) -> None:
"""POST /api/config/regex-test returns matched=false for no match."""
mock_response = RegexTestResponse(matched=False, groups=[], error=None)
with patch(
"app.routers.config.config_service.test_regex",
return_value=mock_response,
):
resp = await config_client.post(
"/api/config/regex-test",
json={"log_line": "ok line", "fail_regex": r"FAIL"},
)
assert resp.status_code == 200
assert resp.json()["matched"] is False
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
"""POST /api/config/regex-test returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).post(
"/api/config/regex-test",
json={"log_line": "test", "fail_regex": "test"},
)
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# POST /api/config/jails/{name}/logpath
# ---------------------------------------------------------------------------
class TestAddLogPath:
"""Tests for ``POST /api/config/jails/{name}/logpath``."""
async def test_204_on_success(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/sshd/logpath returns 204 on success."""
with patch(
"app.routers.config.config_service.add_log_path",
AsyncMock(return_value=None),
):
resp = await config_client.post(
"/api/config/jails/sshd/logpath",
json={"log_path": "/var/log/specific.log", "tail": True},
)
assert resp.status_code == 204
async def test_404_for_unknown_jail(self, config_client: AsyncClient) -> None:
"""POST /api/config/jails/missing/logpath returns 404."""
from app.services.config_service import JailNotFoundError
with patch(
"app.routers.config.config_service.add_log_path",
AsyncMock(side_effect=JailNotFoundError("missing")),
):
resp = await config_client.post(
"/api/config/jails/missing/logpath",
json={"log_path": "/var/log/test.log"},
)
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# POST /api/config/preview-log
# ---------------------------------------------------------------------------
class TestPreviewLog:
"""Tests for ``POST /api/config/preview-log``."""
async def test_200_returns_preview(self, config_client: AsyncClient) -> None:
"""POST /api/config/preview-log returns 200 with LogPreviewResponse."""
from app.models.config import LogPreviewLine, LogPreviewResponse
mock_response = LogPreviewResponse(
lines=[LogPreviewLine(line="fail line", matched=True, groups=[])],
total_lines=1,
matched_count=1,
)
with patch(
"app.routers.config.config_service.preview_log",
AsyncMock(return_value=mock_response),
):
resp = await config_client.post(
"/api/config/preview-log",
json={"log_path": "/var/log/test.log", "fail_regex": "fail"},
)
assert resp.status_code == 200
data = resp.json()
assert data["total_lines"] == 1
assert data["matched_count"] == 1

View File

@@ -12,7 +12,7 @@ from httpx import ASGITransport, AsyncClient
from app.config import Settings
from app.db import init_db
from app.main import create_app
from app.models.jail import JailCommandResponse, JailDetailResponse, JailListResponse, JailStatus, JailSummary, Jail
from app.models.jail import Jail, JailDetailResponse, JailListResponse, JailStatus, JailSummary
# ---------------------------------------------------------------------------
# Fixtures

View File

@@ -0,0 +1,227 @@
"""Tests for the server settings router endpoints."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import aiosqlite
import pytest
from httpx import ASGITransport, AsyncClient
from app.config import Settings
from app.db import init_db
from app.main import create_app
from app.models.server import ServerSettings, ServerSettingsResponse
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
_SETUP_PAYLOAD = {
"master_password": "testpassword1",
"database_path": "bangui.db",
"fail2ban_socket": "/var/run/fail2ban/fail2ban.sock",
"timezone": "UTC",
"session_duration_minutes": 60,
}
@pytest.fixture
async def server_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc]
"""Provide an authenticated ``AsyncClient`` for server endpoint tests."""
settings = Settings(
database_path=str(tmp_path / "server_test.db"),
fail2ban_socket="/tmp/fake.sock",
session_secret="test-server-secret",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
app = create_app(settings=settings)
db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path)
db.row_factory = aiosqlite.Row
await init_db(db)
app.state.db = db
app.state.http_session = MagicMock()
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
await ac.post("/api/setup", json=_SETUP_PAYLOAD)
login = await ac.post(
"/api/auth/login",
json={"password": _SETUP_PAYLOAD["master_password"]},
)
assert login.status_code == 200
yield ac
await db.close()
def _make_settings() -> ServerSettingsResponse:
return ServerSettingsResponse(
settings=ServerSettings(
log_level="INFO",
log_target="/var/log/fail2ban.log",
syslog_socket=None,
db_path="/var/lib/fail2ban/fail2ban.sqlite3",
db_purge_age=86400,
db_max_matches=10,
)
)
# ---------------------------------------------------------------------------
# GET /api/server/settings
# ---------------------------------------------------------------------------
class TestGetServerSettings:
"""Tests for ``GET /api/server/settings``."""
async def test_200_returns_settings(self, server_client: AsyncClient) -> None:
"""GET /api/server/settings returns 200 with ServerSettingsResponse."""
mock_response = _make_settings()
with patch(
"app.routers.server.server_service.get_settings",
AsyncMock(return_value=mock_response),
):
resp = await server_client.get("/api/server/settings")
assert resp.status_code == 200
data = resp.json()
assert data["settings"]["log_level"] == "INFO"
assert data["settings"]["db_purge_age"] == 86400
async def test_401_when_unauthenticated(self, server_client: AsyncClient) -> None:
"""GET /api/server/settings returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=server_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/server/settings")
assert resp.status_code == 401
async def test_502_on_connection_error(self, server_client: AsyncClient) -> None:
"""GET /api/server/settings returns 502 when fail2ban is unreachable."""
from app.utils.fail2ban_client import Fail2BanConnectionError
with patch(
"app.routers.server.server_service.get_settings",
AsyncMock(side_effect=Fail2BanConnectionError("down", "/tmp/fake.sock")),
):
resp = await server_client.get("/api/server/settings")
assert resp.status_code == 502
# ---------------------------------------------------------------------------
# PUT /api/server/settings
# ---------------------------------------------------------------------------
class TestUpdateServerSettings:
"""Tests for ``PUT /api/server/settings``."""
async def test_204_on_success(self, server_client: AsyncClient) -> None:
"""PUT /api/server/settings returns 204 on success."""
with patch(
"app.routers.server.server_service.update_settings",
AsyncMock(return_value=None),
):
resp = await server_client.put(
"/api/server/settings",
json={"log_level": "DEBUG"},
)
assert resp.status_code == 204
async def test_400_on_operation_error(self, server_client: AsyncClient) -> None:
"""PUT /api/server/settings returns 400 when set command fails."""
from app.services.server_service import ServerOperationError
with patch(
"app.routers.server.server_service.update_settings",
AsyncMock(side_effect=ServerOperationError("set failed")),
):
resp = await server_client.put(
"/api/server/settings",
json={"log_level": "DEBUG"},
)
assert resp.status_code == 400
async def test_401_when_unauthenticated(self, server_client: AsyncClient) -> None:
"""PUT /api/server/settings returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=server_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).put("/api/server/settings", json={"log_level": "DEBUG"})
assert resp.status_code == 401
async def test_502_on_connection_error(self, server_client: AsyncClient) -> None:
"""PUT /api/server/settings returns 502 when fail2ban is unreachable."""
from app.utils.fail2ban_client import Fail2BanConnectionError
with patch(
"app.routers.server.server_service.update_settings",
AsyncMock(side_effect=Fail2BanConnectionError("down", "/tmp/fake.sock")),
):
resp = await server_client.put(
"/api/server/settings",
json={"log_level": "INFO"},
)
assert resp.status_code == 502
# ---------------------------------------------------------------------------
# POST /api/server/flush-logs
# ---------------------------------------------------------------------------
class TestFlushLogs:
"""Tests for ``POST /api/server/flush-logs``."""
async def test_200_returns_message(self, server_client: AsyncClient) -> None:
"""POST /api/server/flush-logs returns 200 with a message."""
with patch(
"app.routers.server.server_service.flush_logs",
AsyncMock(return_value="OK"),
):
resp = await server_client.post("/api/server/flush-logs")
assert resp.status_code == 200
assert resp.json()["message"] == "OK"
async def test_400_on_operation_error(self, server_client: AsyncClient) -> None:
"""POST /api/server/flush-logs returns 400 when flushlogs fails."""
from app.services.server_service import ServerOperationError
with patch(
"app.routers.server.server_service.flush_logs",
AsyncMock(side_effect=ServerOperationError("flushlogs failed")),
):
resp = await server_client.post("/api/server/flush-logs")
assert resp.status_code == 400
async def test_401_when_unauthenticated(self, server_client: AsyncClient) -> None:
"""POST /api/server/flush-logs returns 401 without session."""
resp = await AsyncClient(
transport=ASGITransport(app=server_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).post("/api/server/flush-logs")
assert resp.status_code == 401
async def test_502_on_connection_error(self, server_client: AsyncClient) -> None:
"""POST /api/server/flush-logs returns 502 when fail2ban is unreachable."""
from app.utils.fail2ban_client import Fail2BanConnectionError
with patch(
"app.routers.server.server_service.flush_logs",
AsyncMock(side_effect=Fail2BanConnectionError("down", "/tmp/fake.sock")),
):
resp = await server_client.post("/api/server/flush-logs")
assert resp.status_code == 502

View File

@@ -0,0 +1,487 @@
"""Tests for config_service functions."""
from __future__ import annotations
from typing import Any
from unittest.mock import AsyncMock, patch
import pytest
from app.models.config import (
GlobalConfigUpdate,
JailConfigListResponse,
JailConfigResponse,
LogPreviewRequest,
RegexTestRequest,
)
from app.services import config_service
from app.services.config_service import (
ConfigValidationError,
JailNotFoundError,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_SOCKET = "/fake/fail2ban.sock"
def _make_global_status(names: str = "sshd") -> tuple[int, list[Any]]:
return (0, [("Number of jail", 1), ("Jail list", names)])
def _make_short_status() -> tuple[int, list[Any]]:
return (
0,
[
("Filter", [("Currently failed", 3), ("Total failed", 20)]),
("Actions", [("Currently banned", 2), ("Total banned", 10)]),
],
)
def _make_send(responses: dict[str, Any]) -> AsyncMock:
async def _side_effect(command: list[Any]) -> Any:
key = "|".join(str(c) for c in command)
if key in responses:
return responses[key]
for resp_key, resp_value in responses.items():
if key.startswith(resp_key):
return resp_value
return (0, None)
return AsyncMock(side_effect=_side_effect)
def _patch_client(responses: dict[str, Any]) -> Any:
mock_send = _make_send(responses)
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = mock_send
return patch("app.services.config_service.Fail2BanClient", _FakeClient)
_DEFAULT_JAIL_RESPONSES: dict[str, Any] = {
"status|sshd|short": _make_short_status(),
"get|sshd|bantime": (0, 600),
"get|sshd|findtime": (0, 600),
"get|sshd|maxretry": (0, 5),
"get|sshd|failregex": (0, ["regex1", "regex2"]),
"get|sshd|ignoreregex": (0, []),
"get|sshd|logpath": (0, ["/var/log/auth.log"]),
"get|sshd|datepattern": (0, None),
"get|sshd|logencoding": (0, "UTF-8"),
"get|sshd|backend": (0, "polling"),
"get|sshd|actions": (0, ["iptables"]),
}
# ---------------------------------------------------------------------------
# get_jail_config
# ---------------------------------------------------------------------------
class TestGetJailConfig:
"""Unit tests for :func:`~app.services.config_service.get_jail_config`."""
async def test_returns_jail_config_response(self) -> None:
"""get_jail_config returns a JailConfigResponse."""
with _patch_client(_DEFAULT_JAIL_RESPONSES):
result = await config_service.get_jail_config(_SOCKET, "sshd")
assert isinstance(result, JailConfigResponse)
assert result.jail.name == "sshd"
assert result.jail.ban_time == 600
assert result.jail.max_retry == 5
assert result.jail.fail_regex == ["regex1", "regex2"]
assert result.jail.log_paths == ["/var/log/auth.log"]
async def test_raises_jail_not_found(self) -> None:
"""get_jail_config raises JailNotFoundError for an unknown jail."""
async def _send(command: list[Any]) -> Any:
raise Exception("Unknown jail 'missing'")
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_send)
# Patch the client to raise on status command.
async def _faulty_send(command: list[Any]) -> Any:
if command[0] == "status":
return (1, "unknown jail 'missing'")
return (0, None)
with patch(
"app.services.config_service.Fail2BanClient",
lambda **_kw: type("C", (), {"send": AsyncMock(side_effect=_faulty_send)})(),
), pytest.raises(JailNotFoundError):
await config_service.get_jail_config(_SOCKET, "missing")
async def test_actions_parsed_correctly(self) -> None:
"""get_jail_config includes actions list."""
with _patch_client(_DEFAULT_JAIL_RESPONSES):
result = await config_service.get_jail_config(_SOCKET, "sshd")
assert "iptables" in result.jail.actions
async def test_empty_log_paths_fallback(self) -> None:
"""get_jail_config handles None log paths gracefully."""
responses = {**_DEFAULT_JAIL_RESPONSES, "get|sshd|logpath": (0, None)}
with _patch_client(responses):
result = await config_service.get_jail_config(_SOCKET, "sshd")
assert result.jail.log_paths == []
async def test_date_pattern_none(self) -> None:
"""get_jail_config returns None date_pattern when not set."""
with _patch_client(_DEFAULT_JAIL_RESPONSES):
result = await config_service.get_jail_config(_SOCKET, "sshd")
assert result.jail.date_pattern is None
# ---------------------------------------------------------------------------
# list_jail_configs
# ---------------------------------------------------------------------------
class TestListJailConfigs:
"""Unit tests for :func:`~app.services.config_service.list_jail_configs`."""
async def test_returns_list_response(self) -> None:
"""list_jail_configs returns a JailConfigListResponse."""
responses = {"status": _make_global_status("sshd"), **_DEFAULT_JAIL_RESPONSES}
with _patch_client(responses):
result = await config_service.list_jail_configs(_SOCKET)
assert isinstance(result, JailConfigListResponse)
assert result.total == 1
assert result.jails[0].name == "sshd"
async def test_empty_when_no_jails(self) -> None:
"""list_jail_configs returns empty list when no jails are active."""
responses = {"status": (0, [("Jail list", ""), ("Number of jail", 0)])}
with _patch_client(responses):
result = await config_service.list_jail_configs(_SOCKET)
assert result.total == 0
assert result.jails == []
async def test_multiple_jails(self) -> None:
"""list_jail_configs handles comma-separated jail names."""
nginx_responses = {
k.replace("sshd", "nginx"): v for k, v in _DEFAULT_JAIL_RESPONSES.items()
}
responses = {
"status": _make_global_status("sshd, nginx"),
**_DEFAULT_JAIL_RESPONSES,
**nginx_responses,
}
with _patch_client(responses):
result = await config_service.list_jail_configs(_SOCKET)
assert result.total == 2
names = {j.name for j in result.jails}
assert names == {"sshd", "nginx"}
# ---------------------------------------------------------------------------
# update_jail_config
# ---------------------------------------------------------------------------
class TestUpdateJailConfig:
"""Unit tests for :func:`~app.services.config_service.update_jail_config`."""
async def test_updates_numeric_fields(self) -> None:
"""update_jail_config sends set commands for numeric fields."""
sent_commands: list[list[Any]] = []
async def _send(command: list[Any]) -> Any:
sent_commands.append(command)
return (0, "OK")
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_send)
from app.models.config import JailConfigUpdate
update = JailConfigUpdate(ban_time=3600, max_retry=10)
with patch("app.services.config_service.Fail2BanClient", _FakeClient):
await config_service.update_jail_config(_SOCKET, "sshd", update)
keys = [cmd[2] for cmd in sent_commands if len(cmd) >= 3 and cmd[0] == "set"]
assert "bantime" in keys
assert "maxretry" in keys
async def test_raises_validation_error_on_bad_regex(self) -> None:
"""update_jail_config raises ConfigValidationError for invalid regex."""
from app.models.config import JailConfigUpdate
update = JailConfigUpdate(fail_regex=["[invalid"])
with pytest.raises(ConfigValidationError, match="Invalid regex"):
await config_service.update_jail_config(_SOCKET, "sshd", update)
async def test_skips_none_fields(self) -> None:
"""update_jail_config does not send commands for None fields."""
sent_commands: list[list[Any]] = []
async def _send(command: list[Any]) -> Any:
sent_commands.append(command)
return (0, "OK")
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_send)
from app.models.config import JailConfigUpdate
update = JailConfigUpdate(ban_time=None, max_retry=None, find_time=None)
with patch("app.services.config_service.Fail2BanClient", _FakeClient):
await config_service.update_jail_config(_SOCKET, "sshd", update)
set_commands = [cmd for cmd in sent_commands if len(cmd) >= 3 and cmd[0] == "set"]
assert set_commands == []
async def test_replaces_fail_regex(self) -> None:
"""update_jail_config deletes old regexes and adds new ones."""
sent_commands: list[list[Any]] = []
async def _send(command: list[Any]) -> Any:
sent_commands.append(command)
if command[0] == "get":
return (0, ["old_pattern"])
return (0, "OK")
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_send)
from app.models.config import JailConfigUpdate
update = JailConfigUpdate(fail_regex=["new_pattern"])
with patch("app.services.config_service.Fail2BanClient", _FakeClient):
await config_service.update_jail_config(_SOCKET, "sshd", update)
add_cmd = next(
(c for c in sent_commands if len(c) >= 4 and c[2] == "addfailregex"),
None,
)
assert add_cmd is not None
assert add_cmd[3] == "new_pattern"
# ---------------------------------------------------------------------------
# get_global_config
# ---------------------------------------------------------------------------
class TestGetGlobalConfig:
"""Unit tests for :func:`~app.services.config_service.get_global_config`."""
async def test_returns_global_config(self) -> None:
"""get_global_config returns parsed GlobalConfigResponse."""
responses = {
"get|loglevel": (0, "WARNING"),
"get|logtarget": (0, "/var/log/fail2ban.log"),
"get|dbpurgeage": (0, 86400),
"get|dbmaxmatches": (0, 10),
}
with _patch_client(responses):
result = await config_service.get_global_config(_SOCKET)
assert result.log_level == "WARNING"
assert result.log_target == "/var/log/fail2ban.log"
assert result.db_purge_age == 86400
assert result.db_max_matches == 10
async def test_defaults_used_on_error(self) -> None:
"""get_global_config uses fallback defaults when commands fail."""
responses: dict[str, Any] = {}
with _patch_client(responses):
result = await config_service.get_global_config(_SOCKET)
assert result.log_level is not None
assert result.log_target is not None
# ---------------------------------------------------------------------------
# update_global_config
# ---------------------------------------------------------------------------
class TestUpdateGlobalConfig:
"""Unit tests for :func:`~app.services.config_service.update_global_config`."""
async def test_sends_set_commands(self) -> None:
"""update_global_config sends set commands for non-None fields."""
sent: list[list[Any]] = []
async def _send(command: list[Any]) -> Any:
sent.append(command)
return (0, "OK")
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_send)
update = GlobalConfigUpdate(log_level="debug", db_purge_age=3600)
with patch("app.services.config_service.Fail2BanClient", _FakeClient):
await config_service.update_global_config(_SOCKET, update)
keys = [cmd[1] for cmd in sent if len(cmd) >= 3 and cmd[0] == "set"]
assert "loglevel" in keys
assert "dbpurgeage" in keys
async def test_log_level_uppercased(self) -> None:
"""update_global_config uppercases log_level before sending."""
sent: list[list[Any]] = []
async def _send(command: list[Any]) -> Any:
sent.append(command)
return (0, "OK")
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_send)
update = GlobalConfigUpdate(log_level="debug")
with patch("app.services.config_service.Fail2BanClient", _FakeClient):
await config_service.update_global_config(_SOCKET, update)
cmd = next(c for c in sent if len(c) >= 3 and c[1] == "loglevel")
assert cmd[2] == "DEBUG"
# ---------------------------------------------------------------------------
# test_regex (synchronous)
# ---------------------------------------------------------------------------
class TestTestRegex:
"""Unit tests for :func:`~app.services.config_service.test_regex`."""
def test_matching_pattern(self) -> None:
"""test_regex returns matched=True for a valid match."""
req = RegexTestRequest(
log_line="Failed password for user from 1.2.3.4",
fail_regex=r"(?P<host>\d+\.\d+\.\d+\.\d+)",
)
result = config_service.test_regex(req)
assert result.matched is True
assert "1.2.3.4" in result.groups
assert result.error is None
def test_non_matching_pattern(self) -> None:
"""test_regex returns matched=False when pattern does not match."""
req = RegexTestRequest(
log_line="Normal log line here",
fail_regex=r"BANME",
)
result = config_service.test_regex(req)
assert result.matched is False
assert result.groups == []
def test_invalid_pattern_returns_error(self) -> None:
"""test_regex returns error message for an invalid regex."""
req = RegexTestRequest(
log_line="any line",
fail_regex=r"[invalid",
)
result = config_service.test_regex(req)
assert result.matched is False
assert result.error is not None
assert len(result.error) > 0
def test_empty_groups_when_no_capture(self) -> None:
"""test_regex returns empty groups when pattern has no capture groups."""
req = RegexTestRequest(
log_line="fail here",
fail_regex=r"fail",
)
result = config_service.test_regex(req)
assert result.matched is True
assert result.groups == []
def test_multiple_capture_groups(self) -> None:
"""test_regex returns all captured groups."""
req = RegexTestRequest(
log_line="user=root ip=1.2.3.4",
fail_regex=r"user=(\w+) ip=([\d.]+)",
)
result = config_service.test_regex(req)
assert result.matched is True
assert len(result.groups) == 2
# ---------------------------------------------------------------------------
# preview_log
# ---------------------------------------------------------------------------
class TestPreviewLog:
"""Unit tests for :func:`~app.services.config_service.preview_log`."""
async def test_returns_error_for_invalid_regex(self, tmp_path: Any) -> None:
"""preview_log returns regex_error for an invalid pattern."""
req = LogPreviewRequest(log_path=str(tmp_path / "fake.log"), fail_regex="[bad")
result = await config_service.preview_log(req)
assert result.regex_error is not None
assert result.total_lines == 0
async def test_returns_error_for_missing_file(self) -> None:
"""preview_log returns regex_error when file does not exist."""
req = LogPreviewRequest(
log_path="/nonexistent/path/log.txt",
fail_regex=r"test",
)
result = await config_service.preview_log(req)
assert result.regex_error is not None
async def test_matches_lines_in_file(self, tmp_path: Any) -> None:
"""preview_log correctly identifies matching and non-matching lines."""
log_file = tmp_path / "test.log"
log_file.write_text("FAIL login from 1.2.3.4\nOK normal line\nFAIL from 5.6.7.8\n")
req = LogPreviewRequest(log_path=str(log_file), fail_regex=r"FAIL")
result = await config_service.preview_log(req)
assert result.total_lines == 3
assert result.matched_count == 2
async def test_matched_line_has_groups(self, tmp_path: Any) -> None:
"""preview_log captures regex groups in matched lines."""
log_file = tmp_path / "test.log"
log_file.write_text("error from 1.2.3.4 port 22\n")
req = LogPreviewRequest(
log_path=str(log_file),
fail_regex=r"from (\d+\.\d+\.\d+\.\d+)",
)
result = await config_service.preview_log(req)
matched = [ln for ln in result.lines if ln.matched]
assert len(matched) == 1
assert "1.2.3.4" in matched[0].groups
async def test_num_lines_limit(self, tmp_path: Any) -> None:
"""preview_log respects the num_lines limit."""
log_file = tmp_path / "big.log"
log_file.write_text("\n".join(f"line {i}" for i in range(500)) + "\n")
req = LogPreviewRequest(log_path=str(log_file), fail_regex=r"line", num_lines=50)
result = await config_service.preview_log(req)
assert result.total_lines <= 50

View File

@@ -2,14 +2,13 @@
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, MagicMock
import pytest
from app.services import geo_service
from app.services.geo_service import GeoInfo
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

View File

@@ -181,9 +181,8 @@ class TestListJails:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=Fail2BanConnectionError("no socket", _SOCKET))
with patch("app.services.jail_service.Fail2BanClient", _FailClient):
with pytest.raises(Fail2BanConnectionError):
await jail_service.list_jails(_SOCKET)
with patch("app.services.jail_service.Fail2BanClient", _FailClient), pytest.raises(Fail2BanConnectionError):
await jail_service.list_jails(_SOCKET)
# ---------------------------------------------------------------------------
@@ -251,9 +250,8 @@ class TestGetJail:
"""get_jail raises JailNotFoundError when jail is unknown."""
not_found_response = (1, Exception("Unknown jail: 'ghost'"))
with _patch_client({r"status|ghost|short": not_found_response}):
with pytest.raises(JailNotFoundError):
await jail_service.get_jail(_SOCKET, "ghost")
with _patch_client({r"status|ghost|short": not_found_response}), pytest.raises(JailNotFoundError):
await jail_service.get_jail(_SOCKET, "ghost")
# ---------------------------------------------------------------------------
@@ -296,15 +294,13 @@ class TestJailControls:
async def test_start_not_found_raises(self) -> None:
"""start_jail raises JailNotFoundError for unknown jail."""
with _patch_client({"start|ghost": (1, Exception("Unknown jail: 'ghost'"))}):
with pytest.raises(JailNotFoundError):
await jail_service.start_jail(_SOCKET, "ghost")
with _patch_client({"start|ghost": (1, Exception("Unknown jail: 'ghost'"))}), pytest.raises(JailNotFoundError):
await jail_service.start_jail(_SOCKET, "ghost")
async def test_stop_operation_error_raises(self) -> None:
"""stop_jail raises JailOperationError on fail2ban error code."""
with _patch_client({"stop|sshd": (1, Exception("cannot stop"))}):
with pytest.raises(JailOperationError):
await jail_service.stop_jail(_SOCKET, "sshd")
with _patch_client({"stop|sshd": (1, Exception("cannot stop"))}), pytest.raises(JailOperationError):
await jail_service.stop_jail(_SOCKET, "sshd")
# ---------------------------------------------------------------------------

View File

@@ -0,0 +1,205 @@
"""Tests for server_service functions."""
from __future__ import annotations
from typing import Any
from unittest.mock import AsyncMock, patch
import pytest
from app.models.server import ServerSettingsResponse, ServerSettingsUpdate
from app.services import server_service
from app.services.server_service import ServerOperationError
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_SOCKET = "/fake/fail2ban.sock"
_DEFAULT_RESPONSES: dict[str, Any] = {
"get|loglevel": (0, "INFO"),
"get|logtarget": (0, "/var/log/fail2ban.log"),
"get|syslogsocket": (0, None),
"get|dbfile": (0, "/var/lib/fail2ban/fail2ban.sqlite3"),
"get|dbpurgeage": (0, 86400),
"get|dbmaxmatches": (0, 10),
}
def _make_send(responses: dict[str, Any]) -> AsyncMock:
async def _side_effect(command: list[Any]) -> Any:
key = "|".join(str(c) for c in command)
return responses.get(key, (0, None))
return AsyncMock(side_effect=_side_effect)
def _patch_client(responses: dict[str, Any]) -> Any:
mock_send = _make_send(responses)
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = mock_send
return patch("app.services.server_service.Fail2BanClient", _FakeClient)
# ---------------------------------------------------------------------------
# get_settings
# ---------------------------------------------------------------------------
class TestGetSettings:
"""Unit tests for :func:`~app.services.server_service.get_settings`."""
async def test_returns_server_settings_response(self) -> None:
"""get_settings returns a properly populated ServerSettingsResponse."""
with _patch_client(_DEFAULT_RESPONSES):
result = await server_service.get_settings(_SOCKET)
assert isinstance(result, ServerSettingsResponse)
assert result.settings.log_level == "INFO"
assert result.settings.log_target == "/var/log/fail2ban.log"
assert result.settings.db_purge_age == 86400
assert result.settings.db_max_matches == 10
async def test_db_path_parsed(self) -> None:
"""get_settings returns the correct database file path."""
with _patch_client(_DEFAULT_RESPONSES):
result = await server_service.get_settings(_SOCKET)
assert result.settings.db_path == "/var/lib/fail2ban/fail2ban.sqlite3"
async def test_syslog_socket_none(self) -> None:
"""get_settings returns None for syslog_socket when not configured."""
with _patch_client(_DEFAULT_RESPONSES):
result = await server_service.get_settings(_SOCKET)
assert result.settings.syslog_socket is None
async def test_fallback_defaults_on_missing_commands(self) -> None:
"""get_settings uses fallback defaults when commands return None."""
with _patch_client({}):
result = await server_service.get_settings(_SOCKET)
assert result.settings.log_level == "INFO"
assert result.settings.db_max_matches == 10
# ---------------------------------------------------------------------------
# update_settings
# ---------------------------------------------------------------------------
class TestUpdateSettings:
"""Unit tests for :func:`~app.services.server_service.update_settings`."""
async def test_sends_set_commands_for_non_none_fields(self) -> None:
"""update_settings sends set commands only for non-None fields."""
sent: list[list[Any]] = []
async def _send(command: list[Any]) -> Any:
sent.append(command)
return (0, "OK")
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_send)
update = ServerSettingsUpdate(log_level="DEBUG", db_purge_age=3600)
with patch("app.services.server_service.Fail2BanClient", _FakeClient):
await server_service.update_settings(_SOCKET, update)
keys = [cmd[1] for cmd in sent if len(cmd) >= 3]
assert "loglevel" in keys
assert "dbpurgeage" in keys
async def test_skips_none_fields(self) -> None:
"""update_settings does not send commands for None fields."""
sent: list[list[Any]] = []
async def _send(command: list[Any]) -> Any:
sent.append(command)
return (0, "OK")
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_send)
update = ServerSettingsUpdate() # all None
with patch("app.services.server_service.Fail2BanClient", _FakeClient):
await server_service.update_settings(_SOCKET, update)
assert sent == []
async def test_raises_server_operation_error_on_failure(self) -> None:
"""update_settings raises ServerOperationError when fail2ban rejects."""
async def _send(command: list[Any]) -> Any:
return (1, "invalid log level")
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_send)
update = ServerSettingsUpdate(log_level="INVALID")
with patch("app.services.server_service.Fail2BanClient", _FakeClient), pytest.raises(ServerOperationError):
await server_service.update_settings(_SOCKET, update)
async def test_uppercases_log_level(self) -> None:
"""update_settings uppercases the log_level value before sending."""
sent: list[list[Any]] = []
async def _send(command: list[Any]) -> Any:
sent.append(command)
return (0, "OK")
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_send)
update = ServerSettingsUpdate(log_level="warning")
with patch("app.services.server_service.Fail2BanClient", _FakeClient):
await server_service.update_settings(_SOCKET, update)
cmd = next(c for c in sent if len(c) >= 3 and c[1] == "loglevel")
assert cmd[2] == "WARNING"
# ---------------------------------------------------------------------------
# flush_logs
# ---------------------------------------------------------------------------
class TestFlushLogs:
"""Unit tests for :func:`~app.services.server_service.flush_logs`."""
async def test_returns_result_string(self) -> None:
"""flush_logs returns the string response from fail2ban."""
async def _send(command: list[Any]) -> Any:
assert command == ["flushlogs"]
return (0, "OK")
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_send)
with patch("app.services.server_service.Fail2BanClient", _FakeClient):
result = await server_service.flush_logs(_SOCKET)
assert result == "OK"
async def test_raises_operation_error_on_failure(self) -> None:
"""flush_logs raises ServerOperationError when fail2ban rejects."""
async def _send(command: list[Any]) -> Any:
return (1, "flushlogs failed")
class _FakeClient:
def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_send)
with patch("app.services.server_service.Fail2BanClient", _FakeClient), pytest.raises(ServerOperationError):
await server_service.flush_logs(_SOCKET)