Files
BanGUI/backend/tests/test_services/test_file_config_service.py
Lukas f5c3635258 test(backend): add tests for conf-file parser, file-config service and router
- test_conffile_parser.py: unit tests for section/key parsing, comment
  preservation, and round-trip write correctness
- test_file_config_service.py: service-level tests with mock filesystem
- test_file_config.py: router integration tests covering GET / PUT
  endpoints for jails, actions, and filters
2026-03-13 13:47:35 +01:00

590 lines
21 KiB
Python

"""Tests for file_config_service functions."""
from __future__ import annotations
from pathlib import Path
import pytest
from app.models.config import ActionConfigUpdate, FilterConfigUpdate, JailFileConfigUpdate
from app.models.file_config import ConfFileCreateRequest, ConfFileUpdateRequest
from app.services.file_config_service import (
ConfigDirError,
ConfigFileExistsError,
ConfigFileNameError,
ConfigFileNotFoundError,
ConfigFileWriteError,
_parse_enabled,
_set_enabled_in_content,
_validate_new_name,
create_action_file,
create_filter_file,
create_jail_config_file,
get_action_file,
get_filter_file,
get_jail_config_file,
get_parsed_action_file,
get_parsed_filter_file,
get_parsed_jail_file,
list_action_files,
list_filter_files,
list_jail_config_files,
set_jail_config_enabled,
update_parsed_action_file,
update_parsed_filter_file,
update_parsed_jail_file,
write_action_file,
write_filter_file,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_config_dir(tmp_path: Path) -> Path:
"""Create a minimal fail2ban config directory structure."""
config_dir = tmp_path / "fail2ban"
(config_dir / "jail.d").mkdir(parents=True)
(config_dir / "filter.d").mkdir(parents=True)
(config_dir / "action.d").mkdir(parents=True)
return config_dir
# ---------------------------------------------------------------------------
# _parse_enabled
# ---------------------------------------------------------------------------
def test_parse_enabled_explicit_true(tmp_path: Path) -> None:
f = tmp_path / "sshd.conf"
f.write_text("[sshd]\nenabled = true\n")
assert _parse_enabled(f) is True
def test_parse_enabled_explicit_false(tmp_path: Path) -> None:
f = tmp_path / "sshd.conf"
f.write_text("[sshd]\nenabled = false\n")
assert _parse_enabled(f) is False
def test_parse_enabled_default_true_when_absent(tmp_path: Path) -> None:
f = tmp_path / "sshd.conf"
f.write_text("[sshd]\nbantime = 600\n")
assert _parse_enabled(f) is True
def test_parse_enabled_in_default_section(tmp_path: Path) -> None:
f = tmp_path / "custom.conf"
f.write_text("[DEFAULT]\nenabled = false\n")
assert _parse_enabled(f) is False
# ---------------------------------------------------------------------------
# _set_enabled_in_content
# ---------------------------------------------------------------------------
def test_set_enabled_replaces_existing_line() -> None:
src = "[sshd]\nenabled = false\nbantime = 600\n"
result = _set_enabled_in_content(src, True)
assert "enabled = true" in result
assert "enabled = false" not in result
def test_set_enabled_inserts_after_section() -> None:
src = "[sshd]\nbantime = 600\n"
result = _set_enabled_in_content(src, False)
assert "enabled = false" in result
def test_set_enabled_prepends_default_when_no_section() -> None:
result = _set_enabled_in_content("bantime = 600\n", True)
assert "enabled = true" in result
# ---------------------------------------------------------------------------
# _validate_new_name
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("name", ["sshd", "my-filter", "test.local", "A1_filter"])
def test_validate_new_name_valid(name: str) -> None:
_validate_new_name(name) # should not raise
@pytest.mark.parametrize(
"name",
[
"",
".",
".hidden",
"../escape",
"bad/slash",
"a" * 129, # too long
"hello world", # space
],
)
def test_validate_new_name_invalid(name: str) -> None:
with pytest.raises(ConfigFileNameError):
_validate_new_name(name)
# ---------------------------------------------------------------------------
# list_jail_config_files
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_list_jail_config_files_empty(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
resp = await list_jail_config_files(str(config_dir))
assert resp.files == []
assert resp.total == 0
@pytest.mark.asyncio
async def test_list_jail_config_files_returns_conf_files(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "jail.d" / "sshd.conf").write_text("[sshd]\nenabled = true\n")
(config_dir / "jail.d" / "nginx.conf").write_text("[nginx]\n")
(config_dir / "jail.d" / "other.txt").write_text("ignored")
resp = await list_jail_config_files(str(config_dir))
names = {f.filename for f in resp.files}
assert names == {"sshd.conf", "nginx.conf"}
assert resp.total == 2
@pytest.mark.asyncio
async def test_list_jail_config_files_enabled_state(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "jail.d" / "a.conf").write_text("[a]\nenabled = false\n")
(config_dir / "jail.d" / "b.conf").write_text("[b]\n")
resp = await list_jail_config_files(str(config_dir))
by_name = {f.filename: f for f in resp.files}
assert by_name["a.conf"].enabled is False
assert by_name["b.conf"].enabled is True
@pytest.mark.asyncio
async def test_list_jail_config_files_missing_config_dir(tmp_path: Path) -> None:
with pytest.raises(ConfigDirError):
await list_jail_config_files(str(tmp_path / "nonexistent"))
# ---------------------------------------------------------------------------
# get_jail_config_file
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_jail_config_file_returns_content(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "jail.d" / "sshd.conf").write_text("[sshd]\nenabled = true\n")
result = await get_jail_config_file(str(config_dir), "sshd.conf")
assert result.filename == "sshd.conf"
assert result.name == "sshd"
assert result.enabled is True
assert "[sshd]" in result.content
@pytest.mark.asyncio
async def test_get_jail_config_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
with pytest.raises(ConfigFileNotFoundError):
await get_jail_config_file(str(config_dir), "missing.conf")
@pytest.mark.asyncio
async def test_get_jail_config_file_invalid_extension(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "jail.d" / "bad.txt").write_text("content")
with pytest.raises(ConfigFileNameError):
await get_jail_config_file(str(config_dir), "bad.txt")
@pytest.mark.asyncio
async def test_get_jail_config_file_path_traversal(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
with pytest.raises((ConfigFileNameError, ConfigFileNotFoundError)):
await get_jail_config_file(str(config_dir), "../jail.conf")
# ---------------------------------------------------------------------------
# set_jail_config_enabled
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_set_jail_config_enabled_writes_false(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
path = config_dir / "jail.d" / "sshd.conf"
path.write_text("[sshd]\nenabled = true\n")
await set_jail_config_enabled(str(config_dir), "sshd.conf", False)
assert "enabled = false" in path.read_text()
@pytest.mark.asyncio
async def test_set_jail_config_enabled_inserts_when_missing(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
path = config_dir / "jail.d" / "sshd.conf"
path.write_text("[sshd]\nbantime = 600\n")
await set_jail_config_enabled(str(config_dir), "sshd.conf", False)
assert "enabled = false" in path.read_text()
@pytest.mark.asyncio
async def test_set_jail_config_enabled_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
with pytest.raises(ConfigFileNotFoundError):
await set_jail_config_enabled(str(config_dir), "missing.conf", True)
# ---------------------------------------------------------------------------
# list_filter_files / list_action_files
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_list_filter_files_empty(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
resp = await list_filter_files(str(config_dir))
assert resp.files == []
@pytest.mark.asyncio
async def test_list_filter_files_returns_files(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\n")
(config_dir / "filter.d" / "sshd.local").write_text("[Definition]\n")
(config_dir / "filter.d" / "ignore.py").write_text("# ignored")
resp = await list_filter_files(str(config_dir))
names = {f.filename for f in resp.files}
assert names == {"nginx.conf", "sshd.local"}
@pytest.mark.asyncio
async def test_list_action_files_returns_files(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "action.d" / "iptables.conf").write_text("[Definition]\n")
resp = await list_action_files(str(config_dir))
assert resp.files[0].filename == "iptables.conf"
# ---------------------------------------------------------------------------
# get_filter_file / get_action_file
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_filter_file_by_stem(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\nfailregex = test\n")
result = await get_filter_file(str(config_dir), "nginx")
assert result.name == "nginx"
assert "failregex" in result.content
@pytest.mark.asyncio
async def test_get_filter_file_by_full_name(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\n")
result = await get_filter_file(str(config_dir), "nginx.conf")
assert result.filename == "nginx.conf"
@pytest.mark.asyncio
async def test_get_filter_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
with pytest.raises(ConfigFileNotFoundError):
await get_filter_file(str(config_dir), "nonexistent")
@pytest.mark.asyncio
async def test_get_action_file_returns_content(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "action.d" / "iptables.conf").write_text("[Definition]\nactionban = <ip>\n")
result = await get_action_file(str(config_dir), "iptables")
assert "actionban" in result.content
# ---------------------------------------------------------------------------
# write_filter_file / write_action_file
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_write_filter_file_updates_content(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\n")
req = ConfFileUpdateRequest(content="[Definition]\nfailregex = new\n")
await write_filter_file(str(config_dir), "nginx", req)
assert "failregex = new" in (config_dir / "filter.d" / "nginx.conf").read_text()
@pytest.mark.asyncio
async def test_write_filter_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
req = ConfFileUpdateRequest(content="[Definition]\n")
with pytest.raises(ConfigFileNotFoundError):
await write_filter_file(str(config_dir), "missing", req)
@pytest.mark.asyncio
async def test_write_filter_file_too_large(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\n")
big_content = "x" * (512 * 1024 + 1)
req = ConfFileUpdateRequest(content=big_content)
with pytest.raises(ConfigFileWriteError):
await write_filter_file(str(config_dir), "nginx", req)
@pytest.mark.asyncio
async def test_write_action_file_updates_content(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "action.d" / "iptables.conf").write_text("[Definition]\n")
req = ConfFileUpdateRequest(content="[Definition]\nactionban = new\n")
await write_action_file(str(config_dir), "iptables", req)
assert "actionban = new" in (config_dir / "action.d" / "iptables.conf").read_text()
# ---------------------------------------------------------------------------
# create_filter_file / create_action_file
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_create_filter_file_creates_file(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
req = ConfFileCreateRequest(name="myfilter", content="[Definition]\n")
result = await create_filter_file(str(config_dir), req)
assert result == "myfilter.conf"
assert (config_dir / "filter.d" / "myfilter.conf").is_file()
@pytest.mark.asyncio
async def test_create_filter_file_conflict(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "filter.d" / "ngx.conf").write_text("[Definition]\n")
req = ConfFileCreateRequest(name="ngx", content="[Definition]\n")
with pytest.raises(ConfigFileExistsError):
await create_filter_file(str(config_dir), req)
@pytest.mark.asyncio
async def test_create_filter_file_invalid_name(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
req = ConfFileCreateRequest(name="../escape", content="[Definition]\n")
with pytest.raises(ConfigFileNameError):
await create_filter_file(str(config_dir), req)
@pytest.mark.asyncio
async def test_create_action_file_creates_file(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
req = ConfFileCreateRequest(name="my-action", content="[Definition]\n")
result = await create_action_file(str(config_dir), req)
assert result == "my-action.conf"
assert (config_dir / "action.d" / "my-action.conf").is_file()
# ---------------------------------------------------------------------------
# create_jail_config_file
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_create_jail_config_file_creates_file(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
req = ConfFileCreateRequest(name="myjail", content="[myjail]\nenabled = true\n")
result = await create_jail_config_file(str(config_dir), req)
assert result == "myjail.conf"
assert (config_dir / "jail.d" / "myjail.conf").is_file()
@pytest.mark.asyncio
async def test_create_jail_config_file_conflict(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "jail.d" / "sshd.conf").write_text("[sshd]\nenabled = true\n")
req = ConfFileCreateRequest(name="sshd", content="[sshd]\nenabled = true\n")
with pytest.raises(ConfigFileExistsError):
await create_jail_config_file(str(config_dir), req)
@pytest.mark.asyncio
async def test_create_jail_config_file_invalid_name(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
req = ConfFileCreateRequest(name="../escape", content="[x]\nenabled = true\n")
with pytest.raises(ConfigFileNameError):
await create_jail_config_file(str(config_dir), req)
# ---------------------------------------------------------------------------
# get_parsed_filter_file / update_parsed_filter_file
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_parsed_filter_file_returns_structured_model(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
content = "[Definition]\nfailregex = ^<HOST>\nignoreregex = ^.*ignore.*$\n"
(config_dir / "filter.d" / "nginx.conf").write_text(content)
result = await get_parsed_filter_file(str(config_dir), "nginx")
assert result.name == "nginx"
assert result.filename == "nginx.conf"
assert result.failregex == ["^<HOST>"]
assert result.ignoreregex == ["^.*ignore.*$"]
@pytest.mark.asyncio
async def test_get_parsed_filter_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
with pytest.raises(ConfigFileNotFoundError):
await get_parsed_filter_file(str(config_dir), "missing")
@pytest.mark.asyncio
async def test_update_parsed_filter_file_writes_changes(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "filter.d" / "nginx.conf").write_text(
"[Definition]\nfailregex = ^<HOST> old\n"
)
update = FilterConfigUpdate(failregex=["^<HOST> new"])
await update_parsed_filter_file(str(config_dir), "nginx", update)
written = (config_dir / "filter.d" / "nginx.conf").read_text()
assert "new" in written
@pytest.mark.asyncio
async def test_update_parsed_filter_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
update = FilterConfigUpdate(failregex=["^<HOST>"])
with pytest.raises(ConfigFileNotFoundError):
await update_parsed_filter_file(str(config_dir), "missing", update)
# ---------------------------------------------------------------------------
# get_parsed_action_file / update_parsed_action_file
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_parsed_action_file_returns_structured_model(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
content = "[Definition]\nactionban = iptables -I INPUT -s <ip> -j DROP\n"
(config_dir / "action.d" / "iptables.conf").write_text(content)
result = await get_parsed_action_file(str(config_dir), "iptables")
assert result.name == "iptables"
assert result.filename == "iptables.conf"
assert result.actionban is not None
assert "<ip>" in result.actionban
@pytest.mark.asyncio
async def test_get_parsed_action_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
with pytest.raises(ConfigFileNotFoundError):
await get_parsed_action_file(str(config_dir), "missing")
@pytest.mark.asyncio
async def test_update_parsed_action_file_writes_changes(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "action.d" / "iptables.conf").write_text(
"[Definition]\nactionban = iptables -I INPUT -s <ip> -j DROP\n"
)
update = ActionConfigUpdate(actionban="nft add element inet f2b-table <ip>")
await update_parsed_action_file(str(config_dir), "iptables", update)
written = (config_dir / "action.d" / "iptables.conf").read_text()
assert "nft" in written
@pytest.mark.asyncio
async def test_update_parsed_action_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
update = ActionConfigUpdate(actionban="iptables -I INPUT -s <ip> -j DROP")
with pytest.raises(ConfigFileNotFoundError):
await update_parsed_action_file(str(config_dir), "missing", update)
# ---------------------------------------------------------------------------
# get_parsed_jail_file / update_parsed_jail_file
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_parsed_jail_file_returns_structured_model(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
content = "[sshd]\nenabled = true\nport = ssh\nmaxretry = 5\n"
(config_dir / "jail.d" / "sshd.conf").write_text(content)
result = await get_parsed_jail_file(str(config_dir), "sshd.conf")
assert result.filename == "sshd.conf"
assert "sshd" in result.jails
assert result.jails["sshd"].enabled is True
assert result.jails["sshd"].maxretry == 5
@pytest.mark.asyncio
async def test_get_parsed_jail_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
with pytest.raises(ConfigFileNotFoundError):
await get_parsed_jail_file(str(config_dir), "missing.conf")
@pytest.mark.asyncio
async def test_update_parsed_jail_file_writes_changes(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "jail.d" / "sshd.conf").write_text(
"[sshd]\nenabled = true\nport = ssh\n"
)
from app.models.config import JailSectionConfig
update = JailFileConfigUpdate(jails={"sshd": JailSectionConfig(enabled=False)})
await update_parsed_jail_file(str(config_dir), "sshd.conf", update)
written = (config_dir / "jail.d" / "sshd.conf").read_text()
assert "false" in written.lower()
@pytest.mark.asyncio
async def test_update_parsed_jail_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
update = JailFileConfigUpdate(jails={})
with pytest.raises(ConfigFileNotFoundError):
await update_parsed_jail_file(str(config_dir), "missing.conf", update)