Move ConfigDirError, ConfigFileNotFoundError, ConfigFileExistsError, ConfigFileWriteError, and ConfigFileNameError from raw_config_io_service into the shared domain exception module. Update router and tests to import the exceptions from app.exceptions.
802 lines
29 KiB
Python
802 lines
29 KiB
Python
"""Tests for the file_config router endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import aiosqlite
|
|
import pytest
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from app.config import Settings
|
|
from app.db import init_db
|
|
from app.main import create_app
|
|
from app.models.config import (
|
|
ActionConfig,
|
|
FilterConfig,
|
|
JailFileConfig,
|
|
JailSectionConfig,
|
|
)
|
|
from app.models.file_config import (
|
|
ConfFileContent,
|
|
ConfFileEntry,
|
|
ConfFilesResponse,
|
|
JailConfigFile,
|
|
JailConfigFileContent,
|
|
JailConfigFilesResponse,
|
|
)
|
|
from app.exceptions import (
|
|
ConfigDirError,
|
|
ConfigFileExistsError,
|
|
ConfigFileNameError,
|
|
ConfigFileNotFoundError,
|
|
ConfigFileWriteError,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_SETUP_PAYLOAD = {
|
|
"master_password": "testpassword1",
|
|
"database_path": "bangui.db",
|
|
"fail2ban_socket": "/var/run/fail2ban/fail2ban.sock",
|
|
"timezone": "UTC",
|
|
"session_duration_minutes": 60,
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
async def file_config_client(tmp_path: Path) -> AsyncClient: # type: ignore[misc]
|
|
"""Provide an authenticated ``AsyncClient`` for file_config endpoint tests."""
|
|
settings = Settings(
|
|
database_path=str(tmp_path / "file_config_test.db"),
|
|
fail2ban_socket="/tmp/fake.sock",
|
|
session_secret="test-file-config-secret",
|
|
session_duration_minutes=60,
|
|
timezone="UTC",
|
|
log_level="debug",
|
|
)
|
|
app = create_app(settings=settings)
|
|
|
|
db: aiosqlite.Connection = await aiosqlite.connect(settings.database_path)
|
|
db.row_factory = aiosqlite.Row
|
|
await init_db(db)
|
|
app.state.db = db
|
|
app.state.http_session = MagicMock()
|
|
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
|
await ac.post("/api/setup", json=_SETUP_PAYLOAD)
|
|
login = await ac.post(
|
|
"/api/auth/login",
|
|
json={"password": _SETUP_PAYLOAD["master_password"]},
|
|
)
|
|
assert login.status_code == 200
|
|
yield ac
|
|
|
|
await db.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _jail_files_resp(files: list[JailConfigFile] | None = None) -> JailConfigFilesResponse:
|
|
files = files or [JailConfigFile(name="sshd", filename="sshd.conf", enabled=True)]
|
|
return JailConfigFilesResponse(files=files, total=len(files))
|
|
|
|
|
|
def _conf_files_resp(files: list[ConfFileEntry] | None = None) -> ConfFilesResponse:
|
|
files = files or [ConfFileEntry(name="nginx", filename="nginx.conf")]
|
|
return ConfFilesResponse(files=files, total=len(files))
|
|
|
|
|
|
def _conf_file_content(name: str = "nginx") -> ConfFileContent:
|
|
return ConfFileContent(
|
|
name=name,
|
|
filename=f"{name}.conf",
|
|
content=f"[Definition]\n# {name} filter\n",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/jail-files
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestListJailConfigFiles:
|
|
async def test_200_returns_file_list(
|
|
self, file_config_client: AsyncClient
|
|
) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.list_jail_config_files",
|
|
AsyncMock(return_value=_jail_files_resp()),
|
|
):
|
|
resp = await file_config_client.get("/api/config/jail-files")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["total"] == 1
|
|
assert data["files"][0]["filename"] == "sshd.conf"
|
|
|
|
async def test_503_on_config_dir_error(
|
|
self, file_config_client: AsyncClient
|
|
) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.list_jail_config_files",
|
|
AsyncMock(side_effect=ConfigDirError("not found")),
|
|
):
|
|
resp = await file_config_client.get("/api/config/jail-files")
|
|
|
|
assert resp.status_code == 503
|
|
|
|
async def test_401_unauthenticated(self, file_config_client: AsyncClient) -> None:
|
|
resp = await AsyncClient(
|
|
transport=ASGITransport(app=file_config_client._transport.app), # type: ignore[attr-defined]
|
|
base_url="http://test",
|
|
).get("/api/config/jail-files")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/jail-files/{filename}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetJailConfigFile:
|
|
async def test_200_returns_content(
|
|
self, file_config_client: AsyncClient
|
|
) -> None:
|
|
content = JailConfigFileContent(
|
|
name="sshd",
|
|
filename="sshd.conf",
|
|
enabled=True,
|
|
content="[sshd]\nenabled = true\n",
|
|
)
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_jail_config_file",
|
|
AsyncMock(return_value=content),
|
|
):
|
|
resp = await file_config_client.get("/api/config/jail-files/sshd.conf")
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["content"] == "[sshd]\nenabled = true\n"
|
|
|
|
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_jail_config_file",
|
|
AsyncMock(side_effect=ConfigFileNotFoundError("missing.conf")),
|
|
):
|
|
resp = await file_config_client.get("/api/config/jail-files/missing.conf")
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_400_invalid_filename(
|
|
self, file_config_client: AsyncClient
|
|
) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_jail_config_file",
|
|
AsyncMock(side_effect=ConfigFileNameError("bad name")),
|
|
):
|
|
resp = await file_config_client.get("/api/config/jail-files/bad.txt")
|
|
|
|
assert resp.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PUT /api/config/jail-files/{filename}/enabled
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSetJailConfigEnabled:
|
|
async def test_204_on_success(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.set_jail_config_enabled",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/jail-files/sshd.conf/enabled",
|
|
json={"enabled": False},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_404_file_not_found(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.set_jail_config_enabled",
|
|
AsyncMock(side_effect=ConfigFileNotFoundError("missing.conf")),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/jail-files/missing.conf/enabled",
|
|
json={"enabled": True},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/filters/{name}/raw
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetFilterFileRaw:
|
|
"""Tests for the renamed ``GET /api/config/filters/{name}/raw`` endpoint.
|
|
|
|
The simple list (``GET /api/config/filters``) and the structured detail
|
|
(``GET /api/config/filters/{name}``) are now served by the config router.
|
|
This endpoint returns the raw file content only.
|
|
"""
|
|
|
|
async def test_200_returns_content(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_filter_file",
|
|
AsyncMock(return_value=_conf_file_content("nginx")),
|
|
):
|
|
resp = await file_config_client.get("/api/config/filters/nginx/raw")
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["name"] == "nginx"
|
|
|
|
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_filter_file",
|
|
AsyncMock(side_effect=ConfigFileNotFoundError("missing")),
|
|
):
|
|
resp = await file_config_client.get("/api/config/filters/missing/raw")
|
|
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PUT /api/config/filters/{name}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestUpdateFilterFile:
|
|
async def test_204_on_success(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.write_filter_file",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/filters/nginx/raw",
|
|
json={"content": "[Definition]\nfailregex = test\n"},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_400_write_error(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.write_filter_file",
|
|
AsyncMock(side_effect=ConfigFileWriteError("disk full")),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/filters/nginx/raw",
|
|
json={"content": "x"},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/config/filters
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCreateFilterFile:
|
|
async def test_201_creates_file(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.create_filter_file",
|
|
AsyncMock(return_value="myfilter.conf"),
|
|
):
|
|
resp = await file_config_client.post(
|
|
"/api/config/filters/raw",
|
|
json={"name": "myfilter", "content": "[Definition]\n"},
|
|
)
|
|
|
|
assert resp.status_code == 201
|
|
assert resp.json()["filename"] == "myfilter.conf"
|
|
|
|
async def test_409_conflict(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.create_filter_file",
|
|
AsyncMock(side_effect=ConfigFileExistsError("myfilter.conf")),
|
|
):
|
|
resp = await file_config_client.post(
|
|
"/api/config/filters/raw",
|
|
json={"name": "myfilter", "content": "[Definition]\n"},
|
|
)
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_400_invalid_name(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.create_filter_file",
|
|
AsyncMock(side_effect=ConfigFileNameError("bad/../name")),
|
|
):
|
|
resp = await file_config_client.post(
|
|
"/api/config/filters/raw",
|
|
json={"name": "../escape", "content": "[Definition]\n"},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/actions (smoke test — same logic as filters)
|
|
# Note: GET /api/config/actions is handled by config.router (registered first);
|
|
# file_config.router's "/actions" endpoint is shadowed by it.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestListActionFiles:
|
|
async def test_200_returns_files(self, file_config_client: AsyncClient) -> None:
|
|
from app.models.config import ActionListResponse
|
|
|
|
mock_action = ActionConfig(
|
|
name="iptables",
|
|
filename="iptables.conf",
|
|
)
|
|
resp_data = ActionListResponse(actions=[mock_action], total=1)
|
|
with patch(
|
|
"app.routers.config.action_config_service.list_actions",
|
|
AsyncMock(return_value=resp_data),
|
|
):
|
|
resp = await file_config_client.get("/api/config/actions")
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["actions"][0]["name"] == "iptables"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/config/actions
|
|
# Note: POST /api/config/actions is also handled by config.router.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCreateActionFile:
|
|
async def test_201_creates_file(self, file_config_client: AsyncClient) -> None:
|
|
created = ActionConfig(
|
|
name="myaction",
|
|
filename="myaction.local",
|
|
actionban="echo ban <ip>",
|
|
)
|
|
with patch(
|
|
"app.routers.config.action_config_service.create_action",
|
|
AsyncMock(return_value=created),
|
|
):
|
|
resp = await file_config_client.post(
|
|
"/api/config/actions",
|
|
json={"name": "myaction", "actionban": "echo ban <ip>"},
|
|
)
|
|
|
|
assert resp.status_code == 201
|
|
assert resp.json()["name"] == "myaction"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/actions/{name}/raw
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetActionFileRaw:
|
|
"""Tests for ``GET /api/config/actions/{name}/raw``."""
|
|
|
|
async def test_200_returns_content(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_action_file",
|
|
AsyncMock(return_value=_conf_file_content("iptables")),
|
|
):
|
|
resp = await file_config_client.get("/api/config/actions/iptables/raw")
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["name"] == "iptables"
|
|
|
|
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_action_file",
|
|
AsyncMock(side_effect=ConfigFileNotFoundError("missing")),
|
|
):
|
|
resp = await file_config_client.get("/api/config/actions/missing/raw")
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_503_on_config_dir_error(
|
|
self, file_config_client: AsyncClient
|
|
) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_action_file",
|
|
AsyncMock(side_effect=ConfigDirError("no dir")),
|
|
):
|
|
resp = await file_config_client.get("/api/config/actions/iptables/raw")
|
|
|
|
assert resp.status_code == 503
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PUT /api/config/actions/{name}/raw
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestUpdateActionFileRaw:
|
|
"""Tests for ``PUT /api/config/actions/{name}/raw``."""
|
|
|
|
async def test_204_on_success(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.write_action_file",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/actions/iptables/raw",
|
|
json={"content": "[Definition]\nactionban = iptables -I INPUT -s <ip> -j DROP\n"},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_400_write_error(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.write_action_file",
|
|
AsyncMock(side_effect=ConfigFileWriteError("disk full")),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/actions/iptables/raw",
|
|
json={"content": "x"},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.write_action_file",
|
|
AsyncMock(side_effect=ConfigFileNotFoundError("missing")),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/actions/missing/raw",
|
|
json={"content": "x"},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_400_invalid_name(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.write_action_file",
|
|
AsyncMock(side_effect=ConfigFileNameError("bad/../name")),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/actions/escape/raw",
|
|
json={"content": "x"},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/config/jail-files
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCreateJailConfigFile:
|
|
async def test_201_creates_file(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.create_jail_config_file",
|
|
AsyncMock(return_value="myjail.conf"),
|
|
):
|
|
resp = await file_config_client.post(
|
|
"/api/config/jail-files",
|
|
json={"name": "myjail", "content": "[myjail]\nenabled = true\n"},
|
|
)
|
|
|
|
assert resp.status_code == 201
|
|
assert resp.json()["filename"] == "myjail.conf"
|
|
|
|
async def test_409_conflict(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.create_jail_config_file",
|
|
AsyncMock(side_effect=ConfigFileExistsError("myjail.conf")),
|
|
):
|
|
resp = await file_config_client.post(
|
|
"/api/config/jail-files",
|
|
json={"name": "myjail", "content": "[myjail]\nenabled = true\n"},
|
|
)
|
|
|
|
assert resp.status_code == 409
|
|
|
|
async def test_400_invalid_name(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.create_jail_config_file",
|
|
AsyncMock(side_effect=ConfigFileNameError("bad/../name")),
|
|
):
|
|
resp = await file_config_client.post(
|
|
"/api/config/jail-files",
|
|
json={"name": "../escape", "content": "[Definition]\n"},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
async def test_503_on_config_dir_error(
|
|
self, file_config_client: AsyncClient
|
|
) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.create_jail_config_file",
|
|
AsyncMock(side_effect=ConfigDirError("no dir")),
|
|
):
|
|
resp = await file_config_client.post(
|
|
"/api/config/jail-files",
|
|
json={"name": "anyjail", "content": "[anyjail]\nenabled = false\n"},
|
|
)
|
|
|
|
assert resp.status_code == 503
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/filters/{name}/parsed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetParsedFilter:
|
|
async def test_200_returns_parsed_config(
|
|
self, file_config_client: AsyncClient
|
|
) -> None:
|
|
cfg = FilterConfig(name="nginx", filename="nginx.conf")
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_parsed_filter_file",
|
|
AsyncMock(return_value=cfg),
|
|
):
|
|
resp = await file_config_client.get("/api/config/filters/nginx/parsed")
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["name"] == "nginx"
|
|
assert data["filename"] == "nginx.conf"
|
|
|
|
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_parsed_filter_file",
|
|
AsyncMock(side_effect=ConfigFileNotFoundError("missing")),
|
|
):
|
|
resp = await file_config_client.get(
|
|
"/api/config/filters/missing/parsed"
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_503_on_config_dir_error(
|
|
self, file_config_client: AsyncClient
|
|
) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_parsed_filter_file",
|
|
AsyncMock(side_effect=ConfigDirError("no dir")),
|
|
):
|
|
resp = await file_config_client.get("/api/config/filters/nginx/parsed")
|
|
|
|
assert resp.status_code == 503
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PUT /api/config/filters/{name}/parsed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestUpdateParsedFilter:
|
|
async def test_204_on_success(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.update_parsed_filter_file",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/filters/nginx/parsed",
|
|
json={"failregex": ["^<HOST> "]},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.update_parsed_filter_file",
|
|
AsyncMock(side_effect=ConfigFileNotFoundError("missing")),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/filters/missing/parsed",
|
|
json={"failregex": []},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_400_write_error(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.update_parsed_filter_file",
|
|
AsyncMock(side_effect=ConfigFileWriteError("disk full")),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/filters/nginx/parsed",
|
|
json={"failregex": ["^<HOST> "]},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/actions/{name}/parsed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetParsedAction:
|
|
async def test_200_returns_parsed_config(
|
|
self, file_config_client: AsyncClient
|
|
) -> None:
|
|
cfg = ActionConfig(name="iptables", filename="iptables.conf")
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_parsed_action_file",
|
|
AsyncMock(return_value=cfg),
|
|
):
|
|
resp = await file_config_client.get(
|
|
"/api/config/actions/iptables/parsed"
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["name"] == "iptables"
|
|
assert data["filename"] == "iptables.conf"
|
|
|
|
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_parsed_action_file",
|
|
AsyncMock(side_effect=ConfigFileNotFoundError("missing")),
|
|
):
|
|
resp = await file_config_client.get(
|
|
"/api/config/actions/missing/parsed"
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_503_on_config_dir_error(
|
|
self, file_config_client: AsyncClient
|
|
) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_parsed_action_file",
|
|
AsyncMock(side_effect=ConfigDirError("no dir")),
|
|
):
|
|
resp = await file_config_client.get(
|
|
"/api/config/actions/iptables/parsed"
|
|
)
|
|
|
|
assert resp.status_code == 503
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PUT /api/config/actions/{name}/parsed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestUpdateParsedAction:
|
|
async def test_204_on_success(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.update_parsed_action_file",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/actions/iptables/parsed",
|
|
json={"actionban": "iptables -I INPUT -s <ip> -j DROP"},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.update_parsed_action_file",
|
|
AsyncMock(side_effect=ConfigFileNotFoundError("missing")),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/actions/missing/parsed",
|
|
json={"actionban": ""},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_400_write_error(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.update_parsed_action_file",
|
|
AsyncMock(side_effect=ConfigFileWriteError("disk full")),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/actions/iptables/parsed",
|
|
json={"actionban": "iptables -I INPUT -s <ip> -j DROP"},
|
|
)
|
|
|
|
assert resp.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /api/config/jail-files/{filename}/parsed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetParsedJailFile:
|
|
async def test_200_returns_parsed_config(
|
|
self, file_config_client: AsyncClient
|
|
) -> None:
|
|
section = JailSectionConfig(enabled=True, port="ssh")
|
|
cfg = JailFileConfig(filename="sshd.conf", jails={"sshd": section})
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_parsed_jail_file",
|
|
AsyncMock(return_value=cfg),
|
|
):
|
|
resp = await file_config_client.get(
|
|
"/api/config/jail-files/sshd.conf/parsed"
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["filename"] == "sshd.conf"
|
|
assert "sshd" in data["jails"]
|
|
|
|
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_parsed_jail_file",
|
|
AsyncMock(side_effect=ConfigFileNotFoundError("missing.conf")),
|
|
):
|
|
resp = await file_config_client.get(
|
|
"/api/config/jail-files/missing.conf/parsed"
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_503_on_config_dir_error(
|
|
self, file_config_client: AsyncClient
|
|
) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.get_parsed_jail_file",
|
|
AsyncMock(side_effect=ConfigDirError("no dir")),
|
|
):
|
|
resp = await file_config_client.get(
|
|
"/api/config/jail-files/sshd.conf/parsed"
|
|
)
|
|
|
|
assert resp.status_code == 503
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PUT /api/config/jail-files/{filename}/parsed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestUpdateParsedJailFile:
|
|
async def test_204_on_success(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.update_parsed_jail_file",
|
|
AsyncMock(return_value=None),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/jail-files/sshd.conf/parsed",
|
|
json={"jails": {"sshd": {"enabled": False}}},
|
|
)
|
|
|
|
assert resp.status_code == 204
|
|
|
|
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.update_parsed_jail_file",
|
|
AsyncMock(side_effect=ConfigFileNotFoundError("missing.conf")),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/jail-files/missing.conf/parsed",
|
|
json={"jails": {}},
|
|
)
|
|
|
|
assert resp.status_code == 404
|
|
|
|
async def test_400_write_error(self, file_config_client: AsyncClient) -> None:
|
|
with patch(
|
|
"app.routers.file_config.raw_config_io_service.update_parsed_jail_file",
|
|
AsyncMock(side_effect=ConfigFileWriteError("disk full")),
|
|
):
|
|
resp = await file_config_client.put(
|
|
"/api/config/jail-files/sshd.conf/parsed",
|
|
json={"jails": {"sshd": {"enabled": True}}},
|
|
)
|
|
|
|
assert resp.status_code == 400
|