- test_conffile_parser.py: unit tests for section/key parsing, comment preservation, and round-trip write correctness - test_file_config_service.py: service-level tests with mock filesystem - test_file_config.py: router integration tests covering GET / PUT endpoints for jails, actions, and filters
625 lines
22 KiB
Python
625 lines
22 KiB
Python
"""Tests for conffile_parser — fail2ban INI file parser and serializer."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from app.services.conffile_parser import (
|
|
merge_action_update,
|
|
merge_filter_update,
|
|
parse_action_file,
|
|
parse_filter_file,
|
|
serialize_action_config,
|
|
serialize_filter_config,
|
|
)
|
|
|
|
# Path to the bundled fail2ban reference configs shipped with this project.
|
|
_REF_DIR = Path(__file__).parent.parent.parent.parent / "fail2ban-master" / "config"
|
|
_FILTER_DIR = _REF_DIR / "filter.d"
|
|
_ACTION_DIR = _REF_DIR / "action.d"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Filter — parse helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
MINIMAL_FILTER = """\
|
|
[Definition]
|
|
failregex = ^<HOST> bad request$
|
|
ignoreregex =
|
|
"""
|
|
|
|
FULL_FILTER = """\
|
|
[INCLUDES]
|
|
before = common.conf
|
|
|
|
[DEFAULT]
|
|
_daemon = sshd
|
|
__prefix = (?:sshd )?
|
|
|
|
[Definition]
|
|
prefregex = ^<F-MLFID>%(__prefix)s</F-MLFID>%(__pref)s
|
|
failregex = ^Authentication failure for .* from <HOST> via
|
|
^User not known .* from <HOST>
|
|
ignoreregex = ^Authorised key from <HOST>
|
|
|
|
maxlines = 10
|
|
datepattern = %%Y-%%m-%%d %%H:%%M:%%S
|
|
journalmatch = _SYSTEMD_UNIT=sshd.service
|
|
"""
|
|
|
|
|
|
class TestParseFilterFile:
|
|
"""Unit tests for parse_filter_file."""
|
|
|
|
def test_minimal_filter(self) -> None:
|
|
cfg = parse_filter_file(MINIMAL_FILTER, name="test", filename="test.conf")
|
|
assert cfg.name == "test"
|
|
assert cfg.filename == "test.conf"
|
|
assert cfg.failregex == ["^<HOST> bad request$"]
|
|
assert cfg.ignoreregex == []
|
|
assert cfg.before is None
|
|
assert cfg.after is None
|
|
|
|
def test_full_filter_includes(self) -> None:
|
|
cfg = parse_filter_file(FULL_FILTER, name="sshd")
|
|
assert cfg.before == "common.conf"
|
|
assert cfg.after is None
|
|
|
|
def test_full_filter_defaults(self) -> None:
|
|
cfg = parse_filter_file(FULL_FILTER, name="sshd")
|
|
assert "_daemon" in cfg.variables
|
|
assert cfg.variables["_daemon"] == "sshd"
|
|
|
|
def test_full_filter_failregex_multiline(self) -> None:
|
|
cfg = parse_filter_file(FULL_FILTER, name="sshd")
|
|
assert len(cfg.failregex) == 2
|
|
assert "Authentication failure" in cfg.failregex[0]
|
|
assert "User not known" in cfg.failregex[1]
|
|
|
|
def test_full_filter_ignoreregex(self) -> None:
|
|
cfg = parse_filter_file(FULL_FILTER, name="sshd")
|
|
assert len(cfg.ignoreregex) == 1
|
|
assert "Authorised key" in cfg.ignoreregex[0]
|
|
|
|
def test_full_filter_optional_fields(self) -> None:
|
|
cfg = parse_filter_file(FULL_FILTER, name="sshd")
|
|
assert cfg.maxlines == 10
|
|
assert cfg.datepattern is not None
|
|
assert cfg.journalmatch == "_SYSTEMD_UNIT=sshd.service"
|
|
|
|
def test_empty_failregex(self) -> None:
|
|
content = "[Definition]\nfailregex =\nignoreregex =\n"
|
|
cfg = parse_filter_file(content, name="empty")
|
|
assert cfg.failregex == []
|
|
assert cfg.ignoreregex == []
|
|
|
|
def test_comment_lines_stripped_from_failregex(self) -> None:
|
|
content = (
|
|
"[Definition]\n"
|
|
"failregex = ^<HOST> good$\n"
|
|
" # this is a comment\n"
|
|
" ^<HOST> also good$\n"
|
|
)
|
|
cfg = parse_filter_file(content, name="t")
|
|
assert len(cfg.failregex) == 2
|
|
assert cfg.failregex[1] == "^<HOST> also good$"
|
|
|
|
def test_malformed_content_does_not_raise(self) -> None:
|
|
"""Parser should not crash on malformed content; return partial result."""
|
|
cfg = parse_filter_file("not an ini file at all!!!", name="bad")
|
|
assert cfg.failregex == []
|
|
|
|
def test_no_definition_section(self) -> None:
|
|
cfg = parse_filter_file("[INCLUDES]\nbefore = common.conf\n", name="x")
|
|
assert cfg.before == "common.conf"
|
|
assert cfg.failregex == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Filter — round-trip (parse → serialize → parse)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestFilterRoundTrip:
|
|
"""Serialize then re-parse and verify the result matches."""
|
|
|
|
def test_round_trip_minimal(self) -> None:
|
|
original = parse_filter_file(MINIMAL_FILTER, name="t")
|
|
serialized = serialize_filter_config(original)
|
|
restored = parse_filter_file(serialized, name="t")
|
|
assert restored.failregex == original.failregex
|
|
assert restored.ignoreregex == original.ignoreregex
|
|
|
|
def test_round_trip_full(self) -> None:
|
|
original = parse_filter_file(FULL_FILTER, name="sshd")
|
|
serialized = serialize_filter_config(original)
|
|
restored = parse_filter_file(serialized, name="sshd")
|
|
assert restored.before == original.before
|
|
assert restored.failregex == original.failregex
|
|
assert restored.ignoreregex == original.ignoreregex
|
|
assert restored.maxlines == original.maxlines
|
|
assert restored.datepattern == original.datepattern
|
|
assert restored.journalmatch == original.journalmatch
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Filter — reference file (sshd.conf shipped with fail2ban-master)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
not (_FILTER_DIR / "sshd.conf").exists(),
|
|
reason="fail2ban-master reference configs not present",
|
|
)
|
|
class TestParseRealSshdFilter:
|
|
"""Parse the real sshd.conf that ships with fail2ban-master."""
|
|
|
|
def test_parses_without_exception(self) -> None:
|
|
content = (_FILTER_DIR / "sshd.conf").read_text()
|
|
cfg = parse_filter_file(content, name="sshd", filename="sshd.conf")
|
|
assert cfg.name == "sshd"
|
|
|
|
def test_has_failregex_patterns(self) -> None:
|
|
content = (_FILTER_DIR / "sshd.conf").read_text()
|
|
cfg = parse_filter_file(content, name="sshd")
|
|
assert len(cfg.failregex) > 0
|
|
|
|
def test_has_includes(self) -> None:
|
|
content = (_FILTER_DIR / "sshd.conf").read_text()
|
|
cfg = parse_filter_file(content, name="sshd")
|
|
assert cfg.before is not None
|
|
|
|
def test_round_trip_preserves_failregex_count(self) -> None:
|
|
content = (_FILTER_DIR / "sshd.conf").read_text()
|
|
cfg = parse_filter_file(content, name="sshd")
|
|
serialized = serialize_filter_config(cfg)
|
|
restored = parse_filter_file(serialized, name="sshd")
|
|
assert len(restored.failregex) == len(cfg.failregex)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# FilterConfigUpdate — merge
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMergeFilterUpdate:
|
|
"""Tests for merge_filter_update."""
|
|
|
|
def test_merge_only_failregex(self) -> None:
|
|
from app.models.config import FilterConfigUpdate
|
|
|
|
base = parse_filter_file(FULL_FILTER, name="sshd")
|
|
update = FilterConfigUpdate(failregex=["^new pattern <HOST>$"])
|
|
merged = merge_filter_update(base, update)
|
|
assert merged.failregex == ["^new pattern <HOST>$"]
|
|
# Everything else unchanged:
|
|
assert merged.before == base.before
|
|
assert merged.maxlines == base.maxlines
|
|
|
|
def test_merge_none_fields_unchanged(self) -> None:
|
|
from app.models.config import FilterConfigUpdate
|
|
|
|
base = parse_filter_file(FULL_FILTER, name="sshd")
|
|
update = FilterConfigUpdate() # All None
|
|
merged = merge_filter_update(base, update)
|
|
assert merged.failregex == base.failregex
|
|
assert merged.ignoreregex == base.ignoreregex
|
|
|
|
def test_merge_clear_ignoreregex(self) -> None:
|
|
from app.models.config import FilterConfigUpdate
|
|
|
|
base = parse_filter_file(FULL_FILTER, name="sshd")
|
|
update = FilterConfigUpdate(ignoreregex=[])
|
|
merged = merge_filter_update(base, update)
|
|
assert merged.ignoreregex == []
|
|
|
|
def test_merge_update_variables(self) -> None:
|
|
from app.models.config import FilterConfigUpdate
|
|
|
|
base = parse_filter_file(FULL_FILTER, name="sshd")
|
|
update = FilterConfigUpdate(variables={"_daemon": "mynewdaemon"})
|
|
merged = merge_filter_update(base, update)
|
|
assert merged.variables["_daemon"] == "mynewdaemon"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Action — parse helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
MINIMAL_ACTION = """\
|
|
[Definition]
|
|
actionban = iptables -I INPUT -s <ip> -j DROP
|
|
actionunban = iptables -D INPUT -s <ip> -j DROP
|
|
"""
|
|
|
|
FULL_ACTION = """\
|
|
[INCLUDES]
|
|
before = iptables-common.conf
|
|
|
|
[Definition]
|
|
actionstart = { iptables -N f2b-<name>
|
|
iptables -A INPUT -p <protocol> -j f2b-<name> }
|
|
actionstop = iptables -D INPUT -p <protocol> -j f2b-<name>
|
|
iptables -F f2b-<name>
|
|
iptables -X f2b-<name>
|
|
actioncheck = iptables -n -L INPUT | grep -q f2b-<name>[ \\t]
|
|
actionban = iptables -I f2b-<name> 1 -s <ip> -j <blocktype>
|
|
actionunban = iptables -D f2b-<name> -s <ip> -j <blocktype>
|
|
actionflush = iptables -F f2b-<name>
|
|
name = default
|
|
protocol = tcp
|
|
chain = INPUT
|
|
blocktype = REJECT
|
|
|
|
[Init]
|
|
blocktype = REJECT --reject-with icmp-port-unreachable
|
|
name = default
|
|
"""
|
|
|
|
|
|
class TestParseActionFile:
|
|
"""Unit tests for parse_action_file."""
|
|
|
|
def test_minimal_action(self) -> None:
|
|
cfg = parse_action_file(MINIMAL_ACTION, name="test", filename="test.conf")
|
|
assert cfg.name == "test"
|
|
assert cfg.actionban is not None
|
|
assert "<ip>" in cfg.actionban
|
|
assert cfg.actionunban is not None
|
|
|
|
def test_full_action_includes(self) -> None:
|
|
cfg = parse_action_file(FULL_ACTION, name="iptables")
|
|
assert cfg.before == "iptables-common.conf"
|
|
|
|
def test_full_action_lifecycle_keys(self) -> None:
|
|
cfg = parse_action_file(FULL_ACTION, name="iptables")
|
|
assert cfg.actionstart is not None
|
|
assert cfg.actionstop is not None
|
|
assert cfg.actioncheck is not None
|
|
assert cfg.actionban is not None
|
|
assert cfg.actionunban is not None
|
|
assert cfg.actionflush is not None
|
|
|
|
def test_full_action_definition_vars(self) -> None:
|
|
cfg = parse_action_file(FULL_ACTION, name="iptables")
|
|
# 'name', 'protocol', 'chain', 'blocktype' are definition vars
|
|
assert "protocol" in cfg.definition_vars
|
|
assert "chain" in cfg.definition_vars
|
|
|
|
def test_full_action_init_vars(self) -> None:
|
|
cfg = parse_action_file(FULL_ACTION, name="iptables")
|
|
assert "blocktype" in cfg.init_vars
|
|
assert "REJECT" in cfg.init_vars["blocktype"]
|
|
|
|
def test_empty_action(self) -> None:
|
|
cfg = parse_action_file("[Definition]\n", name="empty")
|
|
assert cfg.actionban is None
|
|
assert cfg.definition_vars == {}
|
|
assert cfg.init_vars == {}
|
|
|
|
def test_malformed_does_not_raise(self) -> None:
|
|
cfg = parse_action_file("@@@@not valid ini@@@@", name="bad")
|
|
assert cfg.actionban is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Action — round-trip
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestActionRoundTrip:
|
|
"""Serialize then re-parse and verify key fields are preserved."""
|
|
|
|
def test_round_trip_minimal(self) -> None:
|
|
original = parse_action_file(MINIMAL_ACTION, name="t")
|
|
serialized = serialize_action_config(original)
|
|
restored = parse_action_file(serialized, name="t")
|
|
assert restored.actionban == original.actionban
|
|
assert restored.actionunban == original.actionunban
|
|
|
|
def test_round_trip_full(self) -> None:
|
|
original = parse_action_file(FULL_ACTION, name="iptables")
|
|
serialized = serialize_action_config(original)
|
|
restored = parse_action_file(serialized, name="iptables")
|
|
assert restored.actionban == original.actionban
|
|
assert restored.actionflush == original.actionflush
|
|
assert restored.init_vars.get("blocktype") == original.init_vars.get("blocktype")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Action — reference file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
not (_ACTION_DIR / "iptables.conf").exists(),
|
|
reason="fail2ban-master reference configs not present",
|
|
)
|
|
class TestParseRealIptablesAction:
|
|
"""Parse the real iptables.conf that ships with fail2ban-master."""
|
|
|
|
def test_parses_without_exception(self) -> None:
|
|
content = (_ACTION_DIR / "iptables.conf").read_text()
|
|
cfg = parse_action_file(content, name="iptables", filename="iptables.conf")
|
|
assert cfg.name == "iptables"
|
|
|
|
def test_has_actionban(self) -> None:
|
|
content = (_ACTION_DIR / "iptables.conf").read_text()
|
|
cfg = parse_action_file(content, name="iptables")
|
|
assert cfg.actionban is not None
|
|
|
|
def test_round_trip_preserves_actionban(self) -> None:
|
|
content = (_ACTION_DIR / "iptables.conf").read_text()
|
|
cfg = parse_action_file(content, name="iptables")
|
|
serialized = serialize_action_config(cfg)
|
|
restored = parse_action_file(serialized, name="iptables")
|
|
assert restored.actionban == cfg.actionban
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ActionConfigUpdate — merge
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMergeActionUpdate:
|
|
"""Tests for merge_action_update."""
|
|
|
|
def test_merge_only_actionban(self) -> None:
|
|
from app.models.config import ActionConfigUpdate
|
|
|
|
base = parse_action_file(FULL_ACTION, name="iptables")
|
|
update = ActionConfigUpdate(actionban="iptables -I INPUT -s <ip> -j DROP")
|
|
merged = merge_action_update(base, update)
|
|
assert merged.actionban == "iptables -I INPUT -s <ip> -j DROP"
|
|
assert merged.actionunban == base.actionunban
|
|
|
|
def test_merge_none_fields_unchanged(self) -> None:
|
|
from app.models.config import ActionConfigUpdate
|
|
|
|
base = parse_action_file(FULL_ACTION, name="iptables")
|
|
update = ActionConfigUpdate()
|
|
merged = merge_action_update(base, update)
|
|
assert merged.actionban == base.actionban
|
|
assert merged.init_vars == base.init_vars
|
|
|
|
def test_merge_update_init_vars(self) -> None:
|
|
from app.models.config import ActionConfigUpdate
|
|
|
|
base = parse_action_file(FULL_ACTION, name="iptables")
|
|
update = ActionConfigUpdate(init_vars={"blocktype": "DROP"})
|
|
merged = merge_action_update(base, update)
|
|
assert merged.init_vars["blocktype"] == "DROP"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Jail file test fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
MINIMAL_JAIL = """\
|
|
[sshd]
|
|
enabled = false
|
|
port = ssh
|
|
filter = sshd
|
|
logpath = /var/log/auth.log
|
|
"""
|
|
|
|
FULL_JAIL = """\
|
|
# fail2ban jail configuration
|
|
[sshd]
|
|
enabled = true
|
|
port = ssh,22
|
|
filter = sshd
|
|
backend = polling
|
|
maxretry = 3
|
|
findtime = 600
|
|
bantime = 3600
|
|
logpath = /var/log/auth.log
|
|
/var/log/syslog
|
|
|
|
[nginx-botsearch]
|
|
enabled = false
|
|
port = http,https
|
|
filter = nginx-botsearch
|
|
logpath = /var/log/nginx/error.log
|
|
maxretry = 2
|
|
action = iptables-multiport[name=botsearch, port="http,https"]
|
|
sendmail-whois
|
|
"""
|
|
|
|
JAIL_WITH_EXTRA = """\
|
|
[sshd]
|
|
enabled = true
|
|
port = ssh
|
|
filter = sshd
|
|
logpath = /var/log/auth.log
|
|
custom_key = custom_value
|
|
another_key = 42
|
|
"""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# parse_jail_file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestParseJailFile:
|
|
"""Unit tests for parse_jail_file."""
|
|
|
|
def test_minimal_parses_correctly(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file
|
|
|
|
cfg = parse_jail_file(MINIMAL_JAIL, filename="sshd.conf")
|
|
assert cfg.filename == "sshd.conf"
|
|
assert "sshd" in cfg.jails
|
|
jail = cfg.jails["sshd"]
|
|
assert jail.enabled is False
|
|
assert jail.port == "ssh"
|
|
assert jail.filter == "sshd"
|
|
assert jail.logpath == ["/var/log/auth.log"]
|
|
|
|
def test_full_parses_multiple_jails(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file
|
|
|
|
cfg = parse_jail_file(FULL_JAIL)
|
|
assert len(cfg.jails) == 2
|
|
assert "sshd" in cfg.jails
|
|
assert "nginx-botsearch" in cfg.jails
|
|
|
|
def test_full_jail_numeric_fields(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file
|
|
|
|
jail = parse_jail_file(FULL_JAIL).jails["sshd"]
|
|
assert jail.maxretry == 3
|
|
assert jail.findtime == 600
|
|
assert jail.bantime == 3600
|
|
|
|
def test_full_jail_multiline_logpath(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file
|
|
|
|
jail = parse_jail_file(FULL_JAIL).jails["sshd"]
|
|
assert len(jail.logpath) == 2
|
|
assert "/var/log/auth.log" in jail.logpath
|
|
assert "/var/log/syslog" in jail.logpath
|
|
|
|
def test_full_jail_multiline_action(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file
|
|
|
|
jail = parse_jail_file(FULL_JAIL).jails["nginx-botsearch"]
|
|
assert len(jail.action) == 2
|
|
assert "sendmail-whois" in jail.action
|
|
|
|
def test_enabled_true(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file
|
|
|
|
jail = parse_jail_file(FULL_JAIL).jails["sshd"]
|
|
assert jail.enabled is True
|
|
|
|
def test_enabled_false(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file
|
|
|
|
jail = parse_jail_file(FULL_JAIL).jails["nginx-botsearch"]
|
|
assert jail.enabled is False
|
|
|
|
def test_extra_keys_captured(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file
|
|
|
|
jail = parse_jail_file(JAIL_WITH_EXTRA).jails["sshd"]
|
|
assert jail.extra["custom_key"] == "custom_value"
|
|
assert jail.extra["another_key"] == "42"
|
|
|
|
def test_extra_keys_not_in_named_fields(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file
|
|
|
|
jail = parse_jail_file(JAIL_WITH_EXTRA).jails["sshd"]
|
|
assert "enabled" not in jail.extra
|
|
assert "logpath" not in jail.extra
|
|
|
|
def test_empty_file_yields_no_jails(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file
|
|
|
|
cfg = parse_jail_file("")
|
|
assert cfg.jails == {}
|
|
|
|
def test_invalid_ini_does_not_raise(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file
|
|
|
|
# Should not raise; just parse what it can.
|
|
cfg = parse_jail_file("@@@ not valid ini @@@", filename="bad.conf")
|
|
assert isinstance(cfg.jails, dict)
|
|
|
|
def test_default_section_ignored(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file
|
|
|
|
content = "[DEFAULT]\nignoreip = 127.0.0.1\n\n[sshd]\nenabled = true\n"
|
|
cfg = parse_jail_file(content)
|
|
assert "DEFAULT" not in cfg.jails
|
|
assert "sshd" in cfg.jails
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Jail file round-trip
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestJailFileRoundTrip:
|
|
"""Tests that parse → serialize → parse preserves values."""
|
|
|
|
def test_minimal_round_trip(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file, serialize_jail_file_config
|
|
|
|
original = parse_jail_file(MINIMAL_JAIL, filename="sshd.conf")
|
|
serialized = serialize_jail_file_config(original)
|
|
restored = parse_jail_file(serialized, filename="sshd.conf")
|
|
assert restored.jails["sshd"].enabled == original.jails["sshd"].enabled
|
|
assert restored.jails["sshd"].port == original.jails["sshd"].port
|
|
assert restored.jails["sshd"].logpath == original.jails["sshd"].logpath
|
|
|
|
def test_full_round_trip(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file, serialize_jail_file_config
|
|
|
|
original = parse_jail_file(FULL_JAIL)
|
|
serialized = serialize_jail_file_config(original)
|
|
restored = parse_jail_file(serialized)
|
|
for name, jail in original.jails.items():
|
|
restored_jail = restored.jails[name]
|
|
assert restored_jail.enabled == jail.enabled
|
|
assert restored_jail.maxretry == jail.maxretry
|
|
assert sorted(restored_jail.logpath) == sorted(jail.logpath)
|
|
assert sorted(restored_jail.action) == sorted(jail.action)
|
|
|
|
def test_extra_keys_round_trip(self) -> None:
|
|
from app.services.conffile_parser import parse_jail_file, serialize_jail_file_config
|
|
|
|
original = parse_jail_file(JAIL_WITH_EXTRA)
|
|
serialized = serialize_jail_file_config(original)
|
|
restored = parse_jail_file(serialized)
|
|
assert restored.jails["sshd"].extra["custom_key"] == "custom_value"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# merge_jail_file_update
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMergeJailFileUpdate:
|
|
"""Unit tests for merge_jail_file_update."""
|
|
|
|
def test_none_update_returns_original(self) -> None:
|
|
from app.models.config import JailFileConfigUpdate
|
|
from app.services.conffile_parser import merge_jail_file_update, parse_jail_file
|
|
|
|
cfg = parse_jail_file(FULL_JAIL)
|
|
update = JailFileConfigUpdate()
|
|
merged = merge_jail_file_update(cfg, update)
|
|
assert merged.jails == cfg.jails
|
|
|
|
def test_update_replaces_jail(self) -> None:
|
|
from app.models.config import JailFileConfigUpdate, JailSectionConfig
|
|
from app.services.conffile_parser import merge_jail_file_update, parse_jail_file
|
|
|
|
cfg = parse_jail_file(FULL_JAIL)
|
|
new_sshd = JailSectionConfig(enabled=False, port="2222")
|
|
update = JailFileConfigUpdate(jails={"sshd": new_sshd})
|
|
merged = merge_jail_file_update(cfg, update)
|
|
assert merged.jails["sshd"].enabled is False
|
|
assert merged.jails["sshd"].port == "2222"
|
|
# Other jails unchanged
|
|
assert "nginx-botsearch" in merged.jails
|
|
|
|
def test_update_adds_new_jail(self) -> None:
|
|
from app.models.config import JailFileConfigUpdate, JailSectionConfig
|
|
from app.services.conffile_parser import merge_jail_file_update, parse_jail_file
|
|
|
|
cfg = parse_jail_file(MINIMAL_JAIL)
|
|
new_jail = JailSectionConfig(enabled=True, port="443")
|
|
update = JailFileConfigUpdate(jails={"https": new_jail})
|
|
merged = merge_jail_file_update(cfg, update)
|
|
assert "sshd" in merged.jails
|
|
assert "https" in merged.jails
|
|
assert merged.jails["https"].port == "443"
|