Files
BanGUI/backend/tests/test_services/test_config_file_service.py
Lukas ee7412442a Complete tasks 1-5: UI cleanup, pie chart fix, log path allowlist, activation hardening
Task 1: Remove ActiveBansSection from JailsPage
- Delete buildBanColumns, fmtTimestamp, ActiveBansSection
- Remove Dialog/Delete/Dismiss imports, ActiveBan type
- Update JSDoc to reflect three sections

Task 2: Remove JailDistributionChart from Dashboard
- Delete import and JSX block from DashboardPage.tsx

Task 3: Fix transparent pie chart (TopCountriesPieChart)
- Add Cell import and per-slice <Cell fill={slice.fill}> children inside <Pie>
- Suppress @typescript-eslint/no-deprecated (recharts v3 types)

Task 4: Allow /config/log as safe log prefix
- Add '/config/log' to _SAFE_LOG_PREFIXES in config_service.py
- Update error message to list both allowed directories

Task 5: Block jail activation on missing filter/logpath
- activate_jail refuses to proceed when filter/logpath issues found
- ActivateJailDialog treats all validation issues as blocking
- Trigger immediate _run_probe after activation in config router
- /api/health now reports fail2ban online/offline from cached probe
- Add TestActivateJailBlocking tests; fix existing tests to mock validation
2026-03-14 18:57:01 +01:00

2950 lines
108 KiB
Python

"""Tests for config_file_service — fail2ban jail config parser and activator."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, patch
import pytest
from app.services.config_file_service import (
JailAlreadyActiveError,
JailAlreadyInactiveError,
JailNameError,
JailNotFoundInConfigError,
_build_inactive_jail,
_ordered_config_files,
_parse_jails_sync,
_resolve_filter,
_safe_jail_name,
_write_local_override_sync,
activate_jail,
deactivate_jail,
list_inactive_jails,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _write(path: Path, content: str) -> None:
"""Write text to *path*, creating parent directories if needed."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
# ---------------------------------------------------------------------------
# _safe_jail_name
# ---------------------------------------------------------------------------
class TestSafeJailName:
def test_valid_simple(self) -> None:
assert _safe_jail_name("sshd") == "sshd"
def test_valid_with_hyphen(self) -> None:
assert _safe_jail_name("apache-auth") == "apache-auth"
def test_valid_with_dot(self) -> None:
assert _safe_jail_name("nginx.http") == "nginx.http"
def test_valid_with_underscore(self) -> None:
assert _safe_jail_name("my_jail") == "my_jail"
def test_invalid_path_traversal(self) -> None:
with pytest.raises(JailNameError):
_safe_jail_name("../evil")
def test_invalid_slash(self) -> None:
with pytest.raises(JailNameError):
_safe_jail_name("a/b")
def test_invalid_starts_with_dash(self) -> None:
with pytest.raises(JailNameError):
_safe_jail_name("-bad")
def test_invalid_empty(self) -> None:
with pytest.raises(JailNameError):
_safe_jail_name("")
# ---------------------------------------------------------------------------
# _resolve_filter
# ---------------------------------------------------------------------------
class TestResolveFilter:
def test_name_substitution(self) -> None:
result = _resolve_filter("%(__name__)s", "sshd", "normal")
assert result == "sshd"
def test_mode_substitution(self) -> None:
result = _resolve_filter("%(__name__)s[mode=%(mode)s]", "sshd", "aggressive")
assert result == "sshd[mode=aggressive]"
def test_no_substitution_needed(self) -> None:
result = _resolve_filter("my-filter", "sshd", "normal")
assert result == "my-filter"
def test_empty_raw(self) -> None:
result = _resolve_filter("", "sshd", "normal")
assert result == ""
# ---------------------------------------------------------------------------
# _ordered_config_files
# ---------------------------------------------------------------------------
class TestOrderedConfigFiles:
def test_empty_dir(self, tmp_path: Path) -> None:
result = _ordered_config_files(tmp_path)
assert result == []
def test_jail_conf_only(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", "[sshd]\nenabled=true\n")
result = _ordered_config_files(tmp_path)
assert result == [tmp_path / "jail.conf"]
def test_full_merge_order(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", "[DEFAULT]\n")
_write(tmp_path / "jail.local", "[DEFAULT]\n")
_write(tmp_path / "jail.d" / "custom.conf", "[sshd]\n")
_write(tmp_path / "jail.d" / "custom.local", "[sshd]\n")
result = _ordered_config_files(tmp_path)
assert result[0] == tmp_path / "jail.conf"
assert result[1] == tmp_path / "jail.local"
assert result[2] == tmp_path / "jail.d" / "custom.conf"
assert result[3] == tmp_path / "jail.d" / "custom.local"
def test_jail_d_sorted_alphabetically(self, tmp_path: Path) -> None:
(tmp_path / "jail.d").mkdir()
for name in ("zzz.conf", "aaa.conf", "mmm.conf"):
_write(tmp_path / "jail.d" / name, "")
result = _ordered_config_files(tmp_path)
names = [p.name for p in result]
assert names == ["aaa.conf", "mmm.conf", "zzz.conf"]
# ---------------------------------------------------------------------------
# _parse_jails_sync
# ---------------------------------------------------------------------------
JAIL_CONF = """\
[DEFAULT]
bantime = 10m
findtime = 5m
maxretry = 5
[sshd]
enabled = true
filter = sshd
port = ssh
logpath = /var/log/auth.log
[apache-auth]
enabled = false
filter = apache-auth
port = http,https
logpath = /var/log/apache2/error.log
"""
JAIL_LOCAL = """\
[sshd]
bantime = 1h
"""
JAIL_D_CUSTOM = """\
[nginx-http-auth]
enabled = false
filter = nginx-http-auth
port = http,https
logpath = /var/log/nginx/error.log
"""
class TestParseJailsSync:
def test_parses_all_jails(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
jails, _ = _parse_jails_sync(tmp_path)
assert "sshd" in jails
assert "apache-auth" in jails
def test_enabled_flag_parsing(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
jails, _ = _parse_jails_sync(tmp_path)
assert jails["sshd"]["enabled"] == "true"
assert jails["apache-auth"]["enabled"] == "false"
def test_default_inheritance(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
jails, _ = _parse_jails_sync(tmp_path)
# DEFAULT values should flow into each jail via configparser
assert jails["sshd"]["bantime"] == "10m"
assert jails["apache-auth"]["maxretry"] == "5"
def test_local_override_wins(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
_write(tmp_path / "jail.local", JAIL_LOCAL)
jails, _ = _parse_jails_sync(tmp_path)
# jail.local overrides bantime for sshd from 10m → 1h
assert jails["sshd"]["bantime"] == "1h"
def test_jail_d_conf_included(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
_write(tmp_path / "jail.d" / "custom.conf", JAIL_D_CUSTOM)
jails, _ = _parse_jails_sync(tmp_path)
assert "nginx-http-auth" in jails
def test_source_file_tracked(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
_write(tmp_path / "jail.d" / "custom.conf", JAIL_D_CUSTOM)
_, source_files = _parse_jails_sync(tmp_path)
# sshd comes from jail.conf; nginx-http-auth from jail.d/custom.conf
assert source_files["sshd"] == str(tmp_path / "jail.conf")
assert source_files["nginx-http-auth"] == str(tmp_path / "jail.d" / "custom.conf")
def test_source_file_local_override_tracked(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
_write(tmp_path / "jail.local", JAIL_LOCAL)
_, source_files = _parse_jails_sync(tmp_path)
# jail.local defines [sshd] again → that file wins
assert source_files["sshd"] == str(tmp_path / "jail.local")
def test_default_section_excluded(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
jails, _ = _parse_jails_sync(tmp_path)
assert "DEFAULT" not in jails
def test_includes_section_excluded(self, tmp_path: Path) -> None:
content = "[INCLUDES]\nbefore = paths-debian.conf\n" + JAIL_CONF
_write(tmp_path / "jail.conf", content)
jails, _ = _parse_jails_sync(tmp_path)
assert "INCLUDES" not in jails
def test_corrupt_file_skipped_gracefully(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", "[[bad section\n")
# Should not raise; bad section just yields no jails
jails, _ = _parse_jails_sync(tmp_path)
assert isinstance(jails, dict)
# ---------------------------------------------------------------------------
# _build_inactive_jail
# ---------------------------------------------------------------------------
class TestBuildInactiveJail:
def test_basic_fields(self) -> None:
settings = {
"enabled": "false",
"filter": "sshd",
"port": "ssh",
"logpath": "/var/log/auth.log",
"bantime": "10m",
"findtime": "5m",
"maxretry": "5",
"action": "",
}
jail = _build_inactive_jail("sshd", settings, "/etc/fail2ban/jail.d/sshd.conf")
assert jail.name == "sshd"
assert jail.filter == "sshd"
assert jail.port == "ssh"
assert jail.logpath == ["/var/log/auth.log"]
assert jail.bantime == "10m"
assert jail.findtime == "5m"
assert jail.maxretry == 5
assert jail.enabled is False
assert "sshd.conf" in jail.source_file
def test_filter_name_substitution(self) -> None:
settings = {"enabled": "false", "filter": "%(__name__)s"}
jail = _build_inactive_jail("myservice", settings, "/etc/fail2ban/jail.conf")
assert jail.filter == "myservice"
def test_missing_optional_fields(self) -> None:
jail = _build_inactive_jail("minimal", {}, "/etc/fail2ban/jail.conf")
assert jail.filter == "minimal" # falls back to name
assert jail.port is None
assert jail.logpath == []
assert jail.bantime is None
assert jail.maxretry is None
def test_multiline_logpath(self) -> None:
settings = {"logpath": "/var/log/app.log\n/var/log/app2.log"}
jail = _build_inactive_jail("app", settings, "/etc/fail2ban/jail.conf")
assert "/var/log/app.log" in jail.logpath
assert "/var/log/app2.log" in jail.logpath
def test_multiline_actions(self) -> None:
settings = {"action": "iptables-multiport\niptables-ipset"}
jail = _build_inactive_jail("app", settings, "/etc/fail2ban/jail.conf")
assert len(jail.actions) == 2
def test_enabled_true(self) -> None:
settings = {"enabled": "true"}
jail = _build_inactive_jail("active-jail", settings, "/etc/fail2ban/jail.conf")
assert jail.enabled is True
# ---------------------------------------------------------------------------
# _write_local_override_sync
# ---------------------------------------------------------------------------
class TestWriteLocalOverrideSync:
def test_creates_local_file(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", True, {})
local = tmp_path / "jail.d" / "sshd.local"
assert local.is_file()
def test_enabled_true_written(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", True, {})
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "enabled = true" in content
def test_enabled_false_written(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", False, {})
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "enabled = false" in content
def test_section_header_written(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "apache-auth", True, {})
content = (tmp_path / "jail.d" / "apache-auth.local").read_text()
assert "[apache-auth]" in content
def test_override_bantime(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", True, {"bantime": "1h"})
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "bantime" in content
assert "1h" in content
def test_override_findtime(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", True, {"findtime": "30m"})
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "findtime" in content
assert "30m" in content
def test_override_maxretry(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", True, {"maxretry": 3})
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "maxretry" in content
assert "3" in content
def test_override_port(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", True, {"port": "2222"})
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "2222" in content
def test_override_logpath_list(self, tmp_path: Path) -> None:
_write_local_override_sync(
tmp_path, "sshd", True, {"logpath": ["/var/log/auth.log", "/var/log/secure"]}
)
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "/var/log/auth.log" in content
assert "/var/log/secure" in content
def test_bang_gui_header_comment(self, tmp_path: Path) -> None:
_write_local_override_sync(tmp_path, "sshd", True, {})
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "BanGUI" in content
def test_overwrites_existing_file(self, tmp_path: Path) -> None:
local = tmp_path / "jail.d" / "sshd.local"
local.parent.mkdir()
local.write_text("old content")
_write_local_override_sync(tmp_path, "sshd", True, {})
assert "old content" not in local.read_text()
# ---------------------------------------------------------------------------
# list_inactive_jails
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestListInactiveJails:
async def test_returns_only_inactive(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
# sshd is enabled=true; apache-auth is enabled=false
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
):
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
names = [j.name for j in result.jails]
assert "sshd" not in names
assert "apache-auth" in names
async def test_total_matches_jails_count(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
):
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
assert result.total == len(result.jails)
async def test_empty_config_dir(self, tmp_path: Path) -> None:
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
assert result.jails == []
assert result.total == 0
async def test_all_active_returns_empty(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd", "apache-auth"}),
):
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
assert result.jails == []
async def test_fail2ban_unreachable_shows_all(self, tmp_path: Path) -> None:
# When fail2ban is unreachable, _get_active_jail_names returns empty set,
# so every config-defined jail appears as inactive.
_write(tmp_path / "jail.conf", JAIL_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
names = {j.name for j in result.jails}
assert "sshd" in names
assert "apache-auth" in names
# ---------------------------------------------------------------------------
# activate_jail
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestActivateJail:
async def test_activates_known_inactive_jail(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
from app.models.config import ActivateJailRequest, JailValidationResult
req = ActivateJailRequest()
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(side_effect=[set(), {"apache-auth"}]),
),
patch("app.services.config_file_service.jail_service") as mock_js,
patch(
"app.services.config_file_service._probe_fail2ban_running",
new=AsyncMock(return_value=True),
),
patch(
"app.services.config_file_service._validate_jail_config_sync",
return_value=JailValidationResult(jail_name="apache-auth", valid=True),
),
):
mock_js.reload_all = AsyncMock()
result = await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
assert result.active is True
assert "apache-auth" in result.name
local = tmp_path / "jail.d" / "apache-auth.local"
assert local.is_file()
assert "enabled = true" in local.read_text()
async def test_raises_not_found_for_unknown_jail(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
from app.models.config import ActivateJailRequest
req = ActivateJailRequest()
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
),pytest.raises(JailNotFoundInConfigError)
):
await activate_jail(str(tmp_path), "/fake.sock", "nonexistent", req)
async def test_raises_already_active(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
from app.models.config import ActivateJailRequest
req = ActivateJailRequest()
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
),pytest.raises(JailAlreadyActiveError)
):
await activate_jail(str(tmp_path), "/fake.sock", "sshd", req)
async def test_raises_name_error_for_bad_name(self, tmp_path: Path) -> None:
from app.models.config import ActivateJailRequest
req = ActivateJailRequest()
with pytest.raises(JailNameError):
await activate_jail(str(tmp_path), "/fake.sock", "../etc/passwd", req)
async def test_writes_overrides_to_local_file(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
from app.models.config import ActivateJailRequest, JailValidationResult
req = ActivateJailRequest(bantime="2h", maxretry=3)
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(side_effect=[set(), set()]),
),
patch("app.services.config_file_service.jail_service") as mock_js,
patch(
"app.services.config_file_service._probe_fail2ban_running",
new=AsyncMock(return_value=True),
),
patch(
"app.services.config_file_service._validate_jail_config_sync",
return_value=JailValidationResult(jail_name="apache-auth", valid=True),
),
):
mock_js.reload_all = AsyncMock()
await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
content = (tmp_path / "jail.d" / "apache-auth.local").read_text()
assert "2h" in content
assert "3" in content
# ---------------------------------------------------------------------------
# deactivate_jail
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestDeactivateJail:
async def test_deactivates_active_jail(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
),
patch("app.services.config_file_service.jail_service") as mock_js,
):
mock_js.reload_all = AsyncMock()
result = await deactivate_jail(str(tmp_path), "/fake.sock", "sshd")
assert result.active is False
local = tmp_path / "jail.d" / "sshd.local"
assert local.is_file()
assert "enabled = false" in local.read_text()
async def test_raises_not_found(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
),pytest.raises(JailNotFoundInConfigError)
):
await deactivate_jail(str(tmp_path), "/fake.sock", "nonexistent")
async def test_raises_already_inactive(self, tmp_path: Path) -> None:
_write(tmp_path / "jail.conf", JAIL_CONF)
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
),pytest.raises(JailAlreadyInactiveError)
):
await deactivate_jail(str(tmp_path), "/fake.sock", "apache-auth")
async def test_raises_name_error(self, tmp_path: Path) -> None:
with pytest.raises(JailNameError):
await deactivate_jail(str(tmp_path), "/fake.sock", "a/b")
# ---------------------------------------------------------------------------
# _extract_filter_base_name
# ---------------------------------------------------------------------------
class TestExtractFilterBaseName:
def test_simple_name(self) -> None:
from app.services.config_file_service import _extract_filter_base_name
assert _extract_filter_base_name("sshd") == "sshd"
def test_name_with_mode(self) -> None:
from app.services.config_file_service import _extract_filter_base_name
assert _extract_filter_base_name("sshd[mode=aggressive]") == "sshd"
def test_name_with_variable_mode(self) -> None:
from app.services.config_file_service import _extract_filter_base_name
assert _extract_filter_base_name("sshd[mode=%(mode)s]") == "sshd"
def test_whitespace_stripped(self) -> None:
from app.services.config_file_service import _extract_filter_base_name
assert _extract_filter_base_name(" nginx ") == "nginx"
def test_empty_string(self) -> None:
from app.services.config_file_service import _extract_filter_base_name
assert _extract_filter_base_name("") == ""
# ---------------------------------------------------------------------------
# _build_filter_to_jails_map
# ---------------------------------------------------------------------------
class TestBuildFilterToJailsMap:
def test_active_jail_maps_to_filter(self) -> None:
from app.services.config_file_service import _build_filter_to_jails_map
result = _build_filter_to_jails_map({"sshd": {"filter": "sshd"}}, {"sshd"})
assert result == {"sshd": ["sshd"]}
def test_inactive_jail_not_included(self) -> None:
from app.services.config_file_service import _build_filter_to_jails_map
result = _build_filter_to_jails_map(
{"apache-auth": {"filter": "apache-auth"}}, set()
)
assert result == {}
def test_multiple_jails_sharing_filter(self) -> None:
from app.services.config_file_service import _build_filter_to_jails_map
all_jails = {
"sshd": {"filter": "sshd"},
"sshd-ddos": {"filter": "sshd"},
}
result = _build_filter_to_jails_map(all_jails, {"sshd", "sshd-ddos"})
assert sorted(result["sshd"]) == ["sshd", "sshd-ddos"]
def test_mode_suffix_stripped(self) -> None:
from app.services.config_file_service import _build_filter_to_jails_map
result = _build_filter_to_jails_map(
{"sshd": {"filter": "sshd[mode=aggressive]"}}, {"sshd"}
)
assert "sshd" in result
def test_missing_filter_key_falls_back_to_jail_name(self) -> None:
from app.services.config_file_service import _build_filter_to_jails_map
# When jail has no "filter" key the code falls back to the jail name.
result = _build_filter_to_jails_map({"sshd": {}}, {"sshd"})
assert "sshd" in result
# ---------------------------------------------------------------------------
# _parse_filters_sync
# ---------------------------------------------------------------------------
_FILTER_CONF = """\
[Definition]
failregex = ^Host: <HOST>
ignoreregex =
"""
class TestParseFiltersSync:
def test_returns_empty_for_missing_dir(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_filters_sync
result = _parse_filters_sync(tmp_path / "nonexistent")
assert result == []
def test_single_filter_returned(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_filters_sync
filter_d = tmp_path / "filter.d"
_write(filter_d / "nginx.conf", _FILTER_CONF)
result = _parse_filters_sync(filter_d)
assert len(result) == 1
name, filename, content, has_local, source_path = result[0]
assert name == "nginx"
assert filename == "nginx.conf"
assert "failregex" in content
assert has_local is False
assert source_path.endswith("nginx.conf")
def test_local_override_detected(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_filters_sync
filter_d = tmp_path / "filter.d"
_write(filter_d / "nginx.conf", _FILTER_CONF)
_write(filter_d / "nginx.local", "[Definition]\nignoreregex = ^safe\n")
result = _parse_filters_sync(filter_d)
_, _, _, has_local, _ = result[0]
assert has_local is True
def test_local_content_appended_to_content(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_filters_sync
filter_d = tmp_path / "filter.d"
_write(filter_d / "nginx.conf", _FILTER_CONF)
_write(filter_d / "nginx.local", "[Definition]\n# local tweak\n")
result = _parse_filters_sync(filter_d)
_, _, content, _, _ = result[0]
assert "local tweak" in content
def test_sorted_alphabetically(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_filters_sync
filter_d = tmp_path / "filter.d"
for name in ("zzz", "aaa", "mmm"):
_write(filter_d / f"{name}.conf", _FILTER_CONF)
result = _parse_filters_sync(filter_d)
names = [r[0] for r in result]
assert names == ["aaa", "mmm", "zzz"]
# ---------------------------------------------------------------------------
# list_filters
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestListFilters:
async def test_returns_all_filters(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_filters
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
_write(filter_d / "nginx.conf", _FILTER_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_filters(str(tmp_path), "/fake.sock")
assert result.total == 2
names = {f.name for f in result.filters}
assert "sshd" in names
assert "nginx" in names
async def test_active_flag_set_for_used_filter(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_filters
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
_write(tmp_path / "jail.conf", JAIL_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
):
result = await list_filters(str(tmp_path), "/fake.sock")
sshd = next(f for f in result.filters if f.name == "sshd")
assert sshd.active is True
assert "sshd" in sshd.used_by_jails
async def test_inactive_filter_not_marked_active(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_filters
filter_d = tmp_path / "filter.d"
_write(filter_d / "nginx.conf", _FILTER_CONF)
_write(tmp_path / "jail.conf", JAIL_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
):
result = await list_filters(str(tmp_path), "/fake.sock")
nginx = next(f for f in result.filters if f.name == "nginx")
assert nginx.active is False
assert nginx.used_by_jails == []
async def test_has_local_override_detected(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_filters
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
_write(filter_d / "sshd.local", "[Definition]\n")
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_filters(str(tmp_path), "/fake.sock")
sshd = next(f for f in result.filters if f.name == "sshd")
assert sshd.has_local_override is True
async def test_empty_filter_d_returns_empty(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_filters
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_filters(str(tmp_path), "/fake.sock")
assert result.filters == []
assert result.total == 0
# ---------------------------------------------------------------------------
# get_filter
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestGetFilter:
async def test_returns_filter_config(self, tmp_path: Path) -> None:
from app.services.config_file_service import get_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
_write(tmp_path / "jail.conf", JAIL_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
):
result = await get_filter(str(tmp_path), "/fake.sock", "sshd")
assert result.name == "sshd"
assert result.active is True
assert "sshd" in result.used_by_jails
async def test_accepts_conf_extension(self, tmp_path: Path) -> None:
from app.services.config_file_service import get_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await get_filter(str(tmp_path), "/fake.sock", "sshd.conf")
assert result.name == "sshd"
async def test_raises_filter_not_found(self, tmp_path: Path) -> None:
from app.services.config_file_service import FilterNotFoundError, get_filter
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
), pytest.raises(FilterNotFoundError):
await get_filter(str(tmp_path), "/fake.sock", "nonexistent")
async def test_has_local_override_detected(self, tmp_path: Path) -> None:
from app.services.config_file_service import get_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
_write(filter_d / "sshd.local", "[Definition]\n")
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await get_filter(str(tmp_path), "/fake.sock", "sshd")
assert result.has_local_override is True
# ---------------------------------------------------------------------------
# _parse_filters_sync — .local-only filters (Task 2.2)
# ---------------------------------------------------------------------------
class TestParseFiltersSyncLocalOnly:
"""Verify that .local-only user-created filters appear in results."""
def test_local_only_included(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_filters_sync
filter_d = tmp_path / "filter.d"
_write(filter_d / "custom.local", "[Definition]\nfailregex = ^fail\n")
result = _parse_filters_sync(filter_d)
assert len(result) == 1
name, filename, content, has_local, source_path = result[0]
assert name == "custom"
assert filename == "custom.local"
assert has_local is False # .local-only: no conf to override
assert source_path.endswith("custom.local")
def test_local_only_not_duplicated_when_conf_exists(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_filters_sync
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
_write(filter_d / "sshd.local", "[Definition]\n")
result = _parse_filters_sync(filter_d)
# sshd should appear exactly once (conf + local, not as separate entry)
names = [r[0] for r in result]
assert names.count("sshd") == 1
_, _, _, has_local, _ = result[0]
assert has_local is True # conf + local → has_local_override
def test_local_only_sorted_with_conf_filters(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_filters_sync
filter_d = tmp_path / "filter.d"
_write(filter_d / "zzz.conf", _FILTER_CONF)
_write(filter_d / "aaa.local", "[Definition]\nfailregex = x\n")
result = _parse_filters_sync(filter_d)
names = [r[0] for r in result]
assert names == ["aaa", "zzz"]
# ---------------------------------------------------------------------------
# get_filter — .local-only filter (Task 2.2)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestGetFilterLocalOnly:
"""Verify that get_filter handles .local-only user-created filters."""
async def test_returns_local_only_filter(self, tmp_path: Path) -> None:
from app.services.config_file_service import get_filter
filter_d = tmp_path / "filter.d"
_write(
filter_d / "custom.local",
"[Definition]\nfailregex = ^fail from <HOST>\n",
)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await get_filter(str(tmp_path), "/fake.sock", "custom")
assert result.name == "custom"
assert result.has_local_override is False
assert result.source_file.endswith("custom.local")
assert len(result.failregex) == 1
async def test_raises_when_neither_conf_nor_local(self, tmp_path: Path) -> None:
from app.services.config_file_service import FilterNotFoundError, get_filter
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
), pytest.raises(FilterNotFoundError):
await get_filter(str(tmp_path), "/fake.sock", "nonexistent")
async def test_accepts_local_extension(self, tmp_path: Path) -> None:
from app.services.config_file_service import get_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "custom.local", "[Definition]\nfailregex = x\n")
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await get_filter(str(tmp_path), "/fake.sock", "custom.local")
assert result.name == "custom"
# ---------------------------------------------------------------------------
# _validate_regex_patterns (Task 2.2)
# ---------------------------------------------------------------------------
class TestValidateRegexPatterns:
def test_valid_patterns_pass(self) -> None:
from app.services.config_file_service import _validate_regex_patterns
_validate_regex_patterns([r"^fail from \S+", r"\d+\.\d+"])
def test_empty_list_passes(self) -> None:
from app.services.config_file_service import _validate_regex_patterns
_validate_regex_patterns([])
def test_invalid_pattern_raises(self) -> None:
from app.services.config_file_service import (
FilterInvalidRegexError,
_validate_regex_patterns,
)
with pytest.raises(FilterInvalidRegexError) as exc_info:
_validate_regex_patterns([r"[unclosed"])
assert "[unclosed" in exc_info.value.pattern
def test_mixed_valid_invalid_raises_on_first_invalid(self) -> None:
from app.services.config_file_service import (
FilterInvalidRegexError,
_validate_regex_patterns,
)
with pytest.raises(FilterInvalidRegexError) as exc_info:
_validate_regex_patterns([r"\d+", r"[bad", r"\w+"])
assert "[bad" in exc_info.value.pattern
# ---------------------------------------------------------------------------
# _write_filter_local_sync (Task 2.2)
# ---------------------------------------------------------------------------
class TestWriteFilterLocalSync:
def test_writes_file(self, tmp_path: Path) -> None:
from app.services.config_file_service import _write_filter_local_sync
filter_d = tmp_path / "filter.d"
filter_d.mkdir()
_write_filter_local_sync(filter_d, "myfilter", "[Definition]\n")
local = filter_d / "myfilter.local"
assert local.is_file()
assert "[Definition]" in local.read_text()
def test_creates_filter_d_if_missing(self, tmp_path: Path) -> None:
from app.services.config_file_service import _write_filter_local_sync
filter_d = tmp_path / "filter.d"
_write_filter_local_sync(filter_d, "test", "[Definition]\n")
assert (filter_d / "test.local").is_file()
def test_overwrites_existing_file(self, tmp_path: Path) -> None:
from app.services.config_file_service import _write_filter_local_sync
filter_d = tmp_path / "filter.d"
filter_d.mkdir()
(filter_d / "myfilter.local").write_text("old content")
_write_filter_local_sync(filter_d, "myfilter", "[Definition]\nnew=1\n")
assert "new=1" in (filter_d / "myfilter.local").read_text()
assert "old content" not in (filter_d / "myfilter.local").read_text()
# ---------------------------------------------------------------------------
# _set_jail_local_key_sync (Task 2.2)
# ---------------------------------------------------------------------------
class TestSetJailLocalKeySync:
def test_creates_new_local_file(self, tmp_path: Path) -> None:
from app.services.config_file_service import _set_jail_local_key_sync
_set_jail_local_key_sync(tmp_path, "sshd", "filter", "myfilter")
local = tmp_path / "jail.d" / "sshd.local"
assert local.is_file()
content = local.read_text()
assert "filter" in content
assert "myfilter" in content
def test_updates_existing_local_file(self, tmp_path: Path) -> None:
from app.services.config_file_service import _set_jail_local_key_sync
jail_d = tmp_path / "jail.d"
jail_d.mkdir()
(jail_d / "sshd.local").write_text(
"[sshd]\nenabled = true\n"
)
_set_jail_local_key_sync(tmp_path, "sshd", "filter", "newfilter")
content = (jail_d / "sshd.local").read_text()
assert "newfilter" in content
# Existing key is preserved
assert "enabled" in content
def test_overwrites_existing_key(self, tmp_path: Path) -> None:
from app.services.config_file_service import _set_jail_local_key_sync
jail_d = tmp_path / "jail.d"
jail_d.mkdir()
(jail_d / "sshd.local").write_text("[sshd]\nfilter = old\n")
_set_jail_local_key_sync(tmp_path, "sshd", "filter", "newfilter")
content = (jail_d / "sshd.local").read_text()
assert "newfilter" in content
# ---------------------------------------------------------------------------
# update_filter (Task 2.2)
# ---------------------------------------------------------------------------
_FILTER_CONF_WITH_REGEX = """\
[Definition]
failregex = ^fail from <HOST>
^error from <HOST>
ignoreregex =
"""
@pytest.mark.asyncio
class TestUpdateFilter:
async def test_writes_local_override(self, tmp_path: Path) -> None:
from app.models.config import FilterUpdateRequest
from app.services.config_file_service import update_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF_WITH_REGEX)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await update_filter(
str(tmp_path),
"/fake.sock",
"sshd",
FilterUpdateRequest(failregex=[r"^new pattern <HOST>"]),
)
local = filter_d / "sshd.local"
assert local.is_file()
assert result.name == "sshd"
assert any("new pattern" in p for p in result.failregex)
async def test_accepts_conf_extension(self, tmp_path: Path) -> None:
from app.models.config import FilterUpdateRequest
from app.services.config_file_service import update_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF_WITH_REGEX)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await update_filter(
str(tmp_path),
"/fake.sock",
"sshd.conf",
FilterUpdateRequest(datepattern="%Y-%m-%d"),
)
assert result.name == "sshd"
async def test_raises_filter_not_found(self, tmp_path: Path) -> None:
from app.models.config import FilterUpdateRequest
from app.services.config_file_service import FilterNotFoundError, update_filter
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
), pytest.raises(FilterNotFoundError):
await update_filter(
str(tmp_path),
"/fake.sock",
"missing",
FilterUpdateRequest(),
)
async def test_raises_on_invalid_regex(self, tmp_path: Path) -> None:
from app.models.config import FilterUpdateRequest
from app.services.config_file_service import (
FilterInvalidRegexError,
update_filter,
)
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF_WITH_REGEX)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
), pytest.raises(FilterInvalidRegexError):
await update_filter(
str(tmp_path),
"/fake.sock",
"sshd",
FilterUpdateRequest(failregex=[r"[unclosed"]),
)
async def test_raises_filter_name_error_for_invalid_name(self, tmp_path: Path) -> None:
from app.models.config import FilterUpdateRequest
from app.services.config_file_service import FilterNameError, update_filter
with pytest.raises(FilterNameError):
await update_filter(
str(tmp_path),
"/fake.sock",
"../etc/passwd",
FilterUpdateRequest(),
)
async def test_triggers_reload_when_requested(self, tmp_path: Path) -> None:
from app.models.config import FilterUpdateRequest
from app.services.config_file_service import update_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
), patch(
"app.services.config_file_service.jail_service.reload_all",
new=AsyncMock(),
) as mock_reload:
await update_filter(
str(tmp_path),
"/fake.sock",
"sshd",
FilterUpdateRequest(journalmatch="_SYSTEMD_UNIT=sshd.service"),
do_reload=True,
)
mock_reload.assert_awaited_once()
# ---------------------------------------------------------------------------
# create_filter (Task 2.2)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestCreateFilter:
async def test_creates_local_file(self, tmp_path: Path) -> None:
from app.models.config import FilterCreateRequest
from app.services.config_file_service import create_filter
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await create_filter(
str(tmp_path),
"/fake.sock",
FilterCreateRequest(
name="my-custom",
failregex=[r"^fail from <HOST>"],
),
)
local = tmp_path / "filter.d" / "my-custom.local"
assert local.is_file()
assert result.name == "my-custom"
assert result.source_file.endswith("my-custom.local")
async def test_raises_already_exists_when_conf_exists(self, tmp_path: Path) -> None:
from app.models.config import FilterCreateRequest
from app.services.config_file_service import FilterAlreadyExistsError, create_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
), pytest.raises(FilterAlreadyExistsError):
await create_filter(
str(tmp_path),
"/fake.sock",
FilterCreateRequest(name="sshd"),
)
async def test_raises_already_exists_when_local_exists(self, tmp_path: Path) -> None:
from app.models.config import FilterCreateRequest
from app.services.config_file_service import FilterAlreadyExistsError, create_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "custom.local", "[Definition]\n")
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
), pytest.raises(FilterAlreadyExistsError):
await create_filter(
str(tmp_path),
"/fake.sock",
FilterCreateRequest(name="custom"),
)
async def test_raises_invalid_regex(self, tmp_path: Path) -> None:
from app.models.config import FilterCreateRequest
from app.services.config_file_service import FilterInvalidRegexError, create_filter
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
), pytest.raises(FilterInvalidRegexError):
await create_filter(
str(tmp_path),
"/fake.sock",
FilterCreateRequest(name="bad", failregex=[r"[unclosed"]),
)
async def test_raises_filter_name_error_for_invalid_name(self, tmp_path: Path) -> None:
from app.models.config import FilterCreateRequest
from app.services.config_file_service import FilterNameError, create_filter
with pytest.raises(FilterNameError):
await create_filter(
str(tmp_path),
"/fake.sock",
FilterCreateRequest(name="../etc/evil"),
)
async def test_triggers_reload_when_requested(self, tmp_path: Path) -> None:
from app.models.config import FilterCreateRequest
from app.services.config_file_service import create_filter
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
), patch(
"app.services.config_file_service.jail_service.reload_all",
new=AsyncMock(),
) as mock_reload:
await create_filter(
str(tmp_path),
"/fake.sock",
FilterCreateRequest(name="newfilter"),
do_reload=True,
)
mock_reload.assert_awaited_once()
# ---------------------------------------------------------------------------
# delete_filter (Task 2.2)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestDeleteFilter:
async def test_deletes_local_file_when_conf_and_local_exist(
self, tmp_path: Path
) -> None:
from app.services.config_file_service import delete_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
_write(filter_d / "sshd.local", "[Definition]\n")
await delete_filter(str(tmp_path), "sshd")
assert not (filter_d / "sshd.local").exists()
assert (filter_d / "sshd.conf").exists()
async def test_deletes_local_only_filter(self, tmp_path: Path) -> None:
from app.services.config_file_service import delete_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "custom.local", "[Definition]\n")
await delete_filter(str(tmp_path), "custom")
assert not (filter_d / "custom.local").exists()
async def test_raises_readonly_for_conf_only(self, tmp_path: Path) -> None:
from app.services.config_file_service import FilterReadonlyError, delete_filter
filter_d = tmp_path / "filter.d"
_write(filter_d / "sshd.conf", _FILTER_CONF)
with pytest.raises(FilterReadonlyError):
await delete_filter(str(tmp_path), "sshd")
async def test_raises_not_found_for_missing_filter(self, tmp_path: Path) -> None:
from app.services.config_file_service import FilterNotFoundError, delete_filter
with pytest.raises(FilterNotFoundError):
await delete_filter(str(tmp_path), "nonexistent")
async def test_accepts_filter_name_error_for_invalid_name(
self, tmp_path: Path
) -> None:
from app.services.config_file_service import FilterNameError, delete_filter
with pytest.raises(FilterNameError):
await delete_filter(str(tmp_path), "../evil")
# ---------------------------------------------------------------------------
# assign_filter_to_jail (Task 2.2)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestAssignFilterToJail:
async def test_writes_filter_key_to_jail_local(self, tmp_path: Path) -> None:
from app.models.config import AssignFilterRequest
from app.services.config_file_service import assign_filter_to_jail
# Setup: jail.conf with sshd jail, filter.conf for "myfilter"
_write(tmp_path / "jail.conf", JAIL_CONF)
_write(tmp_path / "filter.d" / "myfilter.conf", _FILTER_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
await assign_filter_to_jail(
str(tmp_path),
"/fake.sock",
"sshd",
AssignFilterRequest(filter_name="myfilter"),
)
local = tmp_path / "jail.d" / "sshd.local"
assert local.is_file()
content = local.read_text()
assert "myfilter" in content
async def test_raises_jail_not_found(self, tmp_path: Path) -> None:
from app.models.config import AssignFilterRequest
from app.services.config_file_service import (
JailNotFoundInConfigError,
assign_filter_to_jail,
)
_write(tmp_path / "filter.d" / "sshd.conf", _FILTER_CONF)
with pytest.raises(JailNotFoundInConfigError):
await assign_filter_to_jail(
str(tmp_path),
"/fake.sock",
"nonexistent-jail",
AssignFilterRequest(filter_name="sshd"),
)
async def test_raises_filter_not_found(self, tmp_path: Path) -> None:
from app.models.config import AssignFilterRequest
from app.services.config_file_service import FilterNotFoundError, assign_filter_to_jail
_write(tmp_path / "jail.conf", JAIL_CONF)
with pytest.raises(FilterNotFoundError):
await assign_filter_to_jail(
str(tmp_path),
"/fake.sock",
"sshd",
AssignFilterRequest(filter_name="nonexistent-filter"),
)
async def test_raises_jail_name_error_for_invalid_name(self, tmp_path: Path) -> None:
from app.models.config import AssignFilterRequest
from app.services.config_file_service import JailNameError, assign_filter_to_jail
with pytest.raises(JailNameError):
await assign_filter_to_jail(
str(tmp_path),
"/fake.sock",
"../etc/evil",
AssignFilterRequest(filter_name="sshd"),
)
async def test_raises_filter_name_error_for_invalid_filter(
self, tmp_path: Path
) -> None:
from app.models.config import AssignFilterRequest
from app.services.config_file_service import FilterNameError, assign_filter_to_jail
with pytest.raises(FilterNameError):
await assign_filter_to_jail(
str(tmp_path),
"/fake.sock",
"sshd",
AssignFilterRequest(filter_name="../etc/evil"),
)
async def test_triggers_reload_when_requested(self, tmp_path: Path) -> None:
from app.models.config import AssignFilterRequest
from app.services.config_file_service import assign_filter_to_jail
_write(tmp_path / "jail.conf", JAIL_CONF)
_write(tmp_path / "filter.d" / "myfilter.conf", _FILTER_CONF)
with patch(
"app.services.config_file_service.jail_service.reload_all",
new=AsyncMock(),
) as mock_reload:
await assign_filter_to_jail(
str(tmp_path),
"/fake.sock",
"sshd",
AssignFilterRequest(filter_name="myfilter"),
do_reload=True,
)
mock_reload.assert_awaited_once()
# ===========================================================================
# Action service tests (Task 3.1 + 3.2)
# ===========================================================================
_ACTION_CONF = """\
[Definition]
actionstart = /sbin/iptables -N f2b-<name>
actionstop = /sbin/iptables -D INPUT -j f2b-<name>
actionban = /sbin/iptables -I f2b-<name> 1 -s <ip> -j DROP
actionunban = /sbin/iptables -D f2b-<name> -s <ip> -j DROP
[Init]
name = default
port = ssh
protocol = tcp
"""
_ACTION_CONF_MINIMAL = """\
[Definition]
actionban = echo ban <ip>
actionunban = echo unban <ip>
"""
# ---------------------------------------------------------------------------
# _safe_action_name
# ---------------------------------------------------------------------------
class TestSafeActionName:
def test_valid_simple(self) -> None:
from app.services.config_file_service import _safe_action_name
assert _safe_action_name("iptables") == "iptables"
def test_valid_with_hyphen(self) -> None:
from app.services.config_file_service import _safe_action_name
assert _safe_action_name("iptables-multiport") == "iptables-multiport"
def test_valid_with_dot(self) -> None:
from app.services.config_file_service import _safe_action_name
assert _safe_action_name("my.action") == "my.action"
def test_invalid_path_traversal(self) -> None:
from app.services.config_file_service import ActionNameError, _safe_action_name
with pytest.raises(ActionNameError):
_safe_action_name("../evil")
def test_invalid_empty(self) -> None:
from app.services.config_file_service import ActionNameError, _safe_action_name
with pytest.raises(ActionNameError):
_safe_action_name("")
def test_invalid_slash(self) -> None:
from app.services.config_file_service import ActionNameError, _safe_action_name
with pytest.raises(ActionNameError):
_safe_action_name("a/b")
# ---------------------------------------------------------------------------
# _build_action_to_jails_map
# ---------------------------------------------------------------------------
class TestBuildActionToJailsMap:
def test_active_jail_maps_to_action(self) -> None:
from app.services.config_file_service import _build_action_to_jails_map
result = _build_action_to_jails_map(
{"sshd": {"action": "iptables-multiport"}}, {"sshd"}
)
assert result == {"iptables-multiport": ["sshd"]}
def test_inactive_jail_not_included(self) -> None:
from app.services.config_file_service import _build_action_to_jails_map
result = _build_action_to_jails_map(
{"sshd": {"action": "iptables-multiport"}}, set()
)
assert result == {}
def test_multiple_actions_per_jail(self) -> None:
from app.services.config_file_service import _build_action_to_jails_map
result = _build_action_to_jails_map(
{"sshd": {"action": "iptables-multiport\niptables-ipset"}}, {"sshd"}
)
assert "iptables-multiport" in result
assert "iptables-ipset" in result
def test_parameter_block_stripped(self) -> None:
from app.services.config_file_service import _build_action_to_jails_map
result = _build_action_to_jails_map(
{"sshd": {"action": "iptables[port=ssh, protocol=tcp]"}}, {"sshd"}
)
assert "iptables" in result
def test_multiple_jails_sharing_action(self) -> None:
from app.services.config_file_service import _build_action_to_jails_map
all_jails = {
"sshd": {"action": "iptables-multiport"},
"apache": {"action": "iptables-multiport"},
}
result = _build_action_to_jails_map(all_jails, {"sshd", "apache"})
assert sorted(result["iptables-multiport"]) == ["apache", "sshd"]
def test_jail_with_no_action_key(self) -> None:
from app.services.config_file_service import _build_action_to_jails_map
result = _build_action_to_jails_map({"sshd": {}}, {"sshd"})
assert result == {}
# ---------------------------------------------------------------------------
# _parse_actions_sync
# ---------------------------------------------------------------------------
class TestParseActionsSync:
def test_returns_empty_for_missing_dir(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_actions_sync
result = _parse_actions_sync(tmp_path / "nonexistent")
assert result == []
def test_single_action_returned(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_actions_sync
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
result = _parse_actions_sync(action_d)
assert len(result) == 1
name, filename, content, has_local, source_path = result[0]
assert name == "iptables"
assert filename == "iptables.conf"
assert "actionban" in content
assert has_local is False
assert source_path.endswith("iptables.conf")
def test_local_override_detected(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_actions_sync
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
_write(action_d / "iptables.local", "[Definition]\n# override\n")
result = _parse_actions_sync(action_d)
_, _, _, has_local, _ = result[0]
assert has_local is True
def test_local_content_merged_into_content(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_actions_sync
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
_write(action_d / "iptables.local", "[Definition]\n# local override tweak\n")
result = _parse_actions_sync(action_d)
_, _, content, _, _ = result[0]
assert "local override tweak" in content
def test_local_only_action_included(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_actions_sync
action_d = tmp_path / "action.d"
_write(action_d / "custom.local", _ACTION_CONF_MINIMAL)
result = _parse_actions_sync(action_d)
assert len(result) == 1
name, filename, _, has_local, source_path = result[0]
assert name == "custom"
assert filename == "custom.local"
assert has_local is False # local-only: no .conf to pair with
assert source_path.endswith("custom.local")
def test_sorted_alphabetically(self, tmp_path: Path) -> None:
from app.services.config_file_service import _parse_actions_sync
action_d = tmp_path / "action.d"
for n in ("zzz", "aaa", "mmm"):
_write(action_d / f"{n}.conf", _ACTION_CONF_MINIMAL)
result = _parse_actions_sync(action_d)
assert [r[0] for r in result] == ["aaa", "mmm", "zzz"]
# ---------------------------------------------------------------------------
# list_actions
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestListActions:
async def test_returns_all_actions(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_actions
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
_write(action_d / "cloudflare.conf", _ACTION_CONF_MINIMAL)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_actions(str(tmp_path), "/fake.sock")
assert result.total == 2
names = {a.name for a in result.actions}
assert "iptables" in names
assert "cloudflare" in names
async def test_active_flag_set_for_used_action(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_actions
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
_write(tmp_path / "jail.conf", JAIL_CONF)
all_jails_with_action = {
"sshd": {
"enabled": "true",
"filter": "sshd",
"action": "iptables",
},
"apache-auth": {"enabled": "false"},
}
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
),
patch(
"app.services.config_file_service._parse_jails_sync",
return_value=(all_jails_with_action, {}),
),
):
result = await list_actions(str(tmp_path), "/fake.sock")
iptables = next(a for a in result.actions if a.name == "iptables")
assert iptables.active is True
assert "sshd" in iptables.used_by_jails
async def test_inactive_action_has_active_false(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_actions
action_d = tmp_path / "action.d"
_write(action_d / "cloudflare.conf", _ACTION_CONF_MINIMAL)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_actions(str(tmp_path), "/fake.sock")
cf = next(a for a in result.actions if a.name == "cloudflare")
assert cf.active is False
assert cf.used_by_jails == []
async def test_has_local_override_detected(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_actions
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
_write(action_d / "iptables.local", "[Definition]\n")
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_actions(str(tmp_path), "/fake.sock")
ipt = next(a for a in result.actions if a.name == "iptables")
assert ipt.has_local_override is True
async def test_empty_action_d_returns_empty(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_actions
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_actions(str(tmp_path), "/fake.sock")
assert result.actions == []
assert result.total == 0
async def test_total_matches_actions_count(self, tmp_path: Path) -> None:
from app.services.config_file_service import list_actions
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await list_actions(str(tmp_path), "/fake.sock")
assert result.total == len(result.actions)
# ---------------------------------------------------------------------------
# get_action
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestGetAction:
async def test_returns_action_config(self, tmp_path: Path) -> None:
from app.services.config_file_service import get_action
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await get_action(str(tmp_path), "/fake.sock", "iptables")
assert result.name == "iptables"
assert result.actionban is not None
assert "iptables" in (result.actionban or "")
async def test_strips_conf_extension(self, tmp_path: Path) -> None:
from app.services.config_file_service import get_action
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await get_action(str(tmp_path), "/fake.sock", "iptables.conf")
assert result.name == "iptables"
async def test_raises_for_unknown_action(self, tmp_path: Path) -> None:
from app.services.config_file_service import ActionNotFoundError, get_action
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
), pytest.raises(ActionNotFoundError):
await get_action(str(tmp_path), "/fake.sock", "nonexistent")
async def test_local_only_action_returned(self, tmp_path: Path) -> None:
from app.services.config_file_service import get_action
action_d = tmp_path / "action.d"
_write(action_d / "custom.local", _ACTION_CONF_MINIMAL)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await get_action(str(tmp_path), "/fake.sock", "custom")
assert result.name == "custom"
async def test_active_status_populated(self, tmp_path: Path) -> None:
from app.services.config_file_service import get_action
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
all_jails_with_action = {
"sshd": {"enabled": "true", "filter": "sshd", "action": "iptables"},
}
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
),
patch(
"app.services.config_file_service._parse_jails_sync",
return_value=(all_jails_with_action, {}),
),
):
result = await get_action(str(tmp_path), "/fake.sock", "iptables")
assert result.active is True
assert "sshd" in result.used_by_jails
# ---------------------------------------------------------------------------
# _write_action_local_sync
# ---------------------------------------------------------------------------
class TestWriteActionLocalSync:
def test_writes_file(self, tmp_path: Path) -> None:
from app.services.config_file_service import _write_action_local_sync
action_d = tmp_path / "action.d"
action_d.mkdir()
_write_action_local_sync(action_d, "myaction", "[Definition]\n")
local = action_d / "myaction.local"
assert local.is_file()
assert "[Definition]" in local.read_text()
def test_creates_action_d_if_missing(self, tmp_path: Path) -> None:
from app.services.config_file_service import _write_action_local_sync
action_d = tmp_path / "action.d"
_write_action_local_sync(action_d, "test", "[Definition]\n")
assert (action_d / "test.local").is_file()
def test_overwrites_existing_file(self, tmp_path: Path) -> None:
from app.services.config_file_service import _write_action_local_sync
action_d = tmp_path / "action.d"
action_d.mkdir()
(action_d / "myaction.local").write_text("old content")
_write_action_local_sync(action_d, "myaction", "[Definition]\nnew=1\n")
assert "new=1" in (action_d / "myaction.local").read_text()
assert "old content" not in (action_d / "myaction.local").read_text()
# ---------------------------------------------------------------------------
# update_action (Task 3.2)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestUpdateAction:
async def test_updates_actionban(self, tmp_path: Path) -> None:
from app.models.config import ActionUpdateRequest
from app.services.config_file_service import update_action
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await update_action(
str(tmp_path),
"/fake.sock",
"iptables",
ActionUpdateRequest(actionban="echo ban <ip>"),
)
local = action_d / "iptables.local"
assert local.is_file()
assert "echo ban" in local.read_text()
assert result.name == "iptables"
async def test_raises_not_found_for_unknown_action(self, tmp_path: Path) -> None:
from app.models.config import ActionUpdateRequest
from app.services.config_file_service import ActionNotFoundError, update_action
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
), pytest.raises(ActionNotFoundError):
await update_action(
str(tmp_path),
"/fake.sock",
"nonexistent",
ActionUpdateRequest(),
)
async def test_raises_name_error_for_invalid_name(self, tmp_path: Path) -> None:
from app.models.config import ActionUpdateRequest
from app.services.config_file_service import ActionNameError, update_action
with pytest.raises(ActionNameError):
await update_action(
str(tmp_path),
"/fake.sock",
"../evil",
ActionUpdateRequest(),
)
async def test_triggers_reload_when_requested(self, tmp_path: Path) -> None:
from app.models.config import ActionUpdateRequest
from app.services.config_file_service import update_action
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
),
patch(
"app.services.config_file_service.jail_service.reload_all",
new=AsyncMock(),
) as mock_reload,
):
await update_action(
str(tmp_path),
"/fake.sock",
"iptables",
ActionUpdateRequest(actionban="echo ban <ip>"),
do_reload=True,
)
mock_reload.assert_awaited_once()
# ---------------------------------------------------------------------------
# create_action (Task 3.2)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestCreateAction:
async def test_creates_local_file(self, tmp_path: Path) -> None:
from app.models.config import ActionCreateRequest
from app.services.config_file_service import create_action
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
result = await create_action(
str(tmp_path),
"/fake.sock",
ActionCreateRequest(
name="my-action",
actionban="echo ban <ip>",
actionunban="echo unban <ip>",
),
)
local = tmp_path / "action.d" / "my-action.local"
assert local.is_file()
assert result.name == "my-action"
async def test_raises_already_exists_for_conf(self, tmp_path: Path) -> None:
from app.models.config import ActionCreateRequest
from app.services.config_file_service import (
ActionAlreadyExistsError,
create_action,
)
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
with pytest.raises(ActionAlreadyExistsError):
await create_action(
str(tmp_path),
"/fake.sock",
ActionCreateRequest(name="iptables"),
)
async def test_raises_already_exists_for_local(self, tmp_path: Path) -> None:
from app.models.config import ActionCreateRequest
from app.services.config_file_service import (
ActionAlreadyExistsError,
create_action,
)
action_d = tmp_path / "action.d"
_write(action_d / "custom.local", _ACTION_CONF_MINIMAL)
with pytest.raises(ActionAlreadyExistsError):
await create_action(
str(tmp_path),
"/fake.sock",
ActionCreateRequest(name="custom"),
)
async def test_raises_name_error_for_invalid_name(self, tmp_path: Path) -> None:
from app.models.config import ActionCreateRequest
from app.services.config_file_service import ActionNameError, create_action
with pytest.raises(ActionNameError):
await create_action(
str(tmp_path),
"/fake.sock",
ActionCreateRequest(name="../evil"),
)
async def test_triggers_reload_when_requested(self, tmp_path: Path) -> None:
from app.models.config import ActionCreateRequest
from app.services.config_file_service import create_action
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
),
patch(
"app.services.config_file_service.jail_service.reload_all",
new=AsyncMock(),
) as mock_reload,
):
await create_action(
str(tmp_path),
"/fake.sock",
ActionCreateRequest(name="new-action"),
do_reload=True,
)
mock_reload.assert_awaited_once()
# ---------------------------------------------------------------------------
# delete_action (Task 3.2)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestDeleteAction:
async def test_deletes_local_file(self, tmp_path: Path) -> None:
from app.services.config_file_service import delete_action
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
_write(action_d / "iptables.local", "[Definition]\n")
await delete_action(str(tmp_path), "iptables")
assert not (action_d / "iptables.local").is_file()
assert (action_d / "iptables.conf").is_file() # original untouched
async def test_raises_readonly_for_conf_only(self, tmp_path: Path) -> None:
from app.services.config_file_service import ActionReadonlyError, delete_action
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
with pytest.raises(ActionReadonlyError):
await delete_action(str(tmp_path), "iptables")
async def test_raises_not_found_for_missing(self, tmp_path: Path) -> None:
from app.services.config_file_service import ActionNotFoundError, delete_action
with pytest.raises(ActionNotFoundError):
await delete_action(str(tmp_path), "nonexistent")
async def test_deletes_local_only_action(self, tmp_path: Path) -> None:
from app.services.config_file_service import delete_action
action_d = tmp_path / "action.d"
_write(action_d / "custom.local", _ACTION_CONF_MINIMAL)
await delete_action(str(tmp_path), "custom")
assert not (action_d / "custom.local").is_file()
async def test_raises_name_error_for_invalid_name(self, tmp_path: Path) -> None:
from app.services.config_file_service import ActionNameError, delete_action
with pytest.raises(ActionNameError):
await delete_action(str(tmp_path), "../etc/evil")
# ---------------------------------------------------------------------------
# _append_jail_action_sync
# ---------------------------------------------------------------------------
class TestAppendJailActionSync:
def test_creates_local_with_action(self, tmp_path: Path) -> None:
from app.services.config_file_service import _append_jail_action_sync
_append_jail_action_sync(tmp_path, "sshd", "iptables-multiport")
local = tmp_path / "jail.d" / "sshd.local"
assert local.is_file()
assert "iptables-multiport" in local.read_text()
def test_appends_to_existing_action_list(self, tmp_path: Path) -> None:
from app.services.config_file_service import _append_jail_action_sync
jail_d = tmp_path / "jail.d"
_write(jail_d / "sshd.local", "[sshd]\naction = iptables-multiport\n")
_append_jail_action_sync(tmp_path, "sshd", "cloudflare")
content = (jail_d / "sshd.local").read_text()
assert "iptables-multiport" in content
assert "cloudflare" in content
def test_does_not_duplicate_action(self, tmp_path: Path) -> None:
from app.services.config_file_service import _append_jail_action_sync
jail_d = tmp_path / "jail.d"
_write(jail_d / "sshd.local", "[sshd]\naction = iptables-multiport\n")
_append_jail_action_sync(tmp_path, "sshd", "iptables-multiport")
_append_jail_action_sync(tmp_path, "sshd", "iptables-multiport")
content = (jail_d / "sshd.local").read_text()
# Should appear only once in the action list
assert content.count("iptables-multiport") == 1
def test_does_not_duplicate_when_params_differ(self, tmp_path: Path) -> None:
from app.services.config_file_service import _append_jail_action_sync
jail_d = tmp_path / "jail.d"
_write(
jail_d / "sshd.local",
"[sshd]\naction = iptables[port=ssh]\n",
)
# Same base name, different params — should not duplicate.
_append_jail_action_sync(tmp_path, "sshd", "iptables[port=22]")
content = (jail_d / "sshd.local").read_text()
assert content.count("iptables") == 1
# ---------------------------------------------------------------------------
# _remove_jail_action_sync
# ---------------------------------------------------------------------------
class TestRemoveJailActionSync:
def test_removes_action_from_list(self, tmp_path: Path) -> None:
from app.services.config_file_service import _remove_jail_action_sync
jail_d = tmp_path / "jail.d"
_write(
jail_d / "sshd.local",
"[sshd]\naction = iptables-multiport\n",
)
_remove_jail_action_sync(tmp_path, "sshd", "iptables-multiport")
content = (jail_d / "sshd.local").read_text()
assert "iptables-multiport" not in content
def test_removes_only_targeted_action(self, tmp_path: Path) -> None:
from app.services.config_file_service import (
_append_jail_action_sync,
_remove_jail_action_sync,
)
jail_d = tmp_path / "jail.d"
jail_d.mkdir(parents=True, exist_ok=True)
_append_jail_action_sync(tmp_path, "sshd", "iptables-multiport")
_append_jail_action_sync(tmp_path, "sshd", "cloudflare")
_remove_jail_action_sync(tmp_path, "sshd", "iptables-multiport")
content = (jail_d / "sshd.local").read_text()
assert "iptables-multiport" not in content
assert "cloudflare" in content
def test_is_noop_when_no_local_file(self, tmp_path: Path) -> None:
from app.services.config_file_service import _remove_jail_action_sync
# Should not raise; no .local file to modify.
_remove_jail_action_sync(tmp_path, "sshd", "iptables-multiport")
def test_is_noop_when_action_not_in_list(self, tmp_path: Path) -> None:
from app.services.config_file_service import _remove_jail_action_sync
jail_d = tmp_path / "jail.d"
_write(jail_d / "sshd.local", "[sshd]\naction = cloudflare\n")
_remove_jail_action_sync(tmp_path, "sshd", "iptables-multiport")
content = (jail_d / "sshd.local").read_text()
assert "cloudflare" in content # untouched
# ---------------------------------------------------------------------------
# assign_action_to_jail (Task 3.2)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestAssignActionToJail:
async def test_creates_local_with_action(self, tmp_path: Path) -> None:
from app.models.config import AssignActionRequest
from app.services.config_file_service import assign_action_to_jail
_write(tmp_path / "jail.conf", JAIL_CONF)
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
await assign_action_to_jail(
str(tmp_path),
"/fake.sock",
"sshd",
AssignActionRequest(action_name="iptables"),
)
local = tmp_path / "jail.d" / "sshd.local"
assert local.is_file()
assert "iptables" in local.read_text()
async def test_params_written_to_action_entry(self, tmp_path: Path) -> None:
from app.models.config import AssignActionRequest
from app.services.config_file_service import assign_action_to_jail
_write(tmp_path / "jail.conf", JAIL_CONF)
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
await assign_action_to_jail(
str(tmp_path),
"/fake.sock",
"sshd",
AssignActionRequest(action_name="iptables", params={"port": "ssh"}),
)
content = (tmp_path / "jail.d" / "sshd.local").read_text()
assert "port=ssh" in content
async def test_raises_jail_not_found(self, tmp_path: Path) -> None:
from app.models.config import AssignActionRequest
from app.services.config_file_service import (
JailNotFoundInConfigError,
assign_action_to_jail,
)
with pytest.raises(JailNotFoundInConfigError):
await assign_action_to_jail(
str(tmp_path),
"/fake.sock",
"nonexistent",
AssignActionRequest(action_name="iptables"),
)
async def test_raises_action_not_found(self, tmp_path: Path) -> None:
from app.models.config import AssignActionRequest
from app.services.config_file_service import (
ActionNotFoundError,
assign_action_to_jail,
)
_write(tmp_path / "jail.conf", JAIL_CONF)
with pytest.raises(ActionNotFoundError):
await assign_action_to_jail(
str(tmp_path),
"/fake.sock",
"sshd",
AssignActionRequest(action_name="nonexistent-action"),
)
async def test_raises_jail_name_error(self, tmp_path: Path) -> None:
from app.models.config import AssignActionRequest
from app.services.config_file_service import JailNameError, assign_action_to_jail
with pytest.raises(JailNameError):
await assign_action_to_jail(
str(tmp_path),
"/fake.sock",
"../etc/evil",
AssignActionRequest(action_name="iptables"),
)
async def test_raises_action_name_error(self, tmp_path: Path) -> None:
from app.models.config import AssignActionRequest
from app.services.config_file_service import (
ActionNameError,
assign_action_to_jail,
)
with pytest.raises(ActionNameError):
await assign_action_to_jail(
str(tmp_path),
"/fake.sock",
"sshd",
AssignActionRequest(action_name="../evil"),
)
async def test_triggers_reload_when_requested(self, tmp_path: Path) -> None:
from app.models.config import AssignActionRequest
from app.services.config_file_service import assign_action_to_jail
_write(tmp_path / "jail.conf", JAIL_CONF)
action_d = tmp_path / "action.d"
_write(action_d / "iptables.conf", _ACTION_CONF)
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
),
patch(
"app.services.config_file_service.jail_service.reload_all",
new=AsyncMock(),
) as mock_reload,
):
await assign_action_to_jail(
str(tmp_path),
"/fake.sock",
"sshd",
AssignActionRequest(action_name="iptables"),
do_reload=True,
)
mock_reload.assert_awaited_once()
# ---------------------------------------------------------------------------
# remove_action_from_jail (Task 3.2)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestRemoveActionFromJail:
async def test_removes_action_from_local(self, tmp_path: Path) -> None:
from app.services.config_file_service import remove_action_from_jail
_write(tmp_path / "jail.conf", JAIL_CONF)
jail_d = tmp_path / "jail.d"
_write(jail_d / "sshd.local", "[sshd]\naction = iptables-multiport\n")
with patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
):
await remove_action_from_jail(
str(tmp_path), "/fake.sock", "sshd", "iptables-multiport"
)
content = (jail_d / "sshd.local").read_text()
assert "iptables-multiport" not in content
async def test_raises_jail_not_found(self, tmp_path: Path) -> None:
from app.services.config_file_service import (
JailNotFoundInConfigError,
remove_action_from_jail,
)
with pytest.raises(JailNotFoundInConfigError):
await remove_action_from_jail(
str(tmp_path), "/fake.sock", "nonexistent", "iptables"
)
async def test_raises_jail_name_error(self, tmp_path: Path) -> None:
from app.services.config_file_service import JailNameError, remove_action_from_jail
with pytest.raises(JailNameError):
await remove_action_from_jail(
str(tmp_path), "/fake.sock", "../evil", "iptables"
)
async def test_raises_action_name_error(self, tmp_path: Path) -> None:
from app.services.config_file_service import ActionNameError, remove_action_from_jail
_write(tmp_path / "jail.conf", JAIL_CONF)
with pytest.raises(ActionNameError):
await remove_action_from_jail(
str(tmp_path), "/fake.sock", "sshd", "../evil"
)
async def test_triggers_reload_when_requested(self, tmp_path: Path) -> None:
from app.services.config_file_service import remove_action_from_jail
_write(tmp_path / "jail.conf", JAIL_CONF)
jail_d = tmp_path / "jail.d"
_write(jail_d / "sshd.local", "[sshd]\naction = iptables\n")
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
),
patch(
"app.services.config_file_service.jail_service.reload_all",
new=AsyncMock(),
) as mock_reload,
):
await remove_action_from_jail(
str(tmp_path), "/fake.sock", "sshd", "iptables", do_reload=True
)
mock_reload.assert_awaited_once()
# ---------------------------------------------------------------------------
# activate_jail — reload_all keyword argument assertions (Stage 5.1)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestActivateJailReloadArgs:
"""Verify activate_jail calls reload_all with include_jails=[name]."""
async def test_activate_passes_include_jails(self, tmp_path: Path) -> None:
"""activate_jail must pass include_jails=[name] to reload_all."""
_write(tmp_path / "jail.conf", JAIL_CONF)
from app.models.config import ActivateJailRequest, JailValidationResult
req = ActivateJailRequest()
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(side_effect=[set(), {"apache-auth"}]),
),
patch("app.services.config_file_service.jail_service") as mock_js,
patch(
"app.services.config_file_service._probe_fail2ban_running",
new=AsyncMock(return_value=True),
),
patch(
"app.services.config_file_service._validate_jail_config_sync",
return_value=JailValidationResult(jail_name="apache-auth", valid=True),
),
):
mock_js.reload_all = AsyncMock()
await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
mock_js.reload_all.assert_awaited_once_with(
"/fake.sock", include_jails=["apache-auth"]
)
async def test_activate_returns_active_true_when_jail_starts(
self, tmp_path: Path
) -> None:
"""activate_jail returns active=True when the jail appears in post-reload names."""
_write(tmp_path / "jail.conf", JAIL_CONF)
from app.models.config import ActivateJailRequest, JailValidationResult
req = ActivateJailRequest()
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(side_effect=[set(), {"apache-auth"}]),
),
patch("app.services.config_file_service.jail_service") as mock_js,
patch(
"app.services.config_file_service._probe_fail2ban_running",
new=AsyncMock(return_value=True),
),
patch(
"app.services.config_file_service._validate_jail_config_sync",
return_value=JailValidationResult(jail_name="apache-auth", valid=True),
),
):
mock_js.reload_all = AsyncMock()
result = await activate_jail(
str(tmp_path), "/fake.sock", "apache-auth", req
)
assert result.active is True
assert "activated" in result.message.lower()
async def test_activate_returns_active_false_when_jail_does_not_start(
self, tmp_path: Path
) -> None:
"""activate_jail returns active=False when the jail is absent after reload.
This covers the Stage 3.1 requirement: if the jail config is invalid
(bad regex, missing log file, etc.) fail2ban may silently refuse to
start the jail even though the reload command succeeded.
"""
_write(tmp_path / "jail.conf", JAIL_CONF)
from app.models.config import ActivateJailRequest, JailValidationResult
req = ActivateJailRequest()
# Pre-reload: jail not running. Post-reload: still not running (boot failed).
# fail2ban is up (probe succeeds) but the jail didn't appear.
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(side_effect=[set(), set()]),
),
patch("app.services.config_file_service.jail_service") as mock_js,
patch(
"app.services.config_file_service._probe_fail2ban_running",
new=AsyncMock(return_value=True),
),
patch(
"app.services.config_file_service._validate_jail_config_sync",
return_value=JailValidationResult(jail_name="apache-auth", valid=True),
),
):
mock_js.reload_all = AsyncMock()
result = await activate_jail(
str(tmp_path), "/fake.sock", "apache-auth", req
)
assert result.active is False
assert "apache-auth" in result.name
# ---------------------------------------------------------------------------
# deactivate_jail — reload_all keyword argument assertions (Stage 5.2)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestDeactivateJailReloadArgs:
"""Verify deactivate_jail calls reload_all with exclude_jails=[name]."""
async def test_deactivate_passes_exclude_jails(self, tmp_path: Path) -> None:
"""deactivate_jail must pass exclude_jails=[name] to reload_all."""
_write(tmp_path / "jail.conf", JAIL_CONF)
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value={"sshd"}),
),
patch("app.services.config_file_service.jail_service") as mock_js,
):
mock_js.reload_all = AsyncMock()
await deactivate_jail(str(tmp_path), "/fake.sock", "sshd")
mock_js.reload_all.assert_awaited_once_with(
"/fake.sock", exclude_jails=["sshd"]
)
# ---------------------------------------------------------------------------
# _validate_jail_config_sync (Task 3)
# ---------------------------------------------------------------------------
from app.services.config_file_service import ( # noqa: E402 (added after block)
_validate_jail_config_sync,
_extract_filter_base_name,
_extract_action_base_name,
validate_jail_config,
rollback_jail,
)
class TestExtractFilterBaseName:
def test_plain_name(self) -> None:
assert _extract_filter_base_name("sshd") == "sshd"
def test_strips_mode_suffix(self) -> None:
assert _extract_filter_base_name("sshd[mode=aggressive]") == "sshd"
def test_strips_whitespace(self) -> None:
assert _extract_filter_base_name(" nginx ") == "nginx"
class TestExtractActionBaseName:
def test_plain_name(self) -> None:
assert _extract_action_base_name("iptables-multiport") == "iptables-multiport"
def test_strips_option_suffix(self) -> None:
assert _extract_action_base_name("iptables[name=SSH]") == "iptables"
def test_returns_none_for_variable_interpolation(self) -> None:
assert _extract_action_base_name("%(action_)s") is None
def test_returns_none_for_dollar_variable(self) -> None:
assert _extract_action_base_name("${action}") is None
class TestValidateJailConfigSync:
"""Tests for _validate_jail_config_sync — the sync validation core."""
def _setup_config(self, config_dir: Path, jail_cfg: str) -> None:
"""Write a minimal fail2ban directory layout with *jail_cfg* content."""
_write(config_dir / "jail.d" / "test.conf", jail_cfg)
def test_valid_config_no_issues(self, tmp_path: Path) -> None:
"""A jail whose filter exists and has a valid regex should pass."""
# Create a real filter file so the existence check passes.
filter_d = tmp_path / "filter.d"
filter_d.mkdir(parents=True, exist_ok=True)
(filter_d / "sshd.conf").write_text("[Definition]\nfailregex = Host .* <HOST>\n")
self._setup_config(
tmp_path,
"[sshd]\nenabled = false\nfilter = sshd\nlogpath = /no/such/log\n",
)
result = _validate_jail_config_sync(tmp_path, "sshd")
# logpath advisory warning is OK; no blocking errors expected.
blocking = [i for i in result.issues if i.field != "logpath"]
assert blocking == [], blocking
def test_missing_filter_reported(self, tmp_path: Path) -> None:
"""A jail whose filter file does not exist must report a filter issue."""
self._setup_config(
tmp_path,
"[bad-jail]\nenabled = false\nfilter = nonexistent-filter\n",
)
result = _validate_jail_config_sync(tmp_path, "bad-jail")
assert not result.valid
fields = [i.field for i in result.issues]
assert "filter" in fields
def test_bad_failregex_reported(self, tmp_path: Path) -> None:
"""A jail with an un-compilable failregex must report a failregex issue."""
self._setup_config(
tmp_path,
"[broken]\nenabled = false\nfailregex = [invalid regex(\n",
)
result = _validate_jail_config_sync(tmp_path, "broken")
assert not result.valid
fields = [i.field for i in result.issues]
assert "failregex" in fields
def test_missing_log_path_is_advisory(self, tmp_path: Path) -> None:
"""A missing log path should be reported in the logpath field."""
self._setup_config(
tmp_path,
"[myjail]\nenabled = false\nlogpath = /no/such/path.log\n",
)
result = _validate_jail_config_sync(tmp_path, "myjail")
fields = [i.field for i in result.issues]
assert "logpath" in fields
def test_missing_action_reported(self, tmp_path: Path) -> None:
"""A jail referencing a non-existent action file must report an action issue."""
self._setup_config(
tmp_path,
"[myjail]\nenabled = false\naction = nonexistent-action\n",
)
result = _validate_jail_config_sync(tmp_path, "myjail")
fields = [i.field for i in result.issues]
assert "action" in fields
def test_unknown_jail_name(self, tmp_path: Path) -> None:
"""Requesting validation for a jail not in any config returns an invalid result."""
(tmp_path / "jail.d").mkdir(parents=True, exist_ok=True)
result = _validate_jail_config_sync(tmp_path, "ghost")
assert not result.valid
assert any(i.field == "name" for i in result.issues)
def test_variable_action_not_flagged(self, tmp_path: Path) -> None:
"""An action like ``%(action_)s`` should not be checked for file existence."""
self._setup_config(
tmp_path,
"[myjail]\nenabled = false\naction = %(action_)s\n",
)
result = _validate_jail_config_sync(tmp_path, "myjail")
# Ensure no action file-missing error (the variable expression is skipped).
action_errors = [i for i in result.issues if i.field == "action"]
assert action_errors == []
@pytest.mark.asyncio
class TestValidateJailConfigAsync:
"""Tests for the public async wrapper validate_jail_config."""
async def test_returns_jail_validation_result(self, tmp_path: Path) -> None:
(tmp_path / "jail.d").mkdir(parents=True, exist_ok=True)
_write(
tmp_path / "jail.d" / "test.conf",
"[testjail]\nenabled = false\n",
)
result = await validate_jail_config(str(tmp_path), "testjail")
assert result.jail_name == "testjail"
async def test_rejects_unsafe_name(self, tmp_path: Path) -> None:
with pytest.raises(JailNameError):
await validate_jail_config(str(tmp_path), "../evil")
@pytest.mark.asyncio
class TestRollbackJail:
"""Tests for rollback_jail (Task 3)."""
async def test_rollback_success(self, tmp_path: Path) -> None:
"""When fail2ban comes back online, rollback returns fail2ban_running=True."""
_write(tmp_path / "jail.d" / "sshd.conf", "[sshd]\nenabled = true\n")
with (
patch(
"app.services.config_file_service._start_daemon",
new=AsyncMock(return_value=True),
),
patch(
"app.services.config_file_service._wait_for_fail2ban",
new=AsyncMock(return_value=True),
),
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
),
):
result = await rollback_jail(
str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"]
)
assert result.disabled is True
assert result.fail2ban_running is True
assert result.jail_name == "sshd"
# .local file must have enabled=false
local = tmp_path / "jail.d" / "sshd.local"
assert local.is_file()
assert "enabled = false" in local.read_text()
async def test_rollback_fail2ban_still_down(self, tmp_path: Path) -> None:
"""When fail2ban does not come back, rollback returns fail2ban_running=False."""
_write(tmp_path / "jail.d" / "sshd.conf", "[sshd]\nenabled = true\n")
with (
patch(
"app.services.config_file_service._start_daemon",
new=AsyncMock(return_value=False),
),
patch(
"app.services.config_file_service._wait_for_fail2ban",
new=AsyncMock(return_value=False),
),
):
result = await rollback_jail(
str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"]
)
assert result.fail2ban_running is False
assert result.disabled is True
async def test_rollback_rejects_unsafe_name(self, tmp_path: Path) -> None:
with pytest.raises(JailNameError):
await rollback_jail(
str(tmp_path), "/fake.sock", "../evil", ["fail2ban-client", "start"]
)
# ---------------------------------------------------------------------------
# activate_jail — blocking on missing filter / logpath (Task 5)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestActivateJailBlocking:
"""activate_jail must refuse to proceed when validation finds critical issues."""
async def test_activate_jail_blocked_when_logpath_missing(self, tmp_path: Path) -> None:
"""activate_jail returns active=False if _validate_jail_config_sync reports a missing logpath."""
from app.models.config import ActivateJailRequest, JailValidationIssue, JailValidationResult
_write(tmp_path / "jail.conf", JAIL_CONF)
req = ActivateJailRequest()
missing_issue = JailValidationIssue(field="logpath", message="log file '/var/log/missing.log' not found")
validation = JailValidationResult(jail_name="apache-auth", valid=False, issues=[missing_issue])
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
),
patch(
"app.services.config_file_service._validate_jail_config_sync",
return_value=validation,
),
patch("app.services.config_file_service.jail_service") as mock_js,
):
mock_js.reload_all = AsyncMock()
result = await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
assert result.active is False
assert result.fail2ban_running is True
assert "cannot be activated" in result.message
mock_js.reload_all.assert_not_awaited()
async def test_activate_jail_blocked_when_filter_missing(self, tmp_path: Path) -> None:
"""activate_jail returns active=False if a filter file is missing."""
from app.models.config import ActivateJailRequest, JailValidationIssue, JailValidationResult
_write(tmp_path / "jail.conf", JAIL_CONF)
req = ActivateJailRequest()
filter_issue = JailValidationIssue(field="filter", message="filter file 'sshd.conf' not found")
validation = JailValidationResult(jail_name="sshd", valid=False, issues=[filter_issue])
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(return_value=set()),
),
patch(
"app.services.config_file_service._validate_jail_config_sync",
return_value=validation,
),
patch("app.services.config_file_service.jail_service") as mock_js,
):
mock_js.reload_all = AsyncMock()
result = await activate_jail(str(tmp_path), "/fake.sock", "sshd", req)
assert result.active is False
assert result.fail2ban_running is True
assert "cannot be activated" in result.message
mock_js.reload_all.assert_not_awaited()
async def test_activate_jail_proceeds_when_only_regex_warnings(self, tmp_path: Path) -> None:
"""activate_jail proceeds normally when only non-blocking (failregex) warnings exist."""
from app.models.config import ActivateJailRequest, JailValidationIssue, JailValidationResult
_write(tmp_path / "jail.conf", JAIL_CONF)
req = ActivateJailRequest()
advisory_issue = JailValidationIssue(field="failregex", message="no failregex defined")
# valid=True but with a non-blocking advisory issue
validation = JailValidationResult(jail_name="apache-auth", valid=True, issues=[advisory_issue])
with (
patch(
"app.services.config_file_service._get_active_jail_names",
new=AsyncMock(side_effect=[set(), {"apache-auth"}]),
),
patch(
"app.services.config_file_service._validate_jail_config_sync",
return_value=validation,
),
patch("app.services.config_file_service.jail_service") as mock_js,
patch(
"app.services.config_file_service._probe_fail2ban_running",
new=AsyncMock(return_value=True),
),
):
mock_js.reload_all = AsyncMock()
result = await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
assert result.active is True
mock_js.reload_all.assert_awaited_once()