- Fail2BanConfigParser class: merge order, include directives (before/after), variable interpolation %(var)s, split_multiline, ordered_conf_files - config_writer: write_local_override, remove_local_key, delete_local_file with atomic writes (os.replace), per-file threading locks, .local-only guard - 79 tests in tests/test_utils/ (all passing) - mypy --strict: 60 source files, 0 errors - ruff: all checks passed
291 lines
11 KiB
Python
291 lines
11 KiB
Python
"""Tests for app.utils.config_writer."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
import pytest # noqa: F401 — used by pytest.raises
|
|
|
|
from app.utils.config_writer import (
|
|
_get_file_lock,
|
|
delete_local_file,
|
|
remove_local_key,
|
|
write_local_override,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _write(tmp_path: Path, name: str, content: str) -> Path:
|
|
p = tmp_path / name
|
|
p.write_text(content, encoding="utf-8")
|
|
return p
|
|
|
|
|
|
def _read(path: Path) -> str:
|
|
return path.read_text(encoding="utf-8")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestGetFileLock
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetFileLock:
|
|
def test_same_path_returns_same_lock(self, tmp_path: Path) -> None:
|
|
path = tmp_path / "test.local"
|
|
lock_a = _get_file_lock(path)
|
|
lock_b = _get_file_lock(path)
|
|
assert lock_a is lock_b
|
|
|
|
def test_different_paths_return_different_locks(self, tmp_path: Path) -> None:
|
|
lock_a = _get_file_lock(tmp_path / "a.local")
|
|
lock_b = _get_file_lock(tmp_path / "b.local")
|
|
assert lock_a is not lock_b
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestWriteLocalOverride
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestWriteLocalOverride:
|
|
def test_creates_new_file(self, tmp_path: Path) -> None:
|
|
path = tmp_path / "sshd.local"
|
|
write_local_override(path, "Definition", {"failregex": "^bad$"})
|
|
assert path.is_file()
|
|
|
|
def test_file_contains_written_key(self, tmp_path: Path) -> None:
|
|
path = tmp_path / "sshd.local"
|
|
write_local_override(path, "Definition", {"failregex": "^bad$"})
|
|
content = _read(path)
|
|
assert "failregex" in content
|
|
assert "^bad$" in content
|
|
|
|
def test_creates_parent_directory(self, tmp_path: Path) -> None:
|
|
path = tmp_path / "subdir" / "sshd.local"
|
|
write_local_override(path, "Definition", {"key": "val"})
|
|
assert path.is_file()
|
|
|
|
def test_updates_existing_key(self, tmp_path: Path) -> None:
|
|
path = tmp_path / "sshd.local"
|
|
write_local_override(path, "Definition", {"failregex": "original"})
|
|
write_local_override(path, "Definition", {"failregex": "updated"})
|
|
content = _read(path)
|
|
assert "updated" in content
|
|
assert "original" not in content
|
|
|
|
def test_preserves_other_sections(self, tmp_path: Path) -> None:
|
|
existing = "[Init]\nname = sshd\n\n[Definition]\nfailregex = orig\n"
|
|
path = _write(tmp_path, "sshd.local", existing)
|
|
write_local_override(path, "Definition", {"failregex": "new"})
|
|
content = _read(path)
|
|
assert "Init" in content
|
|
assert "name" in content
|
|
|
|
def test_preserves_other_keys_in_section(self, tmp_path: Path) -> None:
|
|
existing = "[Definition]\nfailregex = orig\nignoreregex = keep\n"
|
|
path = _write(tmp_path, "sshd.local", existing)
|
|
write_local_override(path, "Definition", {"failregex": "new"})
|
|
content = _read(path)
|
|
assert "ignoreregex" in content
|
|
assert "keep" in content
|
|
|
|
def test_adds_new_section(self, tmp_path: Path) -> None:
|
|
existing = "[Definition]\nfailregex = orig\n"
|
|
path = _write(tmp_path, "sshd.local", existing)
|
|
write_local_override(path, "Init", {"name": "sshd"})
|
|
content = _read(path)
|
|
assert "[Init]" in content
|
|
assert "name" in content
|
|
|
|
def test_writes_multiple_keys(self, tmp_path: Path) -> None:
|
|
path = tmp_path / "sshd.local"
|
|
write_local_override(path, "Definition", {"a": "1", "b": "2", "c": "3"})
|
|
content = _read(path)
|
|
assert "a" in content
|
|
assert "b" in content
|
|
assert "c" in content
|
|
|
|
def test_raises_for_conf_path(self, tmp_path: Path) -> None:
|
|
bad = tmp_path / "sshd.conf"
|
|
with pytest.raises(ValueError, match=r"\.local"):
|
|
write_local_override(bad, "Definition", {"key": "val"})
|
|
|
|
def test_raises_for_non_local_extension(self, tmp_path: Path) -> None:
|
|
bad = tmp_path / "sshd.ini"
|
|
with pytest.raises(ValueError):
|
|
write_local_override(bad, "Definition", {"key": "val"})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestRemoveLocalKey
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRemoveLocalKey:
|
|
def test_removes_existing_key(self, tmp_path: Path) -> None:
|
|
path = _write(
|
|
tmp_path,
|
|
"sshd.local",
|
|
"[Definition]\nfailregex = bad\nignoreregex = keep\n",
|
|
)
|
|
remove_local_key(path, "Definition", "failregex")
|
|
content = _read(path)
|
|
assert "failregex" not in content
|
|
assert "ignoreregex" in content
|
|
|
|
def test_noop_for_missing_key(self, tmp_path: Path) -> None:
|
|
path = _write(tmp_path, "sshd.local", "[Definition]\nother = val\n")
|
|
# Should not raise.
|
|
remove_local_key(path, "Definition", "nonexistent")
|
|
assert path.is_file()
|
|
|
|
def test_noop_for_missing_section(self, tmp_path: Path) -> None:
|
|
path = _write(tmp_path, "sshd.local", "[Definition]\nother = val\n")
|
|
remove_local_key(path, "Init", "name")
|
|
assert path.is_file()
|
|
|
|
def test_noop_for_missing_file(self, tmp_path: Path) -> None:
|
|
path = tmp_path / "missing.local"
|
|
# Should not raise even if file doesn't exist.
|
|
remove_local_key(path, "Definition", "key")
|
|
|
|
def test_removes_empty_section(self, tmp_path: Path) -> None:
|
|
# [Definition] will become empty and be removed; [Init] keeps the file.
|
|
path = _write(
|
|
tmp_path,
|
|
"sshd.local",
|
|
"[Definition]\nonly_key = val\n\n[Init]\nname = sshd\n",
|
|
)
|
|
remove_local_key(path, "Definition", "only_key")
|
|
content = _read(path)
|
|
assert "[Definition]" not in content
|
|
assert "[Init]" in content
|
|
|
|
def test_deletes_file_when_no_sections_remain(self, tmp_path: Path) -> None:
|
|
path = _write(tmp_path, "sshd.local", "[Definition]\nonly_key = val\n")
|
|
remove_local_key(path, "Definition", "only_key")
|
|
assert not path.exists()
|
|
|
|
def test_preserves_other_sections_after_removal(self, tmp_path: Path) -> None:
|
|
path = _write(
|
|
tmp_path,
|
|
"sshd.local",
|
|
"[Definition]\nkey = val\n\n[Init]\nname = sshd\n",
|
|
)
|
|
remove_local_key(path, "Definition", "key")
|
|
content = _read(path)
|
|
assert "[Init]" in content
|
|
assert "name" in content
|
|
|
|
def test_raises_for_conf_path(self, tmp_path: Path) -> None:
|
|
bad = tmp_path / "sshd.conf"
|
|
with pytest.raises(ValueError, match=r"\.local"):
|
|
remove_local_key(bad, "Definition", "key")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestDeleteLocalFile
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDeleteLocalFile:
|
|
def test_deletes_existing_local_with_conf(self, tmp_path: Path) -> None:
|
|
_write(tmp_path, "sshd.conf", "[Definition]\n")
|
|
path = _write(tmp_path, "sshd.local", "[Definition]\nkey = val\n")
|
|
delete_local_file(path)
|
|
assert not path.exists()
|
|
|
|
def test_raises_file_not_found(self, tmp_path: Path) -> None:
|
|
_write(tmp_path, "sshd.conf", "[Definition]\n")
|
|
missing = tmp_path / "sshd.local"
|
|
with pytest.raises(FileNotFoundError):
|
|
delete_local_file(missing)
|
|
|
|
def test_raises_oserror_for_orphan_without_flag(self, tmp_path: Path) -> None:
|
|
path = _write(tmp_path, "orphan.local", "[Definition]\nkey = val\n")
|
|
with pytest.raises(OSError, match="No corresponding .conf"):
|
|
delete_local_file(path)
|
|
|
|
def test_allow_orphan_deletes_local_only_file(self, tmp_path: Path) -> None:
|
|
path = _write(tmp_path, "orphan.local", "[Definition]\nkey = val\n")
|
|
delete_local_file(path, allow_orphan=True)
|
|
assert not path.exists()
|
|
|
|
def test_raises_for_conf_path(self, tmp_path: Path) -> None:
|
|
bad = _write(tmp_path, "sshd.conf", "[Definition]\n")
|
|
with pytest.raises(ValueError, match=r"\.local"):
|
|
delete_local_file(bad)
|
|
|
|
def test_raises_for_non_local_extension(self, tmp_path: Path) -> None:
|
|
bad = tmp_path / "sshd.ini"
|
|
bad.write_text("x", encoding="utf-8")
|
|
with pytest.raises(ValueError):
|
|
delete_local_file(bad)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestAtomicWrite (integration)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAtomicWrite:
|
|
def test_no_temp_files_left_after_write(self, tmp_path: Path) -> None:
|
|
path = tmp_path / "sshd.local"
|
|
write_local_override(path, "Definition", {"key": "val"})
|
|
files = list(tmp_path.iterdir())
|
|
# Only the target file should exist.
|
|
assert len(files) == 1
|
|
assert files[0].name == "sshd.local"
|
|
|
|
def test_write_is_idempotent(self, tmp_path: Path) -> None:
|
|
path = tmp_path / "sshd.local"
|
|
for _ in range(5):
|
|
write_local_override(path, "Definition", {"key": "val"})
|
|
content = _read(path)
|
|
# 'key' should appear exactly once.
|
|
assert content.count("key") == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestEdgeCases
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestEdgeCases:
|
|
def test_write_empty_key_values_creates_empty_section(
|
|
self, tmp_path: Path
|
|
) -> None:
|
|
path = tmp_path / "sshd.local"
|
|
write_local_override(path, "Definition", {})
|
|
content = _read(path)
|
|
assert "[Definition]" in content
|
|
|
|
def test_remove_key_with_unicode_value(self, tmp_path: Path) -> None:
|
|
path = _write(
|
|
tmp_path,
|
|
"sshd.local",
|
|
"[Definition]\nkey = 日本語\nother = keep\n",
|
|
)
|
|
remove_local_key(path, "Definition", "key")
|
|
content = _read(path)
|
|
assert "日本語" not in content
|
|
assert "other" in content
|
|
|
|
def test_write_value_with_newlines(self, tmp_path: Path) -> None:
|
|
path = tmp_path / "sshd.local"
|
|
# configparser stores multi-line values with continuation indent.
|
|
multiline = "line1\n line2\n line3"
|
|
write_local_override(path, "Definition", {"failregex": multiline})
|
|
assert path.is_file()
|
|
|
|
def test_remove_last_key_of_last_section_deletes_file(
|
|
self, tmp_path: Path
|
|
) -> None:
|
|
path = _write(tmp_path, "sshd.local", "[Definition]\nlast_key = val\n")
|
|
remove_local_key(path, "Definition", "last_key")
|
|
assert not path.exists()
|