Files
BanGUI/backend/tests/test_services/test_config_file_service.py
Lukas 4c138424a5 Add filter discovery endpoints with active/inactive status (Task 2.1)
- Add list_filters() and get_filter() to config_file_service.py:
  scans filter.d/, parses [Definition] + [Init] sections, merges .local
  overrides, and cross-references running jails to set active/used_by_jails
- Add FilterConfig.active, used_by_jails, source_file, has_local_override
  fields to the Pydantic model; add FilterListResponse and FilterNotFoundError
- Add GET /api/config/filters and GET /api/config/filters/{name} to config.py
- Remove the shadowed GET /api/config/filters list route from file_config.py;
  rename GET /api/config/filters/{name} raw variant to /filters/{name}/raw
- Update frontend: fetchFilterFiles() adapts FilterListResponse -> ConfFilesResponse;
  add fetchFilters() and fetchFilter() to api/config.ts; remove unused
  fetchFilterFiles/fetchActionFiles calls from useConfigActiveStatus
- Fix ConfigPageLogPath test mock to include fetchInactiveJails and related
  exports introduced by Stage 1
- Backend: 169 tests pass, mypy --strict clean, ruff clean
- Frontend: 63 tests pass, tsc --noEmit clean, eslint clean
2026-03-13 16:48:27 +01:00

855 lines
31 KiB
Python

"""Tests for config_file_service — fail2ban jail config parser and activator."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, patch
import pytest
from app.services.config_file_service import (
JailAlreadyActiveError,
JailAlreadyInactiveError,
JailNameError,
JailNotFoundInConfigError,
_build_inactive_jail,
_ordered_config_files,
_parse_jails_sync,
_resolve_filter,
_safe_jail_name,
_write_local_override_sync,
activate_jail,
deactivate_jail,
list_inactive_jails,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _write(path: Path, content: str) -> None:
"""Write text to *path*, creating parent directories if needed."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
# ---------------------------------------------------------------------------
# _safe_jail_name
# ---------------------------------------------------------------------------
class TestSafeJailName:
def test_valid_simple(self) -> None:
assert _safe_jail_name("sshd") == "sshd"
def test_valid_with_hyphen(self) -> None:
assert _safe_jail_name("apache-auth") == "apache-auth"
def test_valid_with_dot(self) -> None:
assert _safe_jail_name("nginx.http") == "nginx.http"
def test_valid_with_underscore(self) -> None:
assert _safe_jail_name("my_jail") == "my_jail"
def test_invalid_path_traversal(self) -> None:
with pytest.raises(JailNameError):
_safe_jail_name("../evil")
def test_invalid_slash(self) -> None:
with pytest.raises(JailNameError):
_safe_jail_name("a/b")
def test_invalid_starts_with_dash(self) -> None:
with pytest.raises(JailNameError):
_safe_jail_name("-bad")
def test_invalid_empty(self) -> None:
with pytest.raises(JailNameError):
_safe_jail_name("")
# ---------------------------------------------------------------------------
# _resolve_filter
# ---------------------------------------------------------------------------
class TestResolveFilter:
def test_name_substitution(self) -> None:
result = _resolve_filter("%(__name__)s", "sshd", "normal")
assert result == "sshd"
def test_mode_substitution(self) -> None:
result = _resolve_filter("%(__name__)s[mode=%(mode)s]", "sshd", "aggressive")
assert result == "sshd[mode=aggressive]"
def test_no_substitution_needed(self) -> None:
result = _resolve_filter("my-filter", "sshd", "normal")
assert result == "my-filter"
def test_empty_raw(self) -> None:
result = _resolve_filter("", "sshd", "normal")
assert result == ""
# ---------------------------------------------------------------------------
# _ordered_config_files
# ---------------------------------------------------------------------------
class TestOrderedConfigFiles:
def test_empty_dir(self, tmp_path: Path) -> None:
result = _ordered_config_files(tmp_path)
assert result == []
def test_jail_conf_only(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", "[sshd]\nenabled=true\n")
result = _ordered_config_files(tmp_path)
assert result == [tmp_path / "jail.conf"]
def test_full_merge_order(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", "[DEFAULT]\n")
_write(tmp_path / "jail.local", "[DEFAULT]\n")
_write(tmp_path / "jail.d" / "custom.conf", "[sshd]\n")
_write(tmp_path / "jail.d" / "custom.local", "[sshd]\n")
result = _ordered_config_files(tmp_path)
assert result[0] == tmp_path / "jail.conf"
assert result[1] == tmp_path / "jail.local"
assert result[2] == tmp_path / "jail.d" / "custom.conf"
assert result[3] == tmp_path / "jail.d" / "custom.local"
def test_jail_d_sorted_alphabetically(self, tmp_path: Path) -> None:
(tmp_path / "jail.d").mkdir()
for name in ("zzz.conf", "aaa.conf", "mmm.conf"):
_write(tmp_path / "jail.d" / name, "")
result = _ordered_config_files(tmp_path)
names = [p.name for p in result]
assert names == ["aaa.conf", "mmm.conf", "zzz.conf"]
# ---------------------------------------------------------------------------
# _parse_jails_sync
# ---------------------------------------------------------------------------
JAIL_CONF = """\
[DEFAULT]
bantime = 10m
findtime = 5m
maxretry = 5
[sshd]
enabled = true
filter = sshd
port = ssh
logpath = /var/log/auth.log
[apache-auth]
enabled = false
filter = apache-auth
port = http,https
logpath = /var/log/apache2/error.log
"""
JAIL_LOCAL = """\
[sshd]
bantime = 1h
"""
JAIL_D_CUSTOM = """\
[nginx-http-auth]
enabled = false
filter = nginx-http-auth
port = http,https
logpath = /var/log/nginx/error.log
"""
class TestParseJailsSync:
def test_parses_all_jails(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
jails, _ = _parse_jails_sync(tmp_path)
assert "sshd" in jails
assert "apache-auth" in jails
def test_enabled_flag_parsing(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
jails, _ = _parse_jails_sync(tmp_path)
assert jails["sshd"]["enabled"] == "true"
assert jails["apache-auth"]["enabled"] == "false"
def test_default_inheritance(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
jails, _ = _parse_jails_sync(tmp_path)
# DEFAULT values should flow into each jail via configparser
assert jails["sshd"]["bantime"] == "10m"
assert jails["apache-auth"]["maxretry"] == "5"
def test_local_override_wins(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
_write(tmp_path / "jail.local", JAIL_LOCAL)
jails, _ = _parse_jails_sync(tmp_path)
# jail.local overrides bantime for sshd from 10m → 1h
assert jails["sshd"]["bantime"] == "1h"
def test_jail_d_conf_included(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
_write(tmp_path / "jail.d" / "custom.conf", JAIL_D_CUSTOM)
jails, _ = _parse_jails_sync(tmp_path)
assert "nginx-http-auth" in jails
def test_source_file_tracked(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
_write(tmp_path / "jail.d" / "custom.conf", JAIL_D_CUSTOM)
_, source_files = _parse_jails_sync(tmp_path)
# sshd comes from jail.conf; nginx-http-auth from jail.d/custom.conf
assert source_files["sshd"] == str(tmp_path / "jail.conf")
assert source_files["nginx-http-auth"] == str(tmp_path / "jail.d" / "custom.conf")
def test_source_file_local_override_tracked(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
_write(tmp_path / "jail.local", JAIL_LOCAL)
_, source_files = _parse_jails_sync(tmp_path)
# jail.local defines [sshd] again → that file wins
assert source_files["sshd"] == str(tmp_path / "jail.local")
def test_default_section_excluded(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
jails, _ = _parse_jails_sync(tmp_path)
assert "DEFAULT" not in jails
def test_includes_section_excluded(self, tmp_path: Path) -> None:
content = "[INCLUDES]\nbefore = paths-debian.conf\n" + JAIL_CONF
_write(tmp_path / "jail.conf", content)
jails, _ = _parse_jails_sync(tmp_path)
assert "INCLUDES" not in jails
def test_corrupt_file_skipped_gracefully(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", "[[bad section\n")
# Should not raise; bad section just yields no jails
jails, _ = _parse_jails_sync(tmp_path)
assert isinstance(jails, dict)
# ---------------------------------------------------------------------------
# _build_inactive_jail
# ---------------------------------------------------------------------------
class TestBuildInactiveJail:
def test_basic_fields(self) -> None:
settings = {
"enabled": "false",
"filter": "sshd",
"port": "ssh",
"logpath": "/var/log/auth.log",
"bantime": "10m",
"findtime": "5m",
"maxretry": "5",
"action": "",
}
jail = _build_inactive_jail("sshd", settings, "/etc/fail2ban/jail.d/sshd.conf")
assert jail.name == "sshd"
assert jail.filter == "sshd"
assert jail.port == "ssh"
assert jail.logpath == ["/var/log/auth.log"]
assert jail.bantime == "10m"
assert jail.findtime == "5m"
assert jail.maxretry == 5
assert jail.enabled is False
assert "sshd.conf" in jail.source_file
def test_filter_name_substitution(self) -> None:
settings = {"enabled": "false", "filter": "%(__name__)s"}
jail = _build_inactive_jail("myservice", settings, "/etc/fail2ban/jail.conf")
assert jail.filter == "myservice"
def test_missing_optional_fields(self) -> None:
jail = _build_inactive_jail("minimal", {}, "/etc/fail2ban/jail.conf")
assert jail.filter == "minimal" # falls back to name
assert jail.port is None
assert jail.logpath == []
assert jail.bantime is None
assert jail.maxretry is None
def test_multiline_logpath(self) -> None:
settings = {"logpath": "/var/log/app.log\n/var/log/app2.log"}
jail = _build_inactive_jail("app", settings, "/etc/fail2ban/jail.conf")
assert "/var/log/app.log" in jail.logpath
assert "/var/log/app2.log" in jail.logpath
def test_multiline_actions(self) -> None:
settings = {"action": "iptables-multiport\niptables-ipset"}
jail = _build_inactive_jail("app", settings, "/etc/fail2ban/jail.conf")
assert len(jail.actions) == 2
def test_enabled_true(self) -> None:
settings = {"enabled": "true"}
jail = _build_inactive_jail("active-jail", settings, "/etc/fail2ban/jail.conf")
assert jail.enabled is True
# ---------------------------------------------------------------------------
# _write_local_override_sync
# ---------------------------------------------------------------------------
class TestWriteLocalOverrideSync:
def test_creates_local_file(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", True, {})
local = tmp_path / "jail.d" / "sshd.local"
assert local.is_file()
def test_enabled_true_written(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", True, {})
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "enabled = true" in content
def test_enabled_false_written(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", False, {})
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "enabled = false" in content
def test_section_header_written(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "apache-auth", True, {})
content = (tmp_path / "jail.d" / "apache-auth.local").read_text()
assert "[apache-auth]" in content
def test_override_bantime(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", True, {"bantime": "1h"})
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "bantime" in content
assert "1h" in content
def test_override_findtime(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", True, {"findtime": "30m"})
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "findtime" in content
assert "30m" in content
def test_override_maxretry(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", True, {"maxretry": 3})
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "maxretry" in content
assert "3" in content
def test_override_port(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", True, {"port": "2222"})
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "2222" in content
def test_override_logpath_list(self, tmp_path: Path) -> None:
_write_local_override_sync(
tmp_path, "sshd", True, {"logpath": ["/var/log/auth.log", "/var/log/secure"]}
)
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "/var/log/auth.log" in content
assert "/var/log/secure" in content
def test_bang_gui_header_comment(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", True, {})
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "BanGUI" in content
def test_overwrites_existing_file(self, tmp_path: Path) -> None:
local = tmp_path / "jail.d" / "sshd.local"
local.parent.mkdir()
local.write_text("old content")
_write_local_override_sync(tmp_path, "sshd", True, {})
assert "old content" not in local.read_text()
# ---------------------------------------------------------------------------
# list_inactive_jails
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestListInactiveJails:
async def test_returns_only_inactive(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
# sshd is enabled=true; apache-auth is enabled=false
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
):
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
names = [j.name for j in result.jails]
assert "sshd" not in names
assert "apache-auth" in names
async def test_total_matches_jails_count(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
):
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
assert result.total == len(result.jails)
async def test_empty_config_dir(self, tmp_path: Path) -> None:
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
assert result.jails == []
assert result.total == 0
async def test_all_active_returns_empty(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd", "apache-auth"}),
):
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
assert result.jails == []
async def test_fail2ban_unreachable_shows_all(self, tmp_path: Path) -> None:
# When fail2ban is unreachable, _get_active_jail_names returns empty set,
# so every config-defined jail appears as inactive.
_write(tmp_path / "jail.conf", JAIL_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
names = {j.name for j in result.jails}
assert "sshd" in names
assert "apache-auth" in names
# ---------------------------------------------------------------------------
# activate_jail
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestActivateJail:
async def test_activates_known_inactive_jail(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
from app.models.config import ActivateJailRequest
req = ActivateJailRequest()
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
),
patch("app.services.config_file_service.jail_service") as mock_js,
):
mock_js.reload_all = AsyncMock()
result = await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
assert result.active is True
assert "apache-auth" in result.name
local = tmp_path / "jail.d" / "apache-auth.local"
assert local.is_file()
assert "enabled = true" in local.read_text()
async def test_raises_not_found_for_unknown_jail(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
from app.models.config import ActivateJailRequest
req = ActivateJailRequest()
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
),pytest.raises(JailNotFoundInConfigError)
):
await activate_jail(str(tmp_path), "/fake.sock", "nonexistent", req)
async def test_raises_already_active(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
from app.models.config import ActivateJailRequest
req = ActivateJailRequest()
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
),pytest.raises(JailAlreadyActiveError)
):
await activate_jail(str(tmp_path), "/fake.sock", "sshd", req)
async def test_raises_name_error_for_bad_name(self, tmp_path: Path) -> None:
from app.models.config import ActivateJailRequest
req = ActivateJailRequest()
with pytest.raises(JailNameError):
await activate_jail(str(tmp_path), "/fake.sock", "../etc/passwd", req)
async def test_writes_overrides_to_local_file(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
from app.models.config import ActivateJailRequest
req = ActivateJailRequest(bantime="2h", maxretry=3)
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
),
patch("app.services.config_file_service.jail_service") as mock_js,
):
mock_js.reload_all = AsyncMock()
await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
content = (tmp_path / "jail.d" / "apache-auth.local").read_text()
assert "2h" in content
assert "3" in content
# ---------------------------------------------------------------------------
# deactivate_jail
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestDeactivateJail:
async def test_deactivates_active_jail(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
),
patch("app.services.config_file_service.jail_service") as mock_js,
):
mock_js.reload_all = AsyncMock()
result = await deactivate_jail(str(tmp_path), "/fake.sock", "sshd")
assert result.active is False
local = tmp_path / "jail.d" / "sshd.local"
assert local.is_file()
assert "enabled = false" in local.read_text()
async def test_raises_not_found(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
),pytest.raises(JailNotFoundInConfigError)
):
await deactivate_jail(str(tmp_path), "/fake.sock", "nonexistent")
async def test_raises_already_inactive(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
),pytest.raises(JailAlreadyInactiveError)
):
await deactivate_jail(str(tmp_path), "/fake.sock", "apache-auth")
async def test_raises_name_error(self, tmp_path: Path) -> None:
with pytest.raises(JailNameError):
await deactivate_jail(str(tmp_path), "/fake.sock", "a/b")
# ---------------------------------------------------------------------------
# _extract_filter_base_name
# ---------------------------------------------------------------------------
class TestExtractFilterBaseName:
def test_simple_name(self) -> None:
from app.services.config_file_service import _extract_filter_base_name
assert _extract_filter_base_name("sshd") == "sshd"
def test_name_with_mode(self) -> None:
from app.services.config_file_service import _extract_filter_base_name
assert _extract_filter_base_name("sshd[mode=aggressive]") == "sshd"
def test_name_with_variable_mode(self) -> None:
from app.services.config_file_service import _extract_filter_base_name
assert _extract_filter_base_name("sshd[mode=%(mode)s]") == "sshd"
def test_whitespace_stripped(self) -> None:
from app.services.config_file_service import _extract_filter_base_name
assert _extract_filter_base_name(" nginx ") == "nginx"
def test_empty_string(self) -> None:
from app.services.config_file_service import _extract_filter_base_name
assert _extract_filter_base_name("") == ""
# ---------------------------------------------------------------------------
# _build_filter_to_jails_map
# ---------------------------------------------------------------------------
class TestBuildFilterToJailsMap:
def test_active_jail_maps_to_filter(self) -> None:
from app.services.config_file_service import _build_filter_to_jails_map
result = _build_filter_to_jails_map({"sshd": {"filter": "sshd"}}, {"sshd"})
assert result == {"sshd": ["sshd"]}
def test_inactive_jail_not_included(self) -> None:
from app.services.config_file_service import _build_filter_to_jails_map
result = _build_filter_to_jails_map(
{"apache-auth": {"filter": "apache-auth"}}, set()
)
assert result == {}
def test_multiple_jails_sharing_filter(self) -> None:
from app.services.config_file_service import _build_filter_to_jails_map
all_jails = {
"sshd": {"filter": "sshd"},
"sshd-ddos": {"filter": "sshd"},
}
result = _build_filter_to_jails_map(all_jails, {"sshd", "sshd-ddos"})
assert sorted(result["sshd"]) == ["sshd", "sshd-ddos"]
def test_mode_suffix_stripped(self) -> None:
from app.services.config_file_service import _build_filter_to_jails_map
result = _build_filter_to_jails_map(
{"sshd": {"filter": "sshd[mode=aggressive]"}}, {"sshd"}
)
assert "sshd" in result
def test_missing_filter_key_falls_back_to_jail_name(self) -> None:
from app.services.config_file_service import _build_filter_to_jails_map
# When jail has no "filter" key the code falls back to the jail name.
result = _build_filter_to_jails_map({"sshd": {}}, {"sshd"})
assert "sshd" in result
# ---------------------------------------------------------------------------
# _parse_filters_sync
# ---------------------------------------------------------------------------
_FILTER_CONF = """\
[Definition]
failregex = ^Host: <HOST>
ignoreregex =
"""
class TestParseFiltersSync:
def test_returns_empty_for_missing_dir(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_filters_sync
result = _parse_filters_sync(tmp_path / "nonexistent")
assert result == []
def test_single_filter_returned(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_filters_sync
filter_d = tmp_path / "filter.d"
_write(filter_d / "nginx.conf", _FILTER_CONF)
result = _parse_filters_sync(filter_d)
assert len(result) == 1
name, filename, content, has_local = result[0]
assert name == "nginx"
assert filename == "nginx.conf"
assert "failregex" in content
assert has_local is False
def test_local_override_detected(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_filters_sync
filter_d = tmp_path / "filter.d"
_write(filter_d / "nginx.conf", _FILTER_CONF)
_write(filter_d / "nginx.local", "[Definition]\nignoreregex = ^safe\n")
result = _parse_filters_sync(filter_d)
_, _, _, has_local = result[0]
assert has_local is True
def test_local_content_appended_to_content(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_filters_sync
filter_d = tmp_path / "filter.d"
_write(filter_d / "nginx.conf", _FILTER_CONF)
_write(filter_d / "nginx.local", "[Definition]\n# local tweak\n")
result = _parse_filters_sync(filter_d)
_, _, content, _ = result[0]
assert "local tweak" in content
def test_sorted_alphabetically(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_filters_sync
filter_d = tmp_path / "filter.d"
for name in ("zzz", "aaa", "mmm"):
_write(filter_d / f"{name}.conf", _FILTER_CONF)
result = _parse_filters_sync(filter_d)
names = [r[0] for r in result]
assert names == ["aaa", "mmm", "zzz"]
# ---------------------------------------------------------------------------
# list_filters
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestListFilters:
async def test_returns_all_filters(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_filters
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
_write(filter_d / "nginx.conf", _FILTER_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_filters(str(tmp_path), "/fake.sock")
assert result.total == 2
names = {f.name for f in result.filters}
assert "sshd" in names
assert "nginx" in names
async def test_active_flag_set_for_used_filter(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_filters
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
_write(tmp_path / "jail.conf", JAIL_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
):
result = await list_filters(str(tmp_path), "/fake.sock")
sshd = next(f for f in result.filters if f.name == "sshd")
assert sshd.active is True
assert "sshd" in sshd.used_by_jails
async def test_inactive_filter_not_marked_active(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_filters
filter_d = tmp_path / "filter.d"
_write(filter_d / "nginx.conf", _FILTER_CONF)
_write(tmp_path / "jail.conf", JAIL_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
):
result = await list_filters(str(tmp_path), "/fake.sock")
nginx = next(f for f in result.filters if f.name == "nginx")
assert nginx.active is False
assert nginx.used_by_jails == []
async def test_has_local_override_detected(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_filters
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
_write(filter_d / "sshd.local", "[Definition]\n")
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_filters(str(tmp_path), "/fake.sock")
sshd = next(f for f in result.filters if f.name == "sshd")
assert sshd.has_local_override is True
async def test_empty_filter_d_returns_empty(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_filters
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_filters(str(tmp_path), "/fake.sock")
assert result.filters == []
assert result.total == 0
# ---------------------------------------------------------------------------
# get_filter
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestGetFilter:
async def test_returns_filter_config(self, tmp_path: Path) -> None:
from app.services.config_file_service import get_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
_write(tmp_path / "jail.conf", JAIL_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
):
result = await get_filter(str(tmp_path), "/fake.sock", "sshd")
assert result.name == "sshd"
assert result.active is True
assert "sshd" in result.used_by_jails
async def test_accepts_conf_extension(self, tmp_path: Path) -> None:
from app.services.config_file_service import get_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await get_filter(str(tmp_path), "/fake.sock", "sshd.conf")
assert result.name == "sshd"
async def test_raises_filter_not_found(self, tmp_path: Path) -> None:
from app.services.config_file_service import FilterNotFoundError, get_filter
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
), pytest.raises(FilterNotFoundError):
await get_filter(str(tmp_path), "/fake.sock", "nonexistent")
async def test_has_local_override_detected(self, tmp_path: Path) -> None:
from app.services.config_file_service import get_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
_write(filter_d / "sshd.local", "[Definition]\n")
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await get_filter(str(tmp_path), "/fake.sock", "sshd")
assert result.has_local_override is True