"""Tests for app.utils.config_parser.Fail2BanConfigParser.""" from __future__ import annotations from pathlib import Path from app.utils.config_parser import Fail2BanConfigParser # --------------------------------------------------------------------------- # Fixtures and helpers # --------------------------------------------------------------------------- _FILTER_CONF = """\ [INCLUDES] before = common.conf [Definition] failregex = ^%(host)s .*$ ignoreregex = """ _COMMON_CONF = """\ [DEFAULT] host = """ _FILTER_LOCAL = """\ [Definition] failregex = ^OVERRIDE %(host)s$ """ _ACTION_CONF = """\ [Definition] actionstart = iptables -N f2b- actionstop = iptables -X f2b- actionban = iptables -I INPUT -s -j DROP actionunban = iptables -D INPUT -s -j DROP [Init] name = default ip = 1.2.3.4 """ def _write(tmp_path: Path, name: str, content: str) -> Path: """Write *content* to *tmp_path/name* and return the path.""" p = tmp_path / name p.write_text(content, encoding="utf-8") return p # --------------------------------------------------------------------------- # TestOrderedConfFiles # --------------------------------------------------------------------------- class TestOrderedConfFiles: def test_empty_dir_returns_empty(self, tmp_path: Path) -> None: result = Fail2BanConfigParser.ordered_conf_files(tmp_path, "jail") assert result == [] def test_conf_only(self, tmp_path: Path) -> None: conf = _write(tmp_path, "jail.conf", "[DEFAULT]\n") result = Fail2BanConfigParser.ordered_conf_files(tmp_path, "jail") assert result == [conf] def test_conf_then_local(self, tmp_path: Path) -> None: conf = _write(tmp_path, "jail.conf", "[DEFAULT]\n") local = _write(tmp_path, "jail.local", "[DEFAULT]\n") result = Fail2BanConfigParser.ordered_conf_files(tmp_path, "jail") assert result == [conf, local] def test_d_dir_overrides_appended(self, tmp_path: Path) -> None: conf = _write(tmp_path, "jail.conf", "[DEFAULT]\n") local = _write(tmp_path, "jail.local", "[DEFAULT]\n") d_dir = tmp_path / "jail.d" d_dir.mkdir() d_conf = _write(d_dir, "extra.conf", "[DEFAULT]\n") d_local = _write(d_dir, "extra.local", "[DEFAULT]\n") result = Fail2BanConfigParser.ordered_conf_files(tmp_path, "jail") assert result == [conf, local, d_conf, d_local] def test_missing_local_skipped(self, tmp_path: Path) -> None: conf = _write(tmp_path, "jail.conf", "[DEFAULT]\n") result = Fail2BanConfigParser.ordered_conf_files(tmp_path, "jail") assert conf in result assert len(result) == 1 def test_d_dir_sorted(self, tmp_path: Path) -> None: d_dir = tmp_path / "jail.d" d_dir.mkdir() _write(d_dir, "zzz.conf", "[DEFAULT]\n") _write(d_dir, "aaa.conf", "[DEFAULT]\n") result = Fail2BanConfigParser.ordered_conf_files(tmp_path, "jail") names = [p.name for p in result] assert names == ["aaa.conf", "zzz.conf"] # --------------------------------------------------------------------------- # TestReadFile # --------------------------------------------------------------------------- class TestReadFile: def test_reads_single_file(self, tmp_path: Path) -> None: _write(tmp_path, "common.conf", _COMMON_CONF) p = _write(tmp_path, "filter.conf", _FILTER_CONF) parser = Fail2BanConfigParser() parser.read_file(p) assert parser.has_section("Definition") def test_before_include_loaded(self, tmp_path: Path) -> None: _write(tmp_path, "common.conf", _COMMON_CONF) p = _write(tmp_path, "filter.conf", _FILTER_CONF) parser = Fail2BanConfigParser() parser.read_file(p) # DEFAULT from common.conf should be merged. defaults = parser.defaults() assert "host" in defaults def test_missing_file_is_silent(self, tmp_path: Path) -> None: parser = Fail2BanConfigParser() parser.read_file(tmp_path / "nonexistent.conf") assert parser.sections() == [] def test_after_include_overrides(self, tmp_path: Path) -> None: after_content = """\ [Definition] key = after_value """ after = _write(tmp_path, "after.conf", after_content) _ = after # used via [INCLUDES] main_content = """\ [INCLUDES] after = after.conf [Definition] key = main_value """ p = _write(tmp_path, "main.conf", main_content) parser = Fail2BanConfigParser() parser.read_file(p) # 'after' was loaded last → highest priority. assert parser.get("Definition", "key") == "after_value" def test_cycle_detection(self, tmp_path: Path) -> None: # A includes B, B includes A. a_content = """\ [INCLUDES] before = b.conf [Definition] key = from_a """ b_content = """\ [INCLUDES] before = a.conf [Definition] key = from_b """ _write(tmp_path, "a.conf", a_content) _write(tmp_path, "b.conf", b_content) parser = Fail2BanConfigParser() # Should not infinite-loop; terminates via cycle detection. parser.read_file(tmp_path / "a.conf") assert parser.has_section("Definition") def test_max_depth_guard(self, tmp_path: Path) -> None: # Create a chain: 0→1→2→…→max+1 max_depth = 3 for i in range(max_depth + 2): content = f"[INCLUDES]\nbefore = {i + 1}.conf\n\n[s{i}]\nk = v\n" _write(tmp_path, f"{i}.conf", content) parser = Fail2BanConfigParser(max_include_depth=max_depth) parser.read_file(tmp_path / "0.conf") # Should complete without recursion error; some sections will be missing. assert isinstance(parser.sections(), list) def test_invalid_ini_is_ignored(self, tmp_path: Path) -> None: bad = _write(tmp_path, "bad.conf", "this is not valid [[[ini\nstuff") parser = Fail2BanConfigParser() # Should not raise; parser logs and continues. parser.read_file(bad) # --------------------------------------------------------------------------- # TestReadWithOverrides # --------------------------------------------------------------------------- class TestReadWithOverrides: def test_local_overrides_conf(self, tmp_path: Path) -> None: _write(tmp_path, "sshd.conf", "[Definition]\nfailregex = original\n") _write(tmp_path, "sshd.local", "[Definition]\nfailregex = overridden\n") parser = Fail2BanConfigParser() parser.read_with_overrides(tmp_path / "sshd.conf") assert parser.get("Definition", "failregex") == "overridden" def test_no_local_just_reads_conf(self, tmp_path: Path) -> None: _write(tmp_path, "sshd.conf", "[Definition]\nfailregex = only_conf\n") parser = Fail2BanConfigParser() parser.read_with_overrides(tmp_path / "sshd.conf") assert parser.get("Definition", "failregex") == "only_conf" def test_local_adds_new_key(self, tmp_path: Path) -> None: _write(tmp_path, "sshd.conf", "[Definition]\nfailregex = orig\n") _write(tmp_path, "sshd.local", "[Definition]\nextrakey = newval\n") parser = Fail2BanConfigParser() parser.read_with_overrides(tmp_path / "sshd.conf") assert parser.get("Definition", "extrakey") == "newval" def test_conf_keys_preserved_when_local_overrides_other( self, tmp_path: Path ) -> None: _write( tmp_path, "sshd.conf", "[Definition]\nfailregex = orig\nignoreregex = keep_me\n", ) _write(tmp_path, "sshd.local", "[Definition]\nfailregex = new\n") parser = Fail2BanConfigParser() parser.read_with_overrides(tmp_path / "sshd.conf") assert parser.get("Definition", "ignoreregex") == "keep_me" assert parser.get("Definition", "failregex") == "new" # --------------------------------------------------------------------------- # TestSections # --------------------------------------------------------------------------- class TestSections: def test_sections_excludes_default(self, tmp_path: Path) -> None: content = "[DEFAULT]\nfoo = bar\n\n[Definition]\nbaz = qux\n" p = _write(tmp_path, "x.conf", content) parser = Fail2BanConfigParser() parser.read_file(p) secs = parser.sections() assert "DEFAULT" not in secs assert "Definition" in secs def test_has_section_true(self, tmp_path: Path) -> None: p = _write(tmp_path, "x.conf", "[Init]\nname = test\n") parser = Fail2BanConfigParser() parser.read_file(p) assert parser.has_section("Init") is True def test_has_section_false(self, tmp_path: Path) -> None: p = _write(tmp_path, "x.conf", "[Init]\nname = test\n") parser = Fail2BanConfigParser() parser.read_file(p) assert parser.has_section("Nonexistent") is False def test_get_returns_none_for_missing_section(self, tmp_path: Path) -> None: p = _write(tmp_path, "x.conf", "[Init]\nname = test\n") parser = Fail2BanConfigParser() parser.read_file(p) assert parser.get("NoSection", "key") is None def test_get_returns_none_for_missing_key(self, tmp_path: Path) -> None: p = _write(tmp_path, "x.conf", "[Init]\nname = test\n") parser = Fail2BanConfigParser() parser.read_file(p) assert parser.get("Init", "nokey") is None # --------------------------------------------------------------------------- # TestSectionDict # --------------------------------------------------------------------------- class TestSectionDict: def test_returns_all_keys(self, tmp_path: Path) -> None: p = _write(tmp_path, "x.conf", "[Definition]\na = 1\nb = 2\n") parser = Fail2BanConfigParser() parser.read_file(p) d = parser.section_dict("Definition") assert d == {"a": "1", "b": "2"} def test_empty_for_missing_section(self, tmp_path: Path) -> None: p = _write(tmp_path, "x.conf", "[Definition]\na = 1\n") parser = Fail2BanConfigParser() parser.read_file(p) assert parser.section_dict("Init") == {} def test_skip_excludes_keys(self, tmp_path: Path) -> None: p = _write(tmp_path, "x.conf", "[Definition]\na = 1\nb = 2\nc = 3\n") parser = Fail2BanConfigParser() parser.read_file(p) d = parser.section_dict("Definition", skip=frozenset({"b"})) assert "b" not in d assert d["a"] == "1" def test_dunder_keys_excluded(self, tmp_path: Path) -> None: # configparser can inject __name__, __add__ etc. from DEFAULT. content = "[DEFAULT]\n__name__ = foo\n\n[Definition]\nreal = val\n" p = _write(tmp_path, "x.conf", content) parser = Fail2BanConfigParser() parser.read_file(p) d = parser.section_dict("Definition") assert "__name__" not in d assert "real" in d # --------------------------------------------------------------------------- # TestDefaults # --------------------------------------------------------------------------- class TestDefaults: def test_defaults_from_default_section(self, tmp_path: Path) -> None: content = "[DEFAULT]\nhost = \n\n[Definition]\nfailregex = ^\n" p = _write(tmp_path, "x.conf", content) parser = Fail2BanConfigParser() parser.read_file(p) assert parser.defaults().get("host") == "" def test_defaults_empty_when_no_default_section(self, tmp_path: Path) -> None: p = _write(tmp_path, "x.conf", "[Definition]\nfailregex = ^\n") parser = Fail2BanConfigParser() parser.read_file(p) assert parser.defaults() == {} # --------------------------------------------------------------------------- # TestInterpolate # --------------------------------------------------------------------------- class TestInterpolate: def _parser_with(self, tmp_path: Path, content: str) -> Fail2BanConfigParser: p = _write(tmp_path, "x.conf", content) parser = Fail2BanConfigParser() parser.read_file(p) return parser def test_substitutes_default_var(self, tmp_path: Path) -> None: parser = self._parser_with( tmp_path, "[DEFAULT]\nhost = \n\n[Definition]\nrule = match %(host)s\n", ) assert parser.interpolate("match %(host)s") == "match " def test_substitutes_init_var(self, tmp_path: Path) -> None: parser = self._parser_with( tmp_path, "[Init]\nname = sshd\n", ) assert parser.interpolate("f2b-%(name)s") == "f2b-sshd" def test_extra_vars_highest_priority(self, tmp_path: Path) -> None: parser = self._parser_with( tmp_path, "[DEFAULT]\nname = default_name\n", ) result = parser.interpolate("%(name)s", extra_vars={"name": "override"}) assert result == "override" def test_unresolvable_left_unchanged(self, tmp_path: Path) -> None: parser = Fail2BanConfigParser() result = parser.interpolate("value %(unknown)s end") assert result == "value %(unknown)s end" def test_nested_interpolation(self, tmp_path: Path) -> None: # %(outer)s → %(inner)s → final parser = self._parser_with( tmp_path, "[DEFAULT]\ninner = final\nouter = %(inner)s\n", ) assert parser.interpolate("%(outer)s") == "final" def test_no_references_returned_unchanged(self, tmp_path: Path) -> None: parser = Fail2BanConfigParser() assert parser.interpolate("plain value") == "plain value" def test_empty_string(self, tmp_path: Path) -> None: parser = Fail2BanConfigParser() assert parser.interpolate("") == "" # --------------------------------------------------------------------------- # TestSplitMultiline # --------------------------------------------------------------------------- class TestSplitMultiline: def test_strips_blank_lines(self) -> None: raw = "line1\n\nline2\n\n" assert Fail2BanConfigParser.split_multiline(raw) == ["line1", "line2"] def test_strips_comment_lines(self) -> None: raw = "line1\n# comment\nline2" assert Fail2BanConfigParser.split_multiline(raw) == ["line1", "line2"] def test_strips_leading_whitespace(self) -> None: raw = " line1\n line2" assert Fail2BanConfigParser.split_multiline(raw) == ["line1", "line2"] def test_empty_input(self) -> None: assert Fail2BanConfigParser.split_multiline("") == [] def test_all_comments(self) -> None: raw = "# first\n# second" assert Fail2BanConfigParser.split_multiline(raw) == [] def test_single_line(self) -> None: assert Fail2BanConfigParser.split_multiline("single") == ["single"] def test_preserves_internal_spaces(self) -> None: raw = "iptables -I INPUT -s -j DROP" assert Fail2BanConfigParser.split_multiline(raw) == [ "iptables -I INPUT -s -j DROP" ] def test_multiline_regex_list(self) -> None: raw = ( "\n" " ^%(__prefix_line)s Authentication failure for .* from \n" " # inline comment, skip\n" " ^%(__prefix_line)s BREAK-IN ATTEMPT by \n" ) result = Fail2BanConfigParser.split_multiline(raw) assert len(result) == 2 assert all("HOST" in r for r in result) # --------------------------------------------------------------------------- # TestMultipleFiles (integration-style tests) # --------------------------------------------------------------------------- class TestMultipleFilesIntegration: """Tests that combine several files to verify merge order.""" def test_local_only_override(self, tmp_path: Path) -> None: _write(tmp_path, "test.conf", "[Definition]\nfailregex = base\n") _write(tmp_path, "test.local", "[Definition]\nfailregex = local\n") parser = Fail2BanConfigParser() parser.read_with_overrides(tmp_path / "test.conf") assert parser.get("Definition", "failregex") == "local" def test_before_then_conf_then_local(self, tmp_path: Path) -> None: # before.conf → test.conf → test.local (ascending priority) _write(tmp_path, "before.conf", "[Definition]\nsource = before\n") _write( tmp_path, "test.conf", "[INCLUDES]\nbefore = before.conf\n\n[Definition]\nsource = conf\n", ) _write(tmp_path, "test.local", "[Definition]\nsource = local\n") parser = Fail2BanConfigParser() parser.read_with_overrides(tmp_path / "test.conf") assert parser.get("Definition", "source") == "local" def test_before_key_preserved_if_not_overridden(self, tmp_path: Path) -> None: _write(tmp_path, "common.conf", "[DEFAULT]\nhost = \n") _write( tmp_path, "filter.conf", "[INCLUDES]\nbefore = common.conf\n\n[Definition]\nfailregex = ^%(host)s\n", ) parser = Fail2BanConfigParser() parser.read_file(tmp_path / "filter.conf") assert parser.defaults().get("host") == "" assert parser.get("Definition", "failregex") == "^%(host)s" def test_fresh_parser_has_no_state(self) -> None: p1 = Fail2BanConfigParser() p2 = Fail2BanConfigParser() assert p1.sections() == [] assert p2.sections() == [] assert p1 is not p2