test(backend): add tests for conf-file parser, file-config service and router

- test_conffile_parser.py: unit tests for section/key parsing, comment
  preservation, and round-trip write correctness
- test_file_config_service.py: service-level tests with mock filesystem
- test_file_config.py: router integration tests covering GET / PUT
  endpoints for jails, actions, and filters
This commit is contained in:
2026-03-13 13:47:35 +01:00
parent 673eb4c7c2
commit f5c3635258
3 changed files with 1146 additions and 0 deletions

View File

@@ -12,6 +12,12 @@ from httpx import ASGITransport, AsyncClient
from app.config import Settings
from app.db import init_db
from app.main import create_app
from app.models.config import (
ActionConfig,
FilterConfig,
JailFileConfig,
JailSectionConfig,
)
from app.models.file_config import (
ConfFileContent,
ConfFileEntry,
@@ -377,3 +383,331 @@ class TestCreateActionFile:
assert resp.status_code == 201
assert resp.json()["filename"] == "myaction.conf"
# ---------------------------------------------------------------------------
# POST /api/config/jail-files
# ---------------------------------------------------------------------------
class TestCreateJailConfigFile:
async def test_201_creates_file(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.create_jail_config_file",
AsyncMock(return_value="myjail.conf"),
):
resp = await file_config_client.post(
"/api/config/jail-files",
json={"name": "myjail", "content": "[myjail]\nenabled = true\n"},
)
assert resp.status_code == 201
assert resp.json()["filename"] == "myjail.conf"
async def test_409_conflict(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.create_jail_config_file",
AsyncMock(side_effect=ConfigFileExistsError("myjail.conf")),
):
resp = await file_config_client.post(
"/api/config/jail-files",
json={"name": "myjail", "content": "[myjail]\nenabled = true\n"},
)
assert resp.status_code == 409
async def test_400_invalid_name(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.create_jail_config_file",
AsyncMock(side_effect=ConfigFileNameError("bad/../name")),
):
resp = await file_config_client.post(
"/api/config/jail-files",
json={"name": "../escape", "content": "[Definition]\n"},
)
assert resp.status_code == 400
async def test_503_on_config_dir_error(
self, file_config_client: AsyncClient
) -> None:
with patch(
"app.routers.file_config.file_config_service.create_jail_config_file",
AsyncMock(side_effect=ConfigDirError("no dir")),
):
resp = await file_config_client.post(
"/api/config/jail-files",
json={"name": "anyjail", "content": "[anyjail]\nenabled = false\n"},
)
assert resp.status_code == 503
# ---------------------------------------------------------------------------
# GET /api/config/filters/{name}/parsed
# ---------------------------------------------------------------------------
class TestGetParsedFilter:
async def test_200_returns_parsed_config(
self, file_config_client: AsyncClient
) -> None:
cfg = FilterConfig(name="nginx", filename="nginx.conf")
with patch(
"app.routers.file_config.file_config_service.get_parsed_filter_file",
AsyncMock(return_value=cfg),
):
resp = await file_config_client.get("/api/config/filters/nginx/parsed")
assert resp.status_code == 200
data = resp.json()
assert data["name"] == "nginx"
assert data["filename"] == "nginx.conf"
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.get_parsed_filter_file",
AsyncMock(side_effect=ConfigFileNotFoundError("missing")),
):
resp = await file_config_client.get(
"/api/config/filters/missing/parsed"
)
assert resp.status_code == 404
async def test_503_on_config_dir_error(
self, file_config_client: AsyncClient
) -> None:
with patch(
"app.routers.file_config.file_config_service.get_parsed_filter_file",
AsyncMock(side_effect=ConfigDirError("no dir")),
):
resp = await file_config_client.get("/api/config/filters/nginx/parsed")
assert resp.status_code == 503
# ---------------------------------------------------------------------------
# PUT /api/config/filters/{name}/parsed
# ---------------------------------------------------------------------------
class TestUpdateParsedFilter:
async def test_204_on_success(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.update_parsed_filter_file",
AsyncMock(return_value=None),
):
resp = await file_config_client.put(
"/api/config/filters/nginx/parsed",
json={"failregex": ["^<HOST> "]},
)
assert resp.status_code == 204
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.update_parsed_filter_file",
AsyncMock(side_effect=ConfigFileNotFoundError("missing")),
):
resp = await file_config_client.put(
"/api/config/filters/missing/parsed",
json={"failregex": []},
)
assert resp.status_code == 404
async def test_400_write_error(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.update_parsed_filter_file",
AsyncMock(side_effect=ConfigFileWriteError("disk full")),
):
resp = await file_config_client.put(
"/api/config/filters/nginx/parsed",
json={"failregex": ["^<HOST> "]},
)
assert resp.status_code == 400
# ---------------------------------------------------------------------------
# GET /api/config/actions/{name}/parsed
# ---------------------------------------------------------------------------
class TestGetParsedAction:
async def test_200_returns_parsed_config(
self, file_config_client: AsyncClient
) -> None:
cfg = ActionConfig(name="iptables", filename="iptables.conf")
with patch(
"app.routers.file_config.file_config_service.get_parsed_action_file",
AsyncMock(return_value=cfg),
):
resp = await file_config_client.get(
"/api/config/actions/iptables/parsed"
)
assert resp.status_code == 200
data = resp.json()
assert data["name"] == "iptables"
assert data["filename"] == "iptables.conf"
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.get_parsed_action_file",
AsyncMock(side_effect=ConfigFileNotFoundError("missing")),
):
resp = await file_config_client.get(
"/api/config/actions/missing/parsed"
)
assert resp.status_code == 404
async def test_503_on_config_dir_error(
self, file_config_client: AsyncClient
) -> None:
with patch(
"app.routers.file_config.file_config_service.get_parsed_action_file",
AsyncMock(side_effect=ConfigDirError("no dir")),
):
resp = await file_config_client.get(
"/api/config/actions/iptables/parsed"
)
assert resp.status_code == 503
# ---------------------------------------------------------------------------
# PUT /api/config/actions/{name}/parsed
# ---------------------------------------------------------------------------
class TestUpdateParsedAction:
async def test_204_on_success(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.update_parsed_action_file",
AsyncMock(return_value=None),
):
resp = await file_config_client.put(
"/api/config/actions/iptables/parsed",
json={"actionban": "iptables -I INPUT -s <ip> -j DROP"},
)
assert resp.status_code == 204
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.update_parsed_action_file",
AsyncMock(side_effect=ConfigFileNotFoundError("missing")),
):
resp = await file_config_client.put(
"/api/config/actions/missing/parsed",
json={"actionban": ""},
)
assert resp.status_code == 404
async def test_400_write_error(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.update_parsed_action_file",
AsyncMock(side_effect=ConfigFileWriteError("disk full")),
):
resp = await file_config_client.put(
"/api/config/actions/iptables/parsed",
json={"actionban": "iptables -I INPUT -s <ip> -j DROP"},
)
assert resp.status_code == 400
# ---------------------------------------------------------------------------
# GET /api/config/jail-files/{filename}/parsed
# ---------------------------------------------------------------------------
class TestGetParsedJailFile:
async def test_200_returns_parsed_config(
self, file_config_client: AsyncClient
) -> None:
section = JailSectionConfig(enabled=True, port="ssh")
cfg = JailFileConfig(filename="sshd.conf", jails={"sshd": section})
with patch(
"app.routers.file_config.file_config_service.get_parsed_jail_file",
AsyncMock(return_value=cfg),
):
resp = await file_config_client.get(
"/api/config/jail-files/sshd.conf/parsed"
)
assert resp.status_code == 200
data = resp.json()
assert data["filename"] == "sshd.conf"
assert "sshd" in data["jails"]
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.get_parsed_jail_file",
AsyncMock(side_effect=ConfigFileNotFoundError("missing.conf")),
):
resp = await file_config_client.get(
"/api/config/jail-files/missing.conf/parsed"
)
assert resp.status_code == 404
async def test_503_on_config_dir_error(
self, file_config_client: AsyncClient
) -> None:
with patch(
"app.routers.file_config.file_config_service.get_parsed_jail_file",
AsyncMock(side_effect=ConfigDirError("no dir")),
):
resp = await file_config_client.get(
"/api/config/jail-files/sshd.conf/parsed"
)
assert resp.status_code == 503
# ---------------------------------------------------------------------------
# PUT /api/config/jail-files/{filename}/parsed
# ---------------------------------------------------------------------------
class TestUpdateParsedJailFile:
async def test_204_on_success(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.update_parsed_jail_file",
AsyncMock(return_value=None),
):
resp = await file_config_client.put(
"/api/config/jail-files/sshd.conf/parsed",
json={"jails": {"sshd": {"enabled": False}}},
)
assert resp.status_code == 204
async def test_404_not_found(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.update_parsed_jail_file",
AsyncMock(side_effect=ConfigFileNotFoundError("missing.conf")),
):
resp = await file_config_client.put(
"/api/config/jail-files/missing.conf/parsed",
json={"jails": {}},
)
assert resp.status_code == 404
async def test_400_write_error(self, file_config_client: AsyncClient) -> None:
with patch(
"app.routers.file_config.file_config_service.update_parsed_jail_file",
AsyncMock(side_effect=ConfigFileWriteError("disk full")),
):
resp = await file_config_client.put(
"/api/config/jail-files/sshd.conf/parsed",
json={"jails": {"sshd": {"enabled": True}}},
)
assert resp.status_code == 400

View File

@@ -0,0 +1,624 @@
"""Tests for conffile_parser — fail2ban INI file parser and serializer."""
from __future__ import annotations
from pathlib import Path
import pytest
from app.services.conffile_parser import (
merge_action_update,
merge_filter_update,
parse_action_file,
parse_filter_file,
serialize_action_config,
serialize_filter_config,
)
# Path to the bundled fail2ban reference configs shipped with this project.
_REF_DIR = Path(__file__).parent.parent.parent.parent / "fail2ban-master" / "config"
_FILTER_DIR = _REF_DIR / "filter.d"
_ACTION_DIR = _REF_DIR / "action.d"
# ---------------------------------------------------------------------------
# Filter — parse helpers
# ---------------------------------------------------------------------------
MINIMAL_FILTER = """\
[Definition]
failregex = ^<HOST> bad request$
ignoreregex =
"""
FULL_FILTER = """\
[INCLUDES]
before = common.conf
[DEFAULT]
_daemon = sshd
__prefix = (?:sshd )?
[Definition]
prefregex = ^<F-MLFID>%(__prefix)s</F-MLFID>%(__pref)s
failregex = ^Authentication failure for .* from <HOST> via
^User not known .* from <HOST>
ignoreregex = ^Authorised key from <HOST>
maxlines = 10
datepattern = %%Y-%%m-%%d %%H:%%M:%%S
journalmatch = _SYSTEMD_UNIT=sshd.service
"""
class TestParseFilterFile:
"""Unit tests for parse_filter_file."""
def test_minimal_filter(self) -> None:
cfg = parse_filter_file(MINIMAL_FILTER, name="test", filename="test.conf")
assert cfg.name == "test"
assert cfg.filename == "test.conf"
assert cfg.failregex == ["^<HOST> bad request$"]
assert cfg.ignoreregex == []
assert cfg.before is None
assert cfg.after is None
def test_full_filter_includes(self) -> None:
cfg = parse_filter_file(FULL_FILTER, name="sshd")
assert cfg.before == "common.conf"
assert cfg.after is None
def test_full_filter_defaults(self) -> None:
cfg = parse_filter_file(FULL_FILTER, name="sshd")
assert "_daemon" in cfg.variables
assert cfg.variables["_daemon"] == "sshd"
def test_full_filter_failregex_multiline(self) -> None:
cfg = parse_filter_file(FULL_FILTER, name="sshd")
assert len(cfg.failregex) == 2
assert "Authentication failure" in cfg.failregex[0]
assert "User not known" in cfg.failregex[1]
def test_full_filter_ignoreregex(self) -> None:
cfg = parse_filter_file(FULL_FILTER, name="sshd")
assert len(cfg.ignoreregex) == 1
assert "Authorised key" in cfg.ignoreregex[0]
def test_full_filter_optional_fields(self) -> None:
cfg = parse_filter_file(FULL_FILTER, name="sshd")
assert cfg.maxlines == 10
assert cfg.datepattern is not None
assert cfg.journalmatch == "_SYSTEMD_UNIT=sshd.service"
def test_empty_failregex(self) -> None:
content = "[Definition]\nfailregex =\nignoreregex =\n"
cfg = parse_filter_file(content, name="empty")
assert cfg.failregex == []
assert cfg.ignoreregex == []
def test_comment_lines_stripped_from_failregex(self) -> None:
content = (
"[Definition]\n"
"failregex = ^<HOST> good$\n"
" # this is a comment\n"
" ^<HOST> also good$\n"
)
cfg = parse_filter_file(content, name="t")
assert len(cfg.failregex) == 2
assert cfg.failregex[1] == "^<HOST> also good$"
def test_malformed_content_does_not_raise(self) -> None:
"""Parser should not crash on malformed content; return partial result."""
cfg = parse_filter_file("not an ini file at all!!!", name="bad")
assert cfg.failregex == []
def test_no_definition_section(self) -> None:
cfg = parse_filter_file("[INCLUDES]\nbefore = common.conf\n", name="x")
assert cfg.before == "common.conf"
assert cfg.failregex == []
# ---------------------------------------------------------------------------
# Filter — round-trip (parse → serialize → parse)
# ---------------------------------------------------------------------------
class TestFilterRoundTrip:
"""Serialize then re-parse and verify the result matches."""
def test_round_trip_minimal(self) -> None:
original = parse_filter_file(MINIMAL_FILTER, name="t")
serialized = serialize_filter_config(original)
restored = parse_filter_file(serialized, name="t")
assert restored.failregex == original.failregex
assert restored.ignoreregex == original.ignoreregex
def test_round_trip_full(self) -> None:
original = parse_filter_file(FULL_FILTER, name="sshd")
serialized = serialize_filter_config(original)
restored = parse_filter_file(serialized, name="sshd")
assert restored.before == original.before
assert restored.failregex == original.failregex
assert restored.ignoreregex == original.ignoreregex
assert restored.maxlines == original.maxlines
assert restored.datepattern == original.datepattern
assert restored.journalmatch == original.journalmatch
# ---------------------------------------------------------------------------
# Filter — reference file (sshd.conf shipped with fail2ban-master)
# ---------------------------------------------------------------------------
@pytest.mark.skipif(
not (_FILTER_DIR / "sshd.conf").exists(),
reason="fail2ban-master reference configs not present",
)
class TestParseRealSshdFilter:
"""Parse the real sshd.conf that ships with fail2ban-master."""
def test_parses_without_exception(self) -> None:
content = (_FILTER_DIR / "sshd.conf").read_text()
cfg = parse_filter_file(content, name="sshd", filename="sshd.conf")
assert cfg.name == "sshd"
def test_has_failregex_patterns(self) -> None:
content = (_FILTER_DIR / "sshd.conf").read_text()
cfg = parse_filter_file(content, name="sshd")
assert len(cfg.failregex) > 0
def test_has_includes(self) -> None:
content = (_FILTER_DIR / "sshd.conf").read_text()
cfg = parse_filter_file(content, name="sshd")
assert cfg.before is not None
def test_round_trip_preserves_failregex_count(self) -> None:
content = (_FILTER_DIR / "sshd.conf").read_text()
cfg = parse_filter_file(content, name="sshd")
serialized = serialize_filter_config(cfg)
restored = parse_filter_file(serialized, name="sshd")
assert len(restored.failregex) == len(cfg.failregex)
# ---------------------------------------------------------------------------
# FilterConfigUpdate — merge
# ---------------------------------------------------------------------------
class TestMergeFilterUpdate:
"""Tests for merge_filter_update."""
def test_merge_only_failregex(self) -> None:
from app.models.config import FilterConfigUpdate
base = parse_filter_file(FULL_FILTER, name="sshd")
update = FilterConfigUpdate(failregex=["^new pattern <HOST>$"])
merged = merge_filter_update(base, update)
assert merged.failregex == ["^new pattern <HOST>$"]
# Everything else unchanged:
assert merged.before == base.before
assert merged.maxlines == base.maxlines
def test_merge_none_fields_unchanged(self) -> None:
from app.models.config import FilterConfigUpdate
base = parse_filter_file(FULL_FILTER, name="sshd")
update = FilterConfigUpdate() # All None
merged = merge_filter_update(base, update)
assert merged.failregex == base.failregex
assert merged.ignoreregex == base.ignoreregex
def test_merge_clear_ignoreregex(self) -> None:
from app.models.config import FilterConfigUpdate
base = parse_filter_file(FULL_FILTER, name="sshd")
update = FilterConfigUpdate(ignoreregex=[])
merged = merge_filter_update(base, update)
assert merged.ignoreregex == []
def test_merge_update_variables(self) -> None:
from app.models.config import FilterConfigUpdate
base = parse_filter_file(FULL_FILTER, name="sshd")
update = FilterConfigUpdate(variables={"_daemon": "mynewdaemon"})
merged = merge_filter_update(base, update)
assert merged.variables["_daemon"] == "mynewdaemon"
# ---------------------------------------------------------------------------
# Action — parse helpers
# ---------------------------------------------------------------------------
MINIMAL_ACTION = """\
[Definition]
actionban = iptables -I INPUT -s <ip> -j DROP
actionunban = iptables -D INPUT -s <ip> -j DROP
"""
FULL_ACTION = """\
[INCLUDES]
before = iptables-common.conf
[Definition]
actionstart = { iptables -N f2b-<name>
iptables -A INPUT -p <protocol> -j f2b-<name> }
actionstop = iptables -D INPUT -p <protocol> -j f2b-<name>
iptables -F f2b-<name>
iptables -X f2b-<name>
actioncheck = iptables -n -L INPUT | grep -q f2b-<name>[ \\t]
actionban = iptables -I f2b-<name> 1 -s <ip> -j <blocktype>
actionunban = iptables -D f2b-<name> -s <ip> -j <blocktype>
actionflush = iptables -F f2b-<name>
name = default
protocol = tcp
chain = INPUT
blocktype = REJECT
[Init]
blocktype = REJECT --reject-with icmp-port-unreachable
name = default
"""
class TestParseActionFile:
"""Unit tests for parse_action_file."""
def test_minimal_action(self) -> None:
cfg = parse_action_file(MINIMAL_ACTION, name="test", filename="test.conf")
assert cfg.name == "test"
assert cfg.actionban is not None
assert "<ip>" in cfg.actionban
assert cfg.actionunban is not None
def test_full_action_includes(self) -> None:
cfg = parse_action_file(FULL_ACTION, name="iptables")
assert cfg.before == "iptables-common.conf"
def test_full_action_lifecycle_keys(self) -> None:
cfg = parse_action_file(FULL_ACTION, name="iptables")
assert cfg.actionstart is not None
assert cfg.actionstop is not None
assert cfg.actioncheck is not None
assert cfg.actionban is not None
assert cfg.actionunban is not None
assert cfg.actionflush is not None
def test_full_action_definition_vars(self) -> None:
cfg = parse_action_file(FULL_ACTION, name="iptables")
# 'name', 'protocol', 'chain', 'blocktype' are definition vars
assert "protocol" in cfg.definition_vars
assert "chain" in cfg.definition_vars
def test_full_action_init_vars(self) -> None:
cfg = parse_action_file(FULL_ACTION, name="iptables")
assert "blocktype" in cfg.init_vars
assert "REJECT" in cfg.init_vars["blocktype"]
def test_empty_action(self) -> None:
cfg = parse_action_file("[Definition]\n", name="empty")
assert cfg.actionban is None
assert cfg.definition_vars == {}
assert cfg.init_vars == {}
def test_malformed_does_not_raise(self) -> None:
cfg = parse_action_file("@@@@not valid ini@@@@", name="bad")
assert cfg.actionban is None
# ---------------------------------------------------------------------------
# Action — round-trip
# ---------------------------------------------------------------------------
class TestActionRoundTrip:
"""Serialize then re-parse and verify key fields are preserved."""
def test_round_trip_minimal(self) -> None:
original = parse_action_file(MINIMAL_ACTION, name="t")
serialized = serialize_action_config(original)
restored = parse_action_file(serialized, name="t")
assert restored.actionban == original.actionban
assert restored.actionunban == original.actionunban
def test_round_trip_full(self) -> None:
original = parse_action_file(FULL_ACTION, name="iptables")
serialized = serialize_action_config(original)
restored = parse_action_file(serialized, name="iptables")
assert restored.actionban == original.actionban
assert restored.actionflush == original.actionflush
assert restored.init_vars.get("blocktype") == original.init_vars.get("blocktype")
# ---------------------------------------------------------------------------
# Action — reference file
# ---------------------------------------------------------------------------
@pytest.mark.skipif(
not (_ACTION_DIR / "iptables.conf").exists(),
reason="fail2ban-master reference configs not present",
)
class TestParseRealIptablesAction:
"""Parse the real iptables.conf that ships with fail2ban-master."""
def test_parses_without_exception(self) -> None:
content = (_ACTION_DIR / "iptables.conf").read_text()
cfg = parse_action_file(content, name="iptables", filename="iptables.conf")
assert cfg.name == "iptables"
def test_has_actionban(self) -> None:
content = (_ACTION_DIR / "iptables.conf").read_text()
cfg = parse_action_file(content, name="iptables")
assert cfg.actionban is not None
def test_round_trip_preserves_actionban(self) -> None:
content = (_ACTION_DIR / "iptables.conf").read_text()
cfg = parse_action_file(content, name="iptables")
serialized = serialize_action_config(cfg)
restored = parse_action_file(serialized, name="iptables")
assert restored.actionban == cfg.actionban
# ---------------------------------------------------------------------------
# ActionConfigUpdate — merge
# ---------------------------------------------------------------------------
class TestMergeActionUpdate:
"""Tests for merge_action_update."""
def test_merge_only_actionban(self) -> None:
from app.models.config import ActionConfigUpdate
base = parse_action_file(FULL_ACTION, name="iptables")
update = ActionConfigUpdate(actionban="iptables -I INPUT -s <ip> -j DROP")
merged = merge_action_update(base, update)
assert merged.actionban == "iptables -I INPUT -s <ip> -j DROP"
assert merged.actionunban == base.actionunban
def test_merge_none_fields_unchanged(self) -> None:
from app.models.config import ActionConfigUpdate
base = parse_action_file(FULL_ACTION, name="iptables")
update = ActionConfigUpdate()
merged = merge_action_update(base, update)
assert merged.actionban == base.actionban
assert merged.init_vars == base.init_vars
def test_merge_update_init_vars(self) -> None:
from app.models.config import ActionConfigUpdate
base = parse_action_file(FULL_ACTION, name="iptables")
update = ActionConfigUpdate(init_vars={"blocktype": "DROP"})
merged = merge_action_update(base, update)
assert merged.init_vars["blocktype"] == "DROP"
# ---------------------------------------------------------------------------
# Jail file test fixtures
# ---------------------------------------------------------------------------
MINIMAL_JAIL = """\
[sshd]
enabled = false
port = ssh
filter = sshd
logpath = /var/log/auth.log
"""
FULL_JAIL = """\
# fail2ban jail configuration
[sshd]
enabled = true
port = ssh,22
filter = sshd
backend = polling
maxretry = 3
findtime = 600
bantime = 3600
logpath = /var/log/auth.log
/var/log/syslog
[nginx-botsearch]
enabled = false
port = http,https
filter = nginx-botsearch
logpath = /var/log/nginx/error.log
maxretry = 2
action = iptables-multiport[name=botsearch, port="http,https"]
sendmail-whois
"""
JAIL_WITH_EXTRA = """\
[sshd]
enabled = true
port = ssh
filter = sshd
logpath = /var/log/auth.log
custom_key = custom_value
another_key = 42
"""
# ---------------------------------------------------------------------------
# parse_jail_file
# ---------------------------------------------------------------------------
class TestParseJailFile:
"""Unit tests for parse_jail_file."""
def test_minimal_parses_correctly(self) -> None:
from app.services.conffile_parser import parse_jail_file
cfg = parse_jail_file(MINIMAL_JAIL, filename="sshd.conf")
assert cfg.filename == "sshd.conf"
assert "sshd" in cfg.jails
jail = cfg.jails["sshd"]
assert jail.enabled is False
assert jail.port == "ssh"
assert jail.filter == "sshd"
assert jail.logpath == ["/var/log/auth.log"]
def test_full_parses_multiple_jails(self) -> None:
from app.services.conffile_parser import parse_jail_file
cfg = parse_jail_file(FULL_JAIL)
assert len(cfg.jails) == 2
assert "sshd" in cfg.jails
assert "nginx-botsearch" in cfg.jails
def test_full_jail_numeric_fields(self) -> None:
from app.services.conffile_parser import parse_jail_file
jail = parse_jail_file(FULL_JAIL).jails["sshd"]
assert jail.maxretry == 3
assert jail.findtime == 600
assert jail.bantime == 3600
def test_full_jail_multiline_logpath(self) -> None:
from app.services.conffile_parser import parse_jail_file
jail = parse_jail_file(FULL_JAIL).jails["sshd"]
assert len(jail.logpath) == 2
assert "/var/log/auth.log" in jail.logpath
assert "/var/log/syslog" in jail.logpath
def test_full_jail_multiline_action(self) -> None:
from app.services.conffile_parser import parse_jail_file
jail = parse_jail_file(FULL_JAIL).jails["nginx-botsearch"]
assert len(jail.action) == 2
assert "sendmail-whois" in jail.action
def test_enabled_true(self) -> None:
from app.services.conffile_parser import parse_jail_file
jail = parse_jail_file(FULL_JAIL).jails["sshd"]
assert jail.enabled is True
def test_enabled_false(self) -> None:
from app.services.conffile_parser import parse_jail_file
jail = parse_jail_file(FULL_JAIL).jails["nginx-botsearch"]
assert jail.enabled is False
def test_extra_keys_captured(self) -> None:
from app.services.conffile_parser import parse_jail_file
jail = parse_jail_file(JAIL_WITH_EXTRA).jails["sshd"]
assert jail.extra["custom_key"] == "custom_value"
assert jail.extra["another_key"] == "42"
def test_extra_keys_not_in_named_fields(self) -> None:
from app.services.conffile_parser import parse_jail_file
jail = parse_jail_file(JAIL_WITH_EXTRA).jails["sshd"]
assert "enabled" not in jail.extra
assert "logpath" not in jail.extra
def test_empty_file_yields_no_jails(self) -> None:
from app.services.conffile_parser import parse_jail_file
cfg = parse_jail_file("")
assert cfg.jails == {}
def test_invalid_ini_does_not_raise(self) -> None:
from app.services.conffile_parser import parse_jail_file
# Should not raise; just parse what it can.
cfg = parse_jail_file("@@@ not valid ini @@@", filename="bad.conf")
assert isinstance(cfg.jails, dict)
def test_default_section_ignored(self) -> None:
from app.services.conffile_parser import parse_jail_file
content = "[DEFAULT]\nignoreip = 127.0.0.1\n\n[sshd]\nenabled = true\n"
cfg = parse_jail_file(content)
assert "DEFAULT" not in cfg.jails
assert "sshd" in cfg.jails
# ---------------------------------------------------------------------------
# Jail file round-trip
# ---------------------------------------------------------------------------
class TestJailFileRoundTrip:
"""Tests that parse → serialize → parse preserves values."""
def test_minimal_round_trip(self) -> None:
from app.services.conffile_parser import parse_jail_file, serialize_jail_file_config
original = parse_jail_file(MINIMAL_JAIL, filename="sshd.conf")
serialized = serialize_jail_file_config(original)
restored = parse_jail_file(serialized, filename="sshd.conf")
assert restored.jails["sshd"].enabled == original.jails["sshd"].enabled
assert restored.jails["sshd"].port == original.jails["sshd"].port
assert restored.jails["sshd"].logpath == original.jails["sshd"].logpath
def test_full_round_trip(self) -> None:
from app.services.conffile_parser import parse_jail_file, serialize_jail_file_config
original = parse_jail_file(FULL_JAIL)
serialized = serialize_jail_file_config(original)
restored = parse_jail_file(serialized)
for name, jail in original.jails.items():
restored_jail = restored.jails[name]
assert restored_jail.enabled == jail.enabled
assert restored_jail.maxretry == jail.maxretry
assert sorted(restored_jail.logpath) == sorted(jail.logpath)
assert sorted(restored_jail.action) == sorted(jail.action)
def test_extra_keys_round_trip(self) -> None:
from app.services.conffile_parser import parse_jail_file, serialize_jail_file_config
original = parse_jail_file(JAIL_WITH_EXTRA)
serialized = serialize_jail_file_config(original)
restored = parse_jail_file(serialized)
assert restored.jails["sshd"].extra["custom_key"] == "custom_value"
# ---------------------------------------------------------------------------
# merge_jail_file_update
# ---------------------------------------------------------------------------
class TestMergeJailFileUpdate:
"""Unit tests for merge_jail_file_update."""
def test_none_update_returns_original(self) -> None:
from app.models.config import JailFileConfigUpdate
from app.services.conffile_parser import merge_jail_file_update, parse_jail_file
cfg = parse_jail_file(FULL_JAIL)
update = JailFileConfigUpdate()
merged = merge_jail_file_update(cfg, update)
assert merged.jails == cfg.jails
def test_update_replaces_jail(self) -> None:
from app.models.config import JailFileConfigUpdate, JailSectionConfig
from app.services.conffile_parser import merge_jail_file_update, parse_jail_file
cfg = parse_jail_file(FULL_JAIL)
new_sshd = JailSectionConfig(enabled=False, port="2222")
update = JailFileConfigUpdate(jails={"sshd": new_sshd})
merged = merge_jail_file_update(cfg, update)
assert merged.jails["sshd"].enabled is False
assert merged.jails["sshd"].port == "2222"
# Other jails unchanged
assert "nginx-botsearch" in merged.jails
def test_update_adds_new_jail(self) -> None:
from app.models.config import JailFileConfigUpdate, JailSectionConfig
from app.services.conffile_parser import merge_jail_file_update, parse_jail_file
cfg = parse_jail_file(MINIMAL_JAIL)
new_jail = JailSectionConfig(enabled=True, port="443")
update = JailFileConfigUpdate(jails={"https": new_jail})
merged = merge_jail_file_update(cfg, update)
assert "sshd" in merged.jails
assert "https" in merged.jails
assert merged.jails["https"].port == "443"

View File

@@ -6,6 +6,7 @@ from pathlib import Path
import pytest
from app.models.config import ActionConfigUpdate, FilterConfigUpdate, JailFileConfigUpdate
from app.models.file_config import ConfFileCreateRequest, ConfFileUpdateRequest
from app.services.file_config_service import (
ConfigDirError,
@@ -18,13 +19,20 @@ from app.services.file_config_service import (
_validate_new_name,
create_action_file,
create_filter_file,
create_jail_config_file,
get_action_file,
get_filter_file,
get_jail_config_file,
get_parsed_action_file,
get_parsed_filter_file,
get_parsed_jail_file,
list_action_files,
list_filter_files,
list_jail_config_files,
set_jail_config_enabled,
update_parsed_action_file,
update_parsed_filter_file,
update_parsed_jail_file,
write_action_file,
write_filter_file,
)
@@ -399,3 +407,183 @@ async def test_create_action_file_creates_file(tmp_path: Path) -> None:
assert result == "my-action.conf"
assert (config_dir / "action.d" / "my-action.conf").is_file()
# ---------------------------------------------------------------------------
# create_jail_config_file
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_create_jail_config_file_creates_file(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
req = ConfFileCreateRequest(name="myjail", content="[myjail]\nenabled = true\n")
result = await create_jail_config_file(str(config_dir), req)
assert result == "myjail.conf"
assert (config_dir / "jail.d" / "myjail.conf").is_file()
@pytest.mark.asyncio
async def test_create_jail_config_file_conflict(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "jail.d" / "sshd.conf").write_text("[sshd]\nenabled = true\n")
req = ConfFileCreateRequest(name="sshd", content="[sshd]\nenabled = true\n")
with pytest.raises(ConfigFileExistsError):
await create_jail_config_file(str(config_dir), req)
@pytest.mark.asyncio
async def test_create_jail_config_file_invalid_name(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
req = ConfFileCreateRequest(name="../escape", content="[x]\nenabled = true\n")
with pytest.raises(ConfigFileNameError):
await create_jail_config_file(str(config_dir), req)
# ---------------------------------------------------------------------------
# get_parsed_filter_file / update_parsed_filter_file
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_parsed_filter_file_returns_structured_model(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
content = "[Definition]\nfailregex = ^<HOST>\nignoreregex = ^.*ignore.*$\n"
(config_dir / "filter.d" / "nginx.conf").write_text(content)
result = await get_parsed_filter_file(str(config_dir), "nginx")
assert result.name == "nginx"
assert result.filename == "nginx.conf"
assert result.failregex == ["^<HOST>"]
assert result.ignoreregex == ["^.*ignore.*$"]
@pytest.mark.asyncio
async def test_get_parsed_filter_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
with pytest.raises(ConfigFileNotFoundError):
await get_parsed_filter_file(str(config_dir), "missing")
@pytest.mark.asyncio
async def test_update_parsed_filter_file_writes_changes(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "filter.d" / "nginx.conf").write_text(
"[Definition]\nfailregex = ^<HOST> old\n"
)
update = FilterConfigUpdate(failregex=["^<HOST> new"])
await update_parsed_filter_file(str(config_dir), "nginx", update)
written = (config_dir / "filter.d" / "nginx.conf").read_text()
assert "new" in written
@pytest.mark.asyncio
async def test_update_parsed_filter_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
update = FilterConfigUpdate(failregex=["^<HOST>"])
with pytest.raises(ConfigFileNotFoundError):
await update_parsed_filter_file(str(config_dir), "missing", update)
# ---------------------------------------------------------------------------
# get_parsed_action_file / update_parsed_action_file
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_parsed_action_file_returns_structured_model(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
content = "[Definition]\nactionban = iptables -I INPUT -s <ip> -j DROP\n"
(config_dir / "action.d" / "iptables.conf").write_text(content)
result = await get_parsed_action_file(str(config_dir), "iptables")
assert result.name == "iptables"
assert result.filename == "iptables.conf"
assert result.actionban is not None
assert "<ip>" in result.actionban
@pytest.mark.asyncio
async def test_get_parsed_action_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
with pytest.raises(ConfigFileNotFoundError):
await get_parsed_action_file(str(config_dir), "missing")
@pytest.mark.asyncio
async def test_update_parsed_action_file_writes_changes(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "action.d" / "iptables.conf").write_text(
"[Definition]\nactionban = iptables -I INPUT -s <ip> -j DROP\n"
)
update = ActionConfigUpdate(actionban="nft add element inet f2b-table <ip>")
await update_parsed_action_file(str(config_dir), "iptables", update)
written = (config_dir / "action.d" / "iptables.conf").read_text()
assert "nft" in written
@pytest.mark.asyncio
async def test_update_parsed_action_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
update = ActionConfigUpdate(actionban="iptables -I INPUT -s <ip> -j DROP")
with pytest.raises(ConfigFileNotFoundError):
await update_parsed_action_file(str(config_dir), "missing", update)
# ---------------------------------------------------------------------------
# get_parsed_jail_file / update_parsed_jail_file
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_parsed_jail_file_returns_structured_model(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
content = "[sshd]\nenabled = true\nport = ssh\nmaxretry = 5\n"
(config_dir / "jail.d" / "sshd.conf").write_text(content)
result = await get_parsed_jail_file(str(config_dir), "sshd.conf")
assert result.filename == "sshd.conf"
assert "sshd" in result.jails
assert result.jails["sshd"].enabled is True
assert result.jails["sshd"].maxretry == 5
@pytest.mark.asyncio
async def test_get_parsed_jail_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
with pytest.raises(ConfigFileNotFoundError):
await get_parsed_jail_file(str(config_dir), "missing.conf")
@pytest.mark.asyncio
async def test_update_parsed_jail_file_writes_changes(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
(config_dir / "jail.d" / "sshd.conf").write_text(
"[sshd]\nenabled = true\nport = ssh\n"
)
from app.models.config import JailSectionConfig
update = JailFileConfigUpdate(jails={"sshd": JailSectionConfig(enabled=False)})
await update_parsed_jail_file(str(config_dir), "sshd.conf", update)
written = (config_dir / "jail.d" / "sshd.conf").read_text()
assert "false" in written.lower()
@pytest.mark.asyncio
async def test_update_parsed_jail_file_not_found(tmp_path: Path) -> None:
config_dir = _make_config_dir(tmp_path)
update = JailFileConfigUpdate(jails={})
with pytest.raises(ConfigFileNotFoundError):
await update_parsed_jail_file(str(config_dir), "missing.conf", update)