refactor: improve backend type safety and import organization
- Add TYPE_CHECKING guards for runtime-expensive imports (aiohttp, aiosqlite) - Reorganize imports to follow PEP 8 conventions - Convert TypeAlias to modern PEP 695 type syntax (where appropriate) - Use Sequence/Mapping from collections.abc for type hints (covariant) - Replace string literals with cast() for improved type inference - Fix casting of Fail2BanResponse and TypedDict patterns - Add IpLookupResult TypedDict for precise return type annotation - Reformat overlong lines for readability (120 char limit) - Add asyncio_mode and filterwarnings to pytest config - Update test fixtures with improved type hints This improves mypy type checking and makes type relationships explicit.
This commit is contained in:
@@ -13,15 +13,19 @@ from app.services.config_file_service import (
|
||||
JailNameError,
|
||||
JailNotFoundInConfigError,
|
||||
_build_inactive_jail,
|
||||
_extract_action_base_name,
|
||||
_extract_filter_base_name,
|
||||
_ordered_config_files,
|
||||
_parse_jails_sync,
|
||||
_resolve_filter,
|
||||
_safe_jail_name,
|
||||
_validate_jail_config_sync,
|
||||
_write_local_override_sync,
|
||||
activate_jail,
|
||||
deactivate_jail,
|
||||
list_inactive_jails,
|
||||
rollback_jail,
|
||||
validate_jail_config,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -292,9 +296,7 @@ class TestBuildInactiveJail:
|
||||
|
||||
def test_has_local_override_absent(self, tmp_path: Path) -> None:
|
||||
"""has_local_override is False when no .local file exists."""
|
||||
jail = _build_inactive_jail(
|
||||
"sshd", {}, "/etc/fail2ban/jail.d/sshd.conf", config_dir=tmp_path
|
||||
)
|
||||
jail = _build_inactive_jail("sshd", {}, "/etc/fail2ban/jail.d/sshd.conf", config_dir=tmp_path)
|
||||
assert jail.has_local_override is False
|
||||
|
||||
def test_has_local_override_present(self, tmp_path: Path) -> None:
|
||||
@@ -302,9 +304,7 @@ class TestBuildInactiveJail:
|
||||
local = tmp_path / "jail.d" / "sshd.local"
|
||||
local.parent.mkdir(parents=True, exist_ok=True)
|
||||
local.write_text("[sshd]\nenabled = false\n")
|
||||
jail = _build_inactive_jail(
|
||||
"sshd", {}, "/etc/fail2ban/jail.d/sshd.conf", config_dir=tmp_path
|
||||
)
|
||||
jail = _build_inactive_jail("sshd", {}, "/etc/fail2ban/jail.d/sshd.conf", config_dir=tmp_path)
|
||||
assert jail.has_local_override is True
|
||||
|
||||
def test_has_local_override_no_config_dir(self) -> None:
|
||||
@@ -363,9 +363,7 @@ class TestWriteLocalOverrideSync:
|
||||
assert "2222" in content
|
||||
|
||||
def test_override_logpath_list(self, tmp_path: Path) -> None:
|
||||
_write_local_override_sync(
|
||||
tmp_path, "sshd", True, {"logpath": ["/var/log/auth.log", "/var/log/secure"]}
|
||||
)
|
||||
_write_local_override_sync(tmp_path, "sshd", True, {"logpath": ["/var/log/auth.log", "/var/log/secure"]})
|
||||
content = (tmp_path / "jail.d" / "sshd.local").read_text()
|
||||
assert "/var/log/auth.log" in content
|
||||
assert "/var/log/secure" in content
|
||||
@@ -447,9 +445,7 @@ class TestListInactiveJails:
|
||||
assert "sshd" in names
|
||||
assert "apache-auth" in names
|
||||
|
||||
async def test_has_local_override_true_when_local_file_exists(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_has_local_override_true_when_local_file_exists(self, tmp_path: Path) -> None:
|
||||
"""has_local_override is True for a jail whose jail.d .local file exists."""
|
||||
_write(tmp_path / "jail.conf", JAIL_CONF)
|
||||
local = tmp_path / "jail.d" / "apache-auth.local"
|
||||
@@ -463,9 +459,7 @@ class TestListInactiveJails:
|
||||
jail = next(j for j in result.jails if j.name == "apache-auth")
|
||||
assert jail.has_local_override is True
|
||||
|
||||
async def test_has_local_override_false_when_no_local_file(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_has_local_override_false_when_no_local_file(self, tmp_path: Path) -> None:
|
||||
"""has_local_override is False when no jail.d .local file exists."""
|
||||
_write(tmp_path / "jail.conf", JAIL_CONF)
|
||||
with patch(
|
||||
@@ -608,7 +602,8 @@ class TestActivateJail:
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
),pytest.raises(JailNotFoundInConfigError)
|
||||
),
|
||||
pytest.raises(JailNotFoundInConfigError),
|
||||
):
|
||||
await activate_jail(str(tmp_path), "/fake.sock", "nonexistent", req)
|
||||
|
||||
@@ -621,7 +616,8 @@ class TestActivateJail:
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value={"sshd"}),
|
||||
),pytest.raises(JailAlreadyActiveError)
|
||||
),
|
||||
pytest.raises(JailAlreadyActiveError),
|
||||
):
|
||||
await activate_jail(str(tmp_path), "/fake.sock", "sshd", req)
|
||||
|
||||
@@ -691,7 +687,8 @@ class TestDeactivateJail:
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value={"sshd"}),
|
||||
),pytest.raises(JailNotFoundInConfigError)
|
||||
),
|
||||
pytest.raises(JailNotFoundInConfigError),
|
||||
):
|
||||
await deactivate_jail(str(tmp_path), "/fake.sock", "nonexistent")
|
||||
|
||||
@@ -701,7 +698,8 @@ class TestDeactivateJail:
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
),pytest.raises(JailAlreadyInactiveError)
|
||||
),
|
||||
pytest.raises(JailAlreadyInactiveError),
|
||||
):
|
||||
await deactivate_jail(str(tmp_path), "/fake.sock", "apache-auth")
|
||||
|
||||
@@ -710,38 +708,6 @@ class TestDeactivateJail:
|
||||
await deactivate_jail(str(tmp_path), "/fake.sock", "a/b")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _extract_filter_base_name
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExtractFilterBaseName:
|
||||
def test_simple_name(self) -> None:
|
||||
from app.services.config_file_service import _extract_filter_base_name
|
||||
|
||||
assert _extract_filter_base_name("sshd") == "sshd"
|
||||
|
||||
def test_name_with_mode(self) -> None:
|
||||
from app.services.config_file_service import _extract_filter_base_name
|
||||
|
||||
assert _extract_filter_base_name("sshd[mode=aggressive]") == "sshd"
|
||||
|
||||
def test_name_with_variable_mode(self) -> None:
|
||||
from app.services.config_file_service import _extract_filter_base_name
|
||||
|
||||
assert _extract_filter_base_name("sshd[mode=%(mode)s]") == "sshd"
|
||||
|
||||
def test_whitespace_stripped(self) -> None:
|
||||
from app.services.config_file_service import _extract_filter_base_name
|
||||
|
||||
assert _extract_filter_base_name(" nginx ") == "nginx"
|
||||
|
||||
def test_empty_string(self) -> None:
|
||||
from app.services.config_file_service import _extract_filter_base_name
|
||||
|
||||
assert _extract_filter_base_name("") == ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _build_filter_to_jails_map
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -757,9 +723,7 @@ class TestBuildFilterToJailsMap:
|
||||
def test_inactive_jail_not_included(self) -> None:
|
||||
from app.services.config_file_service import _build_filter_to_jails_map
|
||||
|
||||
result = _build_filter_to_jails_map(
|
||||
{"apache-auth": {"filter": "apache-auth"}}, set()
|
||||
)
|
||||
result = _build_filter_to_jails_map({"apache-auth": {"filter": "apache-auth"}}, set())
|
||||
assert result == {}
|
||||
|
||||
def test_multiple_jails_sharing_filter(self) -> None:
|
||||
@@ -775,9 +739,7 @@ class TestBuildFilterToJailsMap:
|
||||
def test_mode_suffix_stripped(self) -> None:
|
||||
from app.services.config_file_service import _build_filter_to_jails_map
|
||||
|
||||
result = _build_filter_to_jails_map(
|
||||
{"sshd": {"filter": "sshd[mode=aggressive]"}}, {"sshd"}
|
||||
)
|
||||
result = _build_filter_to_jails_map({"sshd": {"filter": "sshd[mode=aggressive]"}}, {"sshd"})
|
||||
assert "sshd" in result
|
||||
|
||||
def test_missing_filter_key_falls_back_to_jail_name(self) -> None:
|
||||
@@ -988,10 +950,13 @@ class TestGetFilter:
|
||||
async def test_raises_filter_not_found(self, tmp_path: Path) -> None:
|
||||
from app.services.config_file_service import FilterNotFoundError, get_filter
|
||||
|
||||
with patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
), pytest.raises(FilterNotFoundError):
|
||||
with (
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
),
|
||||
pytest.raises(FilterNotFoundError),
|
||||
):
|
||||
await get_filter(str(tmp_path), "/fake.sock", "nonexistent")
|
||||
|
||||
async def test_has_local_override_detected(self, tmp_path: Path) -> None:
|
||||
@@ -1093,10 +1058,13 @@ class TestGetFilterLocalOnly:
|
||||
async def test_raises_when_neither_conf_nor_local(self, tmp_path: Path) -> None:
|
||||
from app.services.config_file_service import FilterNotFoundError, get_filter
|
||||
|
||||
with patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
), pytest.raises(FilterNotFoundError):
|
||||
with (
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
),
|
||||
pytest.raises(FilterNotFoundError),
|
||||
):
|
||||
await get_filter(str(tmp_path), "/fake.sock", "nonexistent")
|
||||
|
||||
async def test_accepts_local_extension(self, tmp_path: Path) -> None:
|
||||
@@ -1212,9 +1180,7 @@ class TestSetJailLocalKeySync:
|
||||
|
||||
jail_d = tmp_path / "jail.d"
|
||||
jail_d.mkdir()
|
||||
(jail_d / "sshd.local").write_text(
|
||||
"[sshd]\nenabled = true\n"
|
||||
)
|
||||
(jail_d / "sshd.local").write_text("[sshd]\nenabled = true\n")
|
||||
|
||||
_set_jail_local_key_sync(tmp_path, "sshd", "filter", "newfilter")
|
||||
|
||||
@@ -1300,10 +1266,13 @@ class TestUpdateFilter:
|
||||
from app.models.config import FilterUpdateRequest
|
||||
from app.services.config_file_service import FilterNotFoundError, update_filter
|
||||
|
||||
with patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
), pytest.raises(FilterNotFoundError):
|
||||
with (
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
),
|
||||
pytest.raises(FilterNotFoundError),
|
||||
):
|
||||
await update_filter(
|
||||
str(tmp_path),
|
||||
"/fake.sock",
|
||||
@@ -1321,10 +1290,13 @@ class TestUpdateFilter:
|
||||
filter_d = tmp_path / "filter.d"
|
||||
_write(filter_d / "sshd.conf", _FILTER_CONF_WITH_REGEX)
|
||||
|
||||
with patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
), pytest.raises(FilterInvalidRegexError):
|
||||
with (
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
),
|
||||
pytest.raises(FilterInvalidRegexError),
|
||||
):
|
||||
await update_filter(
|
||||
str(tmp_path),
|
||||
"/fake.sock",
|
||||
@@ -1351,13 +1323,16 @@ class TestUpdateFilter:
|
||||
filter_d = tmp_path / "filter.d"
|
||||
_write(filter_d / "sshd.conf", _FILTER_CONF)
|
||||
|
||||
with patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
), patch(
|
||||
"app.services.config_file_service.jail_service.reload_all",
|
||||
new=AsyncMock(),
|
||||
) as mock_reload:
|
||||
with (
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
),
|
||||
patch(
|
||||
"app.services.config_file_service.jail_service.reload_all",
|
||||
new=AsyncMock(),
|
||||
) as mock_reload,
|
||||
):
|
||||
await update_filter(
|
||||
str(tmp_path),
|
||||
"/fake.sock",
|
||||
@@ -1405,10 +1380,13 @@ class TestCreateFilter:
|
||||
filter_d = tmp_path / "filter.d"
|
||||
_write(filter_d / "sshd.conf", _FILTER_CONF)
|
||||
|
||||
with patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
), pytest.raises(FilterAlreadyExistsError):
|
||||
with (
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
),
|
||||
pytest.raises(FilterAlreadyExistsError),
|
||||
):
|
||||
await create_filter(
|
||||
str(tmp_path),
|
||||
"/fake.sock",
|
||||
@@ -1422,10 +1400,13 @@ class TestCreateFilter:
|
||||
filter_d = tmp_path / "filter.d"
|
||||
_write(filter_d / "custom.local", "[Definition]\n")
|
||||
|
||||
with patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
), pytest.raises(FilterAlreadyExistsError):
|
||||
with (
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
),
|
||||
pytest.raises(FilterAlreadyExistsError),
|
||||
):
|
||||
await create_filter(
|
||||
str(tmp_path),
|
||||
"/fake.sock",
|
||||
@@ -1436,10 +1417,13 @@ class TestCreateFilter:
|
||||
from app.models.config import FilterCreateRequest
|
||||
from app.services.config_file_service import FilterInvalidRegexError, create_filter
|
||||
|
||||
with patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
), pytest.raises(FilterInvalidRegexError):
|
||||
with (
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
),
|
||||
pytest.raises(FilterInvalidRegexError),
|
||||
):
|
||||
await create_filter(
|
||||
str(tmp_path),
|
||||
"/fake.sock",
|
||||
@@ -1461,13 +1445,16 @@ class TestCreateFilter:
|
||||
from app.models.config import FilterCreateRequest
|
||||
from app.services.config_file_service import create_filter
|
||||
|
||||
with patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
), patch(
|
||||
"app.services.config_file_service.jail_service.reload_all",
|
||||
new=AsyncMock(),
|
||||
) as mock_reload:
|
||||
with (
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
),
|
||||
patch(
|
||||
"app.services.config_file_service.jail_service.reload_all",
|
||||
new=AsyncMock(),
|
||||
) as mock_reload,
|
||||
):
|
||||
await create_filter(
|
||||
str(tmp_path),
|
||||
"/fake.sock",
|
||||
@@ -1485,9 +1472,7 @@ class TestCreateFilter:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestDeleteFilter:
|
||||
async def test_deletes_local_file_when_conf_and_local_exist(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_deletes_local_file_when_conf_and_local_exist(self, tmp_path: Path) -> None:
|
||||
from app.services.config_file_service import delete_filter
|
||||
|
||||
filter_d = tmp_path / "filter.d"
|
||||
@@ -1524,9 +1509,7 @@ class TestDeleteFilter:
|
||||
with pytest.raises(FilterNotFoundError):
|
||||
await delete_filter(str(tmp_path), "nonexistent")
|
||||
|
||||
async def test_accepts_filter_name_error_for_invalid_name(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_accepts_filter_name_error_for_invalid_name(self, tmp_path: Path) -> None:
|
||||
from app.services.config_file_service import FilterNameError, delete_filter
|
||||
|
||||
with pytest.raises(FilterNameError):
|
||||
@@ -1607,9 +1590,7 @@ class TestAssignFilterToJail:
|
||||
AssignFilterRequest(filter_name="sshd"),
|
||||
)
|
||||
|
||||
async def test_raises_filter_name_error_for_invalid_filter(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_raises_filter_name_error_for_invalid_filter(self, tmp_path: Path) -> None:
|
||||
from app.models.config import AssignFilterRequest
|
||||
from app.services.config_file_service import FilterNameError, assign_filter_to_jail
|
||||
|
||||
@@ -1719,34 +1700,26 @@ class TestBuildActionToJailsMap:
|
||||
def test_active_jail_maps_to_action(self) -> None:
|
||||
from app.services.config_file_service import _build_action_to_jails_map
|
||||
|
||||
result = _build_action_to_jails_map(
|
||||
{"sshd": {"action": "iptables-multiport"}}, {"sshd"}
|
||||
)
|
||||
result = _build_action_to_jails_map({"sshd": {"action": "iptables-multiport"}}, {"sshd"})
|
||||
assert result == {"iptables-multiport": ["sshd"]}
|
||||
|
||||
def test_inactive_jail_not_included(self) -> None:
|
||||
from app.services.config_file_service import _build_action_to_jails_map
|
||||
|
||||
result = _build_action_to_jails_map(
|
||||
{"sshd": {"action": "iptables-multiport"}}, set()
|
||||
)
|
||||
result = _build_action_to_jails_map({"sshd": {"action": "iptables-multiport"}}, set())
|
||||
assert result == {}
|
||||
|
||||
def test_multiple_actions_per_jail(self) -> None:
|
||||
from app.services.config_file_service import _build_action_to_jails_map
|
||||
|
||||
result = _build_action_to_jails_map(
|
||||
{"sshd": {"action": "iptables-multiport\niptables-ipset"}}, {"sshd"}
|
||||
)
|
||||
result = _build_action_to_jails_map({"sshd": {"action": "iptables-multiport\niptables-ipset"}}, {"sshd"})
|
||||
assert "iptables-multiport" in result
|
||||
assert "iptables-ipset" in result
|
||||
|
||||
def test_parameter_block_stripped(self) -> None:
|
||||
from app.services.config_file_service import _build_action_to_jails_map
|
||||
|
||||
result = _build_action_to_jails_map(
|
||||
{"sshd": {"action": "iptables[port=ssh, protocol=tcp]"}}, {"sshd"}
|
||||
)
|
||||
result = _build_action_to_jails_map({"sshd": {"action": "iptables[port=ssh, protocol=tcp]"}}, {"sshd"})
|
||||
assert "iptables" in result
|
||||
|
||||
def test_multiple_jails_sharing_action(self) -> None:
|
||||
@@ -2001,10 +1974,13 @@ class TestGetAction:
|
||||
async def test_raises_for_unknown_action(self, tmp_path: Path) -> None:
|
||||
from app.services.config_file_service import ActionNotFoundError, get_action
|
||||
|
||||
with patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
), pytest.raises(ActionNotFoundError):
|
||||
with (
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
),
|
||||
pytest.raises(ActionNotFoundError),
|
||||
):
|
||||
await get_action(str(tmp_path), "/fake.sock", "nonexistent")
|
||||
|
||||
async def test_local_only_action_returned(self, tmp_path: Path) -> None:
|
||||
@@ -2118,10 +2094,13 @@ class TestUpdateAction:
|
||||
from app.models.config import ActionUpdateRequest
|
||||
from app.services.config_file_service import ActionNotFoundError, update_action
|
||||
|
||||
with patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
), pytest.raises(ActionNotFoundError):
|
||||
with (
|
||||
patch(
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
),
|
||||
pytest.raises(ActionNotFoundError),
|
||||
):
|
||||
await update_action(
|
||||
str(tmp_path),
|
||||
"/fake.sock",
|
||||
@@ -2587,9 +2566,7 @@ class TestRemoveActionFromJail:
|
||||
"app.services.config_file_service._get_active_jail_names",
|
||||
new=AsyncMock(return_value=set()),
|
||||
):
|
||||
await remove_action_from_jail(
|
||||
str(tmp_path), "/fake.sock", "sshd", "iptables-multiport"
|
||||
)
|
||||
await remove_action_from_jail(str(tmp_path), "/fake.sock", "sshd", "iptables-multiport")
|
||||
|
||||
content = (jail_d / "sshd.local").read_text()
|
||||
assert "iptables-multiport" not in content
|
||||
@@ -2601,17 +2578,13 @@ class TestRemoveActionFromJail:
|
||||
)
|
||||
|
||||
with pytest.raises(JailNotFoundInConfigError):
|
||||
await remove_action_from_jail(
|
||||
str(tmp_path), "/fake.sock", "nonexistent", "iptables"
|
||||
)
|
||||
await remove_action_from_jail(str(tmp_path), "/fake.sock", "nonexistent", "iptables")
|
||||
|
||||
async def test_raises_jail_name_error(self, tmp_path: Path) -> None:
|
||||
from app.services.config_file_service import JailNameError, remove_action_from_jail
|
||||
|
||||
with pytest.raises(JailNameError):
|
||||
await remove_action_from_jail(
|
||||
str(tmp_path), "/fake.sock", "../evil", "iptables"
|
||||
)
|
||||
await remove_action_from_jail(str(tmp_path), "/fake.sock", "../evil", "iptables")
|
||||
|
||||
async def test_raises_action_name_error(self, tmp_path: Path) -> None:
|
||||
from app.services.config_file_service import ActionNameError, remove_action_from_jail
|
||||
@@ -2619,9 +2592,7 @@ class TestRemoveActionFromJail:
|
||||
_write(tmp_path / "jail.conf", JAIL_CONF)
|
||||
|
||||
with pytest.raises(ActionNameError):
|
||||
await remove_action_from_jail(
|
||||
str(tmp_path), "/fake.sock", "sshd", "../evil"
|
||||
)
|
||||
await remove_action_from_jail(str(tmp_path), "/fake.sock", "sshd", "../evil")
|
||||
|
||||
async def test_triggers_reload_when_requested(self, tmp_path: Path) -> None:
|
||||
from app.services.config_file_service import remove_action_from_jail
|
||||
@@ -2640,9 +2611,7 @@ class TestRemoveActionFromJail:
|
||||
new=AsyncMock(),
|
||||
) as mock_reload,
|
||||
):
|
||||
await remove_action_from_jail(
|
||||
str(tmp_path), "/fake.sock", "sshd", "iptables", do_reload=True
|
||||
)
|
||||
await remove_action_from_jail(str(tmp_path), "/fake.sock", "sshd", "iptables", do_reload=True)
|
||||
|
||||
mock_reload.assert_awaited_once()
|
||||
|
||||
@@ -2680,13 +2649,9 @@ class TestActivateJailReloadArgs:
|
||||
mock_js.reload_all = AsyncMock()
|
||||
await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
|
||||
|
||||
mock_js.reload_all.assert_awaited_once_with(
|
||||
"/fake.sock", include_jails=["apache-auth"]
|
||||
)
|
||||
mock_js.reload_all.assert_awaited_once_with("/fake.sock", include_jails=["apache-auth"])
|
||||
|
||||
async def test_activate_returns_active_true_when_jail_starts(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_activate_returns_active_true_when_jail_starts(self, tmp_path: Path) -> None:
|
||||
"""activate_jail returns active=True when the jail appears in post-reload names."""
|
||||
_write(tmp_path / "jail.conf", JAIL_CONF)
|
||||
from app.models.config import ActivateJailRequest, JailValidationResult
|
||||
@@ -2708,16 +2673,12 @@ class TestActivateJailReloadArgs:
|
||||
),
|
||||
):
|
||||
mock_js.reload_all = AsyncMock()
|
||||
result = await activate_jail(
|
||||
str(tmp_path), "/fake.sock", "apache-auth", req
|
||||
)
|
||||
result = await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
|
||||
|
||||
assert result.active is True
|
||||
assert "activated" in result.message.lower()
|
||||
|
||||
async def test_activate_returns_active_false_when_jail_does_not_start(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_activate_returns_active_false_when_jail_does_not_start(self, tmp_path: Path) -> None:
|
||||
"""activate_jail returns active=False when the jail is absent after reload.
|
||||
|
||||
This covers the Stage 3.1 requirement: if the jail config is invalid
|
||||
@@ -2746,9 +2707,7 @@ class TestActivateJailReloadArgs:
|
||||
),
|
||||
):
|
||||
mock_js.reload_all = AsyncMock()
|
||||
result = await activate_jail(
|
||||
str(tmp_path), "/fake.sock", "apache-auth", req
|
||||
)
|
||||
result = await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
|
||||
|
||||
assert result.active is False
|
||||
assert "apache-auth" in result.name
|
||||
@@ -2776,23 +2735,13 @@ class TestDeactivateJailReloadArgs:
|
||||
mock_js.reload_all = AsyncMock()
|
||||
await deactivate_jail(str(tmp_path), "/fake.sock", "sshd")
|
||||
|
||||
mock_js.reload_all.assert_awaited_once_with(
|
||||
"/fake.sock", exclude_jails=["sshd"]
|
||||
)
|
||||
mock_js.reload_all.assert_awaited_once_with("/fake.sock", exclude_jails=["sshd"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _validate_jail_config_sync (Task 3)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
from app.services.config_file_service import ( # noqa: E402 (added after block)
|
||||
_validate_jail_config_sync,
|
||||
_extract_filter_base_name,
|
||||
_extract_action_base_name,
|
||||
validate_jail_config,
|
||||
rollback_jail,
|
||||
)
|
||||
|
||||
|
||||
class TestExtractFilterBaseName:
|
||||
def test_plain_name(self) -> None:
|
||||
@@ -2938,11 +2887,11 @@ class TestRollbackJail:
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.services.config_file_service._start_daemon",
|
||||
"app.services.config_file_service.start_daemon",
|
||||
new=AsyncMock(return_value=True),
|
||||
),
|
||||
patch(
|
||||
"app.services.config_file_service._wait_for_fail2ban",
|
||||
"app.services.config_file_service.wait_for_fail2ban",
|
||||
new=AsyncMock(return_value=True),
|
||||
),
|
||||
patch(
|
||||
@@ -2950,9 +2899,7 @@ class TestRollbackJail:
|
||||
new=AsyncMock(return_value=set()),
|
||||
),
|
||||
):
|
||||
result = await rollback_jail(
|
||||
str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"]
|
||||
)
|
||||
result = await rollback_jail(str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"])
|
||||
|
||||
assert result.disabled is True
|
||||
assert result.fail2ban_running is True
|
||||
@@ -2968,26 +2915,22 @@ class TestRollbackJail:
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.services.config_file_service._start_daemon",
|
||||
"app.services.config_file_service.start_daemon",
|
||||
new=AsyncMock(return_value=False),
|
||||
),
|
||||
patch(
|
||||
"app.services.config_file_service._wait_for_fail2ban",
|
||||
"app.services.config_file_service.wait_for_fail2ban",
|
||||
new=AsyncMock(return_value=False),
|
||||
),
|
||||
):
|
||||
result = await rollback_jail(
|
||||
str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"]
|
||||
)
|
||||
result = await rollback_jail(str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"])
|
||||
|
||||
assert result.fail2ban_running is False
|
||||
assert result.disabled is True
|
||||
|
||||
async def test_rollback_rejects_unsafe_name(self, tmp_path: Path) -> None:
|
||||
with pytest.raises(JailNameError):
|
||||
await rollback_jail(
|
||||
str(tmp_path), "/fake.sock", "../evil", ["fail2ban-client", "start"]
|
||||
)
|
||||
await rollback_jail(str(tmp_path), "/fake.sock", "../evil", ["fail2ban-client", "start"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -3096,9 +3039,7 @@ class TestActivateJailBlocking:
|
||||
class TestActivateJailRollback:
|
||||
"""Rollback logic in activate_jail restores the .local file and recovers."""
|
||||
|
||||
async def test_activate_jail_rollback_on_reload_failure(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_activate_jail_rollback_on_reload_failure(self, tmp_path: Path) -> None:
|
||||
"""Rollback when reload_all raises on the activation reload.
|
||||
|
||||
Expects:
|
||||
@@ -3135,23 +3076,17 @@ class TestActivateJailRollback:
|
||||
),
|
||||
patch(
|
||||
"app.services.config_file_service._validate_jail_config_sync",
|
||||
return_value=JailValidationResult(
|
||||
jail_name="apache-auth", valid=True
|
||||
),
|
||||
return_value=JailValidationResult(jail_name="apache-auth", valid=True),
|
||||
),
|
||||
):
|
||||
mock_js.reload_all = AsyncMock(side_effect=reload_side_effect)
|
||||
result = await activate_jail(
|
||||
str(tmp_path), "/fake.sock", "apache-auth", req
|
||||
)
|
||||
result = await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
|
||||
|
||||
assert result.active is False
|
||||
assert result.recovered is True
|
||||
assert local_path.read_text() == original_local
|
||||
|
||||
async def test_activate_jail_rollback_on_health_check_failure(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_activate_jail_rollback_on_health_check_failure(self, tmp_path: Path) -> None:
|
||||
"""Rollback when fail2ban is unreachable after the activation reload.
|
||||
|
||||
Expects:
|
||||
@@ -3190,15 +3125,11 @@ class TestActivateJailRollback:
|
||||
),
|
||||
patch(
|
||||
"app.services.config_file_service._validate_jail_config_sync",
|
||||
return_value=JailValidationResult(
|
||||
jail_name="apache-auth", valid=True
|
||||
),
|
||||
return_value=JailValidationResult(jail_name="apache-auth", valid=True),
|
||||
),
|
||||
):
|
||||
mock_js.reload_all = AsyncMock()
|
||||
result = await activate_jail(
|
||||
str(tmp_path), "/fake.sock", "apache-auth", req
|
||||
)
|
||||
result = await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
|
||||
|
||||
assert result.active is False
|
||||
assert result.recovered is True
|
||||
@@ -3232,25 +3163,17 @@ class TestActivateJailRollback:
|
||||
),
|
||||
patch(
|
||||
"app.services.config_file_service._validate_jail_config_sync",
|
||||
return_value=JailValidationResult(
|
||||
jail_name="apache-auth", valid=True
|
||||
),
|
||||
return_value=JailValidationResult(jail_name="apache-auth", valid=True),
|
||||
),
|
||||
):
|
||||
# Both the activation reload and the recovery reload fail.
|
||||
mock_js.reload_all = AsyncMock(
|
||||
side_effect=RuntimeError("fail2ban unavailable")
|
||||
)
|
||||
result = await activate_jail(
|
||||
str(tmp_path), "/fake.sock", "apache-auth", req
|
||||
)
|
||||
mock_js.reload_all = AsyncMock(side_effect=RuntimeError("fail2ban unavailable"))
|
||||
result = await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
|
||||
|
||||
assert result.active is False
|
||||
assert result.recovered is False
|
||||
|
||||
async def test_activate_jail_rollback_on_jail_not_found_error(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_activate_jail_rollback_on_jail_not_found_error(self, tmp_path: Path) -> None:
|
||||
"""Rollback when reload_all raises JailNotFoundError (invalid config).
|
||||
|
||||
When fail2ban cannot create a jail due to invalid configuration
|
||||
@@ -3294,16 +3217,12 @@ class TestActivateJailRollback:
|
||||
),
|
||||
patch(
|
||||
"app.services.config_file_service._validate_jail_config_sync",
|
||||
return_value=JailValidationResult(
|
||||
jail_name="apache-auth", valid=True
|
||||
),
|
||||
return_value=JailValidationResult(jail_name="apache-auth", valid=True),
|
||||
),
|
||||
):
|
||||
mock_js.reload_all = AsyncMock(side_effect=reload_side_effect)
|
||||
mock_js.JailNotFoundError = JailNotFoundError
|
||||
result = await activate_jail(
|
||||
str(tmp_path), "/fake.sock", "apache-auth", req
|
||||
)
|
||||
result = await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
|
||||
|
||||
assert result.active is False
|
||||
assert result.recovered is True
|
||||
@@ -3311,9 +3230,7 @@ class TestActivateJailRollback:
|
||||
# Verify the error message mentions logpath issues.
|
||||
assert "logpath" in result.message.lower() or "check that all logpath" in result.message.lower()
|
||||
|
||||
async def test_activate_jail_rollback_deletes_file_when_no_prior_local(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_activate_jail_rollback_deletes_file_when_no_prior_local(self, tmp_path: Path) -> None:
|
||||
"""Rollback deletes the .local file when none existed before activation.
|
||||
|
||||
When a jail had no .local override before activation, activate_jail
|
||||
@@ -3355,15 +3272,11 @@ class TestActivateJailRollback:
|
||||
),
|
||||
patch(
|
||||
"app.services.config_file_service._validate_jail_config_sync",
|
||||
return_value=JailValidationResult(
|
||||
jail_name="apache-auth", valid=True
|
||||
),
|
||||
return_value=JailValidationResult(jail_name="apache-auth", valid=True),
|
||||
),
|
||||
):
|
||||
mock_js.reload_all = AsyncMock(side_effect=reload_side_effect)
|
||||
result = await activate_jail(
|
||||
str(tmp_path), "/fake.sock", "apache-auth", req
|
||||
)
|
||||
result = await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
|
||||
|
||||
assert result.active is False
|
||||
assert result.recovered is True
|
||||
@@ -3376,7 +3289,7 @@ class TestActivateJailRollback:
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestRollbackJail:
|
||||
class TestRollbackJailIntegration:
|
||||
"""Integration tests for :func:`~app.services.config_file_service.rollback_jail`."""
|
||||
|
||||
async def test_local_file_written_enabled_false(self, tmp_path: Path) -> None:
|
||||
@@ -3419,15 +3332,11 @@ class TestRollbackJail:
|
||||
AsyncMock(return_value={"other"}),
|
||||
),
|
||||
):
|
||||
await rollback_jail(
|
||||
str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"]
|
||||
)
|
||||
await rollback_jail(str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"])
|
||||
|
||||
mock_start.assert_awaited_once_with(["fail2ban-client", "start"])
|
||||
|
||||
async def test_fail2ban_running_reflects_socket_probe_not_subprocess_exit(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_fail2ban_running_reflects_socket_probe_not_subprocess_exit(self, tmp_path: Path) -> None:
|
||||
"""fail2ban_running in the response reflects the socket probe result.
|
||||
|
||||
Even when start_daemon returns True (subprocess exit 0), if the socket
|
||||
@@ -3443,15 +3352,11 @@ class TestRollbackJail:
|
||||
AsyncMock(return_value=False), # socket still unresponsive
|
||||
),
|
||||
):
|
||||
result = await rollback_jail(
|
||||
str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"]
|
||||
)
|
||||
result = await rollback_jail(str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"])
|
||||
|
||||
assert result.fail2ban_running is False
|
||||
|
||||
async def test_active_jails_zero_when_fail2ban_not_running(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_active_jails_zero_when_fail2ban_not_running(self, tmp_path: Path) -> None:
|
||||
"""active_jails is 0 in the response when fail2ban_running is False."""
|
||||
with (
|
||||
patch(
|
||||
@@ -3463,15 +3368,11 @@ class TestRollbackJail:
|
||||
AsyncMock(return_value=False),
|
||||
),
|
||||
):
|
||||
result = await rollback_jail(
|
||||
str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"]
|
||||
)
|
||||
result = await rollback_jail(str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"])
|
||||
|
||||
assert result.active_jails == 0
|
||||
|
||||
async def test_active_jails_count_from_socket_when_running(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_active_jails_count_from_socket_when_running(self, tmp_path: Path) -> None:
|
||||
"""active_jails reflects the actual jail count from the socket when fail2ban is up."""
|
||||
with (
|
||||
patch(
|
||||
@@ -3487,15 +3388,11 @@ class TestRollbackJail:
|
||||
AsyncMock(return_value={"sshd", "nginx", "apache-auth"}),
|
||||
),
|
||||
):
|
||||
result = await rollback_jail(
|
||||
str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"]
|
||||
)
|
||||
result = await rollback_jail(str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"])
|
||||
|
||||
assert result.active_jails == 3
|
||||
|
||||
async def test_fail2ban_down_at_start_still_succeeds_file_write(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
async def test_fail2ban_down_at_start_still_succeeds_file_write(self, tmp_path: Path) -> None:
|
||||
"""rollback_jail writes the local file even when fail2ban is down at call time."""
|
||||
# fail2ban is down: start_daemon fails and wait_for_fail2ban returns False.
|
||||
with (
|
||||
@@ -3508,12 +3405,9 @@ class TestRollbackJail:
|
||||
AsyncMock(return_value=False),
|
||||
),
|
||||
):
|
||||
result = await rollback_jail(
|
||||
str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"]
|
||||
)
|
||||
result = await rollback_jail(str(tmp_path), "/fake.sock", "sshd", ["fail2ban-client", "start"])
|
||||
|
||||
local = tmp_path / "jail.d" / "sshd.local"
|
||||
assert local.is_file(), "local file must be written even when fail2ban is down"
|
||||
assert result.disabled is True
|
||||
assert result.fail2ban_running is False
|
||||
|
||||
|
||||
Reference in New Issue
Block a user