"""Tests for file_config_service functions.""" from __future__ import annotations from pathlib import Path import pytest from app.models.file_config import ConfFileCreateRequest, ConfFileUpdateRequest from app.services.file_config_service import ( ConfigDirError, ConfigFileExistsError, ConfigFileNameError, ConfigFileNotFoundError, ConfigFileWriteError, _parse_enabled, _set_enabled_in_content, _validate_new_name, create_action_file, create_filter_file, get_action_file, get_filter_file, get_jail_config_file, list_action_files, list_filter_files, list_jail_config_files, set_jail_config_enabled, write_action_file, write_filter_file, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_config_dir(tmp_path: Path) -> Path: """Create a minimal fail2ban config directory structure.""" config_dir = tmp_path / "fail2ban" (config_dir / "jail.d").mkdir(parents=True) (config_dir / "filter.d").mkdir(parents=True) (config_dir / "action.d").mkdir(parents=True) return config_dir # --------------------------------------------------------------------------- # _parse_enabled # --------------------------------------------------------------------------- def test_parse_enabled_explicit_true(tmp_path: Path) -> None: f = tmp_path / "sshd.conf" f.write_text("[sshd]\nenabled = true\n") assert _parse_enabled(f) is True def test_parse_enabled_explicit_false(tmp_path: Path) -> None: f = tmp_path / "sshd.conf" f.write_text("[sshd]\nenabled = false\n") assert _parse_enabled(f) is False def test_parse_enabled_default_true_when_absent(tmp_path: Path) -> None: f = tmp_path / "sshd.conf" f.write_text("[sshd]\nbantime = 600\n") assert _parse_enabled(f) is True def test_parse_enabled_in_default_section(tmp_path: Path) -> None: f = tmp_path / "custom.conf" f.write_text("[DEFAULT]\nenabled = false\n") assert _parse_enabled(f) is False # --------------------------------------------------------------------------- # _set_enabled_in_content # --------------------------------------------------------------------------- def test_set_enabled_replaces_existing_line() -> None: src = "[sshd]\nenabled = false\nbantime = 600\n" result = _set_enabled_in_content(src, True) assert "enabled = true" in result assert "enabled = false" not in result def test_set_enabled_inserts_after_section() -> None: src = "[sshd]\nbantime = 600\n" result = _set_enabled_in_content(src, False) assert "enabled = false" in result def test_set_enabled_prepends_default_when_no_section() -> None: result = _set_enabled_in_content("bantime = 600\n", True) assert "enabled = true" in result # --------------------------------------------------------------------------- # _validate_new_name # --------------------------------------------------------------------------- @pytest.mark.parametrize("name", ["sshd", "my-filter", "test.local", "A1_filter"]) def test_validate_new_name_valid(name: str) -> None: _validate_new_name(name) # should not raise @pytest.mark.parametrize( "name", [ "", ".", ".hidden", "../escape", "bad/slash", "a" * 129, # too long "hello world", # space ], ) def test_validate_new_name_invalid(name: str) -> None: with pytest.raises(ConfigFileNameError): _validate_new_name(name) # --------------------------------------------------------------------------- # list_jail_config_files # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_list_jail_config_files_empty(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) resp = await list_jail_config_files(str(config_dir)) assert resp.files == [] assert resp.total == 0 @pytest.mark.asyncio async def test_list_jail_config_files_returns_conf_files(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) (config_dir / "jail.d" / "sshd.conf").write_text("[sshd]\nenabled = true\n") (config_dir / "jail.d" / "nginx.conf").write_text("[nginx]\n") (config_dir / "jail.d" / "other.txt").write_text("ignored") resp = await list_jail_config_files(str(config_dir)) names = {f.filename for f in resp.files} assert names == {"sshd.conf", "nginx.conf"} assert resp.total == 2 @pytest.mark.asyncio async def test_list_jail_config_files_enabled_state(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) (config_dir / "jail.d" / "a.conf").write_text("[a]\nenabled = false\n") (config_dir / "jail.d" / "b.conf").write_text("[b]\n") resp = await list_jail_config_files(str(config_dir)) by_name = {f.filename: f for f in resp.files} assert by_name["a.conf"].enabled is False assert by_name["b.conf"].enabled is True @pytest.mark.asyncio async def test_list_jail_config_files_missing_config_dir(tmp_path: Path) -> None: with pytest.raises(ConfigDirError): await list_jail_config_files(str(tmp_path / "nonexistent")) # --------------------------------------------------------------------------- # get_jail_config_file # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_get_jail_config_file_returns_content(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) (config_dir / "jail.d" / "sshd.conf").write_text("[sshd]\nenabled = true\n") result = await get_jail_config_file(str(config_dir), "sshd.conf") assert result.filename == "sshd.conf" assert result.name == "sshd" assert result.enabled is True assert "[sshd]" in result.content @pytest.mark.asyncio async def test_get_jail_config_file_not_found(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) with pytest.raises(ConfigFileNotFoundError): await get_jail_config_file(str(config_dir), "missing.conf") @pytest.mark.asyncio async def test_get_jail_config_file_invalid_extension(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) (config_dir / "jail.d" / "bad.txt").write_text("content") with pytest.raises(ConfigFileNameError): await get_jail_config_file(str(config_dir), "bad.txt") @pytest.mark.asyncio async def test_get_jail_config_file_path_traversal(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) with pytest.raises((ConfigFileNameError, ConfigFileNotFoundError)): await get_jail_config_file(str(config_dir), "../jail.conf") # --------------------------------------------------------------------------- # set_jail_config_enabled # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_set_jail_config_enabled_writes_false(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) path = config_dir / "jail.d" / "sshd.conf" path.write_text("[sshd]\nenabled = true\n") await set_jail_config_enabled(str(config_dir), "sshd.conf", False) assert "enabled = false" in path.read_text() @pytest.mark.asyncio async def test_set_jail_config_enabled_inserts_when_missing(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) path = config_dir / "jail.d" / "sshd.conf" path.write_text("[sshd]\nbantime = 600\n") await set_jail_config_enabled(str(config_dir), "sshd.conf", False) assert "enabled = false" in path.read_text() @pytest.mark.asyncio async def test_set_jail_config_enabled_file_not_found(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) with pytest.raises(ConfigFileNotFoundError): await set_jail_config_enabled(str(config_dir), "missing.conf", True) # --------------------------------------------------------------------------- # list_filter_files / list_action_files # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_list_filter_files_empty(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) resp = await list_filter_files(str(config_dir)) assert resp.files == [] @pytest.mark.asyncio async def test_list_filter_files_returns_files(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) (config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\n") (config_dir / "filter.d" / "sshd.local").write_text("[Definition]\n") (config_dir / "filter.d" / "ignore.py").write_text("# ignored") resp = await list_filter_files(str(config_dir)) names = {f.filename for f in resp.files} assert names == {"nginx.conf", "sshd.local"} @pytest.mark.asyncio async def test_list_action_files_returns_files(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) (config_dir / "action.d" / "iptables.conf").write_text("[Definition]\n") resp = await list_action_files(str(config_dir)) assert resp.files[0].filename == "iptables.conf" # --------------------------------------------------------------------------- # get_filter_file / get_action_file # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_get_filter_file_by_stem(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) (config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\nfailregex = test\n") result = await get_filter_file(str(config_dir), "nginx") assert result.name == "nginx" assert "failregex" in result.content @pytest.mark.asyncio async def test_get_filter_file_by_full_name(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) (config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\n") result = await get_filter_file(str(config_dir), "nginx.conf") assert result.filename == "nginx.conf" @pytest.mark.asyncio async def test_get_filter_file_not_found(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) with pytest.raises(ConfigFileNotFoundError): await get_filter_file(str(config_dir), "nonexistent") @pytest.mark.asyncio async def test_get_action_file_returns_content(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) (config_dir / "action.d" / "iptables.conf").write_text("[Definition]\nactionban = \n") result = await get_action_file(str(config_dir), "iptables") assert "actionban" in result.content # --------------------------------------------------------------------------- # write_filter_file / write_action_file # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_write_filter_file_updates_content(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) (config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\n") req = ConfFileUpdateRequest(content="[Definition]\nfailregex = new\n") await write_filter_file(str(config_dir), "nginx", req) assert "failregex = new" in (config_dir / "filter.d" / "nginx.conf").read_text() @pytest.mark.asyncio async def test_write_filter_file_not_found(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) req = ConfFileUpdateRequest(content="[Definition]\n") with pytest.raises(ConfigFileNotFoundError): await write_filter_file(str(config_dir), "missing", req) @pytest.mark.asyncio async def test_write_filter_file_too_large(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) (config_dir / "filter.d" / "nginx.conf").write_text("[Definition]\n") big_content = "x" * (512 * 1024 + 1) req = ConfFileUpdateRequest(content=big_content) with pytest.raises(ConfigFileWriteError): await write_filter_file(str(config_dir), "nginx", req) @pytest.mark.asyncio async def test_write_action_file_updates_content(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) (config_dir / "action.d" / "iptables.conf").write_text("[Definition]\n") req = ConfFileUpdateRequest(content="[Definition]\nactionban = new\n") await write_action_file(str(config_dir), "iptables", req) assert "actionban = new" in (config_dir / "action.d" / "iptables.conf").read_text() # --------------------------------------------------------------------------- # create_filter_file / create_action_file # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_create_filter_file_creates_file(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) req = ConfFileCreateRequest(name="myfilter", content="[Definition]\n") result = await create_filter_file(str(config_dir), req) assert result == "myfilter.conf" assert (config_dir / "filter.d" / "myfilter.conf").is_file() @pytest.mark.asyncio async def test_create_filter_file_conflict(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) (config_dir / "filter.d" / "ngx.conf").write_text("[Definition]\n") req = ConfFileCreateRequest(name="ngx", content="[Definition]\n") with pytest.raises(ConfigFileExistsError): await create_filter_file(str(config_dir), req) @pytest.mark.asyncio async def test_create_filter_file_invalid_name(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) req = ConfFileCreateRequest(name="../escape", content="[Definition]\n") with pytest.raises(ConfigFileNameError): await create_filter_file(str(config_dir), req) @pytest.mark.asyncio async def test_create_action_file_creates_file(tmp_path: Path) -> None: config_dir = _make_config_dir(tmp_path) req = ConfFileCreateRequest(name="my-action", content="[Definition]\n") result = await create_action_file(str(config_dir), req) assert result == "my-action.conf" assert (config_dir / "action.d" / "my-action.conf").is_file()