- test_conffile_parser.py: unit tests for section/key parsing, comment preservation, and round-trip write correctness - test_file_config_service.py: service-level tests with mock filesystem - test_file_config.py: router integration tests covering GET / PUT endpoints for jails, actions, and filters
590 lines
21 KiB
Python
590 lines
21 KiB
Python
"""Tests for file_config_service functions."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from app.models.config import ActionConfigUpdate, FilterConfigUpdate, JailFileConfigUpdate
|
|
from app.models.file_config import ConfFileCreateRequest, ConfFileUpdateRequest
|
|
from app.services.file_config_service import (
|
|
ConfigDirError,
|
|
ConfigFileExistsError,
|
|
ConfigFileNameError,
|
|
ConfigFileNotFoundError,
|
|
ConfigFileWriteError,
|
|
_parse_enabled,
|
|
_set_enabled_in_content,
|
|
_validate_new_name,
|
|
create_action_file,
|
|
create_filter_file,
|
|
create_jail_config_file,
|
|
get_action_file,
|
|
get_filter_file,
|
|
get_jail_config_file,
|
|
get_parsed_action_file,
|
|
get_parsed_filter_file,
|
|
get_parsed_jail_file,
|
|
list_action_files,
|
|
list_filter_files,
|
|
list_jail_config_files,
|
|
set_jail_config_enabled,
|
|
update_parsed_action_file,
|
|
update_parsed_filter_file,
|
|
update_parsed_jail_file,
|
|
write_action_file,
|
|
write_filter_file,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_config_dir(tmp_path: Path) -> Path:
|
|
"""Create a minimal fail2ban config directory structure."""
|
|
config_dir = tmp_path / "fail2ban"
|
|
(config_dir / "jail.d").mkdir(parents=True)
|
|
(config_dir / "filter.d").mkdir(parents=True)
|
|
(config_dir / "action.d").mkdir(parents=True)
|
|
return config_dir
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _parse_enabled
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_parse_enabled_explicit_true(tmp_path: Path) -> None:
|
|
f = tmp_path / "sshd.conf"
|
|
f.write_text("[sshd]\nenabled = true\n")
|
|
assert _parse_enabled(f) is True
|
|
|
|
|
|
def test_parse_enabled_explicit_false(tmp_path: Path) -> None:
|
|
f = tmp_path / "sshd.conf"
|
|
f.write_text("[sshd]\nenabled = false\n")
|
|
assert _parse_enabled(f) is False
|
|
|
|
|
|
def test_parse_enabled_default_true_when_absent(tmp_path: Path) -> None:
|
|
f = tmp_path / "sshd.conf"
|
|
f.write_text("[sshd]\nbantime = 600\n")
|
|
assert _parse_enabled(f) is True
|
|
|
|
|
|
def test_parse_enabled_in_default_section(tmp_path: Path) -> None:
|
|
f = tmp_path / "custom.conf"
|
|
f.write_text("[DEFAULT]\nenabled = false\n")
|
|
assert _parse_enabled(f) is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _set_enabled_in_content
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_set_enabled_replaces_existing_line() -> None:
|
|
src = "[sshd]\nenabled = false\nbantime = 600\n"
|
|
result = _set_enabled_in_content(src, True)
|
|
assert "enabled = true" in result
|
|
assert "enabled = false" not in result
|
|
|
|
|
|
def test_set_enabled_inserts_after_section() -> None:
|
|
src = "[sshd]\nbantime = 600\n"
|
|
result = _set_enabled_in_content(src, False)
|
|
assert "enabled = false" in result
|
|
|
|
|
|
def test_set_enabled_prepends_default_when_no_section() -> None:
|
|
result = _set_enabled_in_content("bantime = 600\n", True)
|
|
assert "enabled = true" in result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _validate_new_name
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.parametrize("name", ["sshd", "my-filter", "test.local", "A1_filter"])
|
|
def test_validate_new_name_valid(name: str) -> None:
|
|
_validate_new_name(name) # should not raise
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"name",
|
|
[
|
|
"",
|
|
".",
|
|
".hidden",
|
|
"../escape",
|
|
"bad/slash",
|
|
"a" * 129, # too long
|
|
"hello world", # space
|
|
],
|
|
)
|
|
def test_validate_new_name_invalid(name: str) -> None:
|
|
with pytest.raises(ConfigFileNameError):
|
|
_validate_new_name(name)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# list_jail_config_files
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_jail_config_files_empty(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
resp = await list_jail_config_files(str(config_dir))
|
|
assert resp.files == []
|
|
assert resp.total == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_jail_config_files_returns_conf_files(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "jail.d" / "sshd.conf").write_text("[sshd]\nenabled = true\n")
|
|
(config_dir / "jail.d" / "nginx.conf").write_text("[nginx]\n")
|
|
(config_dir / "jail.d" / "other.txt").write_text("ignored")
|
|
|
|
resp = await list_jail_config_files(str(config_dir))
|
|
names = {f.filename for f in resp.files}
|
|
assert names == {"sshd.conf", "nginx.conf"}
|
|
assert resp.total == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_jail_config_files_enabled_state(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "jail.d" / "a.conf").write_text("[a]\nenabled = false\n")
|
|
(config_dir / "jail.d" / "b.conf").write_text("[b]\n")
|
|
|
|
resp = await list_jail_config_files(str(config_dir))
|
|
by_name = {f.filename: f for f in resp.files}
|
|
assert by_name["a.conf"].enabled is False
|
|
assert by_name["b.conf"].enabled is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_jail_config_files_missing_config_dir(tmp_path: Path) -> None:
|
|
with pytest.raises(ConfigDirError):
|
|
await list_jail_config_files(str(tmp_path / "nonexistent"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_jail_config_file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_jail_config_file_returns_content(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "jail.d" / "sshd.conf").write_text("[sshd]\nenabled = true\n")
|
|
|
|
result = await get_jail_config_file(str(config_dir), "sshd.conf")
|
|
assert result.filename == "sshd.conf"
|
|
assert result.name == "sshd"
|
|
assert result.enabled is True
|
|
assert "[sshd]" in result.content
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_jail_config_file_not_found(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
with pytest.raises(ConfigFileNotFoundError):
|
|
await get_jail_config_file(str(config_dir), "missing.conf")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_jail_config_file_invalid_extension(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "jail.d" / "bad.txt").write_text("content")
|
|
with pytest.raises(ConfigFileNameError):
|
|
await get_jail_config_file(str(config_dir), "bad.txt")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_jail_config_file_path_traversal(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
with pytest.raises((ConfigFileNameError, ConfigFileNotFoundError)):
|
|
await get_jail_config_file(str(config_dir), "../jail.conf")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# set_jail_config_enabled
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_jail_config_enabled_writes_false(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
path = config_dir / "jail.d" / "sshd.conf"
|
|
path.write_text("[sshd]\nenabled = true\n")
|
|
|
|
await set_jail_config_enabled(str(config_dir), "sshd.conf", False)
|
|
assert "enabled = false" in path.read_text()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_jail_config_enabled_inserts_when_missing(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
path = config_dir / "jail.d" / "sshd.conf"
|
|
path.write_text("[sshd]\nbantime = 600\n")
|
|
|
|
await set_jail_config_enabled(str(config_dir), "sshd.conf", False)
|
|
assert "enabled = false" in path.read_text()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_jail_config_enabled_file_not_found(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
with pytest.raises(ConfigFileNotFoundError):
|
|
await set_jail_config_enabled(str(config_dir), "missing.conf", True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# list_filter_files / list_action_files
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_filter_files_empty(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
resp = await list_filter_files(str(config_dir))
|
|
assert resp.files == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_filter_files_returns_files(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\n")
|
|
(config_dir / "filter.d" / "sshd.local").write_text("[Definition]\n")
|
|
(config_dir / "filter.d" / "ignore.py").write_text("# ignored")
|
|
|
|
resp = await list_filter_files(str(config_dir))
|
|
names = {f.filename for f in resp.files}
|
|
assert names == {"nginx.conf", "sshd.local"}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_action_files_returns_files(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "action.d" / "iptables.conf").write_text("[Definition]\n")
|
|
|
|
resp = await list_action_files(str(config_dir))
|
|
assert resp.files[0].filename == "iptables.conf"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_filter_file / get_action_file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_filter_file_by_stem(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\nfailregex = test\n")
|
|
|
|
result = await get_filter_file(str(config_dir), "nginx")
|
|
assert result.name == "nginx"
|
|
assert "failregex" in result.content
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_filter_file_by_full_name(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\n")
|
|
|
|
result = await get_filter_file(str(config_dir), "nginx.conf")
|
|
assert result.filename == "nginx.conf"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_filter_file_not_found(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
with pytest.raises(ConfigFileNotFoundError):
|
|
await get_filter_file(str(config_dir), "nonexistent")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_action_file_returns_content(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "action.d" / "iptables.conf").write_text("[Definition]\nactionban = <ip>\n")
|
|
|
|
result = await get_action_file(str(config_dir), "iptables")
|
|
assert "actionban" in result.content
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# write_filter_file / write_action_file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_filter_file_updates_content(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\n")
|
|
|
|
req = ConfFileUpdateRequest(content="[Definition]\nfailregex = new\n")
|
|
await write_filter_file(str(config_dir), "nginx", req)
|
|
|
|
assert "failregex = new" in (config_dir / "filter.d" / "nginx.conf").read_text()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_filter_file_not_found(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
req = ConfFileUpdateRequest(content="[Definition]\n")
|
|
with pytest.raises(ConfigFileNotFoundError):
|
|
await write_filter_file(str(config_dir), "missing", req)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_filter_file_too_large(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\n")
|
|
|
|
big_content = "x" * (512 * 1024 + 1)
|
|
req = ConfFileUpdateRequest(content=big_content)
|
|
with pytest.raises(ConfigFileWriteError):
|
|
await write_filter_file(str(config_dir), "nginx", req)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_action_file_updates_content(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "action.d" / "iptables.conf").write_text("[Definition]\n")
|
|
|
|
req = ConfFileUpdateRequest(content="[Definition]\nactionban = new\n")
|
|
await write_action_file(str(config_dir), "iptables", req)
|
|
|
|
assert "actionban = new" in (config_dir / "action.d" / "iptables.conf").read_text()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# create_filter_file / create_action_file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_filter_file_creates_file(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
req = ConfFileCreateRequest(name="myfilter", content="[Definition]\n")
|
|
|
|
result = await create_filter_file(str(config_dir), req)
|
|
|
|
assert result == "myfilter.conf"
|
|
assert (config_dir / "filter.d" / "myfilter.conf").is_file()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_filter_file_conflict(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "filter.d" / "ngx.conf").write_text("[Definition]\n")
|
|
|
|
req = ConfFileCreateRequest(name="ngx", content="[Definition]\n")
|
|
with pytest.raises(ConfigFileExistsError):
|
|
await create_filter_file(str(config_dir), req)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_filter_file_invalid_name(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
req = ConfFileCreateRequest(name="../escape", content="[Definition]\n")
|
|
with pytest.raises(ConfigFileNameError):
|
|
await create_filter_file(str(config_dir), req)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_action_file_creates_file(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
req = ConfFileCreateRequest(name="my-action", content="[Definition]\n")
|
|
|
|
result = await create_action_file(str(config_dir), req)
|
|
|
|
assert result == "my-action.conf"
|
|
assert (config_dir / "action.d" / "my-action.conf").is_file()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# create_jail_config_file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_jail_config_file_creates_file(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
req = ConfFileCreateRequest(name="myjail", content="[myjail]\nenabled = true\n")
|
|
|
|
result = await create_jail_config_file(str(config_dir), req)
|
|
|
|
assert result == "myjail.conf"
|
|
assert (config_dir / "jail.d" / "myjail.conf").is_file()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_jail_config_file_conflict(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "jail.d" / "sshd.conf").write_text("[sshd]\nenabled = true\n")
|
|
|
|
req = ConfFileCreateRequest(name="sshd", content="[sshd]\nenabled = true\n")
|
|
with pytest.raises(ConfigFileExistsError):
|
|
await create_jail_config_file(str(config_dir), req)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_jail_config_file_invalid_name(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
req = ConfFileCreateRequest(name="../escape", content="[x]\nenabled = true\n")
|
|
with pytest.raises(ConfigFileNameError):
|
|
await create_jail_config_file(str(config_dir), req)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_parsed_filter_file / update_parsed_filter_file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_parsed_filter_file_returns_structured_model(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
content = "[Definition]\nfailregex = ^<HOST>\nignoreregex = ^.*ignore.*$\n"
|
|
(config_dir / "filter.d" / "nginx.conf").write_text(content)
|
|
|
|
result = await get_parsed_filter_file(str(config_dir), "nginx")
|
|
|
|
assert result.name == "nginx"
|
|
assert result.filename == "nginx.conf"
|
|
assert result.failregex == ["^<HOST>"]
|
|
assert result.ignoreregex == ["^.*ignore.*$"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_parsed_filter_file_not_found(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
with pytest.raises(ConfigFileNotFoundError):
|
|
await get_parsed_filter_file(str(config_dir), "missing")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_parsed_filter_file_writes_changes(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "filter.d" / "nginx.conf").write_text(
|
|
"[Definition]\nfailregex = ^<HOST> old\n"
|
|
)
|
|
update = FilterConfigUpdate(failregex=["^<HOST> new"])
|
|
|
|
await update_parsed_filter_file(str(config_dir), "nginx", update)
|
|
|
|
written = (config_dir / "filter.d" / "nginx.conf").read_text()
|
|
assert "new" in written
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_parsed_filter_file_not_found(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
update = FilterConfigUpdate(failregex=["^<HOST>"])
|
|
with pytest.raises(ConfigFileNotFoundError):
|
|
await update_parsed_filter_file(str(config_dir), "missing", update)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_parsed_action_file / update_parsed_action_file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_parsed_action_file_returns_structured_model(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
content = "[Definition]\nactionban = iptables -I INPUT -s <ip> -j DROP\n"
|
|
(config_dir / "action.d" / "iptables.conf").write_text(content)
|
|
|
|
result = await get_parsed_action_file(str(config_dir), "iptables")
|
|
|
|
assert result.name == "iptables"
|
|
assert result.filename == "iptables.conf"
|
|
assert result.actionban is not None
|
|
assert "<ip>" in result.actionban
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_parsed_action_file_not_found(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
with pytest.raises(ConfigFileNotFoundError):
|
|
await get_parsed_action_file(str(config_dir), "missing")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_parsed_action_file_writes_changes(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "action.d" / "iptables.conf").write_text(
|
|
"[Definition]\nactionban = iptables -I INPUT -s <ip> -j DROP\n"
|
|
)
|
|
update = ActionConfigUpdate(actionban="nft add element inet f2b-table <ip>")
|
|
|
|
await update_parsed_action_file(str(config_dir), "iptables", update)
|
|
|
|
written = (config_dir / "action.d" / "iptables.conf").read_text()
|
|
assert "nft" in written
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_parsed_action_file_not_found(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
update = ActionConfigUpdate(actionban="iptables -I INPUT -s <ip> -j DROP")
|
|
with pytest.raises(ConfigFileNotFoundError):
|
|
await update_parsed_action_file(str(config_dir), "missing", update)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_parsed_jail_file / update_parsed_jail_file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_parsed_jail_file_returns_structured_model(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
content = "[sshd]\nenabled = true\nport = ssh\nmaxretry = 5\n"
|
|
(config_dir / "jail.d" / "sshd.conf").write_text(content)
|
|
|
|
result = await get_parsed_jail_file(str(config_dir), "sshd.conf")
|
|
|
|
assert result.filename == "sshd.conf"
|
|
assert "sshd" in result.jails
|
|
assert result.jails["sshd"].enabled is True
|
|
assert result.jails["sshd"].maxretry == 5
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_parsed_jail_file_not_found(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
with pytest.raises(ConfigFileNotFoundError):
|
|
await get_parsed_jail_file(str(config_dir), "missing.conf")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_parsed_jail_file_writes_changes(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
(config_dir / "jail.d" / "sshd.conf").write_text(
|
|
"[sshd]\nenabled = true\nport = ssh\n"
|
|
)
|
|
from app.models.config import JailSectionConfig
|
|
|
|
update = JailFileConfigUpdate(jails={"sshd": JailSectionConfig(enabled=False)})
|
|
|
|
await update_parsed_jail_file(str(config_dir), "sshd.conf", update)
|
|
|
|
written = (config_dir / "jail.d" / "sshd.conf").read_text()
|
|
assert "false" in written.lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_parsed_jail_file_not_found(tmp_path: Path) -> None:
|
|
config_dir = _make_config_dir(tmp_path)
|
|
update = JailFileConfigUpdate(jails={})
|
|
with pytest.raises(ConfigFileNotFoundError):
|
|
await update_parsed_jail_file(str(config_dir), "missing.conf", update)
|