From f5c363525845cb27eda8a06d5ad5d9f5e8a33b49 Mon Sep 17 00:00:00 2001 From: Lukas Date: Fri, 13 Mar 2026 13:47:35 +0100 Subject: [PATCH] test(backend): add tests for conf-file parser, file-config service and router - test_conffile_parser.py: unit tests for section/key parsing, comment preservation, and round-trip write correctness - test_file_config_service.py: service-level tests with mock filesystem - test_file_config.py: router integration tests covering GET / PUT endpoints for jails, actions, and filters --- .../tests/test_routers/test_file_config.py | 334 ++++++++++ .../test_services/test_conffile_parser.py | 624 ++++++++++++++++++ .../test_services/test_file_config_service.py | 188 ++++++ 3 files changed, 1146 insertions(+) create mode 100644 backend/tests/test_services/test_conffile_parser.py diff --git a/backend/tests/test_routers/test_file_config.py b/backend/tests/test_routers/test_file_config.py index 6d47f6d..cede4f3 100644 --- a/backend/tests/test_routers/test_file_config.py +++ b/backend/tests/test_routers/test_file_config.py @@ -12,6 +12,12 @@ from httpx import ASGITransport, AsyncClient from app.config import Settings from app.db import init_db from app.main import create_app +from app.models.config import ( + ActionConfig, + FilterConfig, + JailFileConfig, + JailSectionConfig, +) from app.models.file_config import ( ConfFileContent, ConfFileEntry, @@ -377,3 +383,331 @@ class TestCreateActionFile: assert resp.status_code == 201 assert resp.json()["filename"] == "myaction.conf" + + +# --------------------------------------------------------------------------- +# POST /api/config/jail-files +# --------------------------------------------------------------------------- + + +class TestCreateJailConfigFile: + async def test_201_creates_file(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.create_jail_config_file", + AsyncMock(return_value="myjail.conf"), + ): + resp = await file_config_client.post( + "/api/config/jail-files", + json={"name": "myjail", "content": "[myjail]\nenabled = true\n"}, + ) + + assert resp.status_code == 201 + assert resp.json()["filename"] == "myjail.conf" + + async def test_409_conflict(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.create_jail_config_file", + AsyncMock(side_effect=ConfigFileExistsError("myjail.conf")), + ): + resp = await file_config_client.post( + "/api/config/jail-files", + json={"name": "myjail", "content": "[myjail]\nenabled = true\n"}, + ) + + assert resp.status_code == 409 + + async def test_400_invalid_name(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.create_jail_config_file", + AsyncMock(side_effect=ConfigFileNameError("bad/../name")), + ): + resp = await file_config_client.post( + "/api/config/jail-files", + json={"name": "../escape", "content": "[Definition]\n"}, + ) + + assert resp.status_code == 400 + + async def test_503_on_config_dir_error( + self, file_config_client: AsyncClient + ) -> None: + with patch( + "app.routers.file_config.file_config_service.create_jail_config_file", + AsyncMock(side_effect=ConfigDirError("no dir")), + ): + resp = await file_config_client.post( + "/api/config/jail-files", + json={"name": "anyjail", "content": "[anyjail]\nenabled = false\n"}, + ) + + assert resp.status_code == 503 + + +# --------------------------------------------------------------------------- +# GET /api/config/filters/{name}/parsed +# --------------------------------------------------------------------------- + + +class TestGetParsedFilter: + async def test_200_returns_parsed_config( + self, file_config_client: AsyncClient + ) -> None: + cfg = FilterConfig(name="nginx", filename="nginx.conf") + with patch( + "app.routers.file_config.file_config_service.get_parsed_filter_file", + AsyncMock(return_value=cfg), + ): + resp = await file_config_client.get("/api/config/filters/nginx/parsed") + + assert resp.status_code == 200 + data = resp.json() + assert data["name"] == "nginx" + assert data["filename"] == "nginx.conf" + + async def test_404_not_found(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.get_parsed_filter_file", + AsyncMock(side_effect=ConfigFileNotFoundError("missing")), + ): + resp = await file_config_client.get( + "/api/config/filters/missing/parsed" + ) + + assert resp.status_code == 404 + + async def test_503_on_config_dir_error( + self, file_config_client: AsyncClient + ) -> None: + with patch( + "app.routers.file_config.file_config_service.get_parsed_filter_file", + AsyncMock(side_effect=ConfigDirError("no dir")), + ): + resp = await file_config_client.get("/api/config/filters/nginx/parsed") + + assert resp.status_code == 503 + + +# --------------------------------------------------------------------------- +# PUT /api/config/filters/{name}/parsed +# --------------------------------------------------------------------------- + + +class TestUpdateParsedFilter: + async def test_204_on_success(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.update_parsed_filter_file", + AsyncMock(return_value=None), + ): + resp = await file_config_client.put( + "/api/config/filters/nginx/parsed", + json={"failregex": ["^ "]}, + ) + + assert resp.status_code == 204 + + async def test_404_not_found(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.update_parsed_filter_file", + AsyncMock(side_effect=ConfigFileNotFoundError("missing")), + ): + resp = await file_config_client.put( + "/api/config/filters/missing/parsed", + json={"failregex": []}, + ) + + assert resp.status_code == 404 + + async def test_400_write_error(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.update_parsed_filter_file", + AsyncMock(side_effect=ConfigFileWriteError("disk full")), + ): + resp = await file_config_client.put( + "/api/config/filters/nginx/parsed", + json={"failregex": ["^ "]}, + ) + + assert resp.status_code == 400 + + +# --------------------------------------------------------------------------- +# GET /api/config/actions/{name}/parsed +# --------------------------------------------------------------------------- + + +class TestGetParsedAction: + async def test_200_returns_parsed_config( + self, file_config_client: AsyncClient + ) -> None: + cfg = ActionConfig(name="iptables", filename="iptables.conf") + with patch( + "app.routers.file_config.file_config_service.get_parsed_action_file", + AsyncMock(return_value=cfg), + ): + resp = await file_config_client.get( + "/api/config/actions/iptables/parsed" + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["name"] == "iptables" + assert data["filename"] == "iptables.conf" + + async def test_404_not_found(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.get_parsed_action_file", + AsyncMock(side_effect=ConfigFileNotFoundError("missing")), + ): + resp = await file_config_client.get( + "/api/config/actions/missing/parsed" + ) + + assert resp.status_code == 404 + + async def test_503_on_config_dir_error( + self, file_config_client: AsyncClient + ) -> None: + with patch( + "app.routers.file_config.file_config_service.get_parsed_action_file", + AsyncMock(side_effect=ConfigDirError("no dir")), + ): + resp = await file_config_client.get( + "/api/config/actions/iptables/parsed" + ) + + assert resp.status_code == 503 + + +# --------------------------------------------------------------------------- +# PUT /api/config/actions/{name}/parsed +# --------------------------------------------------------------------------- + + +class TestUpdateParsedAction: + async def test_204_on_success(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.update_parsed_action_file", + AsyncMock(return_value=None), + ): + resp = await file_config_client.put( + "/api/config/actions/iptables/parsed", + json={"actionban": "iptables -I INPUT -s -j DROP"}, + ) + + assert resp.status_code == 204 + + async def test_404_not_found(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.update_parsed_action_file", + AsyncMock(side_effect=ConfigFileNotFoundError("missing")), + ): + resp = await file_config_client.put( + "/api/config/actions/missing/parsed", + json={"actionban": ""}, + ) + + assert resp.status_code == 404 + + async def test_400_write_error(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.update_parsed_action_file", + AsyncMock(side_effect=ConfigFileWriteError("disk full")), + ): + resp = await file_config_client.put( + "/api/config/actions/iptables/parsed", + json={"actionban": "iptables -I INPUT -s -j DROP"}, + ) + + assert resp.status_code == 400 + + +# --------------------------------------------------------------------------- +# GET /api/config/jail-files/{filename}/parsed +# --------------------------------------------------------------------------- + + +class TestGetParsedJailFile: + async def test_200_returns_parsed_config( + self, file_config_client: AsyncClient + ) -> None: + section = JailSectionConfig(enabled=True, port="ssh") + cfg = JailFileConfig(filename="sshd.conf", jails={"sshd": section}) + with patch( + "app.routers.file_config.file_config_service.get_parsed_jail_file", + AsyncMock(return_value=cfg), + ): + resp = await file_config_client.get( + "/api/config/jail-files/sshd.conf/parsed" + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["filename"] == "sshd.conf" + assert "sshd" in data["jails"] + + async def test_404_not_found(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.get_parsed_jail_file", + AsyncMock(side_effect=ConfigFileNotFoundError("missing.conf")), + ): + resp = await file_config_client.get( + "/api/config/jail-files/missing.conf/parsed" + ) + + assert resp.status_code == 404 + + async def test_503_on_config_dir_error( + self, file_config_client: AsyncClient + ) -> None: + with patch( + "app.routers.file_config.file_config_service.get_parsed_jail_file", + AsyncMock(side_effect=ConfigDirError("no dir")), + ): + resp = await file_config_client.get( + "/api/config/jail-files/sshd.conf/parsed" + ) + + assert resp.status_code == 503 + + +# --------------------------------------------------------------------------- +# PUT /api/config/jail-files/{filename}/parsed +# --------------------------------------------------------------------------- + + +class TestUpdateParsedJailFile: + async def test_204_on_success(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.update_parsed_jail_file", + AsyncMock(return_value=None), + ): + resp = await file_config_client.put( + "/api/config/jail-files/sshd.conf/parsed", + json={"jails": {"sshd": {"enabled": False}}}, + ) + + assert resp.status_code == 204 + + async def test_404_not_found(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.update_parsed_jail_file", + AsyncMock(side_effect=ConfigFileNotFoundError("missing.conf")), + ): + resp = await file_config_client.put( + "/api/config/jail-files/missing.conf/parsed", + json={"jails": {}}, + ) + + assert resp.status_code == 404 + + async def test_400_write_error(self, file_config_client: AsyncClient) -> None: + with patch( + "app.routers.file_config.file_config_service.update_parsed_jail_file", + AsyncMock(side_effect=ConfigFileWriteError("disk full")), + ): + resp = await file_config_client.put( + "/api/config/jail-files/sshd.conf/parsed", + json={"jails": {"sshd": {"enabled": True}}}, + ) + + assert resp.status_code == 400 diff --git a/backend/tests/test_services/test_conffile_parser.py b/backend/tests/test_services/test_conffile_parser.py new file mode 100644 index 0000000..e69e4f0 --- /dev/null +++ b/backend/tests/test_services/test_conffile_parser.py @@ -0,0 +1,624 @@ +"""Tests for conffile_parser — fail2ban INI file parser and serializer.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from app.services.conffile_parser import ( + merge_action_update, + merge_filter_update, + parse_action_file, + parse_filter_file, + serialize_action_config, + serialize_filter_config, +) + +# Path to the bundled fail2ban reference configs shipped with this project. +_REF_DIR = Path(__file__).parent.parent.parent.parent / "fail2ban-master" / "config" +_FILTER_DIR = _REF_DIR / "filter.d" +_ACTION_DIR = _REF_DIR / "action.d" + + +# --------------------------------------------------------------------------- +# Filter — parse helpers +# --------------------------------------------------------------------------- + + +MINIMAL_FILTER = """\ +[Definition] +failregex = ^ bad request$ +ignoreregex = +""" + +FULL_FILTER = """\ +[INCLUDES] +before = common.conf + +[DEFAULT] +_daemon = sshd +__prefix = (?:sshd )? + +[Definition] +prefregex = ^%(__prefix)s%(__pref)s +failregex = ^Authentication failure for .* from via + ^User not known .* from +ignoreregex = ^Authorised key from + +maxlines = 10 +datepattern = %%Y-%%m-%%d %%H:%%M:%%S +journalmatch = _SYSTEMD_UNIT=sshd.service +""" + + +class TestParseFilterFile: + """Unit tests for parse_filter_file.""" + + def test_minimal_filter(self) -> None: + cfg = parse_filter_file(MINIMAL_FILTER, name="test", filename="test.conf") + assert cfg.name == "test" + assert cfg.filename == "test.conf" + assert cfg.failregex == ["^ bad request$"] + assert cfg.ignoreregex == [] + assert cfg.before is None + assert cfg.after is None + + def test_full_filter_includes(self) -> None: + cfg = parse_filter_file(FULL_FILTER, name="sshd") + assert cfg.before == "common.conf" + assert cfg.after is None + + def test_full_filter_defaults(self) -> None: + cfg = parse_filter_file(FULL_FILTER, name="sshd") + assert "_daemon" in cfg.variables + assert cfg.variables["_daemon"] == "sshd" + + def test_full_filter_failregex_multiline(self) -> None: + cfg = parse_filter_file(FULL_FILTER, name="sshd") + assert len(cfg.failregex) == 2 + assert "Authentication failure" in cfg.failregex[0] + assert "User not known" in cfg.failregex[1] + + def test_full_filter_ignoreregex(self) -> None: + cfg = parse_filter_file(FULL_FILTER, name="sshd") + assert len(cfg.ignoreregex) == 1 + assert "Authorised key" in cfg.ignoreregex[0] + + def test_full_filter_optional_fields(self) -> None: + cfg = parse_filter_file(FULL_FILTER, name="sshd") + assert cfg.maxlines == 10 + assert cfg.datepattern is not None + assert cfg.journalmatch == "_SYSTEMD_UNIT=sshd.service" + + def test_empty_failregex(self) -> None: + content = "[Definition]\nfailregex =\nignoreregex =\n" + cfg = parse_filter_file(content, name="empty") + assert cfg.failregex == [] + assert cfg.ignoreregex == [] + + def test_comment_lines_stripped_from_failregex(self) -> None: + content = ( + "[Definition]\n" + "failregex = ^ good$\n" + " # this is a comment\n" + " ^ also good$\n" + ) + cfg = parse_filter_file(content, name="t") + assert len(cfg.failregex) == 2 + assert cfg.failregex[1] == "^ also good$" + + def test_malformed_content_does_not_raise(self) -> None: + """Parser should not crash on malformed content; return partial result.""" + cfg = parse_filter_file("not an ini file at all!!!", name="bad") + assert cfg.failregex == [] + + def test_no_definition_section(self) -> None: + cfg = parse_filter_file("[INCLUDES]\nbefore = common.conf\n", name="x") + assert cfg.before == "common.conf" + assert cfg.failregex == [] + + +# --------------------------------------------------------------------------- +# Filter — round-trip (parse → serialize → parse) +# --------------------------------------------------------------------------- + + +class TestFilterRoundTrip: + """Serialize then re-parse and verify the result matches.""" + + def test_round_trip_minimal(self) -> None: + original = parse_filter_file(MINIMAL_FILTER, name="t") + serialized = serialize_filter_config(original) + restored = parse_filter_file(serialized, name="t") + assert restored.failregex == original.failregex + assert restored.ignoreregex == original.ignoreregex + + def test_round_trip_full(self) -> None: + original = parse_filter_file(FULL_FILTER, name="sshd") + serialized = serialize_filter_config(original) + restored = parse_filter_file(serialized, name="sshd") + assert restored.before == original.before + assert restored.failregex == original.failregex + assert restored.ignoreregex == original.ignoreregex + assert restored.maxlines == original.maxlines + assert restored.datepattern == original.datepattern + assert restored.journalmatch == original.journalmatch + + +# --------------------------------------------------------------------------- +# Filter — reference file (sshd.conf shipped with fail2ban-master) +# --------------------------------------------------------------------------- + + +@pytest.mark.skipif( + not (_FILTER_DIR / "sshd.conf").exists(), + reason="fail2ban-master reference configs not present", +) +class TestParseRealSshdFilter: + """Parse the real sshd.conf that ships with fail2ban-master.""" + + def test_parses_without_exception(self) -> None: + content = (_FILTER_DIR / "sshd.conf").read_text() + cfg = parse_filter_file(content, name="sshd", filename="sshd.conf") + assert cfg.name == "sshd" + + def test_has_failregex_patterns(self) -> None: + content = (_FILTER_DIR / "sshd.conf").read_text() + cfg = parse_filter_file(content, name="sshd") + assert len(cfg.failregex) > 0 + + def test_has_includes(self) -> None: + content = (_FILTER_DIR / "sshd.conf").read_text() + cfg = parse_filter_file(content, name="sshd") + assert cfg.before is not None + + def test_round_trip_preserves_failregex_count(self) -> None: + content = (_FILTER_DIR / "sshd.conf").read_text() + cfg = parse_filter_file(content, name="sshd") + serialized = serialize_filter_config(cfg) + restored = parse_filter_file(serialized, name="sshd") + assert len(restored.failregex) == len(cfg.failregex) + + +# --------------------------------------------------------------------------- +# FilterConfigUpdate — merge +# --------------------------------------------------------------------------- + + +class TestMergeFilterUpdate: + """Tests for merge_filter_update.""" + + def test_merge_only_failregex(self) -> None: + from app.models.config import FilterConfigUpdate + + base = parse_filter_file(FULL_FILTER, name="sshd") + update = FilterConfigUpdate(failregex=["^new pattern $"]) + merged = merge_filter_update(base, update) + assert merged.failregex == ["^new pattern $"] + # Everything else unchanged: + assert merged.before == base.before + assert merged.maxlines == base.maxlines + + def test_merge_none_fields_unchanged(self) -> None: + from app.models.config import FilterConfigUpdate + + base = parse_filter_file(FULL_FILTER, name="sshd") + update = FilterConfigUpdate() # All None + merged = merge_filter_update(base, update) + assert merged.failregex == base.failregex + assert merged.ignoreregex == base.ignoreregex + + def test_merge_clear_ignoreregex(self) -> None: + from app.models.config import FilterConfigUpdate + + base = parse_filter_file(FULL_FILTER, name="sshd") + update = FilterConfigUpdate(ignoreregex=[]) + merged = merge_filter_update(base, update) + assert merged.ignoreregex == [] + + def test_merge_update_variables(self) -> None: + from app.models.config import FilterConfigUpdate + + base = parse_filter_file(FULL_FILTER, name="sshd") + update = FilterConfigUpdate(variables={"_daemon": "mynewdaemon"}) + merged = merge_filter_update(base, update) + assert merged.variables["_daemon"] == "mynewdaemon" + + +# --------------------------------------------------------------------------- +# Action — parse helpers +# --------------------------------------------------------------------------- + + +MINIMAL_ACTION = """\ +[Definition] +actionban = iptables -I INPUT -s -j DROP +actionunban = iptables -D INPUT -s -j DROP +""" + +FULL_ACTION = """\ +[INCLUDES] +before = iptables-common.conf + +[Definition] +actionstart = { iptables -N f2b- + iptables -A INPUT -p -j f2b- } +actionstop = iptables -D INPUT -p -j f2b- + iptables -F f2b- + iptables -X f2b- +actioncheck = iptables -n -L INPUT | grep -q f2b-[ \\t] +actionban = iptables -I f2b- 1 -s -j +actionunban = iptables -D f2b- -s -j +actionflush = iptables -F f2b- +name = default +protocol = tcp +chain = INPUT +blocktype = REJECT + +[Init] +blocktype = REJECT --reject-with icmp-port-unreachable +name = default +""" + + +class TestParseActionFile: + """Unit tests for parse_action_file.""" + + def test_minimal_action(self) -> None: + cfg = parse_action_file(MINIMAL_ACTION, name="test", filename="test.conf") + assert cfg.name == "test" + assert cfg.actionban is not None + assert "" in cfg.actionban + assert cfg.actionunban is not None + + def test_full_action_includes(self) -> None: + cfg = parse_action_file(FULL_ACTION, name="iptables") + assert cfg.before == "iptables-common.conf" + + def test_full_action_lifecycle_keys(self) -> None: + cfg = parse_action_file(FULL_ACTION, name="iptables") + assert cfg.actionstart is not None + assert cfg.actionstop is not None + assert cfg.actioncheck is not None + assert cfg.actionban is not None + assert cfg.actionunban is not None + assert cfg.actionflush is not None + + def test_full_action_definition_vars(self) -> None: + cfg = parse_action_file(FULL_ACTION, name="iptables") + # 'name', 'protocol', 'chain', 'blocktype' are definition vars + assert "protocol" in cfg.definition_vars + assert "chain" in cfg.definition_vars + + def test_full_action_init_vars(self) -> None: + cfg = parse_action_file(FULL_ACTION, name="iptables") + assert "blocktype" in cfg.init_vars + assert "REJECT" in cfg.init_vars["blocktype"] + + def test_empty_action(self) -> None: + cfg = parse_action_file("[Definition]\n", name="empty") + assert cfg.actionban is None + assert cfg.definition_vars == {} + assert cfg.init_vars == {} + + def test_malformed_does_not_raise(self) -> None: + cfg = parse_action_file("@@@@not valid ini@@@@", name="bad") + assert cfg.actionban is None + + +# --------------------------------------------------------------------------- +# Action — round-trip +# --------------------------------------------------------------------------- + + +class TestActionRoundTrip: + """Serialize then re-parse and verify key fields are preserved.""" + + def test_round_trip_minimal(self) -> None: + original = parse_action_file(MINIMAL_ACTION, name="t") + serialized = serialize_action_config(original) + restored = parse_action_file(serialized, name="t") + assert restored.actionban == original.actionban + assert restored.actionunban == original.actionunban + + def test_round_trip_full(self) -> None: + original = parse_action_file(FULL_ACTION, name="iptables") + serialized = serialize_action_config(original) + restored = parse_action_file(serialized, name="iptables") + assert restored.actionban == original.actionban + assert restored.actionflush == original.actionflush + assert restored.init_vars.get("blocktype") == original.init_vars.get("blocktype") + + +# --------------------------------------------------------------------------- +# Action — reference file +# --------------------------------------------------------------------------- + + +@pytest.mark.skipif( + not (_ACTION_DIR / "iptables.conf").exists(), + reason="fail2ban-master reference configs not present", +) +class TestParseRealIptablesAction: + """Parse the real iptables.conf that ships with fail2ban-master.""" + + def test_parses_without_exception(self) -> None: + content = (_ACTION_DIR / "iptables.conf").read_text() + cfg = parse_action_file(content, name="iptables", filename="iptables.conf") + assert cfg.name == "iptables" + + def test_has_actionban(self) -> None: + content = (_ACTION_DIR / "iptables.conf").read_text() + cfg = parse_action_file(content, name="iptables") + assert cfg.actionban is not None + + def test_round_trip_preserves_actionban(self) -> None: + content = (_ACTION_DIR / "iptables.conf").read_text() + cfg = parse_action_file(content, name="iptables") + serialized = serialize_action_config(cfg) + restored = parse_action_file(serialized, name="iptables") + assert restored.actionban == cfg.actionban + + +# --------------------------------------------------------------------------- +# ActionConfigUpdate — merge +# --------------------------------------------------------------------------- + + +class TestMergeActionUpdate: + """Tests for merge_action_update.""" + + def test_merge_only_actionban(self) -> None: + from app.models.config import ActionConfigUpdate + + base = parse_action_file(FULL_ACTION, name="iptables") + update = ActionConfigUpdate(actionban="iptables -I INPUT -s -j DROP") + merged = merge_action_update(base, update) + assert merged.actionban == "iptables -I INPUT -s -j DROP" + assert merged.actionunban == base.actionunban + + def test_merge_none_fields_unchanged(self) -> None: + from app.models.config import ActionConfigUpdate + + base = parse_action_file(FULL_ACTION, name="iptables") + update = ActionConfigUpdate() + merged = merge_action_update(base, update) + assert merged.actionban == base.actionban + assert merged.init_vars == base.init_vars + + def test_merge_update_init_vars(self) -> None: + from app.models.config import ActionConfigUpdate + + base = parse_action_file(FULL_ACTION, name="iptables") + update = ActionConfigUpdate(init_vars={"blocktype": "DROP"}) + merged = merge_action_update(base, update) + assert merged.init_vars["blocktype"] == "DROP" + + +# --------------------------------------------------------------------------- +# Jail file test fixtures +# --------------------------------------------------------------------------- + +MINIMAL_JAIL = """\ +[sshd] +enabled = false +port = ssh +filter = sshd +logpath = /var/log/auth.log +""" + +FULL_JAIL = """\ +# fail2ban jail configuration +[sshd] +enabled = true +port = ssh,22 +filter = sshd +backend = polling +maxretry = 3 +findtime = 600 +bantime = 3600 +logpath = /var/log/auth.log + /var/log/syslog + +[nginx-botsearch] +enabled = false +port = http,https +filter = nginx-botsearch +logpath = /var/log/nginx/error.log +maxretry = 2 +action = iptables-multiport[name=botsearch, port="http,https"] + sendmail-whois +""" + +JAIL_WITH_EXTRA = """\ +[sshd] +enabled = true +port = ssh +filter = sshd +logpath = /var/log/auth.log +custom_key = custom_value +another_key = 42 +""" + + +# --------------------------------------------------------------------------- +# parse_jail_file +# --------------------------------------------------------------------------- + + +class TestParseJailFile: + """Unit tests for parse_jail_file.""" + + def test_minimal_parses_correctly(self) -> None: + from app.services.conffile_parser import parse_jail_file + + cfg = parse_jail_file(MINIMAL_JAIL, filename="sshd.conf") + assert cfg.filename == "sshd.conf" + assert "sshd" in cfg.jails + jail = cfg.jails["sshd"] + assert jail.enabled is False + assert jail.port == "ssh" + assert jail.filter == "sshd" + assert jail.logpath == ["/var/log/auth.log"] + + def test_full_parses_multiple_jails(self) -> None: + from app.services.conffile_parser import parse_jail_file + + cfg = parse_jail_file(FULL_JAIL) + assert len(cfg.jails) == 2 + assert "sshd" in cfg.jails + assert "nginx-botsearch" in cfg.jails + + def test_full_jail_numeric_fields(self) -> None: + from app.services.conffile_parser import parse_jail_file + + jail = parse_jail_file(FULL_JAIL).jails["sshd"] + assert jail.maxretry == 3 + assert jail.findtime == 600 + assert jail.bantime == 3600 + + def test_full_jail_multiline_logpath(self) -> None: + from app.services.conffile_parser import parse_jail_file + + jail = parse_jail_file(FULL_JAIL).jails["sshd"] + assert len(jail.logpath) == 2 + assert "/var/log/auth.log" in jail.logpath + assert "/var/log/syslog" in jail.logpath + + def test_full_jail_multiline_action(self) -> None: + from app.services.conffile_parser import parse_jail_file + + jail = parse_jail_file(FULL_JAIL).jails["nginx-botsearch"] + assert len(jail.action) == 2 + assert "sendmail-whois" in jail.action + + def test_enabled_true(self) -> None: + from app.services.conffile_parser import parse_jail_file + + jail = parse_jail_file(FULL_JAIL).jails["sshd"] + assert jail.enabled is True + + def test_enabled_false(self) -> None: + from app.services.conffile_parser import parse_jail_file + + jail = parse_jail_file(FULL_JAIL).jails["nginx-botsearch"] + assert jail.enabled is False + + def test_extra_keys_captured(self) -> None: + from app.services.conffile_parser import parse_jail_file + + jail = parse_jail_file(JAIL_WITH_EXTRA).jails["sshd"] + assert jail.extra["custom_key"] == "custom_value" + assert jail.extra["another_key"] == "42" + + def test_extra_keys_not_in_named_fields(self) -> None: + from app.services.conffile_parser import parse_jail_file + + jail = parse_jail_file(JAIL_WITH_EXTRA).jails["sshd"] + assert "enabled" not in jail.extra + assert "logpath" not in jail.extra + + def test_empty_file_yields_no_jails(self) -> None: + from app.services.conffile_parser import parse_jail_file + + cfg = parse_jail_file("") + assert cfg.jails == {} + + def test_invalid_ini_does_not_raise(self) -> None: + from app.services.conffile_parser import parse_jail_file + + # Should not raise; just parse what it can. + cfg = parse_jail_file("@@@ not valid ini @@@", filename="bad.conf") + assert isinstance(cfg.jails, dict) + + def test_default_section_ignored(self) -> None: + from app.services.conffile_parser import parse_jail_file + + content = "[DEFAULT]\nignoreip = 127.0.0.1\n\n[sshd]\nenabled = true\n" + cfg = parse_jail_file(content) + assert "DEFAULT" not in cfg.jails + assert "sshd" in cfg.jails + + +# --------------------------------------------------------------------------- +# Jail file round-trip +# --------------------------------------------------------------------------- + + +class TestJailFileRoundTrip: + """Tests that parse → serialize → parse preserves values.""" + + def test_minimal_round_trip(self) -> None: + from app.services.conffile_parser import parse_jail_file, serialize_jail_file_config + + original = parse_jail_file(MINIMAL_JAIL, filename="sshd.conf") + serialized = serialize_jail_file_config(original) + restored = parse_jail_file(serialized, filename="sshd.conf") + assert restored.jails["sshd"].enabled == original.jails["sshd"].enabled + assert restored.jails["sshd"].port == original.jails["sshd"].port + assert restored.jails["sshd"].logpath == original.jails["sshd"].logpath + + def test_full_round_trip(self) -> None: + from app.services.conffile_parser import parse_jail_file, serialize_jail_file_config + + original = parse_jail_file(FULL_JAIL) + serialized = serialize_jail_file_config(original) + restored = parse_jail_file(serialized) + for name, jail in original.jails.items(): + restored_jail = restored.jails[name] + assert restored_jail.enabled == jail.enabled + assert restored_jail.maxretry == jail.maxretry + assert sorted(restored_jail.logpath) == sorted(jail.logpath) + assert sorted(restored_jail.action) == sorted(jail.action) + + def test_extra_keys_round_trip(self) -> None: + from app.services.conffile_parser import parse_jail_file, serialize_jail_file_config + + original = parse_jail_file(JAIL_WITH_EXTRA) + serialized = serialize_jail_file_config(original) + restored = parse_jail_file(serialized) + assert restored.jails["sshd"].extra["custom_key"] == "custom_value" + + +# --------------------------------------------------------------------------- +# merge_jail_file_update +# --------------------------------------------------------------------------- + + +class TestMergeJailFileUpdate: + """Unit tests for merge_jail_file_update.""" + + def test_none_update_returns_original(self) -> None: + from app.models.config import JailFileConfigUpdate + from app.services.conffile_parser import merge_jail_file_update, parse_jail_file + + cfg = parse_jail_file(FULL_JAIL) + update = JailFileConfigUpdate() + merged = merge_jail_file_update(cfg, update) + assert merged.jails == cfg.jails + + def test_update_replaces_jail(self) -> None: + from app.models.config import JailFileConfigUpdate, JailSectionConfig + from app.services.conffile_parser import merge_jail_file_update, parse_jail_file + + cfg = parse_jail_file(FULL_JAIL) + new_sshd = JailSectionConfig(enabled=False, port="2222") + update = JailFileConfigUpdate(jails={"sshd": new_sshd}) + merged = merge_jail_file_update(cfg, update) + assert merged.jails["sshd"].enabled is False + assert merged.jails["sshd"].port == "2222" + # Other jails unchanged + assert "nginx-botsearch" in merged.jails + + def test_update_adds_new_jail(self) -> None: + from app.models.config import JailFileConfigUpdate, JailSectionConfig + from app.services.conffile_parser import merge_jail_file_update, parse_jail_file + + cfg = parse_jail_file(MINIMAL_JAIL) + new_jail = JailSectionConfig(enabled=True, port="443") + update = JailFileConfigUpdate(jails={"https": new_jail}) + merged = merge_jail_file_update(cfg, update) + assert "sshd" in merged.jails + assert "https" in merged.jails + assert merged.jails["https"].port == "443" diff --git a/backend/tests/test_services/test_file_config_service.py b/backend/tests/test_services/test_file_config_service.py index 227a5ba..202b4b4 100644 --- a/backend/tests/test_services/test_file_config_service.py +++ b/backend/tests/test_services/test_file_config_service.py @@ -6,6 +6,7 @@ from pathlib import Path import pytest +from app.models.config import ActionConfigUpdate, FilterConfigUpdate, JailFileConfigUpdate from app.models.file_config import ConfFileCreateRequest, ConfFileUpdateRequest from app.services.file_config_service import ( ConfigDirError, @@ -18,13 +19,20 @@ from app.services.file_config_service import ( _validate_new_name, create_action_file, create_filter_file, + create_jail_config_file, get_action_file, get_filter_file, get_jail_config_file, + get_parsed_action_file, + get_parsed_filter_file, + get_parsed_jail_file, list_action_files, list_filter_files, list_jail_config_files, set_jail_config_enabled, + update_parsed_action_file, + update_parsed_filter_file, + update_parsed_jail_file, write_action_file, write_filter_file, ) @@ -399,3 +407,183 @@ async def test_create_action_file_creates_file(tmp_path: Path) -> None: assert result == "my-action.conf" assert (config_dir / "action.d" / "my-action.conf").is_file() + + +# --------------------------------------------------------------------------- +# create_jail_config_file +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_create_jail_config_file_creates_file(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + req = ConfFileCreateRequest(name="myjail", content="[myjail]\nenabled = true\n") + + result = await create_jail_config_file(str(config_dir), req) + + assert result == "myjail.conf" + assert (config_dir / "jail.d" / "myjail.conf").is_file() + + +@pytest.mark.asyncio +async def test_create_jail_config_file_conflict(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + (config_dir / "jail.d" / "sshd.conf").write_text("[sshd]\nenabled = true\n") + + req = ConfFileCreateRequest(name="sshd", content="[sshd]\nenabled = true\n") + with pytest.raises(ConfigFileExistsError): + await create_jail_config_file(str(config_dir), req) + + +@pytest.mark.asyncio +async def test_create_jail_config_file_invalid_name(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + req = ConfFileCreateRequest(name="../escape", content="[x]\nenabled = true\n") + with pytest.raises(ConfigFileNameError): + await create_jail_config_file(str(config_dir), req) + + +# --------------------------------------------------------------------------- +# get_parsed_filter_file / update_parsed_filter_file +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_parsed_filter_file_returns_structured_model(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + content = "[Definition]\nfailregex = ^\nignoreregex = ^.*ignore.*$\n" + (config_dir / "filter.d" / "nginx.conf").write_text(content) + + result = await get_parsed_filter_file(str(config_dir), "nginx") + + assert result.name == "nginx" + assert result.filename == "nginx.conf" + assert result.failregex == ["^"] + assert result.ignoreregex == ["^.*ignore.*$"] + + +@pytest.mark.asyncio +async def test_get_parsed_filter_file_not_found(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + with pytest.raises(ConfigFileNotFoundError): + await get_parsed_filter_file(str(config_dir), "missing") + + +@pytest.mark.asyncio +async def test_update_parsed_filter_file_writes_changes(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + (config_dir / "filter.d" / "nginx.conf").write_text( + "[Definition]\nfailregex = ^ old\n" + ) + update = FilterConfigUpdate(failregex=["^ new"]) + + await update_parsed_filter_file(str(config_dir), "nginx", update) + + written = (config_dir / "filter.d" / "nginx.conf").read_text() + assert "new" in written + + +@pytest.mark.asyncio +async def test_update_parsed_filter_file_not_found(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + update = FilterConfigUpdate(failregex=["^"]) + with pytest.raises(ConfigFileNotFoundError): + await update_parsed_filter_file(str(config_dir), "missing", update) + + +# --------------------------------------------------------------------------- +# get_parsed_action_file / update_parsed_action_file +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_parsed_action_file_returns_structured_model(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + content = "[Definition]\nactionban = iptables -I INPUT -s -j DROP\n" + (config_dir / "action.d" / "iptables.conf").write_text(content) + + result = await get_parsed_action_file(str(config_dir), "iptables") + + assert result.name == "iptables" + assert result.filename == "iptables.conf" + assert result.actionban is not None + assert "" in result.actionban + + +@pytest.mark.asyncio +async def test_get_parsed_action_file_not_found(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + with pytest.raises(ConfigFileNotFoundError): + await get_parsed_action_file(str(config_dir), "missing") + + +@pytest.mark.asyncio +async def test_update_parsed_action_file_writes_changes(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + (config_dir / "action.d" / "iptables.conf").write_text( + "[Definition]\nactionban = iptables -I INPUT -s -j DROP\n" + ) + update = ActionConfigUpdate(actionban="nft add element inet f2b-table ") + + await update_parsed_action_file(str(config_dir), "iptables", update) + + written = (config_dir / "action.d" / "iptables.conf").read_text() + assert "nft" in written + + +@pytest.mark.asyncio +async def test_update_parsed_action_file_not_found(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + update = ActionConfigUpdate(actionban="iptables -I INPUT -s -j DROP") + with pytest.raises(ConfigFileNotFoundError): + await update_parsed_action_file(str(config_dir), "missing", update) + + +# --------------------------------------------------------------------------- +# get_parsed_jail_file / update_parsed_jail_file +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_parsed_jail_file_returns_structured_model(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + content = "[sshd]\nenabled = true\nport = ssh\nmaxretry = 5\n" + (config_dir / "jail.d" / "sshd.conf").write_text(content) + + result = await get_parsed_jail_file(str(config_dir), "sshd.conf") + + assert result.filename == "sshd.conf" + assert "sshd" in result.jails + assert result.jails["sshd"].enabled is True + assert result.jails["sshd"].maxretry == 5 + + +@pytest.mark.asyncio +async def test_get_parsed_jail_file_not_found(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + with pytest.raises(ConfigFileNotFoundError): + await get_parsed_jail_file(str(config_dir), "missing.conf") + + +@pytest.mark.asyncio +async def test_update_parsed_jail_file_writes_changes(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + (config_dir / "jail.d" / "sshd.conf").write_text( + "[sshd]\nenabled = true\nport = ssh\n" + ) + from app.models.config import JailSectionConfig + + update = JailFileConfigUpdate(jails={"sshd": JailSectionConfig(enabled=False)}) + + await update_parsed_jail_file(str(config_dir), "sshd.conf", update) + + written = (config_dir / "jail.d" / "sshd.conf").read_text() + assert "false" in written.lower() + + +@pytest.mark.asyncio +async def test_update_parsed_jail_file_not_found(tmp_path: Path) -> None: + config_dir = _make_config_dir(tmp_path) + update = JailFileConfigUpdate(jails={}) + with pytest.raises(ConfigFileNotFoundError): + await update_parsed_jail_file(str(config_dir), "missing.conf", update)