"""Tests for conffile_parser — fail2ban INI file parser and serializer.""" from __future__ import annotations from pathlib import Path import pytest from app.services.conffile_parser import ( merge_action_update, merge_filter_update, parse_action_file, parse_filter_file, serialize_action_config, serialize_filter_config, ) # Path to the bundled fail2ban reference configs shipped with this project. _REF_DIR = Path(__file__).parent.parent.parent.parent / "fail2ban-master" / "config" _FILTER_DIR = _REF_DIR / "filter.d" _ACTION_DIR = _REF_DIR / "action.d" # --------------------------------------------------------------------------- # Filter — parse helpers # --------------------------------------------------------------------------- MINIMAL_FILTER = """\ [Definition] failregex = ^ bad request$ ignoreregex = """ FULL_FILTER = """\ [INCLUDES] before = common.conf [DEFAULT] _daemon = sshd __prefix = (?:sshd )? [Definition] prefregex = ^%(__prefix)s%(__pref)s failregex = ^Authentication failure for .* from via ^User not known .* from ignoreregex = ^Authorised key from maxlines = 10 datepattern = %%Y-%%m-%%d %%H:%%M:%%S journalmatch = _SYSTEMD_UNIT=sshd.service """ class TestParseFilterFile: """Unit tests for parse_filter_file.""" def test_minimal_filter(self) -> None: cfg = parse_filter_file(MINIMAL_FILTER, name="test", filename="test.conf") assert cfg.name == "test" assert cfg.filename == "test.conf" assert cfg.failregex == ["^ bad request$"] assert cfg.ignoreregex == [] assert cfg.before is None assert cfg.after is None def test_full_filter_includes(self) -> None: cfg = parse_filter_file(FULL_FILTER, name="sshd") assert cfg.before == "common.conf" assert cfg.after is None def test_full_filter_defaults(self) -> None: cfg = parse_filter_file(FULL_FILTER, name="sshd") assert "_daemon" in cfg.variables assert cfg.variables["_daemon"] == "sshd" def test_full_filter_failregex_multiline(self) -> None: cfg = parse_filter_file(FULL_FILTER, name="sshd") assert len(cfg.failregex) == 2 assert "Authentication failure" in cfg.failregex[0] assert "User not known" in cfg.failregex[1] def test_full_filter_ignoreregex(self) -> None: cfg = parse_filter_file(FULL_FILTER, name="sshd") assert len(cfg.ignoreregex) == 1 assert "Authorised key" in cfg.ignoreregex[0] def test_full_filter_optional_fields(self) -> None: cfg = parse_filter_file(FULL_FILTER, name="sshd") assert cfg.maxlines == 10 assert cfg.datepattern is not None assert cfg.journalmatch == "_SYSTEMD_UNIT=sshd.service" def test_empty_failregex(self) -> None: content = "[Definition]\nfailregex =\nignoreregex =\n" cfg = parse_filter_file(content, name="empty") assert cfg.failregex == [] assert cfg.ignoreregex == [] def test_comment_lines_stripped_from_failregex(self) -> None: content = ( "[Definition]\n" "failregex = ^ good$\n" " # this is a comment\n" " ^ also good$\n" ) cfg = parse_filter_file(content, name="t") assert len(cfg.failregex) == 2 assert cfg.failregex[1] == "^ also good$" def test_malformed_content_does_not_raise(self) -> None: """Parser should not crash on malformed content; return partial result.""" cfg = parse_filter_file("not an ini file at all!!!", name="bad") assert cfg.failregex == [] def test_no_definition_section(self) -> None: cfg = parse_filter_file("[INCLUDES]\nbefore = common.conf\n", name="x") assert cfg.before == "common.conf" assert cfg.failregex == [] # --------------------------------------------------------------------------- # Filter — round-trip (parse → serialize → parse) # --------------------------------------------------------------------------- class TestFilterRoundTrip: """Serialize then re-parse and verify the result matches.""" def test_round_trip_minimal(self) -> None: original = parse_filter_file(MINIMAL_FILTER, name="t") serialized = serialize_filter_config(original) restored = parse_filter_file(serialized, name="t") assert restored.failregex == original.failregex assert restored.ignoreregex == original.ignoreregex def test_round_trip_full(self) -> None: original = parse_filter_file(FULL_FILTER, name="sshd") serialized = serialize_filter_config(original) restored = parse_filter_file(serialized, name="sshd") assert restored.before == original.before assert restored.failregex == original.failregex assert restored.ignoreregex == original.ignoreregex assert restored.maxlines == original.maxlines assert restored.datepattern == original.datepattern assert restored.journalmatch == original.journalmatch # --------------------------------------------------------------------------- # Filter — reference file (sshd.conf shipped with fail2ban-master) # --------------------------------------------------------------------------- @pytest.mark.skipif( not (_FILTER_DIR / "sshd.conf").exists(), reason="fail2ban-master reference configs not present", ) class TestParseRealSshdFilter: """Parse the real sshd.conf that ships with fail2ban-master.""" def test_parses_without_exception(self) -> None: content = (_FILTER_DIR / "sshd.conf").read_text() cfg = parse_filter_file(content, name="sshd", filename="sshd.conf") assert cfg.name == "sshd" def test_has_failregex_patterns(self) -> None: content = (_FILTER_DIR / "sshd.conf").read_text() cfg = parse_filter_file(content, name="sshd") assert len(cfg.failregex) > 0 def test_has_includes(self) -> None: content = (_FILTER_DIR / "sshd.conf").read_text() cfg = parse_filter_file(content, name="sshd") assert cfg.before is not None def test_round_trip_preserves_failregex_count(self) -> None: content = (_FILTER_DIR / "sshd.conf").read_text() cfg = parse_filter_file(content, name="sshd") serialized = serialize_filter_config(cfg) restored = parse_filter_file(serialized, name="sshd") assert len(restored.failregex) == len(cfg.failregex) # --------------------------------------------------------------------------- # FilterConfigUpdate — merge # --------------------------------------------------------------------------- class TestMergeFilterUpdate: """Tests for merge_filter_update.""" def test_merge_only_failregex(self) -> None: from app.models.config import FilterConfigUpdate base = parse_filter_file(FULL_FILTER, name="sshd") update = FilterConfigUpdate(failregex=["^new pattern $"]) merged = merge_filter_update(base, update) assert merged.failregex == ["^new pattern $"] # Everything else unchanged: assert merged.before == base.before assert merged.maxlines == base.maxlines def test_merge_none_fields_unchanged(self) -> None: from app.models.config import FilterConfigUpdate base = parse_filter_file(FULL_FILTER, name="sshd") update = FilterConfigUpdate() # All None merged = merge_filter_update(base, update) assert merged.failregex == base.failregex assert merged.ignoreregex == base.ignoreregex def test_merge_clear_ignoreregex(self) -> None: from app.models.config import FilterConfigUpdate base = parse_filter_file(FULL_FILTER, name="sshd") update = FilterConfigUpdate(ignoreregex=[]) merged = merge_filter_update(base, update) assert merged.ignoreregex == [] def test_merge_update_variables(self) -> None: from app.models.config import FilterConfigUpdate base = parse_filter_file(FULL_FILTER, name="sshd") update = FilterConfigUpdate(variables={"_daemon": "mynewdaemon"}) merged = merge_filter_update(base, update) assert merged.variables["_daemon"] == "mynewdaemon" # --------------------------------------------------------------------------- # Action — parse helpers # --------------------------------------------------------------------------- MINIMAL_ACTION = """\ [Definition] actionban = iptables -I INPUT -s -j DROP actionunban = iptables -D INPUT -s -j DROP """ FULL_ACTION = """\ [INCLUDES] before = iptables-common.conf [Definition] actionstart = { iptables -N f2b- iptables -A INPUT -p -j f2b- } actionstop = iptables -D INPUT -p -j f2b- iptables -F f2b- iptables -X f2b- actioncheck = iptables -n -L INPUT | grep -q f2b-[ \\t] actionban = iptables -I f2b- 1 -s -j actionunban = iptables -D f2b- -s -j actionflush = iptables -F f2b- name = default protocol = tcp chain = INPUT blocktype = REJECT [Init] blocktype = REJECT --reject-with icmp-port-unreachable name = default """ class TestParseActionFile: """Unit tests for parse_action_file.""" def test_minimal_action(self) -> None: cfg = parse_action_file(MINIMAL_ACTION, name="test", filename="test.conf") assert cfg.name == "test" assert cfg.actionban is not None assert "" in cfg.actionban assert cfg.actionunban is not None def test_full_action_includes(self) -> None: cfg = parse_action_file(FULL_ACTION, name="iptables") assert cfg.before == "iptables-common.conf" def test_full_action_lifecycle_keys(self) -> None: cfg = parse_action_file(FULL_ACTION, name="iptables") assert cfg.actionstart is not None assert cfg.actionstop is not None assert cfg.actioncheck is not None assert cfg.actionban is not None assert cfg.actionunban is not None assert cfg.actionflush is not None def test_full_action_definition_vars(self) -> None: cfg = parse_action_file(FULL_ACTION, name="iptables") # 'name', 'protocol', 'chain', 'blocktype' are definition vars assert "protocol" in cfg.definition_vars assert "chain" in cfg.definition_vars def test_full_action_init_vars(self) -> None: cfg = parse_action_file(FULL_ACTION, name="iptables") assert "blocktype" in cfg.init_vars assert "REJECT" in cfg.init_vars["blocktype"] def test_empty_action(self) -> None: cfg = parse_action_file("[Definition]\n", name="empty") assert cfg.actionban is None assert cfg.definition_vars == {} assert cfg.init_vars == {} def test_malformed_does_not_raise(self) -> None: cfg = parse_action_file("@@@@not valid ini@@@@", name="bad") assert cfg.actionban is None # --------------------------------------------------------------------------- # Action — round-trip # --------------------------------------------------------------------------- class TestActionRoundTrip: """Serialize then re-parse and verify key fields are preserved.""" def test_round_trip_minimal(self) -> None: original = parse_action_file(MINIMAL_ACTION, name="t") serialized = serialize_action_config(original) restored = parse_action_file(serialized, name="t") assert restored.actionban == original.actionban assert restored.actionunban == original.actionunban def test_round_trip_full(self) -> None: original = parse_action_file(FULL_ACTION, name="iptables") serialized = serialize_action_config(original) restored = parse_action_file(serialized, name="iptables") assert restored.actionban == original.actionban assert restored.actionflush == original.actionflush assert restored.init_vars.get("blocktype") == original.init_vars.get("blocktype") # --------------------------------------------------------------------------- # Action — reference file # --------------------------------------------------------------------------- @pytest.mark.skipif( not (_ACTION_DIR / "iptables.conf").exists(), reason="fail2ban-master reference configs not present", ) class TestParseRealIptablesAction: """Parse the real iptables.conf that ships with fail2ban-master.""" def test_parses_without_exception(self) -> None: content = (_ACTION_DIR / "iptables.conf").read_text() cfg = parse_action_file(content, name="iptables", filename="iptables.conf") assert cfg.name == "iptables" def test_has_actionban(self) -> None: content = (_ACTION_DIR / "iptables.conf").read_text() cfg = parse_action_file(content, name="iptables") assert cfg.actionban is not None def test_round_trip_preserves_actionban(self) -> None: content = (_ACTION_DIR / "iptables.conf").read_text() cfg = parse_action_file(content, name="iptables") serialized = serialize_action_config(cfg) restored = parse_action_file(serialized, name="iptables") assert restored.actionban == cfg.actionban # --------------------------------------------------------------------------- # ActionConfigUpdate — merge # --------------------------------------------------------------------------- class TestMergeActionUpdate: """Tests for merge_action_update.""" def test_merge_only_actionban(self) -> None: from app.models.config import ActionConfigUpdate base = parse_action_file(FULL_ACTION, name="iptables") update = ActionConfigUpdate(actionban="iptables -I INPUT -s -j DROP") merged = merge_action_update(base, update) assert merged.actionban == "iptables -I INPUT -s -j DROP" assert merged.actionunban == base.actionunban def test_merge_none_fields_unchanged(self) -> None: from app.models.config import ActionConfigUpdate base = parse_action_file(FULL_ACTION, name="iptables") update = ActionConfigUpdate() merged = merge_action_update(base, update) assert merged.actionban == base.actionban assert merged.init_vars == base.init_vars def test_merge_update_init_vars(self) -> None: from app.models.config import ActionConfigUpdate base = parse_action_file(FULL_ACTION, name="iptables") update = ActionConfigUpdate(init_vars={"blocktype": "DROP"}) merged = merge_action_update(base, update) assert merged.init_vars["blocktype"] == "DROP" # --------------------------------------------------------------------------- # Jail file test fixtures # --------------------------------------------------------------------------- MINIMAL_JAIL = """\ [sshd] enabled = false port = ssh filter = sshd logpath = /var/log/auth.log """ FULL_JAIL = """\ # fail2ban jail configuration [sshd] enabled = true port = ssh,22 filter = sshd backend = polling maxretry = 3 findtime = 600 bantime = 3600 logpath = /var/log/auth.log /var/log/syslog [nginx-botsearch] enabled = false port = http,https filter = nginx-botsearch logpath = /var/log/nginx/error.log maxretry = 2 action = iptables-multiport[name=botsearch, port="http,https"] sendmail-whois """ JAIL_WITH_EXTRA = """\ [sshd] enabled = true port = ssh filter = sshd logpath = /var/log/auth.log custom_key = custom_value another_key = 42 """ # --------------------------------------------------------------------------- # parse_jail_file # --------------------------------------------------------------------------- class TestParseJailFile: """Unit tests for parse_jail_file.""" def test_minimal_parses_correctly(self) -> None: from app.services.conffile_parser import parse_jail_file cfg = parse_jail_file(MINIMAL_JAIL, filename="sshd.conf") assert cfg.filename == "sshd.conf" assert "sshd" in cfg.jails jail = cfg.jails["sshd"] assert jail.enabled is False assert jail.port == "ssh" assert jail.filter == "sshd" assert jail.logpath == ["/var/log/auth.log"] def test_full_parses_multiple_jails(self) -> None: from app.services.conffile_parser import parse_jail_file cfg = parse_jail_file(FULL_JAIL) assert len(cfg.jails) == 2 assert "sshd" in cfg.jails assert "nginx-botsearch" in cfg.jails def test_full_jail_numeric_fields(self) -> None: from app.services.conffile_parser import parse_jail_file jail = parse_jail_file(FULL_JAIL).jails["sshd"] assert jail.maxretry == 3 assert jail.findtime == 600 assert jail.bantime == 3600 def test_full_jail_multiline_logpath(self) -> None: from app.services.conffile_parser import parse_jail_file jail = parse_jail_file(FULL_JAIL).jails["sshd"] assert len(jail.logpath) == 2 assert "/var/log/auth.log" in jail.logpath assert "/var/log/syslog" in jail.logpath def test_full_jail_multiline_action(self) -> None: from app.services.conffile_parser import parse_jail_file jail = parse_jail_file(FULL_JAIL).jails["nginx-botsearch"] assert len(jail.action) == 2 assert "sendmail-whois" in jail.action def test_enabled_true(self) -> None: from app.services.conffile_parser import parse_jail_file jail = parse_jail_file(FULL_JAIL).jails["sshd"] assert jail.enabled is True def test_enabled_false(self) -> None: from app.services.conffile_parser import parse_jail_file jail = parse_jail_file(FULL_JAIL).jails["nginx-botsearch"] assert jail.enabled is False def test_extra_keys_captured(self) -> None: from app.services.conffile_parser import parse_jail_file jail = parse_jail_file(JAIL_WITH_EXTRA).jails["sshd"] assert jail.extra["custom_key"] == "custom_value" assert jail.extra["another_key"] == "42" def test_extra_keys_not_in_named_fields(self) -> None: from app.services.conffile_parser import parse_jail_file jail = parse_jail_file(JAIL_WITH_EXTRA).jails["sshd"] assert "enabled" not in jail.extra assert "logpath" not in jail.extra def test_empty_file_yields_no_jails(self) -> None: from app.services.conffile_parser import parse_jail_file cfg = parse_jail_file("") assert cfg.jails == {} def test_invalid_ini_does_not_raise(self) -> None: from app.services.conffile_parser import parse_jail_file # Should not raise; just parse what it can. cfg = parse_jail_file("@@@ not valid ini @@@", filename="bad.conf") assert isinstance(cfg.jails, dict) def test_default_section_ignored(self) -> None: from app.services.conffile_parser import parse_jail_file content = "[DEFAULT]\nignoreip = 127.0.0.1\n\n[sshd]\nenabled = true\n" cfg = parse_jail_file(content) assert "DEFAULT" not in cfg.jails assert "sshd" in cfg.jails # --------------------------------------------------------------------------- # Jail file round-trip # --------------------------------------------------------------------------- class TestJailFileRoundTrip: """Tests that parse → serialize → parse preserves values.""" def test_minimal_round_trip(self) -> None: from app.services.conffile_parser import parse_jail_file, serialize_jail_file_config original = parse_jail_file(MINIMAL_JAIL, filename="sshd.conf") serialized = serialize_jail_file_config(original) restored = parse_jail_file(serialized, filename="sshd.conf") assert restored.jails["sshd"].enabled == original.jails["sshd"].enabled assert restored.jails["sshd"].port == original.jails["sshd"].port assert restored.jails["sshd"].logpath == original.jails["sshd"].logpath def test_full_round_trip(self) -> None: from app.services.conffile_parser import parse_jail_file, serialize_jail_file_config original = parse_jail_file(FULL_JAIL) serialized = serialize_jail_file_config(original) restored = parse_jail_file(serialized) for name, jail in original.jails.items(): restored_jail = restored.jails[name] assert restored_jail.enabled == jail.enabled assert restored_jail.maxretry == jail.maxretry assert sorted(restored_jail.logpath) == sorted(jail.logpath) assert sorted(restored_jail.action) == sorted(jail.action) def test_extra_keys_round_trip(self) -> None: from app.services.conffile_parser import parse_jail_file, serialize_jail_file_config original = parse_jail_file(JAIL_WITH_EXTRA) serialized = serialize_jail_file_config(original) restored = parse_jail_file(serialized) assert restored.jails["sshd"].extra["custom_key"] == "custom_value" # --------------------------------------------------------------------------- # merge_jail_file_update # --------------------------------------------------------------------------- class TestMergeJailFileUpdate: """Unit tests for merge_jail_file_update.""" def test_none_update_returns_original(self) -> None: from app.models.config import JailFileConfigUpdate from app.services.conffile_parser import merge_jail_file_update, parse_jail_file cfg = parse_jail_file(FULL_JAIL) update = JailFileConfigUpdate() merged = merge_jail_file_update(cfg, update) assert merged.jails == cfg.jails def test_update_replaces_jail(self) -> None: from app.models.config import JailFileConfigUpdate, JailSectionConfig from app.services.conffile_parser import merge_jail_file_update, parse_jail_file cfg = parse_jail_file(FULL_JAIL) new_sshd = JailSectionConfig(enabled=False, port="2222") update = JailFileConfigUpdate(jails={"sshd": new_sshd}) merged = merge_jail_file_update(cfg, update) assert merged.jails["sshd"].enabled is False assert merged.jails["sshd"].port == "2222" # Other jails unchanged assert "nginx-botsearch" in merged.jails def test_update_adds_new_jail(self) -> None: from app.models.config import JailFileConfigUpdate, JailSectionConfig from app.services.conffile_parser import merge_jail_file_update, parse_jail_file cfg = parse_jail_file(MINIMAL_JAIL) new_jail = JailSectionConfig(enabled=True, port="443") update = JailFileConfigUpdate(jails={"https": new_jail}) merged = merge_jail_file_update(cfg, update) assert "sshd" in merged.jails assert "https" in merged.jails assert merged.jails["https"].port == "443"