- PUT /api/config/filters/{name}: updates failregex/ignoreregex/datepattern/
journalmatch in filter.d/{name}.local; validates regex via re.compile();
supports ?reload=true
- POST /api/config/filters: creates filter.d/{name}.local from FilterCreateRequest;
returns 409 if file already exists
- DELETE /api/config/filters/{name}: deletes .local only; returns 409 for
conf-only (readonly) filters
- POST /api/config/jails/{name}/filter: assigns filter to jail by writing
'filter = {name}' to jail.d/{jail}.local; supports ?reload=true
- New models: FilterUpdateRequest, FilterCreateRequest, AssignFilterRequest
- New service helpers: _safe_filter_name, _validate_regex_patterns,
_write_filter_local_sync, _set_jail_local_key_sync
- Fixed .local-only filter discovery in _parse_filters_sync (5-tuple return)
- Fixed get_filter extension stripping (.local is 6 chars not 5)
- Renamed file_config.py raw-write routes to /raw suffix
(PUT /filters/{name}/raw, POST /filters/raw) to avoid routing conflicts
- Full service + router tests; all 930 tests pass
1490 lines
54 KiB
Python
1490 lines
54 KiB
Python
"""Tests for config_file_service — fail2ban jail config parser and activator."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
|
|
from app.services.config_file_service import (
|
|
JailAlreadyActiveError,
|
|
JailAlreadyInactiveError,
|
|
JailNameError,
|
|
JailNotFoundInConfigError,
|
|
_build_inactive_jail,
|
|
_ordered_config_files,
|
|
_parse_jails_sync,
|
|
_resolve_filter,
|
|
_safe_jail_name,
|
|
_write_local_override_sync,
|
|
activate_jail,
|
|
deactivate_jail,
|
|
list_inactive_jails,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _write(path: Path, content: str) -> None:
|
|
"""Write text to *path*, creating parent directories if needed."""
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(content, encoding="utf-8")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _safe_jail_name
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSafeJailName:
|
|
def test_valid_simple(self) -> None:
|
|
assert _safe_jail_name("sshd") == "sshd"
|
|
|
|
def test_valid_with_hyphen(self) -> None:
|
|
assert _safe_jail_name("apache-auth") == "apache-auth"
|
|
|
|
def test_valid_with_dot(self) -> None:
|
|
assert _safe_jail_name("nginx.http") == "nginx.http"
|
|
|
|
def test_valid_with_underscore(self) -> None:
|
|
assert _safe_jail_name("my_jail") == "my_jail"
|
|
|
|
def test_invalid_path_traversal(self) -> None:
|
|
with pytest.raises(JailNameError):
|
|
_safe_jail_name("../evil")
|
|
|
|
def test_invalid_slash(self) -> None:
|
|
with pytest.raises(JailNameError):
|
|
_safe_jail_name("a/b")
|
|
|
|
def test_invalid_starts_with_dash(self) -> None:
|
|
with pytest.raises(JailNameError):
|
|
_safe_jail_name("-bad")
|
|
|
|
def test_invalid_empty(self) -> None:
|
|
with pytest.raises(JailNameError):
|
|
_safe_jail_name("")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _resolve_filter
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestResolveFilter:
|
|
def test_name_substitution(self) -> None:
|
|
result = _resolve_filter("%(__name__)s", "sshd", "normal")
|
|
assert result == "sshd"
|
|
|
|
def test_mode_substitution(self) -> None:
|
|
result = _resolve_filter("%(__name__)s[mode=%(mode)s]", "sshd", "aggressive")
|
|
assert result == "sshd[mode=aggressive]"
|
|
|
|
def test_no_substitution_needed(self) -> None:
|
|
result = _resolve_filter("my-filter", "sshd", "normal")
|
|
assert result == "my-filter"
|
|
|
|
def test_empty_raw(self) -> None:
|
|
result = _resolve_filter("", "sshd", "normal")
|
|
assert result == ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _ordered_config_files
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestOrderedConfigFiles:
|
|
def test_empty_dir(self, tmp_path: Path) -> None:
|
|
result = _ordered_config_files(tmp_path)
|
|
assert result == []
|
|
|
|
def test_jail_conf_only(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", "[sshd]\nenabled=true\n")
|
|
result = _ordered_config_files(tmp_path)
|
|
assert result == [tmp_path / "jail.conf"]
|
|
|
|
def test_full_merge_order(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", "[DEFAULT]\n")
|
|
_write(tmp_path / "jail.local", "[DEFAULT]\n")
|
|
_write(tmp_path / "jail.d" / "custom.conf", "[sshd]\n")
|
|
_write(tmp_path / "jail.d" / "custom.local", "[sshd]\n")
|
|
|
|
result = _ordered_config_files(tmp_path)
|
|
|
|
assert result[0] == tmp_path / "jail.conf"
|
|
assert result[1] == tmp_path / "jail.local"
|
|
assert result[2] == tmp_path / "jail.d" / "custom.conf"
|
|
assert result[3] == tmp_path / "jail.d" / "custom.local"
|
|
|
|
def test_jail_d_sorted_alphabetically(self, tmp_path: Path) -> None:
|
|
(tmp_path / "jail.d").mkdir()
|
|
for name in ("zzz.conf", "aaa.conf", "mmm.conf"):
|
|
_write(tmp_path / "jail.d" / name, "")
|
|
result = _ordered_config_files(tmp_path)
|
|
names = [p.name for p in result]
|
|
assert names == ["aaa.conf", "mmm.conf", "zzz.conf"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _parse_jails_sync
|
|
# ---------------------------------------------------------------------------
|
|
|
|
JAIL_CONF = """\
|
|
[DEFAULT]
|
|
bantime = 10m
|
|
findtime = 5m
|
|
maxretry = 5
|
|
|
|
[sshd]
|
|
enabled = true
|
|
filter = sshd
|
|
port = ssh
|
|
logpath = /var/log/auth.log
|
|
|
|
[apache-auth]
|
|
enabled = false
|
|
filter = apache-auth
|
|
port = http,https
|
|
logpath = /var/log/apache2/error.log
|
|
"""
|
|
|
|
JAIL_LOCAL = """\
|
|
[sshd]
|
|
bantime = 1h
|
|
"""
|
|
|
|
JAIL_D_CUSTOM = """\
|
|
[nginx-http-auth]
|
|
enabled = false
|
|
filter = nginx-http-auth
|
|
port = http,https
|
|
logpath = /var/log/nginx/error.log
|
|
"""
|
|
|
|
|
|
class TestParseJailsSync:
|
|
def test_parses_all_jails(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
jails, _ = _parse_jails_sync(tmp_path)
|
|
assert "sshd" in jails
|
|
assert "apache-auth" in jails
|
|
|
|
def test_enabled_flag_parsing(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
jails, _ = _parse_jails_sync(tmp_path)
|
|
assert jails["sshd"]["enabled"] == "true"
|
|
assert jails["apache-auth"]["enabled"] == "false"
|
|
|
|
def test_default_inheritance(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
jails, _ = _parse_jails_sync(tmp_path)
|
|
# DEFAULT values should flow into each jail via configparser
|
|
assert jails["sshd"]["bantime"] == "10m"
|
|
assert jails["apache-auth"]["maxretry"] == "5"
|
|
|
|
def test_local_override_wins(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
_write(tmp_path / "jail.local", JAIL_LOCAL)
|
|
jails, _ = _parse_jails_sync(tmp_path)
|
|
# jail.local overrides bantime for sshd from 10m → 1h
|
|
assert jails["sshd"]["bantime"] == "1h"
|
|
|
|
def test_jail_d_conf_included(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
_write(tmp_path / "jail.d" / "custom.conf", JAIL_D_CUSTOM)
|
|
jails, _ = _parse_jails_sync(tmp_path)
|
|
assert "nginx-http-auth" in jails
|
|
|
|
def test_source_file_tracked(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
_write(tmp_path / "jail.d" / "custom.conf", JAIL_D_CUSTOM)
|
|
_, source_files = _parse_jails_sync(tmp_path)
|
|
# sshd comes from jail.conf; nginx-http-auth from jail.d/custom.conf
|
|
assert source_files["sshd"] == str(tmp_path / "jail.conf")
|
|
assert source_files["nginx-http-auth"] == str(tmp_path / "jail.d" / "custom.conf")
|
|
|
|
def test_source_file_local_override_tracked(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
_write(tmp_path / "jail.local", JAIL_LOCAL)
|
|
_, source_files = _parse_jails_sync(tmp_path)
|
|
# jail.local defines [sshd] again → that file wins
|
|
assert source_files["sshd"] == str(tmp_path / "jail.local")
|
|
|
|
def test_default_section_excluded(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
jails, _ = _parse_jails_sync(tmp_path)
|
|
assert "DEFAULT" not in jails
|
|
|
|
def test_includes_section_excluded(self, tmp_path: Path) -> None:
|
|
content = "[INCLUDES]\nbefore = paths-debian.conf\n" + JAIL_CONF
|
|
_write(tmp_path / "jail.conf", content)
|
|
jails, _ = _parse_jails_sync(tmp_path)
|
|
assert "INCLUDES" not in jails
|
|
|
|
def test_corrupt_file_skipped_gracefully(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", "[[bad section\n")
|
|
# Should not raise; bad section just yields no jails
|
|
jails, _ = _parse_jails_sync(tmp_path)
|
|
assert isinstance(jails, dict)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _build_inactive_jail
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestBuildInactiveJail:
|
|
def test_basic_fields(self) -> None:
|
|
settings = {
|
|
"enabled": "false",
|
|
"filter": "sshd",
|
|
"port": "ssh",
|
|
"logpath": "/var/log/auth.log",
|
|
"bantime": "10m",
|
|
"findtime": "5m",
|
|
"maxretry": "5",
|
|
"action": "",
|
|
}
|
|
jail = _build_inactive_jail("sshd", settings, "/etc/fail2ban/jail.d/sshd.conf")
|
|
assert jail.name == "sshd"
|
|
assert jail.filter == "sshd"
|
|
assert jail.port == "ssh"
|
|
assert jail.logpath == ["/var/log/auth.log"]
|
|
assert jail.bantime == "10m"
|
|
assert jail.findtime == "5m"
|
|
assert jail.maxretry == 5
|
|
assert jail.enabled is False
|
|
assert "sshd.conf" in jail.source_file
|
|
|
|
def test_filter_name_substitution(self) -> None:
|
|
settings = {"enabled": "false", "filter": "%(__name__)s"}
|
|
jail = _build_inactive_jail("myservice", settings, "/etc/fail2ban/jail.conf")
|
|
assert jail.filter == "myservice"
|
|
|
|
def test_missing_optional_fields(self) -> None:
|
|
jail = _build_inactive_jail("minimal", {}, "/etc/fail2ban/jail.conf")
|
|
assert jail.filter == "minimal" # falls back to name
|
|
assert jail.port is None
|
|
assert jail.logpath == []
|
|
assert jail.bantime is None
|
|
assert jail.maxretry is None
|
|
|
|
def test_multiline_logpath(self) -> None:
|
|
settings = {"logpath": "/var/log/app.log\n/var/log/app2.log"}
|
|
jail = _build_inactive_jail("app", settings, "/etc/fail2ban/jail.conf")
|
|
assert "/var/log/app.log" in jail.logpath
|
|
assert "/var/log/app2.log" in jail.logpath
|
|
|
|
def test_multiline_actions(self) -> None:
|
|
settings = {"action": "iptables-multiport\niptables-ipset"}
|
|
jail = _build_inactive_jail("app", settings, "/etc/fail2ban/jail.conf")
|
|
assert len(jail.actions) == 2
|
|
|
|
def test_enabled_true(self) -> None:
|
|
settings = {"enabled": "true"}
|
|
jail = _build_inactive_jail("active-jail", settings, "/etc/fail2ban/jail.conf")
|
|
assert jail.enabled is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _write_local_override_sync
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestWriteLocalOverrideSync:
|
|
def test_creates_local_file(self, tmp_path: Path) -> None:
|
|
_write_local_override_sync(tmp_path, "sshd", True, {})
|
|
local = tmp_path / "jail.d" / "sshd.local"
|
|
assert local.is_file()
|
|
|
|
def test_enabled_true_written(self, tmp_path: Path) -> None:
|
|
_write_local_override_sync(tmp_path, "sshd", True, {})
|
|
content = (tmp_path / "jail.d" / "sshd.local").read_text()
|
|
assert "enabled = true" in content
|
|
|
|
def test_enabled_false_written(self, tmp_path: Path) -> None:
|
|
_write_local_override_sync(tmp_path, "sshd", False, {})
|
|
content = (tmp_path / "jail.d" / "sshd.local").read_text()
|
|
assert "enabled = false" in content
|
|
|
|
def test_section_header_written(self, tmp_path: Path) -> None:
|
|
_write_local_override_sync(tmp_path, "apache-auth", True, {})
|
|
content = (tmp_path / "jail.d" / "apache-auth.local").read_text()
|
|
assert "[apache-auth]" in content
|
|
|
|
def test_override_bantime(self, tmp_path: Path) -> None:
|
|
_write_local_override_sync(tmp_path, "sshd", True, {"bantime": "1h"})
|
|
content = (tmp_path / "jail.d" / "sshd.local").read_text()
|
|
assert "bantime" in content
|
|
assert "1h" in content
|
|
|
|
def test_override_findtime(self, tmp_path: Path) -> None:
|
|
_write_local_override_sync(tmp_path, "sshd", True, {"findtime": "30m"})
|
|
content = (tmp_path / "jail.d" / "sshd.local").read_text()
|
|
assert "findtime" in content
|
|
assert "30m" in content
|
|
|
|
def test_override_maxretry(self, tmp_path: Path) -> None:
|
|
_write_local_override_sync(tmp_path, "sshd", True, {"maxretry": 3})
|
|
content = (tmp_path / "jail.d" / "sshd.local").read_text()
|
|
assert "maxretry" in content
|
|
assert "3" in content
|
|
|
|
def test_override_port(self, tmp_path: Path) -> None:
|
|
_write_local_override_sync(tmp_path, "sshd", True, {"port": "2222"})
|
|
content = (tmp_path / "jail.d" / "sshd.local").read_text()
|
|
assert "2222" in content
|
|
|
|
def test_override_logpath_list(self, tmp_path: Path) -> None:
|
|
_write_local_override_sync(
|
|
tmp_path, "sshd", True, {"logpath": ["/var/log/auth.log", "/var/log/secure"]}
|
|
)
|
|
content = (tmp_path / "jail.d" / "sshd.local").read_text()
|
|
assert "/var/log/auth.log" in content
|
|
assert "/var/log/secure" in content
|
|
|
|
def test_bang_gui_header_comment(self, tmp_path: Path) -> None:
|
|
_write_local_override_sync(tmp_path, "sshd", True, {})
|
|
content = (tmp_path / "jail.d" / "sshd.local").read_text()
|
|
assert "BanGUI" in content
|
|
|
|
def test_overwrites_existing_file(self, tmp_path: Path) -> None:
|
|
local = tmp_path / "jail.d" / "sshd.local"
|
|
local.parent.mkdir()
|
|
local.write_text("old content")
|
|
_write_local_override_sync(tmp_path, "sshd", True, {})
|
|
assert "old content" not in local.read_text()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# list_inactive_jails
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestListInactiveJails:
|
|
async def test_returns_only_inactive(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
# sshd is enabled=true; apache-auth is enabled=false
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value={"sshd"}),
|
|
):
|
|
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
|
|
|
|
names = [j.name for j in result.jails]
|
|
assert "sshd" not in names
|
|
assert "apache-auth" in names
|
|
|
|
async def test_total_matches_jails_count(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value={"sshd"}),
|
|
):
|
|
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
|
|
|
|
assert result.total == len(result.jails)
|
|
|
|
async def test_empty_config_dir(self, tmp_path: Path) -> None:
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
):
|
|
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
|
|
|
|
assert result.jails == []
|
|
assert result.total == 0
|
|
|
|
async def test_all_active_returns_empty(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value={"sshd", "apache-auth"}),
|
|
):
|
|
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
|
|
|
|
assert result.jails == []
|
|
|
|
async def test_fail2ban_unreachable_shows_all(self, tmp_path: Path) -> None:
|
|
# When fail2ban is unreachable, _get_active_jail_names returns empty set,
|
|
# so every config-defined jail appears as inactive.
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
):
|
|
result = await list_inactive_jails(str(tmp_path), "/fake.sock")
|
|
|
|
names = {j.name for j in result.jails}
|
|
assert "sshd" in names
|
|
assert "apache-auth" in names
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# activate_jail
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestActivateJail:
|
|
async def test_activates_known_inactive_jail(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
from app.models.config import ActivateJailRequest
|
|
|
|
req = ActivateJailRequest()
|
|
with (
|
|
patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
),
|
|
patch("app.services.config_file_service.jail_service") as mock_js,
|
|
):
|
|
mock_js.reload_all = AsyncMock()
|
|
result = await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
|
|
|
|
assert result.active is True
|
|
assert "apache-auth" in result.name
|
|
local = tmp_path / "jail.d" / "apache-auth.local"
|
|
assert local.is_file()
|
|
assert "enabled = true" in local.read_text()
|
|
|
|
async def test_raises_not_found_for_unknown_jail(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
from app.models.config import ActivateJailRequest
|
|
|
|
req = ActivateJailRequest()
|
|
with (
|
|
patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
),pytest.raises(JailNotFoundInConfigError)
|
|
):
|
|
await activate_jail(str(tmp_path), "/fake.sock", "nonexistent", req)
|
|
|
|
async def test_raises_already_active(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
from app.models.config import ActivateJailRequest
|
|
|
|
req = ActivateJailRequest()
|
|
with (
|
|
patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value={"sshd"}),
|
|
),pytest.raises(JailAlreadyActiveError)
|
|
):
|
|
await activate_jail(str(tmp_path), "/fake.sock", "sshd", req)
|
|
|
|
async def test_raises_name_error_for_bad_name(self, tmp_path: Path) -> None:
|
|
from app.models.config import ActivateJailRequest
|
|
|
|
req = ActivateJailRequest()
|
|
with pytest.raises(JailNameError):
|
|
await activate_jail(str(tmp_path), "/fake.sock", "../etc/passwd", req)
|
|
|
|
async def test_writes_overrides_to_local_file(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
from app.models.config import ActivateJailRequest
|
|
|
|
req = ActivateJailRequest(bantime="2h", maxretry=3)
|
|
with (
|
|
patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
),
|
|
patch("app.services.config_file_service.jail_service") as mock_js,
|
|
):
|
|
mock_js.reload_all = AsyncMock()
|
|
await activate_jail(str(tmp_path), "/fake.sock", "apache-auth", req)
|
|
|
|
content = (tmp_path / "jail.d" / "apache-auth.local").read_text()
|
|
assert "2h" in content
|
|
assert "3" in content
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# deactivate_jail
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestDeactivateJail:
|
|
async def test_deactivates_active_jail(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
with (
|
|
patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value={"sshd"}),
|
|
),
|
|
patch("app.services.config_file_service.jail_service") as mock_js,
|
|
):
|
|
mock_js.reload_all = AsyncMock()
|
|
result = await deactivate_jail(str(tmp_path), "/fake.sock", "sshd")
|
|
|
|
assert result.active is False
|
|
local = tmp_path / "jail.d" / "sshd.local"
|
|
assert local.is_file()
|
|
assert "enabled = false" in local.read_text()
|
|
|
|
async def test_raises_not_found(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
with (
|
|
patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value={"sshd"}),
|
|
),pytest.raises(JailNotFoundInConfigError)
|
|
):
|
|
await deactivate_jail(str(tmp_path), "/fake.sock", "nonexistent")
|
|
|
|
async def test_raises_already_inactive(self, tmp_path: Path) -> None:
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
with (
|
|
patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
),pytest.raises(JailAlreadyInactiveError)
|
|
):
|
|
await deactivate_jail(str(tmp_path), "/fake.sock", "apache-auth")
|
|
|
|
async def test_raises_name_error(self, tmp_path: Path) -> None:
|
|
with pytest.raises(JailNameError):
|
|
await deactivate_jail(str(tmp_path), "/fake.sock", "a/b")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _extract_filter_base_name
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestExtractFilterBaseName:
|
|
def test_simple_name(self) -> None:
|
|
from app.services.config_file_service import _extract_filter_base_name
|
|
|
|
assert _extract_filter_base_name("sshd") == "sshd"
|
|
|
|
def test_name_with_mode(self) -> None:
|
|
from app.services.config_file_service import _extract_filter_base_name
|
|
|
|
assert _extract_filter_base_name("sshd[mode=aggressive]") == "sshd"
|
|
|
|
def test_name_with_variable_mode(self) -> None:
|
|
from app.services.config_file_service import _extract_filter_base_name
|
|
|
|
assert _extract_filter_base_name("sshd[mode=%(mode)s]") == "sshd"
|
|
|
|
def test_whitespace_stripped(self) -> None:
|
|
from app.services.config_file_service import _extract_filter_base_name
|
|
|
|
assert _extract_filter_base_name(" nginx ") == "nginx"
|
|
|
|
def test_empty_string(self) -> None:
|
|
from app.services.config_file_service import _extract_filter_base_name
|
|
|
|
assert _extract_filter_base_name("") == ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _build_filter_to_jails_map
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestBuildFilterToJailsMap:
|
|
def test_active_jail_maps_to_filter(self) -> None:
|
|
from app.services.config_file_service import _build_filter_to_jails_map
|
|
|
|
result = _build_filter_to_jails_map({"sshd": {"filter": "sshd"}}, {"sshd"})
|
|
assert result == {"sshd": ["sshd"]}
|
|
|
|
def test_inactive_jail_not_included(self) -> None:
|
|
from app.services.config_file_service import _build_filter_to_jails_map
|
|
|
|
result = _build_filter_to_jails_map(
|
|
{"apache-auth": {"filter": "apache-auth"}}, set()
|
|
)
|
|
assert result == {}
|
|
|
|
def test_multiple_jails_sharing_filter(self) -> None:
|
|
from app.services.config_file_service import _build_filter_to_jails_map
|
|
|
|
all_jails = {
|
|
"sshd": {"filter": "sshd"},
|
|
"sshd-ddos": {"filter": "sshd"},
|
|
}
|
|
result = _build_filter_to_jails_map(all_jails, {"sshd", "sshd-ddos"})
|
|
assert sorted(result["sshd"]) == ["sshd", "sshd-ddos"]
|
|
|
|
def test_mode_suffix_stripped(self) -> None:
|
|
from app.services.config_file_service import _build_filter_to_jails_map
|
|
|
|
result = _build_filter_to_jails_map(
|
|
{"sshd": {"filter": "sshd[mode=aggressive]"}}, {"sshd"}
|
|
)
|
|
assert "sshd" in result
|
|
|
|
def test_missing_filter_key_falls_back_to_jail_name(self) -> None:
|
|
from app.services.config_file_service import _build_filter_to_jails_map
|
|
|
|
# When jail has no "filter" key the code falls back to the jail name.
|
|
result = _build_filter_to_jails_map({"sshd": {}}, {"sshd"})
|
|
assert "sshd" in result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _parse_filters_sync
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_FILTER_CONF = """\
|
|
[Definition]
|
|
failregex = ^Host: <HOST>
|
|
ignoreregex =
|
|
"""
|
|
|
|
|
|
class TestParseFiltersSync:
|
|
def test_returns_empty_for_missing_dir(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import _parse_filters_sync
|
|
|
|
result = _parse_filters_sync(tmp_path / "nonexistent")
|
|
assert result == []
|
|
|
|
def test_single_filter_returned(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import _parse_filters_sync
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "nginx.conf", _FILTER_CONF)
|
|
|
|
result = _parse_filters_sync(filter_d)
|
|
|
|
assert len(result) == 1
|
|
name, filename, content, has_local, source_path = result[0]
|
|
assert name == "nginx"
|
|
assert filename == "nginx.conf"
|
|
assert "failregex" in content
|
|
assert has_local is False
|
|
assert source_path.endswith("nginx.conf")
|
|
|
|
def test_local_override_detected(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import _parse_filters_sync
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "nginx.conf", _FILTER_CONF)
|
|
_write(filter_d / "nginx.local", "[Definition]\nignoreregex = ^safe\n")
|
|
|
|
result = _parse_filters_sync(filter_d)
|
|
|
|
_, _, _, has_local, _ = result[0]
|
|
assert has_local is True
|
|
|
|
def test_local_content_appended_to_content(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import _parse_filters_sync
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "nginx.conf", _FILTER_CONF)
|
|
_write(filter_d / "nginx.local", "[Definition]\n# local tweak\n")
|
|
|
|
result = _parse_filters_sync(filter_d)
|
|
|
|
_, _, content, _, _ = result[0]
|
|
assert "local tweak" in content
|
|
|
|
def test_sorted_alphabetically(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import _parse_filters_sync
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
for name in ("zzz", "aaa", "mmm"):
|
|
_write(filter_d / f"{name}.conf", _FILTER_CONF)
|
|
|
|
result = _parse_filters_sync(filter_d)
|
|
|
|
names = [r[0] for r in result]
|
|
assert names == ["aaa", "mmm", "zzz"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# list_filters
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestListFilters:
|
|
async def test_returns_all_filters(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import list_filters
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "sshd.conf", _FILTER_CONF)
|
|
_write(filter_d / "nginx.conf", _FILTER_CONF)
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
):
|
|
result = await list_filters(str(tmp_path), "/fake.sock")
|
|
|
|
assert result.total == 2
|
|
names = {f.name for f in result.filters}
|
|
assert "sshd" in names
|
|
assert "nginx" in names
|
|
|
|
async def test_active_flag_set_for_used_filter(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import list_filters
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "sshd.conf", _FILTER_CONF)
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value={"sshd"}),
|
|
):
|
|
result = await list_filters(str(tmp_path), "/fake.sock")
|
|
|
|
sshd = next(f for f in result.filters if f.name == "sshd")
|
|
assert sshd.active is True
|
|
assert "sshd" in sshd.used_by_jails
|
|
|
|
async def test_inactive_filter_not_marked_active(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import list_filters
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "nginx.conf", _FILTER_CONF)
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value={"sshd"}),
|
|
):
|
|
result = await list_filters(str(tmp_path), "/fake.sock")
|
|
|
|
nginx = next(f for f in result.filters if f.name == "nginx")
|
|
assert nginx.active is False
|
|
assert nginx.used_by_jails == []
|
|
|
|
async def test_has_local_override_detected(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import list_filters
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "sshd.conf", _FILTER_CONF)
|
|
_write(filter_d / "sshd.local", "[Definition]\n")
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
):
|
|
result = await list_filters(str(tmp_path), "/fake.sock")
|
|
|
|
sshd = next(f for f in result.filters if f.name == "sshd")
|
|
assert sshd.has_local_override is True
|
|
|
|
async def test_empty_filter_d_returns_empty(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import list_filters
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
):
|
|
result = await list_filters(str(tmp_path), "/fake.sock")
|
|
|
|
assert result.filters == []
|
|
assert result.total == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_filter
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestGetFilter:
|
|
async def test_returns_filter_config(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import get_filter
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "sshd.conf", _FILTER_CONF)
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value={"sshd"}),
|
|
):
|
|
result = await get_filter(str(tmp_path), "/fake.sock", "sshd")
|
|
|
|
assert result.name == "sshd"
|
|
assert result.active is True
|
|
assert "sshd" in result.used_by_jails
|
|
|
|
async def test_accepts_conf_extension(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import get_filter
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "sshd.conf", _FILTER_CONF)
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
):
|
|
result = await get_filter(str(tmp_path), "/fake.sock", "sshd.conf")
|
|
|
|
assert result.name == "sshd"
|
|
|
|
async def test_raises_filter_not_found(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import FilterNotFoundError, get_filter
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
), pytest.raises(FilterNotFoundError):
|
|
await get_filter(str(tmp_path), "/fake.sock", "nonexistent")
|
|
|
|
async def test_has_local_override_detected(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import get_filter
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "sshd.conf", _FILTER_CONF)
|
|
_write(filter_d / "sshd.local", "[Definition]\n")
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
):
|
|
result = await get_filter(str(tmp_path), "/fake.sock", "sshd")
|
|
|
|
assert result.has_local_override is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _parse_filters_sync — .local-only filters (Task 2.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestParseFiltersSyncLocalOnly:
|
|
"""Verify that .local-only user-created filters appear in results."""
|
|
|
|
def test_local_only_included(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import _parse_filters_sync
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "custom.local", "[Definition]\nfailregex = ^fail\n")
|
|
|
|
result = _parse_filters_sync(filter_d)
|
|
|
|
assert len(result) == 1
|
|
name, filename, content, has_local, source_path = result[0]
|
|
assert name == "custom"
|
|
assert filename == "custom.local"
|
|
assert has_local is False # .local-only: no conf to override
|
|
assert source_path.endswith("custom.local")
|
|
|
|
def test_local_only_not_duplicated_when_conf_exists(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import _parse_filters_sync
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "sshd.conf", _FILTER_CONF)
|
|
_write(filter_d / "sshd.local", "[Definition]\n")
|
|
|
|
result = _parse_filters_sync(filter_d)
|
|
|
|
# sshd should appear exactly once (conf + local, not as separate entry)
|
|
names = [r[0] for r in result]
|
|
assert names.count("sshd") == 1
|
|
_, _, _, has_local, _ = result[0]
|
|
assert has_local is True # conf + local → has_local_override
|
|
|
|
def test_local_only_sorted_with_conf_filters(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import _parse_filters_sync
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "zzz.conf", _FILTER_CONF)
|
|
_write(filter_d / "aaa.local", "[Definition]\nfailregex = x\n")
|
|
|
|
result = _parse_filters_sync(filter_d)
|
|
|
|
names = [r[0] for r in result]
|
|
assert names == ["aaa", "zzz"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_filter — .local-only filter (Task 2.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestGetFilterLocalOnly:
|
|
"""Verify that get_filter handles .local-only user-created filters."""
|
|
|
|
async def test_returns_local_only_filter(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import get_filter
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(
|
|
filter_d / "custom.local",
|
|
"[Definition]\nfailregex = ^fail from <HOST>\n",
|
|
)
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
):
|
|
result = await get_filter(str(tmp_path), "/fake.sock", "custom")
|
|
|
|
assert result.name == "custom"
|
|
assert result.has_local_override is False
|
|
assert result.source_file.endswith("custom.local")
|
|
assert len(result.failregex) == 1
|
|
|
|
async def test_raises_when_neither_conf_nor_local(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import FilterNotFoundError, get_filter
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
), pytest.raises(FilterNotFoundError):
|
|
await get_filter(str(tmp_path), "/fake.sock", "nonexistent")
|
|
|
|
async def test_accepts_local_extension(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import get_filter
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "custom.local", "[Definition]\nfailregex = x\n")
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
):
|
|
result = await get_filter(str(tmp_path), "/fake.sock", "custom.local")
|
|
|
|
assert result.name == "custom"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _validate_regex_patterns (Task 2.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestValidateRegexPatterns:
|
|
def test_valid_patterns_pass(self) -> None:
|
|
from app.services.config_file_service import _validate_regex_patterns
|
|
|
|
_validate_regex_patterns([r"^fail from \S+", r"\d+\.\d+"])
|
|
|
|
def test_empty_list_passes(self) -> None:
|
|
from app.services.config_file_service import _validate_regex_patterns
|
|
|
|
_validate_regex_patterns([])
|
|
|
|
def test_invalid_pattern_raises(self) -> None:
|
|
from app.services.config_file_service import (
|
|
FilterInvalidRegexError,
|
|
_validate_regex_patterns,
|
|
)
|
|
|
|
with pytest.raises(FilterInvalidRegexError) as exc_info:
|
|
_validate_regex_patterns([r"[unclosed"])
|
|
|
|
assert "[unclosed" in exc_info.value.pattern
|
|
|
|
def test_mixed_valid_invalid_raises_on_first_invalid(self) -> None:
|
|
from app.services.config_file_service import (
|
|
FilterInvalidRegexError,
|
|
_validate_regex_patterns,
|
|
)
|
|
|
|
with pytest.raises(FilterInvalidRegexError) as exc_info:
|
|
_validate_regex_patterns([r"\d+", r"[bad", r"\w+"])
|
|
|
|
assert "[bad" in exc_info.value.pattern
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _write_filter_local_sync (Task 2.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestWriteFilterLocalSync:
|
|
def test_writes_file(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import _write_filter_local_sync
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
filter_d.mkdir()
|
|
_write_filter_local_sync(filter_d, "myfilter", "[Definition]\n")
|
|
|
|
local = filter_d / "myfilter.local"
|
|
assert local.is_file()
|
|
assert "[Definition]" in local.read_text()
|
|
|
|
def test_creates_filter_d_if_missing(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import _write_filter_local_sync
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write_filter_local_sync(filter_d, "test", "[Definition]\n")
|
|
assert (filter_d / "test.local").is_file()
|
|
|
|
def test_overwrites_existing_file(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import _write_filter_local_sync
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
filter_d.mkdir()
|
|
(filter_d / "myfilter.local").write_text("old content")
|
|
|
|
_write_filter_local_sync(filter_d, "myfilter", "[Definition]\nnew=1\n")
|
|
|
|
assert "new=1" in (filter_d / "myfilter.local").read_text()
|
|
assert "old content" not in (filter_d / "myfilter.local").read_text()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _set_jail_local_key_sync (Task 2.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSetJailLocalKeySync:
|
|
def test_creates_new_local_file(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import _set_jail_local_key_sync
|
|
|
|
_set_jail_local_key_sync(tmp_path, "sshd", "filter", "myfilter")
|
|
|
|
local = tmp_path / "jail.d" / "sshd.local"
|
|
assert local.is_file()
|
|
content = local.read_text()
|
|
assert "filter" in content
|
|
assert "myfilter" in content
|
|
|
|
def test_updates_existing_local_file(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import _set_jail_local_key_sync
|
|
|
|
jail_d = tmp_path / "jail.d"
|
|
jail_d.mkdir()
|
|
(jail_d / "sshd.local").write_text(
|
|
"[sshd]\nenabled = true\n"
|
|
)
|
|
|
|
_set_jail_local_key_sync(tmp_path, "sshd", "filter", "newfilter")
|
|
|
|
content = (jail_d / "sshd.local").read_text()
|
|
assert "newfilter" in content
|
|
# Existing key is preserved
|
|
assert "enabled" in content
|
|
|
|
def test_overwrites_existing_key(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import _set_jail_local_key_sync
|
|
|
|
jail_d = tmp_path / "jail.d"
|
|
jail_d.mkdir()
|
|
(jail_d / "sshd.local").write_text("[sshd]\nfilter = old\n")
|
|
|
|
_set_jail_local_key_sync(tmp_path, "sshd", "filter", "newfilter")
|
|
|
|
content = (jail_d / "sshd.local").read_text()
|
|
assert "newfilter" in content
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# update_filter (Task 2.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
_FILTER_CONF_WITH_REGEX = """\
|
|
[Definition]
|
|
|
|
failregex = ^fail from <HOST>
|
|
^error from <HOST>
|
|
|
|
ignoreregex =
|
|
"""
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestUpdateFilter:
|
|
async def test_writes_local_override(self, tmp_path: Path) -> None:
|
|
from app.models.config import FilterUpdateRequest
|
|
from app.services.config_file_service import update_filter
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "sshd.conf", _FILTER_CONF_WITH_REGEX)
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
):
|
|
result = await update_filter(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
"sshd",
|
|
FilterUpdateRequest(failregex=[r"^new pattern <HOST>"]),
|
|
)
|
|
|
|
local = filter_d / "sshd.local"
|
|
assert local.is_file()
|
|
assert result.name == "sshd"
|
|
assert any("new pattern" in p for p in result.failregex)
|
|
|
|
async def test_accepts_conf_extension(self, tmp_path: Path) -> None:
|
|
from app.models.config import FilterUpdateRequest
|
|
from app.services.config_file_service import update_filter
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "sshd.conf", _FILTER_CONF_WITH_REGEX)
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
):
|
|
result = await update_filter(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
"sshd.conf",
|
|
FilterUpdateRequest(datepattern="%Y-%m-%d"),
|
|
)
|
|
|
|
assert result.name == "sshd"
|
|
|
|
async def test_raises_filter_not_found(self, tmp_path: Path) -> None:
|
|
from app.models.config import FilterUpdateRequest
|
|
from app.services.config_file_service import FilterNotFoundError, update_filter
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
), pytest.raises(FilterNotFoundError):
|
|
await update_filter(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
"missing",
|
|
FilterUpdateRequest(),
|
|
)
|
|
|
|
async def test_raises_on_invalid_regex(self, tmp_path: Path) -> None:
|
|
from app.models.config import FilterUpdateRequest
|
|
from app.services.config_file_service import (
|
|
FilterInvalidRegexError,
|
|
update_filter,
|
|
)
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "sshd.conf", _FILTER_CONF_WITH_REGEX)
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
), pytest.raises(FilterInvalidRegexError):
|
|
await update_filter(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
"sshd",
|
|
FilterUpdateRequest(failregex=[r"[unclosed"]),
|
|
)
|
|
|
|
async def test_raises_filter_name_error_for_invalid_name(self, tmp_path: Path) -> None:
|
|
from app.models.config import FilterUpdateRequest
|
|
from app.services.config_file_service import FilterNameError, update_filter
|
|
|
|
with pytest.raises(FilterNameError):
|
|
await update_filter(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
"../etc/passwd",
|
|
FilterUpdateRequest(),
|
|
)
|
|
|
|
async def test_triggers_reload_when_requested(self, tmp_path: Path) -> None:
|
|
from app.models.config import FilterUpdateRequest
|
|
from app.services.config_file_service import update_filter
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "sshd.conf", _FILTER_CONF)
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
), patch(
|
|
"app.services.config_file_service.jail_service.reload_all",
|
|
new=AsyncMock(),
|
|
) as mock_reload:
|
|
await update_filter(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
"sshd",
|
|
FilterUpdateRequest(journalmatch="_SYSTEMD_UNIT=sshd.service"),
|
|
do_reload=True,
|
|
)
|
|
|
|
mock_reload.assert_awaited_once()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# create_filter (Task 2.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestCreateFilter:
|
|
async def test_creates_local_file(self, tmp_path: Path) -> None:
|
|
from app.models.config import FilterCreateRequest
|
|
from app.services.config_file_service import create_filter
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
):
|
|
result = await create_filter(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
FilterCreateRequest(
|
|
name="my-custom",
|
|
failregex=[r"^fail from <HOST>"],
|
|
),
|
|
)
|
|
|
|
local = tmp_path / "filter.d" / "my-custom.local"
|
|
assert local.is_file()
|
|
assert result.name == "my-custom"
|
|
assert result.source_file.endswith("my-custom.local")
|
|
|
|
async def test_raises_already_exists_when_conf_exists(self, tmp_path: Path) -> None:
|
|
from app.models.config import FilterCreateRequest
|
|
from app.services.config_file_service import FilterAlreadyExistsError, create_filter
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "sshd.conf", _FILTER_CONF)
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
), pytest.raises(FilterAlreadyExistsError):
|
|
await create_filter(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
FilterCreateRequest(name="sshd"),
|
|
)
|
|
|
|
async def test_raises_already_exists_when_local_exists(self, tmp_path: Path) -> None:
|
|
from app.models.config import FilterCreateRequest
|
|
from app.services.config_file_service import FilterAlreadyExistsError, create_filter
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "custom.local", "[Definition]\n")
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
), pytest.raises(FilterAlreadyExistsError):
|
|
await create_filter(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
FilterCreateRequest(name="custom"),
|
|
)
|
|
|
|
async def test_raises_invalid_regex(self, tmp_path: Path) -> None:
|
|
from app.models.config import FilterCreateRequest
|
|
from app.services.config_file_service import FilterInvalidRegexError, create_filter
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
), pytest.raises(FilterInvalidRegexError):
|
|
await create_filter(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
FilterCreateRequest(name="bad", failregex=[r"[unclosed"]),
|
|
)
|
|
|
|
async def test_raises_filter_name_error_for_invalid_name(self, tmp_path: Path) -> None:
|
|
from app.models.config import FilterCreateRequest
|
|
from app.services.config_file_service import FilterNameError, create_filter
|
|
|
|
with pytest.raises(FilterNameError):
|
|
await create_filter(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
FilterCreateRequest(name="../etc/evil"),
|
|
)
|
|
|
|
async def test_triggers_reload_when_requested(self, tmp_path: Path) -> None:
|
|
from app.models.config import FilterCreateRequest
|
|
from app.services.config_file_service import create_filter
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
), patch(
|
|
"app.services.config_file_service.jail_service.reload_all",
|
|
new=AsyncMock(),
|
|
) as mock_reload:
|
|
await create_filter(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
FilterCreateRequest(name="newfilter"),
|
|
do_reload=True,
|
|
)
|
|
|
|
mock_reload.assert_awaited_once()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# delete_filter (Task 2.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestDeleteFilter:
|
|
async def test_deletes_local_file_when_conf_and_local_exist(
|
|
self, tmp_path: Path
|
|
) -> None:
|
|
from app.services.config_file_service import delete_filter
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "sshd.conf", _FILTER_CONF)
|
|
_write(filter_d / "sshd.local", "[Definition]\n")
|
|
|
|
await delete_filter(str(tmp_path), "sshd")
|
|
|
|
assert not (filter_d / "sshd.local").exists()
|
|
assert (filter_d / "sshd.conf").exists()
|
|
|
|
async def test_deletes_local_only_filter(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import delete_filter
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "custom.local", "[Definition]\n")
|
|
|
|
await delete_filter(str(tmp_path), "custom")
|
|
|
|
assert not (filter_d / "custom.local").exists()
|
|
|
|
async def test_raises_readonly_for_conf_only(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import FilterReadonlyError, delete_filter
|
|
|
|
filter_d = tmp_path / "filter.d"
|
|
_write(filter_d / "sshd.conf", _FILTER_CONF)
|
|
|
|
with pytest.raises(FilterReadonlyError):
|
|
await delete_filter(str(tmp_path), "sshd")
|
|
|
|
async def test_raises_not_found_for_missing_filter(self, tmp_path: Path) -> None:
|
|
from app.services.config_file_service import FilterNotFoundError, delete_filter
|
|
|
|
with pytest.raises(FilterNotFoundError):
|
|
await delete_filter(str(tmp_path), "nonexistent")
|
|
|
|
async def test_accepts_filter_name_error_for_invalid_name(
|
|
self, tmp_path: Path
|
|
) -> None:
|
|
from app.services.config_file_service import FilterNameError, delete_filter
|
|
|
|
with pytest.raises(FilterNameError):
|
|
await delete_filter(str(tmp_path), "../evil")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# assign_filter_to_jail (Task 2.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestAssignFilterToJail:
|
|
async def test_writes_filter_key_to_jail_local(self, tmp_path: Path) -> None:
|
|
from app.models.config import AssignFilterRequest
|
|
from app.services.config_file_service import assign_filter_to_jail
|
|
|
|
# Setup: jail.conf with sshd jail, filter.conf for "myfilter"
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
_write(tmp_path / "filter.d" / "myfilter.conf", _FILTER_CONF)
|
|
|
|
with patch(
|
|
"app.services.config_file_service._get_active_jail_names",
|
|
new=AsyncMock(return_value=set()),
|
|
):
|
|
await assign_filter_to_jail(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
"sshd",
|
|
AssignFilterRequest(filter_name="myfilter"),
|
|
)
|
|
|
|
local = tmp_path / "jail.d" / "sshd.local"
|
|
assert local.is_file()
|
|
content = local.read_text()
|
|
assert "myfilter" in content
|
|
|
|
async def test_raises_jail_not_found(self, tmp_path: Path) -> None:
|
|
from app.models.config import AssignFilterRequest
|
|
from app.services.config_file_service import (
|
|
JailNotFoundInConfigError,
|
|
assign_filter_to_jail,
|
|
)
|
|
|
|
_write(tmp_path / "filter.d" / "sshd.conf", _FILTER_CONF)
|
|
|
|
with pytest.raises(JailNotFoundInConfigError):
|
|
await assign_filter_to_jail(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
"nonexistent-jail",
|
|
AssignFilterRequest(filter_name="sshd"),
|
|
)
|
|
|
|
async def test_raises_filter_not_found(self, tmp_path: Path) -> None:
|
|
from app.models.config import AssignFilterRequest
|
|
from app.services.config_file_service import FilterNotFoundError, assign_filter_to_jail
|
|
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
|
|
with pytest.raises(FilterNotFoundError):
|
|
await assign_filter_to_jail(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
"sshd",
|
|
AssignFilterRequest(filter_name="nonexistent-filter"),
|
|
)
|
|
|
|
async def test_raises_jail_name_error_for_invalid_name(self, tmp_path: Path) -> None:
|
|
from app.models.config import AssignFilterRequest
|
|
from app.services.config_file_service import JailNameError, assign_filter_to_jail
|
|
|
|
with pytest.raises(JailNameError):
|
|
await assign_filter_to_jail(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
"../etc/evil",
|
|
AssignFilterRequest(filter_name="sshd"),
|
|
)
|
|
|
|
async def test_raises_filter_name_error_for_invalid_filter(
|
|
self, tmp_path: Path
|
|
) -> None:
|
|
from app.models.config import AssignFilterRequest
|
|
from app.services.config_file_service import FilterNameError, assign_filter_to_jail
|
|
|
|
with pytest.raises(FilterNameError):
|
|
await assign_filter_to_jail(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
"sshd",
|
|
AssignFilterRequest(filter_name="../etc/evil"),
|
|
)
|
|
|
|
async def test_triggers_reload_when_requested(self, tmp_path: Path) -> None:
|
|
from app.models.config import AssignFilterRequest
|
|
from app.services.config_file_service import assign_filter_to_jail
|
|
|
|
_write(tmp_path / "jail.conf", JAIL_CONF)
|
|
_write(tmp_path / "filter.d" / "myfilter.conf", _FILTER_CONF)
|
|
|
|
with patch(
|
|
"app.services.config_file_service.jail_service.reload_all",
|
|
new=AsyncMock(),
|
|
) as mock_reload:
|
|
await assign_filter_to_jail(
|
|
str(tmp_path),
|
|
"/fake.sock",
|
|
"sshd",
|
|
AssignFilterRequest(filter_name="myfilter"),
|
|
do_reload=True,
|
|
)
|
|
|
|
mock_reload.assert_awaited_once()
|
|
|